Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash rework #52

Merged
merged 25 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions yellowbox/extras/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from yellowbox.service import YellowService
from yellowbox.retry import RetrySpec
from yellowbox.utils import docker_host_name

__all__ = ['HttpService', 'RouterHTTPRequestHandler']
SideEffectResponse = Union[bytes, str, int]
Expand Down Expand Up @@ -109,11 +110,6 @@ def __getattr__(self, item: str):
raise AttributeError(item)


if platform.system() == "Linux":
_docker_host_name = '172.17.0.1'
else:
_docker_host_name = 'host.docker.internal'


class HttpService(YellowService):
"""
Expand Down Expand Up @@ -151,7 +147,7 @@ def local_url(self):

@property
def container_url(self):
return f'http://{_docker_host_name}:{self.server_port}'
return f'http://{docker_host_name}:{self.server_port}'

@staticmethod
def _to_callback(side_effect: SideEffect):
Expand Down Expand Up @@ -239,7 +235,7 @@ def is_alive(self):
def connect(self, network):
# since the http service is not docker related, it cannot actually connect to the network. However,
# other containers, connected to the network or not, can connect to the service with docker's usual host
return [_docker_host_name]
return [docker_host_name]

def disconnect(self, network):
pass
296 changes: 275 additions & 21 deletions yellowbox/extras/logstash.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,284 @@
from typing import Optional
import json
import logging
import selectors
bentheiii marked this conversation as resolved.
Show resolved Hide resolved
import socket
import threading
from typing import Any, Callable, Dict, Iterable, List, Union, cast
from weakref import WeakMethod

from docker import DockerClient
from yellowbox.subclasses import YellowService
from yellowbox.utils import docker_host_name

from yellowbox.retry import RetrySpec
from yellowbox.containers import get_ports, create_and_pull
from yellowbox.subclasses import SingleContainerService, RunMixin
__all__ = ['FakeLogstashService']
_logger = logging.getLogger(__name__)
bentheiii marked this conversation as resolved.
Show resolved Hide resolved

__all__ = ['LogstashService', 'LOGSTASH_DEFAULT_PORT']
_STOP_TIMEOUT = 5 # Timeout for stopping the service

LOGSTASH_DEFAULT_PORT = 5959
# Sent on internal connection to signal closing of background thread
_CLOSE_SENTINEL = b"\0"


class LogstashService(SingleContainerService, RunMixin):
def __init__(self, docker_client: DockerClient, image='logstash:7.8.1', **kwargs):
super().__init__(create_and_pull(
docker_client, image, publish_all_ports=True, detach=True,
ports={LOGSTASH_DEFAULT_PORT: None}
), **kwargs)
class FakeLogstashService(YellowService):
"""Implements a fake logging service that closely resembles Logstash.

def start(self, retry_spec: Optional[RetrySpec] = None):
# for now we just start the service, re-implementing this service is an ongoing ticket
return super().start(retry_spec)
Accepts external TCP connections, with logs received in the "json_lines"
format.

def client_port(self):
return get_ports(self.container)[LOGSTASH_DEFAULT_PORT]
Attributes:
records: A list of all records received. You are welcome to modify,
clear or iterate over this during runtime. New logs will be added in the
order they were received. Each record is a dict - for possible keys see
attribute docs.
port: Dynamic port external service should connect to. FakeLogstashService
automatically binds a free port during initialization unless chosen
otherwise.
local_host: Host to connect to from the local machine (=="localhost")
container_host: Host to connect to from inside containers.
encoding: Encoding of the json lines received. Defaults to utf-8 per
specification.
delimiter: Delimiter splitting between json objects. Defaults to b'\n'
per specification.

