Skip to content

Commit

Permalink
Add nanny logs (#2744)
Browse files Browse the repository at this point in the history
  • Loading branch information
TomAugspurger authored and mrocklin committed Jun 6, 2019
1 parent 0696a1f commit 587be8d
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 55 deletions.
20 changes: 14 additions & 6 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,10 @@ def run(self, function, *args, **kwargs):
wait: boolean (optional)
If the function is asynchronous whether or not to wait until that
function finishes.
nanny : bool, defualt False
Whether to run ``function`` on the nanny. By default, the function
is run on the worker process. If specified, the addresses in
``workers`` should still be the worker addresses, not the nanny addresses.
Examples
--------
Expand Down Expand Up @@ -3354,7 +3358,7 @@ def get_scheduler_logs(self, n=None):
Parameters
----------
n: int
n : int
Number of logs to retrive. Maxes out at 10000 by default,
confiruable in config.yaml::log-length
Expand All @@ -3364,23 +3368,27 @@ def get_scheduler_logs(self, n=None):
"""
return self.sync(self.scheduler.logs, n=n)

def get_worker_logs(self, n=None, workers=None):
def get_worker_logs(self, n=None, workers=None, nanny=False):
""" Get logs from workers
Parameters
----------
n: int
n : int
Number of logs to retrive. Maxes out at 10000 by default,
confiruable in config.yaml::log-length
workers: iterable
List of worker addresses to retrive. Gets all workers by default.
workers : iterable
List of worker addresses to retrieve. Gets all workers by default.
nanny : bool, default False
Whether to get the logs from the workers (False) or the nannies (True). If
specified, the addresses in `workers` should still be the worker addresses,
not the nanny addresses.
Returns
-------
Dictionary mapping worker address to logs.
Logs are returned in reversed order (newest first)
"""
return self.sync(self.scheduler.worker_logs, n=n, workers=workers)
return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny)

def retire_workers(self, workers=None, close_workers=True, **kwargs):
""" Retire certain workers on the scheduler
Expand Down
2 changes: 2 additions & 0 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(
protocol=None,
**worker_kwargs
):
self._setup_logging(logger)
self.loop = loop or IOLoop.current()
self.security = security or Security()
assert isinstance(self.security, Security)
Expand Down Expand Up @@ -130,6 +131,7 @@ def __init__(
"kill": self.kill,
"restart": self.restart,
# cannot call it 'close' on the rpc side for naming conflict
"get_logs": self.get_logs,
"terminate": self.close,
"close_gracefully": self.close_gracefully,
"run": self.run,
Expand Down
24 changes: 23 additions & 1 deletion distributed/node.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import print_function, division, absolute_import

import warnings
import logging

from tornado.ioloop import IOLoop
import dask

from .compatibility import unicode
from .compatibility import unicode, finalize
from .core import Server, ConnectionPool
from .versions import get_versions
from .utils import DequeHandler


class Node(object):
Expand Down Expand Up @@ -131,6 +134,25 @@ def stop_services(self):
def service_ports(self):
return {k: v.port for k, v in self.services.items()}

def _setup_logging(self, logger):
self._deque_handler = DequeHandler(
n=dask.config.get("distributed.admin.log-length")
)
self._deque_handler.setFormatter(
logging.Formatter(dask.config.get("distributed.admin.log-format"))
)
logger.addHandler(self._deque_handler)
finalize(self, logger.removeHandler, self._deque_handler)

def get_logs(self, comm=None, n=None):
deque_handler = self._deque_handler
if n is None:
L = list(deque_handler.deque)
else:
L = deque_handler.deque
L = [L[-i] for i in range(min(n, len(L)))]
return [(msg.levelname, deque_handler.format(msg)) for msg in L]

async def __aenter__(self):
await self
return self
Expand Down
28 changes: 5 additions & 23 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
key_split,
validate_key,
no_default,
DequeHandler,
parse_timedelta,
parse_bytes,
PeriodicCallback,
Expand Down Expand Up @@ -844,7 +843,7 @@ def __init__(
dashboard_address=None,
**kwargs
):
self._setup_logging()
self._setup_logging(logger)

# Attributes
self.allowed_failures = allowed_failures
Expand Down Expand Up @@ -1329,16 +1328,6 @@ def close_worker(self, stream=None, worker=None, safe=None):
self.worker_send(worker, {"op": "close", "report": False})
self.remove_worker(address=worker, safe=safe)

def _setup_logging(self):
self._deque_handler = DequeHandler(
n=dask.config.get("distributed.admin.log-length")
)
self._deque_handler.setFormatter(
logging.Formatter(dask.config.get("distributed.admin.log-format"))
)
logger.addHandler(self._deque_handler)
finalize(self, logger.removeHandler, self._deque_handler)

###########
# Stimuli #
###########
Expand Down Expand Up @@ -4627,18 +4616,11 @@ def get_profile_metadata(

raise gen.Return({"counts": counts, "keys": keys})

def get_logs(self, comm=None, n=None):
deque_handler = self._deque_handler
if n is None:
L = list(deque_handler.deque)
else:
L = deque_handler.deque
L = [L[-i] for i in range(min(n, len(L)))]
return [(msg.levelname, deque_handler.format(msg)) for msg in L]

@gen.coroutine
def get_worker_logs(self, comm=None, n=None, workers=None):
results = yield self.broadcast(msg={"op": "get_logs", "n": n}, workers=workers)
def get_worker_logs(self, comm=None, n=None, workers=None, nanny=False):
results = yield self.broadcast(
msg={"op": "get_logs", "n": n}, workers=workers, nanny=nanny
)
raise gen.Return(results)

###########
Expand Down
16 changes: 14 additions & 2 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5111,7 +5111,7 @@ def test_task_metadata(c, s, a, b):
assert result == {"a": {"c": {"d": 1}}, "b": 2}


@gen_cluster(client=True)
@gen_cluster(client=True, Worker=Nanny)
def test_logs(c, s, a, b):
yield wait(c.map(inc, range(5)))
logs = yield c.get_scheduler_logs(n=5)
Expand All @@ -5121,11 +5121,23 @@ def test_logs(c, s, a, b):
assert "distributed.scheduler" in msg

w_logs = yield c.get_worker_logs(n=5)
assert set(w_logs.keys()) == {a.address, b.address}
assert set(w_logs.keys()) == {a.worker_address, b.worker_address}
for log in w_logs.values():
for _, msg in log:
assert "distributed.worker" in msg

n_logs = yield c.get_worker_logs(nanny=True)
assert set(n_logs.keys()) == {a.worker_address, b.worker_address}
for log in n_logs.values():
for _, msg in log:
assert "distributed.nanny" in msg

n_logs = yield c.get_worker_logs(nanny=True, workers=[a.worker_address])
assert set(n_logs.keys()) == {a.worker_address}
for log in n_logs.values():
for _, msg in log:
assert "distributed.nanny" in msg


@gen_cluster(client=True)
def test_avoid_delayed_finalize(c, s, a, b):
Expand Down
24 changes: 2 additions & 22 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .comm import get_address_host, get_local_address_for, connect
from .comm.utils import offload
from .comm.addressing import address_from_user_args
from .compatibility import unicode, get_thread_identity, finalize, MutableMapping
from .compatibility import unicode, get_thread_identity, MutableMapping
from .core import error_message, CommClosedError, send_recv, pingpong, coerce_to_address
from .diskutils import WorkSpace
from .metrics import time
Expand All @@ -60,7 +60,6 @@
json_load_robust,
key_split,
format_bytes,
DequeHandler,
PeriodicCallback,
parse_bytes,
parse_timedelta,
Expand Down Expand Up @@ -412,7 +411,7 @@ def __init__(
)
profile_cycle_interval = parse_timedelta(profile_cycle_interval, default="ms")

self._setup_logging()
self._setup_logging(logger)

if scheduler_file:
cfg = json_load_robust(scheduler_file)
Expand Down Expand Up @@ -666,16 +665,6 @@ def __repr__(self):
)
)

def _setup_logging(self):
self._deque_handler = DequeHandler(
n=dask.config.get("distributed.admin.log-length")
)
self._deque_handler.setFormatter(
logging.Formatter(dask.config.get("distributed.admin.log-format"))
)
logger.addHandler(self._deque_handler)
finalize(self, logger.removeHandler, self._deque_handler)

@property
def worker_address(self):
""" For API compatibility with Nanny """
Expand Down Expand Up @@ -888,15 +877,6 @@ def gather(self, comm=None, who_has=None):
self.update_data(data=result, report=False)
raise Return({"status": "OK"})

def get_logs(self, comm=None, n=None):
deque_handler = self._deque_handler
if n is None:
L = list(deque_handler.deque)
else:
L = deque_handler.deque
L = [L[-i] for i in range(min(n, len(L)))]
return [(msg.levelname, deque_handler.format(msg)) for msg in L]

#############
# Lifecycle #
#############
Expand Down
2 changes: 1 addition & 1 deletion docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ API
Client.get_executor
Client.get_metadata
Client.get_scheduler_logs
Client.get_task_stream
Client.get_worker_logs
Client.get_task_stream
Client.has_what
Client.list_datasets
Client.map
Expand Down

0 comments on commit 587be8d

Please sign in to comment.