[python] concurrence 모듈을 활용한 message center Programming

concurrence 모듈은 stackless python 또는 greenlet + python 을 통해 함수의 비동기 호출 기능을 제공하는 모듈이다.
이 모듈을 활용하여 개발을 할 때 메시지 처리를 좀 더 유연하게 하기 위해 애플에서 제공하는 notification center와 유사한 기능을 구현해 보았다. 메시지를 수신하고 싶은 비동기 인스턴스는 자신을 message center 에 등록해두면 임의의 비동기 인스턴스에서 날라오는 메시지를 수신하여 적절한 처리를 할 수 있다.


msgcenter.py
# -*- coding: utf-8 -*-
#
#   The global message router for a programm who uses concurrence module.
#   That module can be found at (http://opensource.hyves.org/concurrence/index.html)
#   It has similar pattern of the notification center.
#   Any Tasklet can register itself to MsgCenter instance with specific message who
#   is derived from concurrence.Message class.
#   
#   NOTE:
#       This module may has several logical weakness probably.
#       I expect that you can fix them. :-)
#
#   author: fullc0de (fullc0de@gmail.com)
#

from concurrence import Tasklet, Message, dispatch

class MCMessage(object):
    def __init__(self, msg, receiver):
        # a class derived from concurrence.Message
        self.msg = msg
        # tasklet
        self.sender = Tasklet.current()
        # tasklet
        self.receiver = receiver
        # user-defined data to be passed to receiver tasklet
        self.userinfo = None

    def payload(self, userinfo):
        self.userinfo = userinfo

class MsgCenter(object):
    #   CONSTANTS
    #   don't change that values
    TASK = 0
    MSG = 1

    def __init__(self):
        self._msgs = []
        # Run router tasklet
        self._receiver = Tasklet.new(self._msg_loop)()

    #   Register any receiver tasklet and message derived from concurrence.Message
    #   task:   receiver tasklet
    #   msg:    message derived from concurrence.Message.
    def register(self, task, msg):
        self._msgs.append((task, msg))

    #   Send the message to receiver tasklet
    #   arg:    message that is an instance of MCMessage.
    #   task:   receiving tasklet
    def send(self, arg, task = None):
            arg.msg.send(self._receiver)(arg)

    def _msg_loop(self):
        for msg, args, kwargs in Tasklet.receive():
            for m in self._msgs:
                if msg.match(m[1]):
                    self._send_msg(m, args[0])

    def _send_msg(self, target, msg):
        if msg.receiver != None and msg.receiver != target[self.TASK]:
            return

        target[self.MSG].send(target[self.TASK])(msg)

if __name__ == "__main__":
    print "use test_msgcenter.py"



다음은 테스트 코드이다.


test_msgcenter.py
# -*- coding: utf-8 -*-
#
#   Test script of msgcenter.py
#
#   author: fullc0de (fullc0de@gmail.com)
#

from concurrence import Tasklet, Message, dispatch, TimeoutError
from msgcenter import MCMessage, MsgCenter

g_msg_center = MsgCenter()

class MSG_GREETING(Message):
    name = "MSG_GREETING"

class MSG_FAREWELL(Message):
    name = "MSG_FAREWELL"

def printer1():
    g_msg_center.register(Tasklet.current(), MSG_GREETING)
    g_msg_center.register(Tasklet.current(), MSG_FAREWELL)

    for msg, args, kwargs in Tasklet.receive():
        if msg.match(MSG_GREETING):
            print 'printer1 Hello', args[0].userinfo
        elif msg.match(MSG_FAREWELL):
            print 'printer1 Goodbye', args[0].userinfo
        else:
            pass

def printer2():
    g_msg_center.register(Tasklet.current(), MSG_GREETING)
    g_msg_center.register(Tasklet.current(), MSG_FAREWELL)

    while True:
        try:
            for msg, args, kwargs in Tasklet.receive(1):
                if msg.match(MSG_GREETING):
                    print 'printer2 Hello', args[0].userinfo
                elif msg.match(MSG_FAREWELL):
                    print 'printer2 Goodbye', args[0].userinfo
                else:
                    pass
        except TimeoutError, e:
            pass
        print "do another job.."


def sender1():
    print "task(%s) sleep 5 sec..." % (Tasklet.current().name)
    Tasklet.sleep(5)

    mcitem = MCMessage(MSG_FAREWELL, None)
    mcitem.payload("everybody")
    print "send task(%s) msg(%s) to ALL" % (Tasklet.current().name, MSG_FAREWELL.name)
    g_msg_center.send(mcitem)
    print "continue..."

def main():
    global g_msg_center

    printer1_task = Tasklet.new(printer1)()
    printer2_task = Tasklet.new(printer2)()
    sender1_task = Tasklet.new(sender1)()

    Tasklet.sleep(0)

    mcitem1 = MCMessage(MSG_GREETING, printer1_task)
    mcitem1.payload("kyokook")
    print "send task(%s) msg(%s) to task(%s)" % (Tasklet.current().name, MSG_GREETING.name, printer1_task.name)
    g_msg_center.send(mcitem1)

    mcitem2 = MCMessage(MSG_FAREWELL, printer2_task)
    mcitem2.payload("fullc0de")
    print "send task(%s) msg(%s) to task(%s)" % (Tasklet.current().name, MSG_FAREWELL.name, printer2_task.name)
    g_msg_center.send(mcitem2)


dispatch(main)





1 2 3 4 5 6 7 8 9 10 다음