Like的世界

个人总结与随想

基于Python multiprocessing的Actor模型

| Comments

虽然基于Gevent的Actor基于Python 3.5异步的Actor都支持并发(concurrent)计算(仅运行于单进程中),但是不支持并行(parallel)计算,即无法利用多核。

Python内置的multiprocessing模块不仅支持并行计算,而且与Gevent接口相似。所以,模仿Gevent的Actor实现multiprocessing的Actor并不困难。

multiprocessing的Actor实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Process, Queue

try:
    from Queue import Empty
except ImportError:
    from queue import Empty


class Actor(Process):
    def __init__(self, receive_timeout=None):
        Process.__init__(self)
        self.inbox = Queue()
        self.receive_timeout = receive_timeout

    def send(self, message):
        self.inbox.put_nowait(message)

    def receive(self, message):
        raise NotImplemented()

    def handle_timeout(self):
        pass

    def run(self):
        self.running = True
        while self.running:
            try:
                message = self.inbox.get(True, self.receive_timeout)
            except Empty:
                self.handle_timeout()
            else:
                self.receive(message)

基于message的扩展

将并行Actor扩展为发布-订阅者模式,基本与Gevent的实现一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from message import observable

from actor import Actor


@observable
class Publisher(Actor):
    def __init__(self, subject, receive_timeout=None):
        self.subject = subject
        Actor.__init__(self, receive_timeout)

    def subcribe(self, observer):
        self.sub(self.subject, observer.send)

    def publish(self, message):
        self.pub(self.subject, message)

基于Publisher实现Ping-Pong,与Gevent的实现差异也不大。

不同的是它实际启动3个进程。除主进程外,每个actor分别运行于独立进程,从而实现多核计算。主进程监督2个actor进程运行,如启动、停止以及异常处理等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import time

from publisher import Publisher


class Pinger(Publisher):
    def receive(self, message):
        print(message)
        time.sleep(2)
        self.publish('ping')

    def handle_timeout(self):
        print('pinger timeout')


class Ponger(Publisher):
    def receive(self, message):
        print(message)
        time.sleep(2)
        self.publish('ping')

    def handle_timeout(self):
        print('ponger timeout')


ping = Pinger('evt.ping', 1)
pong = Ponger('evt.pong', 1)

ping.subcribe(pong)
pong.subcribe(ping)
ping.start()
pong.start()

ping.publish('start')

pong.join()
ping.join()

Comments