def stop(self, signal='SIGKILL'):
# change in default
return super().stop(signal)
Example:
>>> import socket
>>> import time
>>> ls = FakeLogstashService()
>>> ls.start()
bharel marked this conversation as resolved.
Show resolved Hide resolved
>>> s = socket.create_connection((ls.local_host, ls.port)) # Logstash Handler
>>> s.sendall(b'{"record": "value", "level": "ERROR"}\\n')
>>> time.sleep(0.01) # Wait for service to process message
bharel marked this conversation as resolved.
Show resolved Hide resolved
>>> ls.assert_logs("ERROR")
>>> assert ls.records[0]["record"] == "value"
"""
port: int
records: List[Dict[str, Any]]
bharel marked this conversation as resolved.
Show resolved Hide resolved
bentheiii marked this conversation as resolved.
Show resolved Hide resolved
"""
Records generated by Python's numerous Logstash packages have at least the
following keys in common:
* @timestamp: str - Log timestamp in ISO-8601 format.
* @version: str - Logstash format version (always 1)
* message: str - Log message.
* host: str - Host sending the message.
* path: str - Path to the module writing the log.
* tags: List[str] - ?
* type: str - "Logstash"
* level: str - An all upper-case name of the log level
* logger_name: str - Logger name
* stack_info: Optional[str] - Formatted stacktrace if one exists.

More keys may be added by the specific software sending the logs.
"""

delimiter: bytes = b"\n"
encoding: str = "utf-8"

def __init__(self, port: int = 0) -> None:
"""Initialize the service.

Args:
port: Port to listen on. By default or if set to 0, port is chosen
by the OS.
"""
self.records = []

root = socket.socket()
root.bind(("0.0.0.0", port))
self._root = root

_background = WeakMethod(self._background_thread)
self._thread = threading.Thread(target=lambda: _background()(),
bharel marked this conversation as resolved.
Show resolved Hide resolved
bharel marked this conversation as resolved.
Show resolved Hide resolved
daemon=True)

self._selector = selectors.DefaultSelector()

# Sockets used for signalling shutdown
self._rshutdown, self._wshutdown = socket.socketpair()

self.port = self._root.getsockname()[1]

self.local_host = "localhost" # Requested by Ben.
self.container_host = docker_host_name
bentheiii marked this conversation as resolved.
Show resolved Hide resolved

def __del__(self):
# Will never happen while thread is running.
try:
# Order is important.
self._root.close()
self._selector.close()
self._rshutdown.close()
self._wshutdown.close()
except AttributeError:
pass

def _create_data_callback(self, sock: socket.socket) -> Callable[[], None]:
"""Creates a callback to be called every time the socket receives data.

Args:
sock: A connected socket.socket().

Returns:
A callable that should be called every time the socket receives data.
"""
# Data chunks received while waiting for a delimiter.
partial_chunks = []
delimiter = self.delimiter
encoding = self.encoding

def process_socket_data():
"""Process & parse incoming socket data."""
data = sock.recv(1024)

# Socket closed
if not data:
self._selector.unregister(sock)
return

chunks = data.split(delimiter)

# Single partial chunk (no delimiter)
if len(chunks) == 1:
partial_chunks.append(chunks[0])
return

# Combine all partial chunks with first.
partial_chunks.append(chunks[0])
chunks[0] = b"".join(partial_chunks)
partial_chunks[:] = chunks.pop()

# Parse all chunks into records.
try:
for chunk in chunks:
record_dict = json.loads(chunk.decode(encoding))
self.records.append(record_dict)
except json.JSONDecodeError:
self._selector.unregister(sock)
sock.close()
# noinspection PyUnboundLocalVariable
_logger.exception("Failed decoding json, closing socket. "
"Data received: %s", chunk)
return

return process_socket_data

def _background_thread(self):
"""Background thread processing incoming connections and data"""
while True:
events = self._selector.select(timeout=5) # For signal handling
for key, mask in events:
# Handle closing request
if key.fileobj is self._rshutdown:
assert self._rshutdown.recv(
len(_CLOSE_SENTINEL)) == _CLOSE_SENTINEL
return

# Handle new connection
if key.fileobj is self._root:
new_socket, _ = self._root.accept()
self._selector.register(new_socket, selectors.EVENT_READ,
self._create_data_callback(new_socket))
continue

# Data received, run callback
try:
key.data()
except Exception:
_logger.exception("Unknown error occurred, closing connection.")
self._selector.unregister(key.fileobj)
# noinspection PyUnresolvedReferences
key.fileobj.close()

def start(self, *, retry_spec: None = None) -> None:
"""Start the service.

