diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5ca5339d..fb812b58 100755 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,7 +2,7 @@ Version 0.4.2 ======================== * Fixed bug in boolean validation. * Refactored :class:`pulsar.apps.test.TestPlugin` to handle multi-parameters. -* **310 regression tests**, **84% coverage**. +* **320 regression tests**, **84% coverage**. Version 0.4.1 - 2012-Dec-04 ============================== diff --git a/examples/calculator/tests.py b/examples/calculator/tests.py index 9c2efd1d..ac5841a9 100755 --- a/examples/calculator/tests.py +++ b/examples/calculator/tests.py @@ -25,7 +25,7 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): if cls.app: - return send('arbiter', 'kill_actor', cls.app.mid) + return send('arbiter', 'kill_actor', cls.app.name) def setUp(self): self.assertEqual(self.p.url, self.uri) @@ -46,7 +46,6 @@ def testHandler(self): hnd = root.subHandlers['calc'] self.assertFalse(hnd.isroot()) self.assertEqual(hnd.subHandlers, {}) - self.assertTrue(s.mid) # Pulsar server commands def testPing(self): @@ -82,7 +81,7 @@ def testDivide(self): result = self.p.calc.divide(50, 25) self.assertEqual(result, 2) - def testAInfo(self): + def testInfo(self): result = self.p.server_info() self.assertTrue('server' in result) server = result['server'] diff --git a/examples/helloworld/manage.py b/examples/helloworld/manage.py index faf8e0b1..28ed38dd 100755 --- a/examples/helloworld/manage.py +++ b/examples/helloworld/manage.py @@ -27,7 +27,7 @@ def hello(environ, start_response): return iter([data]) -def server(description = None, **kwargs): +def server(description=None, **kwargs): description = description or 'Pulsar Hello World Application' return wsgi.WSGIServer(callable=hello, description=description, diff --git a/examples/helloworld/tests.py b/examples/helloworld/tests.py index 268b0695..af13fc05 100755 --- a/examples/helloworld/tests.py +++ b/examples/helloworld/tests.py @@ -17,8 +17,9 @@ def name(cls): @classmethod def setUpClass(cls): - s = server(bind='127.0.0.1:0', name=cls.name(), - concurrency=cls.concurrency) + name = name=cls.name() + kwargs = {'%s__bind' % name: '127.0.0.1:0'} + s = server(name=cls.name(), concurrency=cls.concurrency, **kwargs) outcome = send('arbiter', 'run', s) yield outcome cls.app = outcome.result @@ -27,7 +28,7 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): if cls.app is not None: - outcome = send('arbiter', 'kill_actor', cls.app.mid) + outcome = send('arbiter', 'kill_actor', cls.app.name) yield outcome @run_on_arbiter @@ -35,6 +36,9 @@ def testMeta(self): app = get_application(self.name()) self.assertEqual(app.name, self.name()) self.assertTrue(app.monitor.running()) + self.assertEqual(app, app.app) + self.assertEqual(str(app), app.name) + self.assertEqual(app.cfg.bind, '127.0.0.1:0') def testResponse(self): c = HttpClient() diff --git a/examples/taskqueue/manage.py b/examples/taskqueue/manage.py index 5681e584..dcafe87d 100644 --- a/examples/taskqueue/manage.py +++ b/examples/taskqueue/manage.py @@ -40,8 +40,7 @@ class server(pulsar.MultiApp): def __call__(self, actor=None): name = self.name params = self.params - tq = tasks.TaskQueue(name=name, callable=dummy, - tasks_path=TASK_PATHS, + tq = tasks.TaskQueue(name=name, tasks_path=TASK_PATHS, script=__file__, **params) self.apps.append(tq) rpcs = wsgi.WSGIServer(rpc.RpcMiddleware(RpcRoot(tq)), diff --git a/examples/taskqueue/tests.py b/examples/taskqueue/tests.py index 73175f8a..94009bc5 100755 --- a/examples/taskqueue/tests.py +++ b/examples/taskqueue/tests.py @@ -45,8 +45,7 @@ def name_rpc(cls): @classmethod def setUpClass(cls): # The name of the task queue application - s = server(cls.name_tq(), - bind ='127.0.0.1:0', + s = server(cls.name_tq(), bind='127.0.0.1:0', concurrency=cls.concurrency) outcome = send('arbiter', 'run', s) yield outcome diff --git a/examples/websocket/tests.py b/examples/websocket/tests.py index 24ce894a..82c5dbcd 100644 --- a/examples/websocket/tests.py +++ b/examples/websocket/tests.py @@ -32,7 +32,7 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): if cls.app is not None: - outcome = send('arbiter', 'kill_actor', cls.app.mid) + outcome = send('arbiter', 'kill_actor', cls.app.name) yield outcome def headers(self, extensions=None, protocol=None): @@ -57,17 +57,17 @@ def testBadRequests(self): response = outcome.result self.assertEqual(response.status_code, 400) # - outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-Key','')]) + outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-Key', '')]) yield outcome response = outcome.result self.assertEqual(response.status_code, 400) # - outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-Key','bla')]) + outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-Key', 'bla')]) yield outcome response = outcome.result self.assertEqual(response.status_code, 400) # - outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-version','xxx')]) + outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-version', 'xxx')]) yield outcome response = outcome.result self.assertEqual(response.status_code, 400) diff --git a/pulsar/apps/__init__.py b/pulsar/apps/__init__.py index 83563640..09cd8a55 100644 --- a/pulsar/apps/__init__.py +++ b/pulsar/apps/__init__.py @@ -97,10 +97,6 @@ class Worker(ApplicationHandlerMixin, Actor): The application handler obtained from :meth:`Application.handler`. """ - @property - def class_code(self): - return 'worker %s' % self.app.name - def on_init(self, app=None, **kwargs): self.app = app self.information = LogInformation(self.cfg.logevery) @@ -182,9 +178,6 @@ def on_exit(self): except: pass - def clean_up(self): - self.worker_class.clean_arbiter_loop(self,self.ioloop) - def actorparams(self): '''Override the :meth:`Monitor.actorparams` method to updated actor parameters with information about the application. @@ -291,7 +284,6 @@ class Application(pulsar.Pulsar): cfg = {} _app_name = None description = None - mid = None epilog = None cfg_apps = None config_options_include = None @@ -341,7 +333,10 @@ def __init__(self, def __call__(self, actor=None): if actor is None: actor = get_actor() - if not self.mid and (not actor or actor.is_arbiter()): + monitor = None + if actor and actor.is_arbiter(): + monitor = actor.monitors.get(self.name) + if monitor is None and (not actor or actor.is_arbiter()): # Add events self.local.events = {'on_start': Deferred(), 'on_stop': Deferred()} self.cfg.on_start() @@ -381,7 +376,7 @@ def __repr__(self): return self.name def __str__(self): - return self.name + return self.__repr__() @property def monitor(self): @@ -426,15 +421,6 @@ def get_ioqueue(self): By default it returns ``None``.''' return None - def put(self, request): - queue = self.ioqueue - if queue: - self.logger.debug('Put %s on IO queue', request) - queue.put(('request', request)) - else: - self.logger.error("Trying to put a request on task queue,\ - but there isn't one!") - def on_config_init(self, cfg, params): '''Callback when configuration is initialised but not yet loaded. This is a chance to add extra config parameter or remove unwanted ones. @@ -511,7 +497,7 @@ def load_config(self, argv, version, parse_console, params): self.cfg.settings[k].default = v except AttributeError: if not self.add_to_overrides(k, v, overrides): - setattr(self,k,v) + setattr(self, k, v) # parse console args if parse_console: parser = self.cfg.parser() @@ -613,25 +599,6 @@ def start(self): arbiter.start() return self - def stop(self): - '''Stop the application.''' - arbiter = pulsar.arbiter() - if arbiter: - monitor = arbiter.get_actor(self.name) - if monitor: - monitor.stop() - - def actorlinks(self, links): - if not links: - raise StopIteration - else: - arbiter = pulsar.arbiter() - for name,app in links.items(): - if app.mid in arbiter.monitors: - monitor = arbiter.monitors[app.mid] - monitor.actor_links[self.name] = self - yield name, app - class MultiApp: diff --git a/pulsar/apps/rpc/handlers.py b/pulsar/apps/rpc/handlers.py index 094a2454..c5a0249b 100755 --- a/pulsar/apps/rpc/handlers.py +++ b/pulsar/apps/rpc/handlers.py @@ -1,7 +1,8 @@ import sys import inspect +import logging -from pulsar import LogginMixin, to_bytes, is_failure, log_failure, is_async,\ +from pulsar import to_bytes, is_failure, log_failure, is_async,\ as_failure, maybe_async, HttpException from pulsar.utils.tools import checkarity from pulsar.apps.wsgi import WsgiResponse, WsgiResponseGenerator @@ -12,13 +13,13 @@ __all__ = ['RpcHandler', 'RpcMiddleware'] +LOGGER = logging.getLogger('pulsar.rpc') class RpcRequest(object): def __init__(self, environ, handler, method, func, args, kwargs, id, version): self.environ = environ - self.logger = handler.logger self.handler = handler self.method = method self.func = func @@ -75,7 +76,7 @@ def __iter__(self): request.version, result=result) except Exception as e: - handler.log.error('Could not serialize', exc_info=True) + LOGGER.error('Could not serialize', exc_info=True) status_code = 500 result = handler.dumps(request.id, request.version, @@ -124,10 +125,7 @@ def __new__(cls, name, bases, attrs): return make(cls, name, bases, attrs) -BaseHandler = MetaRpcHandler('BaseRpcHandler',(LogginMixin,),{'virtual':True}) - - -class RpcHandler(BaseHandler): +class RpcHandler(MetaRpcHandler('_RpcHandler', (object,), {'virtual': True})): '''The base class for rpc handlers. .. attribute:: content_type @@ -152,7 +150,6 @@ def __init__(self, subhandlers=None, title=None, documentation=None, self.subHandlers = {} self.title = title or self.__class__.__name__ self.documentation = documentation or '' - self.setlog(**kwargs) if subhandlers: for prefix,handler in subhandlers.items(): if inspect.isclass(handler): @@ -189,14 +186,14 @@ def get_method_and_args(self, data): raise NotImplementedError() def __getstate__(self): - d = super(RpcHandler,self).__getstate__() + d = self.__dict__.copy() if not self.isroot(): # Avoid duplicating handlers d['_parent'] = True return d def __setstate__(self, state): - super(RpcHandler,self).__setstate__(state) + self.__dict__ = state for handler in self.subHandlers.values(): handler._parent = self diff --git a/pulsar/apps/rpc/jsonrpc.py b/pulsar/apps/rpc/jsonrpc.py index 960290b4..06f8e4df 100755 --- a/pulsar/apps/rpc/jsonrpc.py +++ b/pulsar/apps/rpc/jsonrpc.py @@ -49,7 +49,7 @@ class JSONRPC(RpcHandler): def get_method_and_args(self, data): '''Overrides the :meth:`RpcHandler:get_method_and_args` to obtain method data from the JSON *data* string.''' - if not isinstance(data,dict): + if not isinstance(data, dict): data = self._json.loads(data) method = data.get('method',None) params = data.get('params',None) @@ -57,8 +57,8 @@ def get_method_and_args(self, data): version = data.get('jsonrpc',None) kwargs = {} args = () - if isinstance(params,dict): - for k,v in params.items(): + if isinstance(params, dict): + for k, v in params.items(): kwargs[str(k)] = v elif params: args = tuple(params) diff --git a/pulsar/apps/socket/__init__.py b/pulsar/apps/socket/__init__.py index 9f5b0be9..ba7eabbc 100644 --- a/pulsar/apps/socket/__init__.py +++ b/pulsar/apps/socket/__init__.py @@ -47,26 +47,24 @@ def monitor_init(self, monitor): if not pulsar.platform.multiProcessSocket()\ or cfg.concurrency == 'thread': cfg.set('workers', 0) + if not self.socket_server_class: + raise TypeError('Socket server class not specified.') def monitor_start(self, monitor): # Open the socket and bind to address address = self.cfg.address if address: - socket = pulsar.create_socket(address, - logger=monitor.logger, - backlog=self.cfg.backlog) + socket = pulsar.create_socket(address, backlog=self.cfg.backlog) else: raise pulsar.ImproperlyConfigured('Could not open a socket. ' 'No address to bind to') self.logger.info('Listening on %s', socket) - monitor.socket = socket + monitor.params.socket = socket self.address = socket.name def worker_start(self, worker): # Start the worker by starting the socket server - if not self.socket_server_class: - raise TypeError('Socket server class not specified.') - s = self.socket_server_class(worker, worker.socket) + s = self.socket_server_class(worker, worker.params.socket) # We add the file descriptor handler s.on_connection_callbacks.append(worker.handle_fd_event) worker.socket_server = s.start() diff --git a/pulsar/apps/test/utils.py b/pulsar/apps/test/utils.py index 7a537551..8b977b91 100644 --- a/pulsar/apps/test/utils.py +++ b/pulsar/apps/test/utils.py @@ -182,7 +182,7 @@ def all_spawned(self): def spawn(self, concurrency=None, **kwargs): concurrency = concurrency or self.concurrency - ad = pulsar.spawn(concurrency=concurrency,**kwargs) + ad = pulsar.spawn(concurrency=concurrency, **kwargs) self.assertTrue(ad.aid) self.assertTrue(isinstance(ad, pulsar.ActorProxyDeferred)) yield ad diff --git a/pulsar/async/access.py b/pulsar/async/access.py index 821af91d..3c443aa6 100644 --- a/pulsar/async/access.py +++ b/pulsar/async/access.py @@ -62,10 +62,15 @@ def thread_ioloop(ioloop=None): def set_actor(actor): '''Returns the actor running the current thread.''' actor = thread_local_data('actor', value=actor) - if actor.impl == 'thread': + if actor.impl.kind == 'thread': process_local_data('thread_actors')[actor.aid] = actor return actor +def remove_actor(actor): + '''Remove actor from threaded_actors dictionary''' + if actor.impl.kind == 'thread': + process_local_data('thread_actors').pop(actor.aid, None) + def get_actor_from_id(aid): '''Retrieve an actor from its actor id. This function can be used by actors with thread concurrency ince they live in the arbiter process domain.''' diff --git a/pulsar/async/actor.py b/pulsar/async/actor.py index 02b5ccf9..c2a6d696 100644 --- a/pulsar/async/actor.py +++ b/pulsar/async/actor.py @@ -18,7 +18,7 @@ from .defer import make_async, is_failure, iteritems, itervalues,\ pickle, safe_async, async, log_failure, make_async from .mailbox import IOQueue, mailbox -from .access import set_local_data, is_mainthread, get_actor +from .access import set_local_data, is_mainthread, get_actor, remove_actor __all__ = ['is_actor', 'send', 'Actor', 'ACTOR_STATES', 'Pulsar'] @@ -39,6 +39,9 @@ # times timeout. So for a timeout of 30 seconds, the messages will # go after tolerance*30 seconds (18 secs for tolerance = 0.6). ACTOR_TIMEOUT_TOLERANCE = 0.6 +# Timeout of when joining an actor which has been terminated +ACTOR_TERMINATE_TIMEOUT = 2 +ACTOR_STOPPING_LOOPS = 10 EMPTY_TUPLE = () EMPTY_DICT = {} @@ -74,12 +77,16 @@ def send(target, action, *args, **params): class Pulsar(LogginMixin): def configure_logging(self, **kwargs): - super(Pulsar, self).configure_logging(logger=self.name, - config=self.cfg.logconfig, - level=self.cfg.loglevel, - handlers=self.cfg.loghandlers) - - + configure_logging = super(Pulsar, self).configure_logging + configure_logging(logger='pulsar', + config=self.cfg.logconfig, + level=self.cfg.loglevel, + handlers=self.cfg.loghandlers) + configure_logging(logger='pulsar.%s' % self.name, + config=self.cfg.logconfig, + level=self.cfg.loglevel, + handlers=self.cfg.loghandlers) + class Actor(Pulsar): '''The base class for concurrent programming in pulsar. In computer science, @@ -153,6 +160,7 @@ class Actor(Pulsar): String indicating the logging level for the actor. ''' exit_code = None + mailbox = None stopping_start = None stopping_end = None @@ -173,12 +181,13 @@ def __init__(self, impl): self.concurrent_requests = 0 self.state = ACTOR_STATES.INITIAL self.ioqueue = ioqueue - self.linked_actors = {} + self.linked_actors = impl.params.pop('linked_actors', {}) self.monitors = impl.params.pop('monitors', {}) - self.arbiter = impl.params.pop('arbiter', {}) - self.monitor = impl.params.pop('monitor', {}) + self.arbiter = impl.params.pop('arbiter', None) + self.monitor = impl.params.pop('monitor', None) self.proxy_mailboxes = {} - impl.params = self.on_init(**impl.params) or {} + self.params = AttributeDictionary(self.on_init(**impl.params)) + del impl.params def __repr__(self): return self.impl.unique_name @@ -256,10 +265,8 @@ def send(self, target, action, *args, **params): '''Send a message to *target* to perform *action* with given parameters *params*. It return a :class:`ActorMessage`.''' if not isinstance(target, ActorProxy): - tg = self.get_actor(target) - else: - tg = target - return get_proxy(tg).receive_from(self, action, *args, **params) + target = get_proxy(self.get_actor(target)) + return target.receive_from(self, action, *args, **params) def put(self, request): '''Put a *request* into the :attr:`ioqueue` if available.''' @@ -269,15 +276,7 @@ def put(self, request): else: self.logger.error("Trying to put a request on task queue,\ but there isn't one!") - - def run_on_arbiter(self, callable): - '''Run a *callable* in the arbiter event loop. - -:parameter callable: a pickable, therefore it must be a pickable callable object - or a function. -:rtype: a :class:`Deferred`''' - return self.send('arbiter', 'run', callable) - + ############################################################### STATES def running(self): '''``True`` if actor is running.''' @@ -313,7 +312,7 @@ def is_monitor(self): def isprocess(self): '''boolean indicating if this is an actor on a child process.''' - return self.impl == 'process' + return self.impl.kind == 'process' def ready(self): return self.arbiter.aid in self.linked_actors @@ -434,6 +433,7 @@ def stop(self, force=False, exit_code=0): self.on_exit() self.stopping_end = time() self.logger.info('%s exited', self) + remove_actor(self) def linked_proxy_actors(self): '''Iterator over linked-actor proxies.''' @@ -446,7 +446,11 @@ def get_actor(self, aid): if aid == self.aid: return self elif aid == 'arbiter': + return self.arbiter or self + elif self.arbiter and aid == self.arbiter.aid: return self.arbiter + elif self.monitor and aid == self.monitor.aid: + return self.monitor elif aid in self.linked_actors: return self.linked_actors[aid] else: @@ -468,11 +472,11 @@ def __call__(self): nt = None if nt: self.last_notified = nt - info = self.get_info() + info = self.info() self.send('arbiter', 'notify', info) self.on_task() - def get_info(self): + def info(self): '''return A dictionary of information related to the actor status and performance.''' if not self.started(): @@ -550,7 +554,7 @@ def _on_run(self): # Inject self as the actor of this thread ioq = self.ioqueue self.requestloop = IOLoop(io=IOQueue(ioq, self) if ioq else None, - pool_timeout=self.impl.pool_timeout, + pool_timeout=self.params.pool_timeout, logger=self.logger, name=self.name, ready=not self.cpubound) diff --git a/pulsar/async/arbiter.py b/pulsar/async/arbiter.py index e2857262..ee1fc479 100755 --- a/pulsar/async/arbiter.py +++ b/pulsar/async/arbiter.py @@ -12,7 +12,7 @@ from pulsar import HaltServer from .defer import itervalues, iteritems, multi_async -from .actor import Actor, send +from .actor import Actor, ACTOR_TERMINATE_TIMEOUT, ACTOR_STOPPING_LOOPS from .monitor import PoolMixin, _spawn_actor from .access import get_actor, set_actor from . import proxy @@ -42,7 +42,7 @@ def arbiter(commands_set=None, **params): return arbiter -def spawn(**kwargs): +def spawn(cfg=None, **kwargs): '''Spawn a new :class:`Actor` and return an :class:`ActorProxyDeferred`. This method can be used from any :class:`Actor`. If not in the :class:`Arbiter` domain, @@ -76,8 +76,8 @@ def spawn(**kwargs): # The actor is not the Arbiter domain. # We send a message to the Arbiter to spawn a new Actor if not isinstance(actor, Arbiter): - msg = send('arbiter', 'spawn', **kwargs)\ - .add_callback(actor.link_actor) + msg = actor.send('arbiter', 'spawn', **kwargs)\ + .add_callback(actor.link_actor) return proxy.ActorProxyDeferred(aid, msg) else: return actor.spawn(**kwargs) @@ -100,7 +100,6 @@ class Arbiter(PoolMixin, Actor): .. _twisted: http://twistedmatrix.com/trac/ .. _tornado: http://www.tornadoweb.org/ ''' - STOPPING_LOOPS = 20 SIG_TIMEOUT = 0.01 EXIT_SIGNALS = (signal.SIGINT, signal.SIGTERM, @@ -135,7 +134,7 @@ def isprocess(self): def get_all_monitors(self): '''A dictionary of all :class:`Monitor` in the arbiter''' return dict(((mon.name, mon.proxy) for mon in\ - itervalues(self.monitors))) + itervalues(self.monitors) if mon.mailbox)) @multi_async def close_monitors(self): @@ -144,18 +143,18 @@ def close_monitors(self): yield pool.stop() def on_info(self, data): - monitors = [p.get_info() for p in itervalues(self.monitors)] + monitors = [p.info() for p in itervalues(self.monitors)] server = data.pop('actor') server.update({'version': pulsar.__version__, 'name': pulsar.SERVER_NAME, 'number_of_monitors': len(self.monitors), - 'number_of_actors': len(self.MANAGED_ACTORS)}) + 'number_of_actors': len(self.managed_actors)}) server.pop('is_process', None) server.pop('ppid', None) server.pop('actor_id', None) server.pop('age', None) data['server'] = server - data['workers'] = [a.info for a in itervalues(self.MANAGED_ACTORS)] + data['workers'] = [a.info for a in itervalues(self.managed_actors)] data['monitors'] = monitors return data @@ -206,16 +205,16 @@ def manage_actor(self, actor): the timeout. Stop the arbiter.''' if self.running() and actor.notified: gap = time() - actor.notified - if gap > actor.timeout: - if actor.stopping_loops < self.STOPPING_LOOPS: + if gap > actor.cfg.timeout: + if actor.stopping_loops < ACTOR_STOPPING_LOOPS: if not actor.stopping_loops: self.logger.info('Stopping %s. Timeout.', actor) self.send(actor, 'stop') + actor.stopping_loops += 1 else: self.logger.warn('Terminating %s. Timeout.', actor) actor.terminate() - actor.join(self.JOIN_TIMEOUT) - actor.stopping_loops += 1 + actor.join(ACTOR_TERMINATE_TIMEOUT) def on_stop(self): '''Stop the pools the message queue and remaining actors.''' diff --git a/pulsar/async/commands.py b/pulsar/async/commands.py index e31558f8..797c6903 100644 --- a/pulsar/async/commands.py +++ b/pulsar/async/commands.py @@ -61,7 +61,7 @@ def config(client, actor, setget, name, *value): def mailbox_address(client, actor, caller, address): '''The remote *caller* register its mailbox ``address``.''' if address: - actor.logger.debug('Registering %s inbox address %s', caller, address) + actor.logger.debug('Registering %s inbox address %s', caller, address) actor.link_actor(caller, address) return actor.proxy @@ -93,7 +93,7 @@ def kill_actor(client, actor, aid): a = actor.get_actor(aid) if a: a.stop() - return 'stopped {0}'.format(a) + return 'stopped %s' % a else: actor.logger.info('Could not kill "%s" no such actor', aid) \ No newline at end of file diff --git a/pulsar/async/concurrency.py b/pulsar/async/concurrency.py index 4c90d9e0..4f9f3049 100644 --- a/pulsar/async/concurrency.py +++ b/pulsar/async/concurrency.py @@ -13,7 +13,7 @@ __all__ = ['Concurrency', 'concurrency'] -def concurrency(kind, actor_class, monitor, commands_set, cfg, params): +def concurrency(kind, actor_class, monitor, commands_set, cfg, **params): '''Function invoked by the :class:`Arbiter` or a :class:`Monitor` when spawning a new :class:`Actor`. It created a :class:`Concurrency` instance which handle the contruction and the lif of an :class:`Actor`. @@ -30,7 +30,7 @@ def concurrency(kind, actor_class, monitor, commands_set, cfg, params): c = ActorProcess() else: raise ValueError('Concurrency %s not supported in pulsar' % kind) - return c.make(kind, actor_class, monitor, commands_set, cfg, params) + return c.make(kind, actor_class, monitor, commands_set, cfg, **params) class Concurrency(object): @@ -48,16 +48,16 @@ class Concurrency(object): ''' _creation_counter = 0 address = None - def make(self, kind, actor_class, monitor, commands_set, cfg, params): + def make(self, kind, actor_class, monitor, commands_set, cfg, name=None, + aid=None, **params): self.__class__._creation_counter += 1 - self.aid = gen_unique_id()[:8] + self.aid = aid or gen_unique_id()[:8] self.age = self.__class__._creation_counter - self.name = params.pop('name', actor_class.__name__.lower()) + self.name = name or actor_class.__name__.lower() self.kind = kind self.commands_set = commands_set self.cfg = cfg self.actor_class = actor_class - self.pool_timeout = params.pop('pool_timeout', None) self.params = params return self.get_actor(monitor) @@ -116,7 +116,8 @@ class ActorThread(ActorConcurrency, Thread): def terminate(self): '''Called by the main thread to force termination.''' actor = get_actor_from_id(self.aid) - result = actor.stop(force=True) + if actor is not None: + actor.stop(force=True) @property def pid(self): diff --git a/pulsar/async/iostream.py b/pulsar/async/iostream.py index 6b07aa5b..0ea6470f 100644 --- a/pulsar/async/iostream.py +++ b/pulsar/async/iostream.py @@ -18,7 +18,7 @@ from .eventloop import IOLoop, loop_timeout from .access import PulsarThread, thread_ioloop, get_actor -iologger = logging.getLogger('pulsar.iostream') +LOGGER = logging.getLogger('pulsar.iostream') __all__ = ['AsyncIOStream', @@ -51,11 +51,11 @@ class AsyncIOStream(IObase, BaseSocket): A :class:`Socket` which might be connected or unconnected. -.. attribute:: _read_callback_timeout +.. attribute:: timeout A timeout in second which is used when waiting for a data to be available for reading. If timeout is a positive number, - every time we the :class:`AsyncIOStream` perform a :meth:`read` + every time the :class:`AsyncIOStream` performs a :meth:`read` operation a timeout is also created on the :attr:`ioloop`. ''' _error = None @@ -76,7 +76,6 @@ def __init__(self, socket=None, max_buffer_size=None, self.read_chunk_size = read_chunk_size or io.DEFAULT_BUFFER_SIZE self._read_buffer = deque() self._write_buffer = deque() - self.log = iologger def __repr__(self): if self.sock: @@ -168,7 +167,7 @@ def connect(self, address): except socket.error as e: # In non-blocking mode connect() always raises an exception if not async_error(e): - self.log.warning('Connect error on %s: %s', self, e) + LOGGER.warning('Connect error on %s: %s', self, e) self.close() return callback = Deferred(description='%s connect callback' % self) @@ -287,7 +286,7 @@ def read_to_buffer(self): break self._read_buffer.append(chunk) if self._read_buffer_size() >= self.max_buffer_size: - self.log.error("Reached maximum read buffer size") + LOGGER.error("Reached maximum read buffer size") self.close() raise IOError("Reached maximum read buffer size") if len(chunk) < length: @@ -327,7 +326,7 @@ def _handle_read(self): raise except Exception: result = 0 - self.log.warning("Read error on %s.", self, exc_info=True) + LOGGER.warning("Read error on %s.", self, exc_info=True) if result == 0: self.close() buffer = self._get_buffer(self._read_buffer) @@ -359,7 +358,7 @@ def _handle_write(self): if async_error(e): break else: - self.log.warning("Write error on %s: %s", self, e) + LOGGER.warning("Write error on %s: %s", self, e) self.close() return if not self._write_buffer and self._write_callback: @@ -380,7 +379,7 @@ def _get_buffer(self, dq): def _handle_events(self, fd, events): # This is the actual callback from the event loop if not self.sock: - self.log.warning("Got events for closed stream %d", fd) + LOGGER.warning("Got events for closed stream %d", fd) return try: if events & self.READ: @@ -463,7 +462,7 @@ def close(self, msg=None): if isinstance(msg.trace[1], Timeout): if not msg.logged: msg.logged = True - self.log.info('Closing %s on timeout.', self) + LOGGER.info('Closing %s on timeout.', self) else: log_failure(msg) self.on_close(msg) @@ -502,7 +501,6 @@ class ClientSocketHandler(BaseSocketHandler): :class:`ClientSocketHandler`. ''' parser_class = EchoParser - log = iologger def __init__(self, socket, address, parser_class=None, timeout=None): '''Create a client or client-connection socket. A parser class is required in order to use :class:`SocketClient`. @@ -535,6 +533,19 @@ def _set_socket(self, sock): if self.async: close_callback = Deferred().add_callback(self.close) self.sock.set_close_callback(close_callback) + + def _get_read_timeout(self): + if self.async: + return self.sock._read_callback_timeout + else: + return self._socket_timeout + def _set_read_timeout(self, value): + if self.async: + self.sock._read_callback_timeout = value + else: + self._socket_timeout = value + sock.settimeout(self._socket_timeout) + read_timeout = property(_get_read_timeout, _set_read_timeout) class ClientSocket(ClientSocketHandler, IOClientRead): @@ -612,7 +623,7 @@ def _parsedata(self, data): try: parsed_data, buffer = self.parser.decode(buffer) except CouldNotParse: - self.log.warn('Could not parse data', exc_info=True) + LOGGER.warn('Could not parse data', exc_info=True) parsed_data = None buffer = bytearray() self.buffer = buffer @@ -710,12 +721,6 @@ def handle(self): yield self.write(data) else: # The response is not ready. release the loop yield NOT_DONE - - def _get_timeout(self): - return self.sock._read_callback_timeout - def _set_timeout(self, value): - self.sock._read_callback_timeout = value - read_timeout = property(_get_timeout, _set_timeout) def request(self, response=None): if self._current_request is None: @@ -736,10 +741,9 @@ def request_data(self): self.parser.connection = self try: parsed_data, buffer = self.parser.decode(buffer) - except CouldNotParse: - self.log.warn('Could not parse data', exc_info=True) - parsed_data = None - buffer = bytearray() + except: + LOGGER.error('Could not parse data', exc_info=True) + raise self.buffer = buffer return parsed_data @@ -747,10 +751,6 @@ def request_data(self): def actor(self): return self.server.actor - @property - def log(self): - return self.actor.log - def on_close(self, failure=None): self.server.connections.discard(self) diff --git a/pulsar/async/mailbox.py b/pulsar/async/mailbox.py index 49fbcfc7..d564cc4a 100644 --- a/pulsar/async/mailbox.py +++ b/pulsar/async/mailbox.py @@ -22,6 +22,9 @@ 'Empty', 'Queue', 'ActorMessage'] +LOGGER = logging.getLogger('pulsar.mailbox') + + def mailbox(actor=None, address=None): '''Creates a :class:`Mailbox` instances for :class:`Actor` instances. If an address is provided, the communication is implemented using a socket, @@ -35,7 +38,7 @@ def mailbox(actor=None, address=None): return Mailbox.make(actor).start() def actorid(actor): - return actor.aid if hasattr(actor,'aid') else actor + return actor.aid if hasattr(actor, 'aid') else actor class MessageParser(object): @@ -239,8 +242,8 @@ def register(self, actor): def unregister(self, actor): if not self.ioloop.remove_loop_task(actor): - self.actor.log.warn('"%s" could not be removed from eventloop'\ - % actor) + LOGGER.warn('"%s" could not be removed from eventloop', actor) + class MonitorMailbox(object): '''A :class:`Mailbox` for a :class:`Monitor`. This is a proxy for the @@ -277,14 +280,14 @@ def __init__(self, queue): self._fd = 'waker' def __str__(self): - return '{0} {1}'.format(self.__class__.__name__,self._fd) + return '%s %s' % (self.__class__.__name__, self._fd) def fileno(self): return self._fd def wake(self): try: - self._queue.put((self._fd,None)) + self._queue.put((self._fd, None)) except (IOError,TypeError): pass diff --git a/pulsar/async/monitor.py b/pulsar/async/monitor.py index d91cced8..90d97773 100644 --- a/pulsar/async/monitor.py +++ b/pulsar/async/monitor.py @@ -5,7 +5,7 @@ import pulsar from . import proxy -from .actor import Actor, ACTOR_STATES +from .actor import Actor, ACTOR_STATES, ACTOR_TERMINATE_TIMEOUT from .eventloop import setid from .concurrency import concurrency from .defer import async, iteritems, itervalues, range, NOT_DONE @@ -15,42 +15,54 @@ __all__ = ['Monitor', 'PoolMixin'] -def _spawn_actor(cls, commands_set=None, monitor=None, cfg=None, **kw): +def _spawn_actor(cls, commands_set=None, monitor=None, cfg=None, name=None, + aid=None, **kw): # Internal function which spawns a new Actor and return its # ActorProxyMonitor. # *monitor* can be either the ariber or a monitor # *actorcls* is the Actor class - commands_set = set(commands_set or proxy.actor_commands) kind = None if issubclass(cls, PoolMixin): kind = 'monitor' + if cfg is None: + if monitor: + cfg = monitor.cfg.copy() + else: + cfg = pulsar.Config() if monitor: params = monitor.actorparams() - params.update(kw) - if cfg is None: - cfg = monitor.cfg + name = params.pop('name', name) + aid = params.pop('aid', aid) + commands_set = params.pop('commands_set', commands_set) + else: + if kind != 'monitor': + raise TypeError('class %s not a valid monitor' % cls) + params = {} + commands_set = set(commands_set or proxy.actor_commands) + for key, value in iteritems(kw): + if key in cfg.settings: + cfg.set(key, value) + else: + params[key] = value + # + if monitor: if not kind: if not issubclass(cls, Actor): raise TypeError('Class %s not a valid actor.' % cls) kind = cfg.concurrency - else: - # No monitor, cls must be a monitor class - if kind != 'monitor': - raise TypeError('class %s not a valid monitor' % cls) - params = kw - if cfg is None: - cfg = pulsar.Config() if not kind: raise TypeError('Cannot spawn class %s. not a valid concurrency.' % cls) - actor_proxy = concurrency(kind, cls, monitor, commands_set, cfg, params) + actor_proxy = concurrency(kind, cls, monitor, commands_set, cfg, + name=name, aid=aid, **params) # Add to the list of managed actors if this is a remote actor if isinstance(actor_proxy, Actor): return actor_proxy else: actor_proxy.monitor = monitor monitor.spawning_actors[actor_proxy.aid] = actor_proxy + deferred = proxy.ActorProxyDeferred(actor_proxy) actor_proxy.start() - return proxy.ActorProxyDeferred(actor_proxy) + return deferred class PoolMixin(object): @@ -64,12 +76,15 @@ class PoolMixin(object): dictionary with keys given by actor's ids and values by :class:`ActorProxyMonitor` instances. These are the actors managed by the pool. + +.. attribute:: spawning_actors + + A dictionary of :class:`ActorProxyMonitor` which are in the process of + being spawned. ''' CLOSE_TIMEOUT = 30000000000000 - JOIN_TIMEOUT = 1.0 actor_class = Actor - DEFAULT_IMPLEMENTATION = 'monitor' '''The class derived form :class:`Actor` which the monitor manages during its life time. @@ -91,20 +106,23 @@ def is_pool(self): def ready(self): return True - def spawn(self, actorcls=None, linked_actors=None, montitor=None, **params): + def spawn(self, actor_class=None, linked_actors=None, montitor=None, + **params): '''Spawn a new :class:`Actor` and return its :class:`ActorProxyMonitor`.''' - actorcls = actorcls or self.actor_class + actor_class = actor_class or self.actor_class if linked_actors: params['linked_actors'] =\ dict(((aid, p.proxy) for aid, p in iteritems(linked_actors))) - return _spawn_actor(actorcls, monitor=self, **params) + return _spawn_actor(actor_class, monitor=self, **params) def actorparams(self): '''Return a dictionary of parameters to be passed to the spawn method when creating new actors.''' arbiter = self.arbiter or self - return {'monitors': arbiter.get_all_monitors()} + params = dict(self.params) + params['monitors'] = arbiter.get_all_monitors() + return params def get_actor(self, aid): a = Actor.get_actor(self, aid) @@ -132,7 +150,7 @@ def manage_actors(self, terminate=False, stop=False, manage=True): items.extend(iteritems(SPAWNING)) for aid, actor in items: if not actor.is_alive(): - actor.join(self.JOIN_TIMEOUT) + actor.join(ACTOR_TERMINATE_TIMEOUT) ACTORS.pop(aid, None) SPAWNING.pop(aid, None) LINKED.pop(aid, None) @@ -143,7 +161,7 @@ def manage_actors(self, terminate=False, stop=False, manage=True): # registered with its monitor yet). if terminate or actor.mailbox is None: actor.terminate() - actor.join(self.JOIN_TIMEOUT) + actor.join(ACTOR_TERMINATE_TIMEOUT) else: actor.stop(self) elif manage: @@ -187,7 +205,7 @@ def link_actor(self, proxy, address): delattr(proxy_monitor,'on_address') try: if not address: - raise valueError('No address received') + raise ValueError('No address received') except: on_address.callback(sys.exc_info()) else: @@ -216,33 +234,6 @@ def close_actors(self): self.logger.warn('terminated %s actors.', to_stop) to_stop = 0 - @classmethod - def _spawn_actor(cls, monitor, actorcls, aid, commands_set, **kwargs): - # Internal function which spawns a new Actor and return its - # ActorProxyMonitor. - # *monitor* can be either the ariber or a monitor - # *actorcls* is the Actor class - commands_set = set(commands_set or proxy.actor_commands) - if monitor: - params = monitor.actorparams() - params.update(kwargs) - else: - params = kwargs - impl = params.pop('concurrency', actorcls.DEFAULT_IMPLEMENTATION) - timeout = max(params.pop('timeout',cls.DEFAULT_ACTOR_TIMEOUT), - cls.MINIMUM_ACTOR_TIMEOUT) - actor_proxy = concurrency(impl, actorcls, timeout, - monitor, aid, commands_set, - params) - # Add to the list of managed actors if this is a remote actor - if isinstance(actor_proxy, Actor): - return actor_proxy - else: - actor_proxy.monitor = monitor - monitor.spawning_actors[actor_proxy.aid] = actor_proxy - actor_proxy.start() - return proxy.ActorProxyDeferred(actor_proxy) - class Monitor(PoolMixin, Actor): '''A monitor is a special :class:`Actor` which shares @@ -327,6 +318,7 @@ def on_stop(self): def _run(self): self.requestloop = self.arbiter.requestloop self.mailbox = mailbox(self) + self.monitors = self.arbiter.monitors setid(self) self.state = ACTOR_STATES.RUN self.on_start() @@ -353,14 +345,14 @@ def stop_actor(self, actor): else: return actor.proxy.stop() - def get_info(self): + def info(self): tq = self.ioqueue - data = {'actor': {'actor_class':self.actor_class.code(), + data = {'actor': {'actor_class':self.actor_class.__name__, 'concurrency':self.cfg.concurrency, 'name':self.name, 'age':self.impl.age, 'workers': len(self.managed_actors)}, - 'workers': [a.get_info() for a in itervalues(self.managed_actors)]} + 'workers': [a.info for a in itervalues(self.managed_actors)]} if tq is not None: if isinstance(tq, Queue): tqs = 'multiprocessing.Queue' diff --git a/pulsar/async/proxy.py b/pulsar/async/proxy.py index 75c0f615..b4995b95 100755 --- a/pulsar/async/proxy.py +++ b/pulsar/async/proxy.py @@ -3,7 +3,6 @@ from collections import deque from pulsar import CommandNotFound, AuthenticationError -from pulsar.utils.log import LocalMixin from .defer import Deferred, is_async, make_async, AlreadyCalledError from .mailbox import mailbox, ActorMessage @@ -110,12 +109,12 @@ def __init__(self, aid, msg=None): # simply listent for the calbacks and errorbacks msg.addBoth(self.callback) - def __str__(self): + def __repr__(self): return '%s(%s)' % (self.__class__, self.aid) - __repr__ = __str__ + __str__ = __repr__ -class ActorProxy(LocalMixin): +class ActorProxy(object): '''This is an important component in pulsar concurrent framework. An instance of this class is as a proxy for a remote `underlying` :class:`Actor`. This is a lightweight class which delegates @@ -146,12 +145,13 @@ class ActorProxy(LocalMixin): last_msg = None def __init__(self, impl): self.aid = impl.aid + self.name = impl.name self.commands_set = impl.commands_set self.address = impl.address self.cfg = impl.cfg def __repr__(self): - return self.aid + return '%s(%s)' % (self.name, self.aid) __str__ = __repr__ @property @@ -178,7 +178,7 @@ def receive_from(self, sender, command, *args, **kwargs): if sender is None: sender = get_actor() if not self.mailbox: - sender.log.critical('Cannot send a message to %s. No\ + sender.logger.critical('Cannot send a message to %s. No\ mailbox available.', self) return cmd = get_command(command, self.commands_set) diff --git a/pulsar/utils/httpurl.py b/pulsar/utils/httpurl.py index bee4aab9..5e9b1765 100644 --- a/pulsar/utils/httpurl.py +++ b/pulsar/utils/httpurl.py @@ -1096,6 +1096,7 @@ def _got_data(self, data): class HttpParseError(ValueError): pass + class HttpResponse(IOClientRead): '''An Http response object. @@ -1224,12 +1225,11 @@ def write(self, data): def parsedata(self, data): '''Called when data is available on the pipeline''' - has_headers = self.parser.is_headers_complete() + had_headers = self.parser.is_headers_complete() if self.parser.execute(data, len(data)) == len(data): if self.streaming: - # if headers are ready, we keep reading (by returning nothing) if - # the body is not yet complete - if has_headers: + # if headers are ready, try to close the response + if had_headers: return self.close() elif self.parser.is_headers_complete(): res = self.request.client.build_response(self) or self @@ -1237,7 +1237,8 @@ def parsedata(self, data): res._read() return res # Not streaming. wait until we have the whole message - elif self.parser.is_message_complete(): + elif self.parser.is_headers_complete() and\ + self.parser.is_message_complete(): return self.request.client.build_response(self) else: # This is an error in the parsing. Raise an error so that the diff --git a/pulsar/utils/structures.py b/pulsar/utils/structures.py index 13d1bca7..8ff5958d 100755 --- a/pulsar/utils/structures.py +++ b/pulsar/utils/structures.py @@ -152,7 +152,7 @@ def __init__(self, *iterable, **kwargs): if len(iterable) > 1: raise TypeError('%s exceped at most 1 arguments, got %s.' %\ (self.__class__.__name__, len(iterable))) - self.update(iterable) + self.update(iterable[0]) if kwargs: self.update(kwargs) diff --git a/tests/actor.py b/tests/actor.py index a9738b80..de1dc634 100644 --- a/tests/actor.py +++ b/tests/actor.py @@ -2,6 +2,7 @@ from time import sleep import pulsar +from pulsar import send from pulsar.async.defer import pickle from pulsar.apps.test import unittest, ActorTestMixin, run_on_arbiter,\ dont_run_with_thread @@ -27,18 +28,18 @@ def test_get_proxy(self): def test_dummy_proxy(self): actor = pulsar.get_actor() self.assertRaises(ValueError, pulsar.concurrency, 'bla', - pulsar.Actor, 5, actor, None, set(), {}) - impl = pulsar.concurrency('thread', pulsar.Actor, 5, actor, None, - set(), {}) + pulsar.Actor, actor, set(), pulsar.Config()) + impl = pulsar.concurrency('thread', pulsar.Actor, actor, set(), + pulsar.Config()) p = pulsar.ActorProxy(impl) self.assertEqual(p.address, None) self.assertEqual(p.receive_from(None, 'dummy'), None) - self.assertEqual(str(p), p.aid) + self.assertEqual(str(p), 'actor(%s)' % p.aid) def testActorCoverage(self): '''test case for coverage''' actor = pulsar.get_actor() - self.assertRaises(ValueError, actor.send, 'sjdcbhjscbhjdbjsj', 'bla') + self.assertRaises(ValueError, send, 'sjdcbhjscbhjdbjsj', 'bla') self.assertRaises(pickle.PicklingError, pickle.dumps, actor) @@ -54,9 +55,8 @@ def testSimpleSpawn(self): proxy_monitor = arbiter.get_actor(proxy.aid) self.assertEqual(proxy_monitor.aid, proxy.aid) self.assertEqual(proxy_monitor.address, proxy.address) - yield self.async.assertEqual(arbiter.send(proxy, 'ping'), 'pong') - yield self.async.assertEqual(arbiter.send(proxy, 'echo', 'Hello!'), - 'Hello!') + yield self.async.assertEqual(send(proxy, 'ping'), 'pong') + yield self.async.assertEqual(send(proxy, 'echo', 'Hello!'), 'Hello!') # We call the ActorTestMixin.stop_actors method here, since the # ActorTestMixin.tearDown method is invoked on the test-worker domain # (here we are in the arbiter domain) @@ -67,24 +67,35 @@ def testSimpleSpawn(self): def testActorSpawn(self): '''Test spawning from actor domain.''' - yield self.spawn(on_task=on_task, concurrency='thread') + yield self.spawn(on_task=on_task, name='pippo') proxy = self.a - actor = pulsar.get_actor() + self.assertEqual(proxy.name, 'pippo') # The current actor is linked with the actor just spawned + actor = pulsar.get_actor() self.assertEqual(actor.get_actor(proxy.aid), proxy) - yield self.async.assertEqual(actor.send(proxy, 'ping'), 'pong') - yield self.async.assertEqual(actor.send(proxy, 'echo', 'Hello!'), - 'Hello!') - yield actor.send(proxy, 'run', check_actor) + yield self.async.assertEqual(send(proxy, 'ping'), 'pong') + yield self.async.assertEqual(send(proxy, 'echo', 'Hello!'), 'Hello!') + yield send(proxy, 'run', check_actor) def testPasswordProtected(self): - yield self.spawn(cfg={'password': 'bla', 'param': 1}) + yield self.spawn(password='bla', name='pluto') proxy = self.a - actor = pulsar.get_actor() - yield self.async.assertEqual(actor.send(proxy, 'ping'), 'pong') + self.assertEqual(proxy.name, 'pluto') + yield self.async.assertEqual(send(proxy, 'ping'), 'pong') yield self.async.assertRaises(pulsar.AuthenticationError, - actor.send(proxy, 'shutdown')) - yield self.async.assertEqual(actor.send(proxy, 'auth', 'bla'), True) + send(proxy, 'shutdown')) + yield self.async.assertEqual(send(proxy, 'auth', 'bla'), True) + + def testInfo(self): + yield self.spawn(on_task=on_task, name='pippo') + proxy = self.a + self.assertEqual(proxy.name, 'pippo') + outcome = send(proxy, 'info') + yield outcome + info = outcome.result + self.assertTrue('actor' in info) + ainfo = info['actor'] + self.assertEqual(ainfo['is_process'], self.concurrency=='process') diff --git a/tests/arbiter.py b/tests/arbiter.py index 224d8350..a83438a4 100644 --- a/tests/arbiter.py +++ b/tests/arbiter.py @@ -1,53 +1,112 @@ '''Tests for arbiter and monitors.''' import os +import time from threading import current_thread import pulsar from pulsar import send, spawn -from pulsar.apps.test import unittest, run_on_arbiter +from pulsar.async.actor import ACTOR_STOPPING_LOOPS +from pulsar.apps.test import unittest, run_on_arbiter, ActorTestMixin -class ActorA(pulsar.Actor): - '''This actor ping application "app1" and once it receive the callback -it fires a message with the result to its monitor.''' - def on_task(self): - '''Ping the application monitor "app1"''' - if not hasattr(self,'_result'): - self._result = None - self.log.debug('{0} pinging "app1"'.format(self)) - server = self.ACTOR_LINKS["app1"] - self.proxy.ping(server).add_callback(self.fire_result) - - def fire_result(self, result): - self.monitor.send(self.aid,result) +class BogusActor(pulsar.Actor): + + def __call__(self): + #This actor does not send notify messages to the arbiter + pass + def stop(self, force=False, exit_code=0): + if not self.params.failstop or\ + (self.params.failstop and force): + return super(BogusActor, self).stop(force=force, exit_code=exit_code) -class TestArbiter(unittest.TestCase): + +class TestArbiter(ActorTestMixin, unittest.TestCase): @run_on_arbiter def testSpawning(self): arbiter = pulsar.get_actor() - self.assertEqual(arbiter.aid, 'arbiter') + self.assertEqual(arbiter.name, 'arbiter') self.assertEqual(len(arbiter.monitors), 1) - self.assertEqual(arbiter.monitors['test']._spawning, {}) + self.assertEqual(arbiter.monitors['test'].spawning_actors, {}) + yield self.spawn(name='foo') + proxy = self.a + self.assertEqual(proxy.name, 'foo') + self.assertEqual(arbiter.spawning_actors, {}) + self.assertTrue(proxy.aid in arbiter.managed_actors) + arbiter.manage_actors() def testArbiter(self): worker = pulsar.get_actor() self.assertEqual(pulsar.arbiter(), None) arbiter = worker.arbiter self.assertTrue(arbiter) + self.assertEqual(arbiter.name, 'arbiter') @run_on_arbiter def testArbiterObject(self): '''Test the arbiter in its process domain''' arbiter = pulsar.get_actor() self.assertTrue(arbiter.is_arbiter()) - self.assertEqual(arbiter.impl, 'monitor') + self.assertEqual(arbiter.impl.kind, 'monitor') self.assertTrue(arbiter.monitors) self.assertEqual(arbiter.ioloop, arbiter.requestloop) self.assertFalse(arbiter.cpubound) - self.assertEqual(arbiter.close_signal, None) + self.assertEqual(arbiter.exit_code, None) info = arbiter.info() - self.assertTrue(info) + self.assertTrue('server' in info) + server = info['server'] + self.assertEqual(server['state'], 'running') + + @run_on_arbiter + def testBadMonitor(self): + arbiter = pulsar.get_actor() + self.assertTrue(arbiter.monitors) + name = list(arbiter.monitors.values())[0].name + self.assertRaises(KeyError, arbiter.add_monitor, pulsar.Monitor, name) + + @run_on_arbiter + def testTimeout(self): + arbiter = pulsar.get_actor() + self.assertTrue(arbiter.is_arbiter()) + yield self.spawn(actor_class=BogusActor, name='foo', timeout=1) + proxy = self.a + self.assertEqual(proxy.name, 'foo') + self.assertEqual(arbiter.spawning_actors, {}) + self.assertTrue(proxy.aid in arbiter.managed_actors) + proxy = arbiter.managed_actors[proxy.aid] + self.assertEqual(proxy.stopping_loops, 0) + time.sleep(1) + n = arbiter.manage_actors() + self.assertTrue(n) + self.assertEqual(proxy.stopping_loops, 1) + while proxy.aid in arbiter.managed_actors: + yield pulsar.NOT_DONE + arbiter.manage_actors() + self.assertEqual(arbiter.manage_actors(), n-1) + self.assertFalse(proxy.aid in arbiter.managed_actors) + thread_actors = pulsar.process_local_data('thread_actors') + self.assertFalse(proxy.aid in thread_actors) + + @run_on_arbiter + def testTerminate(self): + arbiter = pulsar.get_actor() + self.assertTrue(arbiter.is_arbiter()) + yield self.spawn(actor_class=BogusActor, name='foo', timeout=1, + failstop=True) + proxy = self.a + self.assertEqual(proxy.name, 'foo') + proxy = arbiter.managed_actors[proxy.aid] + self.assertEqual(proxy.stopping_loops, 0) + time.sleep(1) + n = arbiter.manage_actors() + self.assertTrue(n) + self.assertEqual(proxy.stopping_loops, 1) + while proxy.aid in arbiter.managed_actors: + yield pulsar.NOT_DONE + arbiter.manage_actors() + self.assertEqual(proxy.stopping_loops, ACTOR_STOPPING_LOOPS) + thread_actors = pulsar.process_local_data('thread_actors') + self.assertFalse(proxy.aid in thread_actors) diff --git a/tests/httpurl.py b/tests/httpurl.py index 07e3338b..d03b966b 100644 --- a/tests/httpurl.py +++ b/tests/httpurl.py @@ -157,6 +157,11 @@ def test_encode_multipart_formdata(self): idx = data.find(b'\r\n') boundary = data[2:idx].decode('utf-8') self.assertEqual(ct, 'multipart/form-data; boundary=%s' % boundary) + + def test_HttpResponse(self): + r = httpurl.HttpResponse(None) + self.assertEqual(r.status_code, None) + self.assertEqual(str(r), '') def request_callback(result): @@ -193,9 +198,9 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): if cls.app is not None: - yield send('arbiter', 'kill_actor', cls.app.mid) + yield send('arbiter', 'kill_actor', cls.app.name) if cls.proxy_app is not None: - yield send('arbiter', 'kill_actor', cls.proxy_app.mid) + yield send('arbiter', 'kill_actor', cls.proxy_app.name) def client(self, **kwargs): kwargs['timeout'] = self.timeout diff --git a/tests/iostream.py b/tests/iostream.py index 3daaad6d..826ec194 100644 --- a/tests/iostream.py +++ b/tests/iostream.py @@ -40,7 +40,7 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): if cls.server: - yield pulsar.send('arbiter', 'kill_actor', cls.server.mid) + yield pulsar.send('arbiter', 'kill_actor', cls.server.name) def client(self, **kwargs): return pulsar.ClientSocket.connect(self.server.address, **kwargs) @@ -125,6 +125,31 @@ def _call(_): # we need to run this test on the ioloop thread io.ioloop.add_callback(cbk) + def testReadTimeout(self): + client = self.client(timeout=0) + self.assertEqual(client.read_timeout, None) + client.read_timeout = 20 + self.assertEqual(client.read_timeout, 20) + r = client.execute(b'ciao') + yield r + self.assertEqual(r.result, b'ciao') + self.assertTrue(client.sock._read_timeout) + # Remove the read_timeout + client.sock.ioloop.remove_timeout(client.sock._read_timeout) + r = client.execute(b'pippo') + yield r + self.assertEqual(r.result, b'pippo') + + def testMaxBufferSize(self): + client = self.client(timeout=0) + client.sock.max_buffer_size = 10 + msg = b'this will overflow the reading buffer' + r = client.execute(msg) + yield r + self.assertEqual(r.result, msg) + self.assertTrue(client.closed) + + @dont_run_with_thread class TestPulsarStreamsProcess(TestPulsarStreams): diff --git a/tests/me.py b/tests/me.py index 32bd3505..5b9afa2e 100644 --- a/tests/me.py +++ b/tests/me.py @@ -31,12 +31,12 @@ def testWorker(self): self.assertFalse(worker.stopping()) self.assertFalse(worker.closed()) self.assertFalse(worker.stopped()) - self.assertEqual(worker.state, 'running') + self.assertEqual(worker.info_state, 'running') self.assertEqual(worker.tid, current_thread().ident) self.assertEqual(worker.pid, os.getpid()) self.assertTrue(worker.cpubound) self.assertTrue(worker.mailbox.cpubound) - self.assertTrue(worker._impl.daemon) + self.assertTrue(worker.impl.daemon) self.assertFalse(worker.is_pool()) def testWorkerMonitor(self): @@ -126,7 +126,7 @@ def testPingArbiter(self): def test_run_on_arbiter(self): actor = pulsar.get_actor() - result = actor.run_on_arbiter(simple_function) + result = actor.send('arbiter', 'run', simple_function) yield result self.assertEqual(result.result, 'success') diff --git a/tests/utils/tools.py b/tests/utils/tools.py index 7bba9ce8..40c49256 100755 --- a/tests/utils/tools.py +++ b/tests/utils/tools.py @@ -89,7 +89,7 @@ class TestPidfile(ActorTestMixin, unittest.TestCase): concurrency = 'process' def testCreate(self): - yield self.spawn() + yield self.spawn(name='pippo') proxy = self.a r = send(proxy, 'info') yield r diff --git a/tests/wsgi.py b/tests/wsgi.py index 90f672ac..f9cdccc1 100644 --- a/tests/wsgi.py +++ b/tests/wsgi.py @@ -57,7 +57,7 @@ class testWsgiApplication(unittest.TestCase): def testBuildWsgiApp(self): appserver = wsgi.WSGIServer() - self.assertEqual(appserver.mid, None) + self.assertEqual(appserver.name, 'wsgi') self.assertEqual(appserver.callable, None) def testWsgiHandler(self): @@ -70,9 +70,7 @@ def testWsgiHandler(self): def testHttpBinServer(self): from examples.httpbin.manage import server app = server(bind='127.0.0.1:0') - self.assertEqual(app.mid, None) app2 = pickle.loads(pickle.dumps(app)) - self.assertEqual(app2.mid, None) self.assertEqual(len(app.callable.middleware), len(app2.callable.middleware)) \ No newline at end of file