Skip to content

Commit

Permalink
fixing task application
Browse files Browse the repository at this point in the history
  • Loading branch information
quantmind committed Dec 6, 2012
1 parent 315575f commit 37be161
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 55 deletions.
8 changes: 7 additions & 1 deletion pulsar/apps/rpc/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from timeit import default_timer

import pulsar
from pulsar import HttpClient
from pulsar import HttpClient, is_async
from pulsar.utils.structures import AttributeDictionary
from pulsar.utils.security import gen_unique_id
from pulsar.utils.httpurl import to_string, range
Expand Down Expand Up @@ -199,6 +199,12 @@ def __call__(self, *args, **kwargs):
# Always make sure the content-type is application/json
self.http.headers['content-type'] = 'application/json'
resp = self.http.post(self.__url, data=body)
if is_async(resp):
return resp.add_callback(lambda r: self._end_call(r, raw))
else:
return self._end_call(resp, raw)

def _end_call(self, resp, raw):
content = resp.content.decode('utf-8')
if resp.is_error:
if 'error' in content:
Expand Down
6 changes: 6 additions & 0 deletions pulsar/apps/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ def job_list(client, actor, jobnames=None):
def next_scheduled(client, actor, jobnames=None):
return actor.app.scheduler.next_scheduled(jobnames=jobnames)

@pulsar.command(commands_set=taskqueue_cmnds)
def wait_for_task(client, actor, id, timeout=3600):
# wait for a task to finish for at most timeout seconds
scheduler = actor.app.scheduler
return scheduler.task_class.wait_for_task(scheduler, id, timeout)


class TaskQueue(CPUboundServer):
'''A :class:`pulsar.CPUboundServer` for consuming
Expand Down
49 changes: 27 additions & 22 deletions pulsar/apps/tasks/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pulsar.apps import rpc

from .exceptions import TaskNotAvailable
from .task import Task


__all__ = ['TaskQueueRpcMixin']
Expand All @@ -18,8 +19,8 @@ def task_to_json(task):
raise rpc.InvalidParams('Job "%s" is not available.'\
% err.task_name)
if isinstance(task, (list, tuple)):
task = [t.tojson() for t in task]
else:
task = [task_to_json(t) for t in task]
elif isinstance(task, Task):
task = task.tojson()
return task

Expand All @@ -31,32 +32,31 @@ class TaskQueueRpcMixin(rpc.JSONRPC):
and a :ref:`task queue <apps-tasks>` application installed in the
:class:`pulsar.Arbiter`.
:parameter taskqueue: set the :attr:`task_queue_manager` attribute. It can be
a :class:`pulsar.apps.tasks.TaskQueue` instance or a name of a taskqueue.
.. attribute:: task_queue_manager
A :class:`pulsar.ActorLink` for facilitating the communication
from the rpc workers to the task queue.
:parameter taskqueue: instance or name of the
:class:`pulsar.apps.tasks.TaskQueue` which exposes the remote procedure
calls.
It exposes the following remote functions:
**Remote Procedure Calls**
.. method:: job_list([jobnames=None])
Return the list of jobs registered with task queue with meta information.
If a list of jobnames is given, it returns only jobs included in the list.
Return the list of :class:`Job` registered with task queue with meta
information. If a list of jobnames is given, it returns only jobs
included in the list.
:rtype: A list of dictionaries
.. method:: run_new_task(jobname, [**kwargs])
Run a new task in the task queue. The task can be of any type
as long as it is registered in the job registry.
Run a new :class:`Task` in the task queue. The task can be of any type
as long as it is registered in the :class:`Job` registry.
:parameter jobname: the name of the job to run.
:parameter jobname: the name of the :class:`Job` to run.
:parameter kwargs: optional key-valued job parameters.
:rtype: a dictionary containing information about the request
:rtype: a dictionary containing information about the
:class:`Task` submitted
.. method:: get_task(id=task_id)
Expand All @@ -68,6 +68,11 @@ class TaskQueueRpcMixin(rpc.JSONRPC):
.. method:: get_tasks(**filters)
Retrieve a list of tasks which satisfy *filters*.
.. method:: wait_for_task(id=task_id)
Wait for a task to have finished.
'''
def __init__(self, taskqueue, **kwargs):
if not isinstance(taskqueue, str):
Expand Down Expand Up @@ -100,19 +105,19 @@ def rpc_get_task(self, request, id=None):
def rpc_get_tasks(self, request, **params):
if params:
return self._rq(request, 'get_tasks', **params).add_callback(task_to_json)

def rpc_wait_for_task(self, request, id=None):
if id:
return self._rq(request, 'wait_for_task', id).add_callback(task_to_json)

