Skip to content

Commit

Permalink
all tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
quantmind committed Dec 10, 2012
1 parent 7a0bf5a commit fedefb1
Show file tree
Hide file tree
Showing 30 changed files with 339 additions and 273 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Version 0.4.2
========================
* Fixed bug in boolean validation.
* Refactored :class:`pulsar.apps.test.TestPlugin` to handle multi-parameters.
* **310 regression tests**, **84% coverage**.
* **320 regression tests**, **84% coverage**.

Version 0.4.1 - 2012-Dec-04
==============================
Expand Down
5 changes: 2 additions & 3 deletions examples/calculator/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
if cls.app:
return send('arbiter', 'kill_actor', cls.app.mid)
return send('arbiter', 'kill_actor', cls.app.name)

def setUp(self):
self.assertEqual(self.p.url, self.uri)
Expand All @@ -46,7 +46,6 @@ def testHandler(self):
hnd = root.subHandlers['calc']
self.assertFalse(hnd.isroot())
self.assertEqual(hnd.subHandlers, {})
self.assertTrue(s.mid)

# Pulsar server commands
def testPing(self):
Expand Down Expand Up @@ -82,7 +81,7 @@ def testDivide(self):
result = self.p.calc.divide(50, 25)
self.assertEqual(result, 2)

def testAInfo(self):
def testInfo(self):
result = self.p.server_info()
self.assertTrue('server' in result)
server = result['server']
Expand Down
2 changes: 1 addition & 1 deletion examples/helloworld/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def hello(environ, start_response):
return iter([data])