Args:
retry_spec: Ignored.
"""
self._root.listen()
self._selector.register(self._root, selectors.EVENT_READ)
self._selector.register(self._rshutdown, selectors.EVENT_READ)
self._thread.start()
super(FakeLogstashService, self).start(retry_spec=retry_spec)
bentheiii marked this conversation as resolved.
Show resolved Hide resolved
bharel marked this conversation as resolved.
Show resolved Hide resolved

def stop(self) -> None:
"""Stop the service

Raises:
RuntimeError: Failed stopping the service. Operation timed out.
"""
if not self._thread.is_alive():
return
self._wshutdown.send(_CLOSE_SENTINEL)
self._thread.join(_STOP_TIMEOUT) # Should almost never timeout
if self._thread.is_alive():
raise RuntimeError(f"Failed stopping {self.__class__.__name__}.")

for key in self._selector.get_map().values():
sock = cast(socket.socket, key.fileobj)
sock.close()

self._selector.close()

def is_alive(self) -> bool:
"""Check if FakeLogstashService is alive.

Returns:
Boolean.
"""
return self._thread.is_alive()

def connect(self, network: Any) -> List[str]:
"""Does nothing. Conforms to YellowService interface."""
# Logstash service is not docker related. It cannot actually connect to
# the network. However, other containers connected to the network can
# connect to the service with docker's usual host
return [docker_host_name]
bharel marked this conversation as resolved.
Show resolved Hide resolved

def disconnect(self, network: Any):
"""Does nothing. Conforms to YellowService interface."""
pass

def _filter_records(self, level: int) -> Iterable[Dict[str, Any]]:
bharel marked this conversation as resolved.
Show resolved Hide resolved
"""Filter records in the given level or above."""
return (record for record in self.records if
logging.getLevelName(record["level"]) >= level)
bentheiii marked this conversation as resolved.
Show resolved Hide resolved

def assert_logs(self, level: Union[str, int]):
bharel marked this conversation as resolved.
Show resolved Hide resolved
"""Asserts that log messages were received in the given level or above.

Resembles unittest.assertLogs.

Args:
level: Log level by name or number

Raises:
AssertionError: No logs above the given level were received.
"""
if not isinstance(level, int):
level = logging.getLevelName(level.upper())

# In rare cases it might be false, but shouldn't happen generally.
assert isinstance(level, int)

if not any(self._filter_records(level)):
raise AssertionError(f"No logs of level {logging.getLevelName(level)} "
f"or above were received.")

def assert_no_logs(self, level: Union[str, int]):
"""Asserts that no log messages were received in the given level or above.

Args:
level: Log level by name or number

Raises:
AssertionError: A log above the given level was received.
"""
if not isinstance(level, int):
level = logging.getLevelName(level.upper())

# In rare cases it might be false, but shouldn't happen generally.
assert isinstance(level, int)

record = next(self._filter_records(level), None)
if record:
raise AssertionError(f"A log level {record['level']} was received. "
f"Message: {record['message']}")
bharel marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions yellowbox/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from contextlib import AbstractContextManager, contextmanager, nullcontext, closing
from socket import socket, SOL_SOCKET, SO_REUSEADDR, SOCK_STREAM, AF_INET
import platform
from typing import Callable, TypeVar

from yaspin import yaspin
Expand Down Expand Up @@ -31,3 +32,8 @@ def get_free_port():
s.bind(('', 0))
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
return s.getsockname()[1]

if platform.system() == "Linux":
docker_host_name = '172.17.0.1'
else:
docker_host_name = 'host.docker.internal'