From 94f02986485c4a9e43b83a70173bdadf06dc527c Mon Sep 17 00:00:00 2001 From: quantmind Date: Fri, 15 Apr 2011 15:29:39 +0100 Subject: [PATCH] working on test suite --- .gitignore | 1 + .../calculator/__init__.py | 0 examples/{ => calculator}/calculator.py | 6 +- examples/calculator/tests.py | 11 ++ examples/helloworld/__init__.py | 0 examples/{ => helloworld}/helloworld.py | 4 +- examples/helloworld/tests.py | 15 +++ pulsar/__init__.py | 16 +-- pulsar/apps/base.py | 6 +- pulsar/http/__init__.py | 2 + pulsar/http/client/__init__.py | 31 ++--- pulsar/http/client/_httplib2.py | 36 ++++++ pulsar/http/client/std.py | 56 ++++++--- pulsar/{utils => http}/rpc/__init__.py | 0 pulsar/{utils => http}/rpc/decorators.py | 0 pulsar/{utils => http}/rpc/exceptions.py | 0 pulsar/{utils => http}/rpc/handlers.py | 9 +- pulsar/{utils => http}/rpc/jsonrpc.py | 4 +- pulsar/utils/__init__.py | 1 - pulsar/utils/eventloop.py | 113 +++++++++++------- pulsar/utils/log.py | 41 +++++++ pulsar/utils/system/windowssystem.py | 3 +- pulsar/utils/{test => }/test.py | 30 ++++- pulsar/utils/test/__init__.py | 1 - pulsar/workers/arbiter.py | 39 +++--- pulsar/workers/base.py | 26 ++-- pulsar/workers/http.py | 2 +- pulsar/workers/http_t.py | 7 +- pulsar/workers/workerpool.py | 8 +- runtests.py | 58 +++++++-- tests/control.py | 46 +++---- 31 files changed, 403 insertions(+), 169 deletions(-) rename pulsar/utils/http.py => examples/calculator/__init__.py (100%) rename examples/{ => calculator}/calculator.py (89%) create mode 100755 examples/calculator/tests.py create mode 100755 examples/helloworld/__init__.py rename examples/{ => helloworld}/helloworld.py (81%) create mode 100755 examples/helloworld/tests.py create mode 100755 pulsar/http/client/_httplib2.py rename pulsar/{utils => http}/rpc/__init__.py (100%) rename pulsar/{utils => http}/rpc/decorators.py (100%) rename pulsar/{utils => http}/rpc/exceptions.py (100%) rename pulsar/{utils => http}/rpc/handlers.py (93%) rename pulsar/{utils => http}/rpc/jsonrpc.py (95%) create mode 100755 pulsar/utils/log.py rename pulsar/utils/{test => }/test.py (67%) delete mode 100644 pulsar/utils/test/__init__.py diff --git a/.gitignore b/.gitignore index cfae9f0c..fd456e14 100755 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ docs/build/* dist/* .project .pydevproject +cfg.py .settings/* lib/src/parser.c pulsar/http/cparser/parser.* \ No newline at end of file diff --git a/pulsar/utils/http.py b/examples/calculator/__init__.py similarity index 100% rename from pulsar/utils/http.py rename to examples/calculator/__init__.py diff --git a/examples/calculator.py b/examples/calculator/calculator.py similarity index 89% rename from examples/calculator.py rename to examples/calculator/calculator.py index a44883a8..72b1a772 100644 --- a/examples/calculator.py +++ b/examples/calculator/calculator.py @@ -2,8 +2,9 @@ A very simple JSON-RPC Calculator ''' import pulsar +from pulsar.http import rpc -class Calculator(pulsar.JSONRPC): +class Calculator(rpc.JSONRPC): def rpc_ping(self, request): return 'pong' @@ -27,4 +28,5 @@ def run(): if __name__ == '__main__': - run() \ No newline at end of file + run() + diff --git a/examples/calculator/tests.py b/examples/calculator/tests.py new file mode 100755 index 00000000..4faceb57 --- /dev/null +++ b/examples/calculator/tests.py @@ -0,0 +1,11 @@ +from pulsar import test +from pulsar.http import rpc + +from .calculator import run + + +class TestCalculatorExample(test.TestCase): + + def setUp(self): + self.p = rpc.JsonProxy('http://localhost:8060') + diff --git a/examples/helloworld/__init__.py b/examples/helloworld/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/examples/helloworld.py b/examples/helloworld/helloworld.py similarity index 81% rename from examples/helloworld.py rename to examples/helloworld/helloworld.py index e4291ab8..bb5ee3c3 100755 --- a/examples/helloworld.py +++ b/examples/helloworld/helloworld.py @@ -12,9 +12,9 @@ def hello(environ, start_response): return iter([data]) -def run(): +def run(**kwargs): wsgi = pulsar.require('wsgi') - wsgi.createServer(callable = hello).run() + wsgi.createServer(callable = hello, **kwargs).run() if __name__ == '__main__': diff --git a/examples/helloworld/tests.py b/examples/helloworld/tests.py new file mode 100755 index 00000000..14471104 --- /dev/null +++ b/examples/helloworld/tests.py @@ -0,0 +1,15 @@ +from pulsar import test +from pulsar.http import HttpClient + +from .helloworld import run + +class TestCalculatorExample(test.TestCase): + uri = 'http://localhost:8060' + + def setUp(self): + self.c = HttpClient() + + def testThreadWorker(self): + run(worker_class = 'http_t') + resp = self.c.request(self.uri) + self.assertEqual(resp,'Hello World!') diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 127f707d..9c5c2ab6 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,20 +1,9 @@ '''Concurrent server and message queues''' - VERSION = (0, 1, 'dev') - def get_version(): return '.'.join(map(str,VERSION)) -SERVER_NAME = "pulsar" -SERVER_SOFTWARE = "{0}/{1}".format(SERVER_NAME,get_version()) - - -def getLogger(name = None): - import logging - name = '{0}.{1}'.format(SERVER_NAME,name) if name else SERVER_NAME - return logging.getLogger(name) - __version__ = get_version() __license__ = "BSD" __author__ = "Luca Sbardella" @@ -34,6 +23,10 @@ def getLogger(name = None): 'Topic :: Utilities' ] +from .utils.log import * + +SERVER_SOFTWARE = "python-{0}/{1}".format(SERVER_NAME,get_version()) + from .utils.exceptions import * from .utils import test from .utils import system @@ -45,6 +38,5 @@ def getLogger(name = None): from .workers.base import * from .workers.arbiter import * from .apps.base import Application, require -from .utils.rpc import * diff --git a/pulsar/apps/base.py b/pulsar/apps/base.py index 8b7eadb8..37bb944a 100644 --- a/pulsar/apps/base.py +++ b/pulsar/apps/base.py @@ -61,8 +61,12 @@ def __setstate__(self, state): def load_config(self, **params): '''Load the application configuration''' - self.cfg = pulsar.Config(self.usage, **params) + self.cfg = pulsar.Config(self.usage) + # add params + for k, v in params.items(): + self.cfg.set(k.lower(), v) + # parse console args parser = self.cfg.parser() opts, args = parser.parse_args() diff --git a/pulsar/http/__init__.py b/pulsar/http/__init__.py index b8d644d1..0fbd749e 100644 --- a/pulsar/http/__init__.py +++ b/pulsar/http/__init__.py @@ -1,5 +1,7 @@ from pulsar.utils.importer import import_module +from .client import HttpClient + def get_httplib(cfg = None): name = None if not cfg else cfg.settings['httplib'].value if name == 'gunicorn': diff --git a/pulsar/http/client/__init__.py b/pulsar/http/client/__init__.py index 1746e674..fb61f512 100644 --- a/pulsar/http/client/__init__.py +++ b/pulsar/http/client/__init__.py @@ -1,29 +1,32 @@ -import urllib - -from .std import - +from .std import HttpClient1, getproxies_environment HttpClients={1:HttpClient1} +try: + from ._httplib2 import HttpClient2 + HttpClients[2] = HttpClient2 +except ImportError: + pass -def getproxy(schema = 'http'): - p = urllib.getproxies_environment() - return p.get(schema,None) +form_headers = {'Content-type': 'application/x-www-form-urlencoded'} def HttpClient(cache = None, proxy_info = None, timeout = None, type = 1, async = False): - '''Create a http client handler: + '''Create a http client handler using different implementation. +It can build a synchronous or an asyncronous handler build on top +of the :class:`pulsar.IOLoop`. - * *cache* is the http cache file. - * *proxy_info* proxy server - * *timeout* - * *type* the type of client. - ''' +:parameter cache: Cache file. Default ``None``. +:parameter proxy_info: Dictionary of proxies. Default ``None``. +:parameter timeout: Connection timeout. Default ``None``. +:parameter type: Handler implementation. Default ``1``. +:parameter async: Synchronous or Asynchronous. Default ``False``. +''' if type not in HttpClients: raise ValueError('HttpClient{0} not available'.format(type)) client = HttpClients[type] proxy = proxy_info if proxy is None: - proxy = getproxy() + proxy = getproxies_environment() return client(proxy_info = proxy, cache = cache, timeout = timeout) diff --git a/pulsar/http/client/_httplib2.py b/pulsar/http/client/_httplib2.py new file mode 100755 index 00000000..fac6083b --- /dev/null +++ b/pulsar/http/client/_httplib2.py @@ -0,0 +1,36 @@ +'''\ +The Httplib2 clinet + +This is a thin layer on top of httplib2 python library. + +http://code.google.com/p/httplib2/ +''' +import httplib2 + +from .std import HttpClientBase + + +class Response(object): + __slots__ = ('response','content') + def __init__(self, response, content): + self.response = response + self.content = content + + +class HttpClient2(HttpClientBase): + + def __init__(self, proxy_info = None, + timeout = None, cache = None, headers = None): + self._opener = httplib2.Http(cache = cache, timeout = timeout, + proxy_info = proxy_info) + + def request(self, uri, method='GET', body=None, headers = None, **kwargs): + r = self._opener.open(uri, + method=method, + body=body, + headers=self.headers(headers)) + return Response(r) + + def add_credentials(self, username, password, domain = ''): + self._opener.add_credentials(username, password, domain) + diff --git a/pulsar/http/client/std.py b/pulsar/http/client/std.py index d4d753ff..e54b49ac 100644 --- a/pulsar/http/client/std.py +++ b/pulsar/http/client/std.py @@ -4,48 +4,68 @@ This is a thin layer on top of urllib2 in python2 / urllib in Python 3 It exposes the httplib1 class from the standard library. ''' -from pulsar.utils.py2py3 import ispy3k -if ispy3k: +import pulsar +if pulsar.ispy3k: # Python 3 from urllib.request import Request, build_opener, install_opener from urllib.request import HTTPCookieProcessor, HTTPPasswordMgrWithDefaultRealm - from urllib.request import HTTPBasicAuthHandler + from urllib.request import HTTPBasicAuthHandler, ProxyHandler + from urllib.request import getproxies_environment from urllib.parse import urlencode else: # Python 2.* from urllib2 import Request, build_opener, install_opener, HTTPCookieProcessor from urllib2 import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler - from urllib import urlencode + from urllib2 import ProxyHandler + from urllib import urlencode, getproxies_environment + class Response(object): def __init__(self, response): self._response = response + + @property + def content(self): + if not hasattr(self,'_content'): + self._content = self.response.read() + return self._content - -class HttpClient1(object): - def __init__(self, proxy_info = None, timeout = None, cache = None): - self._opener = opener +class HttpClientBase(object): + DEFAULT_HEADERS = {'user-agent': pulsar.SERVER_SOFTWARE} + + def headers(self, headers): + d = self.DEFAULT_HEADERS.copy() + if not headers: + return d + else: + d.update(headers) + return d + + +class HttpClient1(HttpClientBase): + + def __init__(self, proxy_info = None, + timeout = None, cache = None, headers = None): + proxy = ProxyHandler(proxy_info) + self._opener = build_opener(proxy) self.timeout = timeout def request(self, url, body=None, **kwargs): - response = self._opener.open(url,data=body,timeout=timeout) - content = c.read() - return (Response(response),content) - - @classmethod - def auth(cls, username, password, uri, realm=None, timeout=None): - '''Create an httplib1 instance witn a basic authentication handler. -The authentication''' + response = self._opener.open(url,data=body,timeout=self.timeout) + return Response(response) + + def add_password(self, username, password, uri, realm=None): + '''Add Basic HTTP Authentication to the opener''' if realm is None: password_mgr = HTTPPasswordMgrWithDefaultRealm() else: password_mgr = HTTPPasswordMgr() password_mgr.add_password(realm, uri, user, passwd) - opener = HTTPBasicAuthHandler(password_mgr) - return cls(opener,timeout) + self._opener.add_handler(HTTPBasicAuthHandler(password_mgr)) + diff --git a/pulsar/utils/rpc/__init__.py b/pulsar/http/rpc/__init__.py similarity index 100% rename from pulsar/utils/rpc/__init__.py rename to pulsar/http/rpc/__init__.py diff --git a/pulsar/utils/rpc/decorators.py b/pulsar/http/rpc/decorators.py similarity index 100% rename from pulsar/utils/rpc/decorators.py rename to pulsar/http/rpc/decorators.py diff --git a/pulsar/utils/rpc/exceptions.py b/pulsar/http/rpc/exceptions.py similarity index 100% rename from pulsar/utils/rpc/exceptions.py rename to pulsar/http/rpc/exceptions.py diff --git a/pulsar/utils/rpc/handlers.py b/pulsar/http/rpc/handlers.py similarity index 93% rename from pulsar/utils/rpc/handlers.py rename to pulsar/http/rpc/handlers.py index 38503a27..482d1551 100755 --- a/pulsar/utils/rpc/handlers.py +++ b/pulsar/http/rpc/handlers.py @@ -3,6 +3,7 @@ import inspect from datetime import datetime +from pulsar import PickableMixin from .exceptions import NoSuchFunction __all__ = ['RpcHandler'] @@ -73,7 +74,7 @@ def __new__(cls, name, bases, attrs): BaseHandler = MetaRpcHandler('BaseRpcHandler',(object,),{'virtual':True}) -class RpcHandler(BaseHandler): +class RpcHandler(BaseHandler,PickableMixin): '''Server Handler. Sub-handlers for prefixed methods (e.g., system.listMethods) can be added with putSubHandler. By default, prefixes are @@ -96,11 +97,7 @@ def __init__(self, self.route = route if route is not None else self.route self.subHandlers = {} self.started = datetime.now() - logger = kwargs.pop('logger',None) - if logger: - self.log = logger - else: - self.log = logging.getLogger(self.__class__.__name__) + self.log = self.getLogger(**kwargs) if subhandlers: for route,handler in subhandlers.items(): if inspect.isclass(handler): diff --git a/pulsar/utils/rpc/jsonrpc.py b/pulsar/http/rpc/jsonrpc.py similarity index 95% rename from pulsar/utils/rpc/jsonrpc.py rename to pulsar/http/rpc/jsonrpc.py index 39c183e5..a9780d6f 100755 --- a/pulsar/utils/rpc/jsonrpc.py +++ b/pulsar/http/rpc/jsonrpc.py @@ -13,10 +13,10 @@ import json from pulsar.utils.crypt import gen_unique_id +from pulsar.http import HttpClient from pulsar.utils.jsontools import DefaultJSONEncoder, DefaultJSONHook from .handlers import RpcHandler -#from unuk.http import httplib __all__ = ['JSONRPC', @@ -161,7 +161,7 @@ def __init__(self, url, name = None, version = None, self.__id = id self.__data = data if data is not None else {} if not http: - self._http = httplib(proxy_info = proxies) + self._http = HttpClient(proxy_info = proxies) else: self._http = http diff --git a/pulsar/utils/__init__.py b/pulsar/utils/__init__.py index 99a80913..d3f5a12f 100644 --- a/pulsar/utils/__init__.py +++ b/pulsar/utils/__init__.py @@ -1,2 +1 @@ - diff --git a/pulsar/utils/eventloop.py b/pulsar/utils/eventloop.py index 062b9a10..ebb54c15 100644 --- a/pulsar/utils/eventloop.py +++ b/pulsar/utils/eventloop.py @@ -1,11 +1,34 @@ import logging +import weakref import time import signal import threading import errno from multiprocessing import Pipe -from .system import IObase, IOpoll, close_on_exec +from .system import IObase, IOpoll, close_on_exec, platform + + +class WeakList(object): + + def __init__(self): + self._list = [] + + def append(self, obj): + if obj: + self._list.append(weakref.ref(obj)) + + def __iter__(self): + if self._list: + ol = self._list + nl = self._list = [] + for v in ol: + obj = v() + if obj: + nl.append(v) + yield obj + else: + raise StopIteration def file_descriptor(fd): @@ -79,45 +102,22 @@ def __init__(self, impl=None, logger = None, pool_timeout = None, commnads = Non self._events = {} self._callbacks = [] self._timeouts = [] - self._loop_tasks = [] + self._loop_tasks = WeakList() self._running = False self._stopped = False '''Called when when the child process is forked''' self._blocking_signal_threshold = None - # Create a pipe that we send bogus data to when we want to wake - # the I/O loop when it is idle - #self._waker_reader, self._waker_writer = Pipe(duplex = False) - #r = self._waker_reader - #self.add_handler(r, self.readbogus, self.READ) + if platform.type == 'posix': + # Create a pipe that we send bogus data to when we want to wake + # the I/O loop when it is idle + self._waker_reader, self._waker_writer = Pipe(duplex = False) + r = self._waker_reader + self.add_handler(r, self.readbogus, self.READ) def readbogus(self, fd, events): r = self._waker_reader while r.poll(): - self.log.debug("Got bogus data {0}".format(r.recv())) - - @classmethod - def instance(cls, logger = None): - """Returns a global IOLoop instance. - - Most single-threaded applications have a single, global IOLoop. - Use this method instead of passing around IOLoop instances - throughout your code. - - A common pattern for classes that depend on IOLoops is to use - a default argument to enable programs with multiple IOLoops - but not require the argument for simpler applications: - - class MyClass(object): - def __init__(self, io_loop=None): - self.io_loop = io_loop or IOLoop.instance() - """ - if not hasattr(cls, "_instance"): - cls._instance = cls(logger = logger) - return cls._instance - - @classmethod - def initialized(cls): - return hasattr(cls, "_instance") + self.log.debug("Got wake up data {0}".format(r.recv())) def add_loop_task(self, task): self._loop_tasks.append(task) @@ -190,6 +190,10 @@ def _startup(self): self._running = True return True + def do_loop_tasks(self): + for task in self._loop_tasks: + task() + def start(self): """Starts the I/O loop. @@ -225,10 +229,9 @@ def start(self): if not self.running(): self.log.info('Exiting event loop') break - - for task in self._loop_tasks: - task() + self.do_loop_tasks() + if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. @@ -297,7 +300,7 @@ def stop(self): self.log.info("Stopping event loop") self._running = False self._stopped = True - #self._wake() + self._wake() def running(self): """Returns true if this IOLoop is currently running.""" @@ -337,10 +340,12 @@ def add_callback(self, callback): def _wake(self): '''Wake up the eventloop''' - try: - self._waker_writer.send(b'x') - except IOError: - pass + if platform.type == 'posix': + if self.running(): + try: + self._waker_writer.send(b'x') + except IOError: + pass def _run_callback(self, callback): try: @@ -361,7 +366,33 @@ def handle_callback_exception(self, callback): in sys.exc_info. """ logging.error("Exception in callback %r", callback, exc_info=True) - + + +class MainIOLoop(IOLoop): + + @classmethod + def instance(cls, logger = None): + """Returns a global MainIOLoop instance. + + Most single-threaded applications have a single, global IOLoop. + Use this method instead of passing around IOLoop instances + throughout your code. + + A common pattern for classes that depend on IOLoops is to use + a default argument to enable programs with multiple IOLoops + but not require the argument for simpler applications: + + class MyClass(object): + def __init__(self, io_loop=None): + self.io_loop = io_loop or IOLoop.instance() + """ + if not hasattr(cls, "_instance"): + cls._instance = cls(logger = logger) + return cls._instance + + @classmethod + def initialized(cls): + return hasattr(cls, "_instance") class _Timeout(object): diff --git a/pulsar/utils/log.py b/pulsar/utils/log.py new file mode 100755 index 00000000..1dbcabae --- /dev/null +++ b/pulsar/utils/log.py @@ -0,0 +1,41 @@ +SERVER_NAME = 'Pulsar' + + +__all__ = ['SERVER_NAME', + 'getLogger', + 'PickableMixin'] + + +def getLogger(name = None): + import logging + name = '{0}.{1}'.format(SERVER_NAME,name) if name else SERVER_NAME + return logging.getLogger(name) + + +class PickableMixin(object): + _class_code = None + + def __getstate__(self): + d = self.__dict__.copy() + d.pop('log',None) + return d + + def __setstate__(self, state): + self.__dict__ = state + self.log = getLogger(self.class_code) + self.configure_logging() + + def getLogger(self, **kwargs): + logger = kwargs.pop('logger',None) + return logger or getLogger(self.class_code) + + @property + def class_code(self): + return self.__class__.code() + + @classmethod + def code(cls): + return cls._class_code or cls.__name__ + + def configure_logging(self): + pass diff --git a/pulsar/utils/system/windowssystem.py b/pulsar/utils/system/windowssystem.py index cb1a50c3..69fd9511 100755 --- a/pulsar/utils/system/windowssystem.py +++ b/pulsar/utils/system/windowssystem.py @@ -15,8 +15,7 @@ def fromfd(fd, family, type, proto=0): Create a socket object from a duplicate of the given file descriptor. The remaining arguments are the same as for socket(). """ - nfd = dup(fd) - return socket(family, type, proto, nfd) + raise NotImplementedError('Cannot duplicate socket from file descriptor') socket.fromfd = fromfd diff --git a/pulsar/utils/test/test.py b/pulsar/utils/test.py similarity index 67% rename from pulsar/utils/test/test.py rename to pulsar/utils/test.py index b9422955..c4c9593c 100644 --- a/pulsar/utils/test/test.py +++ b/pulsar/utils/test.py @@ -1,6 +1,8 @@ import unittest import inspect +from threading import Thread +from pulsar.utils.eventloop import MainIOLoop TextTestRunner = unittest.TextTestRunner TestSuite = unittest.TestSuite @@ -10,12 +12,26 @@ class TestCase(unittest.TestCase): pass +class TestRunnerThread(Thread): + + def __init__(self, suite, verbosity): + Thread.__init__(self) + self.verbosity = verbosity + self.suite = suite + + def run(self): + self.result = TextTestRunner(verbosity = self.verbosity).run(self.suite) + ioloop = MainIOLoop.instance() + ioloop.stop() + + class TestSuiteRunner(object): '''A suite runner with twisted if available.''' def __init__(self, verbosity = 1, itags = None): self.verbosity = verbosity self.itags = itags + self.ioloop = MainIOLoop.instance() def setup_test_environment(self): pass @@ -26,20 +42,22 @@ def teardown_test_environment(self): def run_tests(self, modules): self.setup_test_environment() suite = self.build_suite(modules) - self.run_suite(suite) - - def close_tests(self, result): + result = self.run_suite(suite) self.teardown_test_environment() - return self.suite_result(suite, result) + return self.suite_result(result) def build_suite(self, modules): loader = TestLoader() return loader.loadTestsFromModules(modules, itags = self.itags) def run_suite(self, suite): - return TextTestRunner(verbosity = self.verbosity).run(suite) + ioloop = self.ioloop + self.runner = TestRunnerThread(suite,self.verbosity) + ioloop.add_callback(self.runner.start) + ioloop.start() + return self.runner.result - def suite_result(self, suite, result, **kwargs): + def suite_result(self, result): return len(result.failures) + len(result.errors) diff --git a/pulsar/utils/test/__init__.py b/pulsar/utils/test/__init__.py deleted file mode 100644 index 273ad34e..00000000 --- a/pulsar/utils/test/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .test import * \ No newline at end of file diff --git a/pulsar/workers/arbiter.py b/pulsar/workers/arbiter.py index 61615cbc..cb70f77b 100644 --- a/pulsar/workers/arbiter.py +++ b/pulsar/workers/arbiter.py @@ -16,7 +16,7 @@ from pulsar.utils.py2py3 import iteritems, map, range from pulsar.utils.pidfile import Pidfile from pulsar.utils import system -from pulsar.http import get_httplib +from pulsar.utils.eventloop import MainIOLoop from .workerpool import WorkerPool from .base import ArbiterBase, ThreadQueue @@ -66,32 +66,37 @@ def __init__(self, app): 0: sys.executable } + def get_eventloop(self): + return MainIOLoop.instance(logger = self.log) + def setup(self): self.log.info("Starting pulsar %s" % pulsar.__version__) self.address = self.cfg.address self.debug = self.cfg.debug - self.ioloop.add_loop_task(self.arbiter) + self.ioloop.add_loop_task(self) # Create the listener if not available if not self.LISTENER: self.LISTENER = system.create_socket(self) - worker_class = self.cfg.worker_class - # Setup the server pool of workers - cfg = self.cfg - pool = WorkerPool(worker_class, - cfg.workers, - app = self.app, - timeout = cfg.timeout, - socket = self.LISTENER) - self._pools.append(pool) - - worker_class.modify_arbiter_loop(pool,self.ioloop) + self.addpool(self.cfg, self.LISTENER) if self.debug: self.log.debug("Current configuration:") for config, value in sorted(self.cfg.settings.items()): self.log.debug(" %s: %s" % (config, value.value)) + + def addpool(self, cfg, socket = None, start = True): + worker_class = cfg.worker_class + pool = WorkerPool(self.ioloop, + worker_class, + cfg.workers, + app = self.app, + timeout = cfg.timeout, + socket = socket) + self._pools.append(pool) + if start: + pool.start() def __repr__(self): return self.__class__.__name__ @@ -99,7 +104,8 @@ def __repr__(self): def __str__(self): return self.__repr__() - def arbiter(self): + def __call__(self): + '''Called by the Event loop to perform the Arbiter tasks''' while True: try: sig = self.SIG_QUEUE.get(timeout = self.SIG_TIMEOUT) @@ -127,12 +133,7 @@ def _run(self): Initialize the arbiter. Start listening and set pidfile if needed. """ ioloop = self.ioloop - if ioloop._stopped: - ioloop._stopped = False - return False - assert not ioloop.running(), 'cannot start arbiter twice' self.pid = os.getpid() - if self.cfg.pidfile is not None: self.pidfile = Pidfile(self.cfg.pidfile) self.pidfile.create(self.pid) diff --git a/pulsar/workers/base.py b/pulsar/workers/base.py index 46af90bd..1bac5484 100644 --- a/pulsar/workers/base.py +++ b/pulsar/workers/base.py @@ -17,7 +17,7 @@ from multiprocessing.queues import Queue, Empty from threading import current_thread, Thread -from pulsar import getLogger +import pulsar from pulsar.utils.eventloop import IOLoop from pulsar.utils import system @@ -31,27 +31,30 @@ 'WorkerThread', 'WorkerProcess'] +_main_thread = current_thread() -class Runner(object): + +class Runner(pulsar.PickableMixin): '''Base class for classes with an event loop. ''' DEF_PROC_NAME = 'pulsar' SIG_QUEUE = None - worker_name = None def init_process(self): '''Initialise the runner. This function will block the current thread since it enters the event loop. If the runner is a instance of a subprocess, this function is called after fork by the run method.''' - self.worker_name = self.worker_name or self.__class__.__name__ - self.log = getLogger(self.worker_name) - self.ioloop = IOLoop(impl = self.get_ioimpl(), logger = self.log) + self.log = self.getLogger() + self.ioloop = self.get_eventloop() self.set_proctitle() self.setup() self.install_signals() self._run() + def get_eventloop(self): + return IOLoop(impl = self.get_ioimpl(), logger = self.log) + def get_ioimpl(self): '''Return the event-loop implementation. By default it returns ``None``.''' return None @@ -65,9 +68,14 @@ def set_proctitle(self): proc_name = self.DEF_PROC_NAME system.set_proctitle("{0} - {1}".format(proc_name,self)) + def current_thread(self): + '''Return the current thread''' + return current_thread() + def install_signals(self): '''Initialise signals for correct signal handling.''' - if not self.isthread: + current = self.current_thread() + if current == _main_thread and not self.isthread: self.log.debug('Installing signals') sfun = getattr(self,'signal',None) for name in system.ALL_SIGNALS: @@ -183,6 +191,10 @@ def modify_arbiter_loop(cls, wp, ioloop): ''' pass + @classmethod + def clean_arbiter_loop(cls, wp, ioloop): + pass + def _run(self): self.ioloop.start() diff --git a/pulsar/workers/http.py b/pulsar/workers/http.py index 86443236..bb7a5252 100644 --- a/pulsar/workers/http.py +++ b/pulsar/workers/http.py @@ -104,7 +104,7 @@ def response(self, resp, environ): class Worker(WorkerProcess,HttpMixin): '''A Http worker on a child process''' - worker_name = 'Worker.HttpProcess' + _class_code = 'HttpProcess' def _run(self, ioloop = None): ioloop = self.ioloop diff --git a/pulsar/workers/http_t.py b/pulsar/workers/http_t.py index b4dd5a6e..6092c22f 100644 --- a/pulsar/workers/http_t.py +++ b/pulsar/workers/http_t.py @@ -12,7 +12,7 @@ def handle(self, fd, req): class Worker(WorkerThread,HttpMixin): '''A Http worker on a thread. This worker process http requests from the pool queue.''' - worker_name = 'Worker.HttpThread' + _class_code = 'HttpThread' def get_ioimpl(self): return get_task_loop(self) @@ -28,4 +28,9 @@ def modify_arbiter_loop(cls, wp, ioloop): ioloop.add_handler(wp.socket, HttpPoolHandler(wp), ioloop.READ) + + @classmethod + def clean_arbiter_loop(cls, wp, ioloop): + if wp.socket: + ioloop.remove_handler(wp.socket) diff --git a/pulsar/workers/workerpool.py b/pulsar/workers/workerpool.py index 097ed1d5..ba9936ca 100644 --- a/pulsar/workers/workerpool.py +++ b/pulsar/workers/workerpool.py @@ -1,4 +1,5 @@ import os +import sys import time from multiprocessing import Pipe @@ -36,17 +37,19 @@ class WorkerPool(HttpMixin): _state = 0x0 def __init__(self, + ioloop, worker_class, num_workers, app = None, timeout = 30, socket = None): + self.ioloop = ioloop self.worker_class = worker_class self.num_workers = num_workers self.timeout = timeout self.worker_age = 0 self.app = app - self.log = getLogger(worker_class.worker_name or worker_class.__name__) + self.log = getLogger(worker_class.code()) if self.app: self.cfg = getattr(app,'cfg',None) else: @@ -78,6 +81,7 @@ def arbiter(self): def start(self): if not self._state: + self.worker_class.modify_arbiter_loop(self,self.ioloop) self._state = self.RUN self.spawn_workers() else: @@ -99,11 +103,13 @@ def close(self): self._state = self.CLOSE for wid in list(self.WORKERS): self.stop_worker(wid) + self.worker_class.clean_arbiter_loop(self,self.ioloop) def terminate(self): '''Force each worker to terminate''' self.close() if self._state < self.TERMINATE: + self._state = self.TERMINATE for wid, proc in list(iteritems(self.WORKERS)): w = proc['worker'] if not w.is_alive(): diff --git a/runtests.py b/runtests.py index aef29d32..ceb08019 100644 --- a/runtests.py +++ b/runtests.py @@ -1,10 +1,7 @@ #!/usr/bin/env python import os import sys -from optparse import OptionParser - -import pulsar -from tests.control import run +from optparse import OptionParser def makeoptions(): @@ -32,18 +29,59 @@ def makeoptions(): help="Set the HTTP_PROXY environment variable") return parser + + +class TestExtractor(object): + TESTMAPPING = {'regression':'tests','bench':'bench','profile':'profile'} + def __init__(self, path): + self.path = path + + def testdir(self, testtype): + return os.path.join(self.path,testtype) -if __name__ == '__main__': + def test_module(self, testtype, loc, app): + return '{0}.{1}.tests'.format(loc,app) + + +class ExampleExtractor(TestExtractor): + + def testdir(self, testtype): + return self.path + + def test_module(self, testtype, loc, app): + name = self.TESTMAPPING[testtype] + return '{0}.{1}.{2}'.format(loc,app,name) + + +def run(): + '''To perform preprocessing before tests add a cfg.py module''' + dirs = (('examples',ExampleExtractor), + ('tests',TestExtractor)) + from tests.control import run try: - import _dep + import cfg except ImportError: pass options, tags = makeoptions().parse_args() - if options.proxy: - from dynts.conf import settings - settings.proxies['http'] = options.proxy + p = lambda x : os.path.split(x)[0] + path = p(os.path.abspath(__file__)) + + running_tests = [] + for t,c in dirs: + p = os.path.join(path,t) + if p not in sys.path: + sys.path.insert(0, p) + running_tests.append(c(p)) + + #if options.proxy: + # settings.proxies['http'] = options.proxy run(tags, options.test_type, + directories = running_tests, verbosity=options.verbosity, - show_list=options.show_list) \ No newline at end of file + show_list=options.show_list) + + +if __name__ == '__main__': + run() \ No newline at end of file diff --git a/tests/control.py b/tests/control.py index 951424a5..9f58598d 100644 --- a/tests/control.py +++ b/tests/control.py @@ -17,7 +17,6 @@ def emit(self, record): def get_tests(dirpath): - tests = [] join = os.path.join loc = os.path.split(dirpath)[1] for d in os.listdir(dirpath): @@ -25,22 +24,23 @@ def get_tests(dirpath): yield (loc,d) -def import_tests(tags,testdir): - for loc,app in get_tests(testdir): - if tags and app not in tags: - logger.debug("Skipping tests for %s" % app) - continue - logger.debug("Try to import tests for %s" % app) - test_module = '{0}.{1}.tests'.format(loc,app) +def import_tests(tags,testtype,extractors): + for extractor in extractors: + testdir = extractor.testdir(testtype) + for loc,app in get_tests(testdir): + if tags and app not in tags: + logger.debug("Skipping tests for %s" % app) + continue + logger.debug("Try to import tests for %s" % app) + test_module = extractor.test_module(testtype,loc,app) + try: + mod = importer.import_module(test_module) + except ImportError as e: + logger.debug("Could not import tests for %s: %s" % (test_module,e)) + continue - try: - mod = importer.import_module(test_module) - except ImportError as e: - logger.debug("Could not import tests for %s: %s" % (test_module,e)) - continue - - logger.debug("Adding tests for %s" % app) - yield mod + logger.debug("Adding tests for %s" % app) + yield mod def setup_logging(verbosity): @@ -52,14 +52,16 @@ def setup_logging(verbosity): logger.setLevel(level) -def run(tags = None, testtype = None, verbosity = 1, show_list = False, itags = None): - curdir = filesystem.filedir(__file__) - if curdir not in sys.path: - sys.path.insert(0,curdir) +def run(tags = None, + testtype = None, + directories = None, + verbosity = 1, + show_list = False, itags = None): + if not directories: + return testtype = testtype or 'regression' - testdir = os.path.join(curdir,testtype) setup_logging(verbosity) - modules = import_tests(tags,testdir) + modules = import_tests(tags,testtype,directories) runner = test.TestSuiteRunner(verbosity = verbosity, itags = itags) runner.run_tests(modules) \ No newline at end of file