############################################################################
## INTERNALS
def task_request_parameters(self, request):
'''Internal function which returns a dictionary of parameters
'''**Internal function** which returns a dictionary of parameters
to be passed to the :class:`Task` class constructor.
This function can be overridden to add information about
the type of request, who made the request and so forth. It must return
a dictionary and it is called by the internal
:meth:`task_callback` method.
By default it returns an empty dictionary.'''
a dictionary. By default it returns an empty dictionary.'''
return {}

def _rq(self, request, action, *args, **kw):
Expand Down
67 changes: 38 additions & 29 deletions pulsar/apps/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,17 @@ def start(self, worker):
if is_async(result):
yield result
result = maybe_async(result)
self.result = result
except Exception as e:
self.result = as_failure(e)
result = as_failure(e)
finally:
yield self.finish(worker, result=self.result)
yield self.finish(worker, result)

def finish(self, worker, result):
'''called when finishing the task.'''
'''Called when the task has finished and a ``result`` is ready.
It sets the :attr:`time_end` attribute if not already set
(in case the :class:`Task` was revoked) and
determined if it was succesful or a failure. Once done it invokes
the :ref:`task callback <tasks-callbacks>` :meth:`on_finish`.'''
if not self.time_end:
self.time_end = datetime.now()
if is_failure(result):
Expand All @@ -127,7 +130,7 @@ def finish(self, worker, result):
elif self.status == STARTED:
self.status = SUCCESS
self.result = result
return self.on_finish(worker)
return self.on_finish(worker)

def to_queue(self, schedulter=None):
'''The task has been received by the scheduler. If its status
Expand All @@ -146,7 +149,8 @@ def needs_queuing(self):
return self.__dict__.pop('_toqueue', False)

def done(self):
'''Return ``True`` if the task has its staus in READY_STATES'''
'''Return ``True`` if the :class:`Task` has finshed
(its status is one of :ref:`READY_STATES <task-state>`).'''
return self.status in READY_STATES

def maybe_revoked(self):
Expand Down Expand Up @@ -186,29 +190,6 @@ def serialize_for_queue(self):
def ack(self):
return self

############################################################################
## FACTORY METHODS
############################################################################
@classmethod
def get_task(cls, scheduler, id):
'''Given a task *id* it retrieves a task instance or ``None`` if
not available.'''
raise NotImplementedError()

@classmethod
def get_tasks(cls, scheduler, **filters):
'''Given *filters* it retrieves task instances which satisfy the
filter criteria.'''
raise NotImplementedError()

@classmethod
def save_task(cls, scheduler, task):
raise NotImplementedError()

@classmethod
def delete_tasks(cls, scheduler, task):
raise NotImplementedError()

############################################################################
# CALLBACKS
############################################################################
Expand Down Expand Up @@ -247,6 +228,34 @@ def emit_log(self, record):
'''Implement the task logging emit method. By default it does nothing.
It can be reimplemented to do something with the log record.'''
pass

############################################################################
## FACTORY METHODS
############################################################################
@classmethod
def get_task(cls, scheduler, id):
'''Given a task *id* it retrieves a task instance or ``None`` if
not available.'''
raise NotImplementedError()

@classmethod
def get_tasks(cls, scheduler, **filters):
'''Given *filters* it retrieves task instances which satisfy the
filter criteria.'''
raise NotImplementedError()

@classmethod
def save_task(cls, scheduler, task):
raise NotImplementedError()

@classmethod
def delete_tasks(cls, scheduler, task):
raise NotImplementedError()

@classmethod
def wait_for_task(cls, scheduler, id, timeout):
'''Wait for a task to have finished.'''
raise NotImplementedError()


class TaskInMemory(Task):
Expand Down
2 changes: 1 addition & 1 deletion pulsar/apps/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def python_path(self):
sys.path.insert(0, path)

def on_config_init(self, cfg, params):
self.plugins = params.get('plugins')
self.plugins = params.get('plugins') or ()
if self.plugins:
for plugin in self.plugins:
cfg.settings.update(plugin.config.settings)
Expand Down
4 changes: 2 additions & 2 deletions pulsar/async/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ def mailbox_address(client, actor, caller, address):
return actor.proxy

@proxy.command(internal=True)
def run(client, actor, caller, callable):
def run(client, actor, caller, callable, *args, **kwargs):
'''Execute a python script in the server'''
return callable(actor)
return callable(actor, *args, **kwargs)

@proxy.command(ack=False, internal=True)
def stop(client, actor, caller):
Expand Down

0 comments on commit 37be161

Please sign in to comment.