def server(description = None, **kwargs):
def server(description=None, **kwargs):
description = description or 'Pulsar Hello World Application'
return wsgi.WSGIServer(callable=hello,
description=description,
Expand Down
10 changes: 7 additions & 3 deletions examples/helloworld/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ def name(cls):

@classmethod
def setUpClass(cls):
s = server(bind='127.0.0.1:0', name=cls.name(),
concurrency=cls.concurrency)
name = name=cls.name()
kwargs = {'%s__bind' % name: '127.0.0.1:0'}
s = server(name=cls.name(), concurrency=cls.concurrency, **kwargs)
outcome = send('arbiter', 'run', s)
yield outcome
cls.app = outcome.result
Expand All @@ -27,14 +28,17 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
if cls.app is not None:
outcome = send('arbiter', 'kill_actor', cls.app.mid)
outcome = send('arbiter', 'kill_actor', cls.app.name)
yield outcome

@run_on_arbiter
def testMeta(self):
app = get_application(self.name())
self.assertEqual(app.name, self.name())
self.assertTrue(app.monitor.running())
self.assertEqual(app, app.app)
self.assertEqual(str(app), app.name)
self.assertEqual(app.cfg.bind, '127.0.0.1:0')

def testResponse(self):
c = HttpClient()
Expand Down
3 changes: 1 addition & 2 deletions examples/taskqueue/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ class server(pulsar.MultiApp):
def __call__(self, actor=None):
name = self.name
params = self.params
tq = tasks.TaskQueue(name=name, callable=dummy,
tasks_path=TASK_PATHS,
tq = tasks.TaskQueue(name=name, tasks_path=TASK_PATHS,
script=__file__, **params)
self.apps.append(tq)
rpcs = wsgi.WSGIServer(rpc.RpcMiddleware(RpcRoot(tq)),
Expand Down
3 changes: 1 addition & 2 deletions examples/taskqueue/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def name_rpc(cls):
@classmethod
def setUpClass(cls):
# The name of the task queue application
s = server(cls.name_tq(),
bind ='127.0.0.1:0',
s = server(cls.name_tq(), bind='127.0.0.1:0',
concurrency=cls.concurrency)
outcome = send('arbiter', 'run', s)
yield outcome
Expand Down
8 changes: 4 additions & 4 deletions examples/websocket/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
if cls.app is not None:
outcome = send('arbiter', 'kill_actor', cls.app.mid)
outcome = send('arbiter', 'kill_actor', cls.app.name)
yield outcome

def headers(self, extensions=None, protocol=None):
Expand All @@ -57,17 +57,17 @@ def testBadRequests(self):
response = outcome.result
self.assertEqual(response.status_code, 400)
#
outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-Key','')])
outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-Key', '')])
yield outcome
response = outcome.result
self.assertEqual(response.status_code, 400)
#
outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-Key','bla')])
outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-Key', 'bla')])
yield outcome
response = outcome.result
self.assertEqual(response.status_code, 400)
#
outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-version','xxx')])
outcome = c.get(self.ws_uri, headers=[('Sec-Websocket-version', 'xxx')])
yield outcome
response = outcome.result
self.assertEqual(response.status_code, 400)
Expand Down
45 changes: 6 additions & 39 deletions pulsar/apps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ class Worker(ApplicationHandlerMixin, Actor):
The application handler obtained from :meth:`Application.handler`.
"""
@property
def class_code(self):
return 'worker %s' % self.app.name

def on_init(self, app=None, **kwargs):
self.app = app
self.information = LogInformation(self.cfg.logevery)
Expand Down Expand Up @@ -182,9 +178,6 @@ def on_exit(self):
except:
pass

def clean_up(self):
self.worker_class.clean_arbiter_loop(self,self.ioloop)

def actorparams(self):
'''Override the :meth:`Monitor.actorparams` method to
updated actor parameters with information about the application.
Expand Down Expand Up @@ -291,7 +284,6 @@ class Application(pulsar.Pulsar):
cfg = {}
_app_name = None
description = None
mid = None
epilog = None
cfg_apps = None
config_options_include = None
Expand Down Expand Up @@ -341,7 +333,10 @@ def __init__(self,
def __call__(self, actor=None):
if actor is None:
actor = get_actor()
if not self.mid and (not actor or actor.is_arbiter()):
monitor = None
if actor and actor.is_arbiter():
monitor = actor.monitors.get(self.name)
if monitor is None and (not actor or actor.is_arbiter()):
# Add events
self.local.events = {'on_start': Deferred(), 'on_stop': Deferred()}
self.cfg.on_start()
Expand Down Expand Up @@ -381,7 +376,7 @@ def __repr__(self):
return self.name

def __str__(self):
return self.name
return self.__repr__()

@property
def monitor(self):
Expand Down Expand Up @@ -426,15 +421,6 @@ def get_ioqueue(self):
By default it returns ``None``.'''
return None

def put(self, request):
queue = self.ioqueue
if queue:
self.logger.debug('Put %s on IO queue', request)
queue.put(('request', request))
else:
self.logger.error("Trying to put a request on task queue,\
but there isn't one!")

def on_config_init(self, cfg, params):
'''Callback when configuration is initialised but not yet loaded.
This is a chance to add extra config parameter or remove unwanted ones.
Expand Down Expand Up @@ -511,7 +497,7 @@ def load_config(self, argv, version, parse_console, params):
self.cfg.settings[k].default = v
except AttributeError:
if not self.add_to_overrides(k, v, overrides):
setattr(self,k,v)
setattr(self, k, v)
# parse console args
if parse_console:
parser = self.cfg.parser()
Expand Down Expand Up @@ -613,25 +599,6 @@ def start(self):
arbiter.start()
return self

def stop(self):
'''Stop the application.'''
arbiter = pulsar.arbiter()
if arbiter:
monitor = arbiter.get_actor(self.name)
if monitor:
monitor.stop()

def actorlinks(self, links):
if not links:
raise StopIteration
else:
arbiter = pulsar.arbiter()
for name,app in links.items():
if app.mid in arbiter.monitors:
monitor = arbiter.monitors[app.mid]
monitor.actor_links[self.name] = self
yield name, app


class MultiApp:

Expand Down
17 changes: 7 additions & 10 deletions pulsar/apps/rpc/handlers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import sys
import inspect
import logging

from pulsar import LogginMixin, to_bytes, is_failure, log_failure, is_async,\
from pulsar import to_bytes, is_failure, log_failure, is_async,\
as_failure, maybe_async, HttpException
from pulsar.utils.tools import checkarity
from pulsar.apps.wsgi import WsgiResponse, WsgiResponseGenerator
Expand All @@ -12,13 +13,13 @@

__all__ = ['RpcHandler', 'RpcMiddleware']

LOGGER = logging.getLogger('pulsar.rpc')

class RpcRequest(object):

def __init__(self, environ, handler, method, func, args,
kwargs, id, version):
self.environ = environ
self.logger = handler.logger
self.handler = handler
self.method = method
self.func = func
Expand Down Expand Up @@ -75,7 +76,7 @@ def __iter__(self):
request.version,
result=result)
except Exception as e:
handler.log.error('Could not serialize', exc_info=True)
LOGGER.error('Could not serialize', exc_info=True)
status_code = 500
result = handler.dumps(request.id,
request.version,
Expand Down Expand Up @@ -124,10 +125,7 @@ def __new__(cls, name, bases, attrs):
return make(cls, name, bases, attrs)


BaseHandler = MetaRpcHandler('BaseRpcHandler',(LogginMixin,),{'virtual':True})


class RpcHandler(BaseHandler):
class RpcHandler(MetaRpcHandler('_RpcHandler', (object,), {'virtual': True})):
'''The base class for rpc handlers.
.. attribute:: content_type
Expand All @@ -152,7 +150,6 @@ def __init__(self, subhandlers=None, title=None, documentation=None,
self.subHandlers = {}
self.title = title or self.__class__.__name__
self.documentation = documentation or ''
self.setlog(**kwargs)
if subhandlers:
for prefix,handler in subhandlers.items():
if inspect.isclass(handler):
Expand Down Expand Up @@ -189,14 +186,14 @@ def get_method_and_args(self, data):
raise NotImplementedError()

def __getstate__(self):
d = super(RpcHandler,self).__getstate__()
d = self.__dict__.copy()
if not self.isroot():
# Avoid duplicating handlers
d['_parent'] = True
return d

def __setstate__(self, state):
super(RpcHandler,self).__setstate__(state)
self.__dict__ = state
for handler in self.subHandlers.values():
handler._parent = self

Expand Down
6 changes: 3 additions & 3 deletions pulsar/apps/rpc/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ class JSONRPC(RpcHandler):
def get_method_and_args(self, data):
'''Overrides the :meth:`RpcHandler:get_method_and_args` to obtain
method data from the JSON *data* string.'''
if not isinstance(data,dict):
if not isinstance(data, dict):
data = self._json.loads(data)
method = data.get('method',None)
params = data.get('params',None)
id = data.get('id',None)
version = data.get('jsonrpc',None)
kwargs = {}
args = ()
if isinstance(params,dict):
for k,v in params.items():
if isinstance(params, dict):
for k, v in params.items():
kwargs[str(k)] = v
elif params:
args = tuple(params)
Expand Down
12 changes: 5 additions & 7 deletions pulsar/apps/socket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,24 @@ def monitor_init(self, monitor):
if not pulsar.platform.multiProcessSocket()\
or cfg.concurrency == 'thread':
cfg.set('workers', 0)
if not self.socket_server_class:
raise TypeError('Socket server class not specified.')

def monitor_start(self, monitor):
# Open the socket and bind to address
address = self.cfg.address
if address:
socket = pulsar.create_socket(address,
logger=monitor.logger,
backlog=self.cfg.backlog)
socket = pulsar.create_socket(address, backlog=self.cfg.backlog)
else:
raise pulsar.ImproperlyConfigured('Could not open a socket. '
'No address to bind to')
self.logger.info('Listening on %s', socket)
monitor.socket = socket
monitor.params.socket = socket
self.address = socket.name

def worker_start(self, worker):
# Start the worker by starting the socket server
if not self.socket_server_class:
raise TypeError('Socket server class not specified.')
s = self.socket_server_class(worker, worker.socket)
s = self.socket_server_class(worker, worker.params.socket)
# We add the file descriptor handler
s.on_connection_callbacks.append(worker.handle_fd_event)
worker.socket_server = s.start()
Expand Down
2 changes: 1 addition & 1 deletion pulsar/apps/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def all_spawned(self):

def spawn(self, concurrency=None, **kwargs):
concurrency = concurrency or self.concurrency
ad = pulsar.spawn(concurrency=concurrency,**kwargs)
ad = pulsar.spawn(concurrency=concurrency, **kwargs)
self.assertTrue(ad.aid)
self.assertTrue(isinstance(ad, pulsar.ActorProxyDeferred))
yield ad
Expand Down
7 changes: 6 additions & 1 deletion pulsar/async/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,15 @@ def thread_ioloop(ioloop=None):
def set_actor(actor):
'''Returns the actor running the current thread.'''
actor = thread_local_data('actor', value=actor)
if actor.impl == 'thread':
if actor.impl.kind == 'thread':
process_local_data('thread_actors')[actor.aid] = actor
return actor

def remove_actor(actor):
'''Remove actor from threaded_actors dictionary'''
if actor.impl.kind == 'thread':
process_local_data('thread_actors').pop(actor.aid, None)

def get_actor_from_id(aid):
'''Retrieve an actor from its actor id. This function can be used by
actors with thread concurrency ince they live in the arbiter process domain.'''
Expand Down
Loading

0 comments on commit fedefb1

Please sign in to comment.