Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash rework #52

Merged
merged 25 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions yellowbox/extras/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from yellowbox.service import YellowService
from yellowbox.retry import RetrySpec
from yellowbox.utils import docker_host_name

__all__ = ['HttpService', 'RouterHTTPRequestHandler']
SideEffectResponse = Union[bytes, str, int]
Expand Down Expand Up @@ -109,11 +110,6 @@ def __getattr__(self, item: str):
raise AttributeError(item)


if platform.system() == "Linux":
_docker_host_name = '172.17.0.1'
else:
_docker_host_name = 'host.docker.internal'


class HttpService(YellowService):
"""
Expand Down Expand Up @@ -151,7 +147,7 @@ def local_url(self):

@property
def container_url(self):
return f'http://{_docker_host_name}:{self.server_port}'
return f'http://{docker_host_name}:{self.server_port}'

@staticmethod
def _to_callback(side_effect: SideEffect):
Expand Down Expand Up @@ -239,7 +235,7 @@ def is_alive(self):
def connect(self, network):
# since the http service is not docker related, it cannot actually connect to the network. However,
# other containers, connected to the network or not, can connect to the service with docker's usual host
return [_docker_host_name]
return [docker_host_name]

def disconnect(self, network):
pass
54 changes: 38 additions & 16 deletions yellowbox/extras/logstash.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
import selectors
bentheiii marked this conversation as resolved.
Show resolved Hide resolved
import socket
import threading
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, cast, TypedDict
from typing import Any, Callable, Dict, Iterable, List, Union, cast
from weakref import WeakMethod

from yellowbox.retry import RetrySpec
from yellowbox.subclasses import YellowService
from yellowbox.utils import _docker_host_name
from yellowbox.utils import docker_host_name

__all__ = ['LogstashService']
__all__ = ['FakeLogstashService']
_logger = logging.getLogger(__name__)
bentheiii marked this conversation as resolved.
Show resolved Hide resolved

_STOP_TIMEOUT = 5 # Timeout for stopping the service
Expand All @@ -19,22 +18,37 @@
_CLOSE_SENTINEL = b"\0"


class LogstashService(YellowService):
class FakeLogstashService(YellowService):
"""Implements a fake logging service that closely resembles Logstash.

Accepts external connections, with logs received in the "json_lines" format.
Accepts external TCP connections, with logs received in the "json_lines"
format.

Attributes:
records: A list of all records received. You are welcome to modify,
clear or iterate over this during runtime. New logs will be added in the
order they were received. Each record is a dict - for possible keys see
attribute docs.
port: Dynamic port external service should connect to. LogstashService
automatically binds a free port during initialization.
port: Dynamic port external service should connect to. FakeLogstashService
automatically binds a free port during initialization unless chosen
otherwise.
local_host: Host to connect to from the local machine (=="localhost")
container_host: Host to connect to from inside containers.
encoding: Encoding of the json lines received. Defaults to utf-8 per
specification.
delimiter: Delimiter splitting between json objects. Defaults to b'\n' per
specification.
delimiter: Delimiter splitting between json objects. Defaults to b'\n'
per specification.

Example:
>>> import socket
>>> import time
>>> ls = FakeLogstashService()
>>> ls.start()
bharel marked this conversation as resolved.
Show resolved Hide resolved
>>> s = socket.create_connection((ls.local_host, ls.port)) # Logstash Handler
>>> s.sendall(b'{"record": "value", "level": "ERROR"}\\n')
>>> time.sleep(0.01) # Wait for service to process message
bharel marked this conversation as resolved.
Show resolved Hide resolved
>>> ls.assert_logs("ERROR")
>>> assert ls.records[0]["record"] == "value"
"""
port: int
records: List[Dict[str, Any]]
bharel marked this conversation as resolved.
Show resolved Hide resolved
bentheiii marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -58,12 +72,17 @@ class LogstashService(YellowService):
delimiter: bytes = b"\n"
encoding: str = "utf-8"

def __init__(self) -> None:
"""Initialize the service."""
def __init__(self, port: int = 0) -> None:
"""Initialize the service.

Args:
port: Port to listen on. By default or if set to 0, port is chosen
by the OS.
"""
self.records = []

root = socket.socket()
root.bind(("0.0.0.0", 0))
root.bind(("0.0.0.0", port))
self._root = root

_background = WeakMethod(self._background_thread)
Expand All @@ -77,6 +96,9 @@ def __init__(self) -> None:

self.port = self._root.getsockname()[1]

self.local_host = "localhost" # Requested by Ben.
self.container_host = docker_host_name
bentheiii marked this conversation as resolved.
Show resolved Hide resolved

def __del__(self):
# Will never happen while thread is running.
try:
Expand Down Expand Up @@ -175,7 +197,7 @@ def start(self, *, retry_spec: None = None) -> None:
self._selector.register(self._root, selectors.EVENT_READ)
self._selector.register(self._rshutdown, selectors.EVENT_READ)
self._thread.start()
super(LogstashService, self).start(retry_spec=retry_spec)
super(FakeLogstashService, self).start(retry_spec=retry_spec)
bentheiii marked this conversation as resolved.
Show resolved Hide resolved
bharel marked this conversation as resolved.
Show resolved Hide resolved

def stop(self) -> None:
"""Stop the service
Expand All @@ -197,7 +219,7 @@ def stop(self) -> None:
self._selector.close()

def is_alive(self) -> bool:
"""Check if LogstashService is alive.
"""Check if FakeLogstashService is alive.

Returns:
Boolean.
Expand All @@ -209,7 +231,7 @@ def connect(self, network: Any) -> List[str]:
# Logstash service is not docker related. It cannot actually connect to
# the network. However, other containers connected to the network can
# connect to the service with docker's usual host
return [_docker_host_name]
return [docker_host_name]
bharel marked this conversation as resolved.
Show resolved Hide resolved

def disconnect(self, network: Any):
"""Does nothing. Conforms to YellowService interface."""
Expand Down
4 changes: 2 additions & 2 deletions yellowbox/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ def get_free_port():
return s.getsockname()[1]

if platform.system() == "Linux":
_docker_host_name = '172.17.0.1'
docker_host_name = '172.17.0.1'
else:
_docker_host_name = 'host.docker.internal'
docker_host_name = 'host.docker.internal'