Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update EventListener.handler behavior #1260

Merged
merged 11 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- **BREAKING**: `BaseEventListener.publish_event` `flush` argument. Use `BaseEventListener.flush_events()` instead.
- **BREAKING**: Removed `BaseEventListener.publish_event` `flush` argument. Use `BaseEventListener.flush_events()` instead.
- **BREAKING**: Renamed parameter `driver` on `EventListener` to `event_listener_driver`.
- **BREAKING**: Changed default value of parameter `handler` on `EventListener` to `None`.
- **BREAKING**: Updated `EventListener.handler` return value behavior.
- If `EventListener.handler` returns `None`, the event will not be published to the `event_listener_driver`.
- If `EventListener.handler` is None, the event will be published to the `event_listener_driver` as-is.
- Updated `EventListener.handler` return type to `Optional[BaseEvent | dict]`.
- `BaseTask.parent_outputs` type has changed from `dict[str, str | None]` to `dict[str, BaseArtifact]`.
- `Workflow.context["parent_outputs"]` type has changed from `dict[str, str | None]` to `dict[str, BaseArtifact]`.
- `Pipeline.context["parent_output"]` has changed type from `str | None` to `BaseArtifact | None`.
Expand All @@ -36,6 +42,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Structures not flushing events when not listening for `FinishStructureRunEvent`.
- `EventListener.event_types` and the argument to `BaseEventListenerDriver.handler` being out of sync.

## \[0.33.1\] - 2024-10-11

Expand Down
47 changes: 47 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,53 @@

This document provides instructions for migrating your codebase to accommodate breaking changes introduced in new versions of Griptape.

## 0.33.X to 0.34.X

### `EventListener.handler` behavior, `driver` parameter rename

Returning `None` from the `handler` function now causes the event to not be published to the `EventListenerDriver`.
The `handler` function can now return a `BaseEvent` object.

#### Before

```python
def handler_fn_return_none(event: BaseEvent) -> Optional[dict]:
# This causes the `BaseEvent` object to be passed to the EventListenerDriver
return None

def handler_fn_return_dict(event: BaseEvent) -> Optional[dict]:
# This causes the returned dictionary to be passed to the EventListenerDriver
return {
"key": "value
}

EventListener(handler=handler_fn_return_none, driver=driver)
EventListener(handler=handler_fn_return_dict, driver=driver)
```

#### After

```python
def handler_fn_return_none(event: BaseEvent) -> Optional[dict | BaseEvent]:
# This causes the `BaseEvent` object to NOT get passed to the EventListenerDriver
return None

def handler_fn_return_dict(event: BaseEvent) -> Optional[dict | BaseEvent]:
# This causes the returned dictionary to be passed to the EventListenerDriver
return {
"key": "value
}

def handler_fn_return_base_event(event: BaseEvent) -> Optional[dict | BaseEvent]:
# This causes the returned `BaseEvent` object to be passed to the EventListenerDriver
return ChildClassOfBaseEvent()

# `driver` has been renamed to `event_listener_driver`
EventListener(handler=handler_fn_return_none, event_listener_driver=driver)
EventListener(handler=handler_fn_return_dict, event_listener_driver=driver)
EventListener(handler=handler_fn_return_base_event, event_listener_driver=driver)
```

## 0.32.X to 0.33.X

### Removed `DataframeLoader`
Expand Down
2 changes: 1 addition & 1 deletion docs/griptape-framework/data/loaders.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Scrapes web pages using a [WebScraperDriver](../drivers/web-scraper-drivers.md)

## SQL

Loads data from a SQL database using a [SQLDriver](../drivers/sql-drivers.md) and loads the resulting data into [ListArtifact](../../griptape-framework/data/artifacts.md#list)s, where each element is a [CsvRowArtifact](../../griptape-framework/data/artifacts.md#csv) containing a row of the SQL query.
Loads data from a SQL database using a [SQLDriver](../drivers/sql-drivers.md) and loads the resulting data into [ListArtifact](../../griptape-framework/data/artifacts.md#list)s, where each element is a [TextArtifact](../../griptape-framework/data/artifacts.md#text) containing a row of the SQL query.

```python
--8<-- "docs/griptape-framework/data/src/loaders_2.py"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
EventBus.add_event_listeners(
[
EventListener(
driver=AmazonSqsEventListenerDriver(
event_listener_driver=AmazonSqsEventListenerDriver(
queue_url=os.environ["AMAZON_SQS_QUEUE_URL"],
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
EventBus.add_event_listeners(
[
EventListener(
driver=AmazonSqsEventListenerDriver(
event_listener_driver=AmazonSqsEventListenerDriver(
queue_url=os.environ["AMAZON_SQS_QUEUE_URL"],
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
[
EventListener(
event_types=[FinishStructureRunEvent],
driver=AwsIotCoreEventListenerDriver(
event_listener_driver=AwsIotCoreEventListenerDriver(
topic=os.environ["AWS_IOT_CORE_TOPIC"],
iot_endpoint=os.environ["AWS_IOT_CORE_ENDPOINT"],
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
event_types=[FinishStructureRunEvent],
# By default, GriptapeCloudEventListenerDriver uses the api key provided
# in the GT_CLOUD_API_KEY environment variable.
driver=GriptapeCloudEventListenerDriver(),
event_listener_driver=GriptapeCloudEventListenerDriver(),
),
]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
[
EventListener(
event_types=[FinishStructureRunEvent],
driver=WebhookEventListenerDriver(
event_listener_driver=WebhookEventListenerDriver(
webhook_url=os.environ["WEBHOOK_URL"],
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
[
EventListener(
event_types=[FinishStructureRunEvent],
driver=PusherEventListenerDriver(
event_listener_driver=PusherEventListenerDriver(
batched=False,
app_id=os.environ["PUSHER_APP_ID"],
key=os.environ["PUSHER_KEY"],
Expand Down
19 changes: 19 additions & 0 deletions docs/griptape-framework/misc/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,22 @@ User: Write me a poem.
Assistant:
...
```

## `EventListenerDriver.handler` Return Value Behavior

The value that gets returned from the [`EventListener.handler`](../../reference/griptape/events/event_listener.md#griptape.events.event_listener.EventListener.handler) will determine what gets sent to the `event_listener_driver`.
### `EventListener.handler` is None

By default, the `EventListener.handler` function is `None`. Any events that the `EventListener` is listening for will get sent to the `event_listener_driver` as-is.

### Return `BaseEvent` or `dict`

You can return a `BaseEvent` or `dict` object from `EventListener.handler`, and it will get sent to the `event_listener_driver`.

### Return `None`

You can return `None` in the handler function to prevent the event from getting sent to the `event_listener_driver`.

```python
--8<-- "docs/griptape-framework/misc/src/events_no_publish.py"
```
49 changes: 49 additions & 0 deletions docs/griptape-framework/misc/src/events_no_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

from typing import Optional

from griptape.artifacts import ErrorArtifact, InfoArtifact
from griptape.drivers import GriptapeCloudEventListenerDriver
from griptape.events import BaseEvent, EventBus, EventListener, FinishStructureRunEvent
from griptape.structures import Agent


def handler_maybe_drop_events(event: FinishStructureRunEvent) -> Optional[BaseEvent | dict]:
if event.structure_id == "some_structure_id":
# Drop the event if the structure_id is "some_structure_id"
return None
if isinstance(event.output_task_output, InfoArtifact):
# Print the output of the task if it is an InfoArtifact
# and then drop the event
print(f"Info: {event.output_task_output}")
return None
if isinstance(event.output_task_output, ErrorArtifact):
# Print the output of the task if it is an ErrorArtifact
# and then convert it to a dictionary and return it
print(f"Error: {event.output_task_output}")
return {
"error": event.output_task_output.to_text(),
"exception_message": str(event.output_task_output.exception),
}

return event


EventBus.add_event_listeners(
[
EventListener(
handler_maybe_drop_events,
event_types=[FinishStructureRunEvent],
# By default, GriptapeCloudEventListenerDriver uses the api key provided
# in the GT_CLOUD_API_KEY environment variable.
event_listener_driver=GriptapeCloudEventListenerDriver(),
),
]
)


agent1 = Agent(id="some_structure_id")
agent1.run("Create a list of 8 questions for an interview with a science fiction author.")

agent2 = Agent(id="another_structure_id")
agent2.run("Create a list of 10 questions for an interview with a theoretical physicist.")
4 changes: 2 additions & 2 deletions docs/griptape-framework/structures/rulesets.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ A [Ruleset](../../reference/griptape/rules/ruleset.md) can be used to define [Ru
This is particularly useful when you need the LLM to return well-formed data, such as JSON objects, with specific fields and data types.

!!! warning
`JsonSchemaRule` may break [ToolkitTask](../structures/tasks.md#toolkit) which relies on a specific [output token](https://github.com/griptape-ai/griptape/blob/e6a04c7b88cf9fa5d6bcf4c833ffebfab89a3258/griptape/tasks/toolkit_task.py#L28).
`JsonSchemaRule` may break [ToolkitTask](../structures/tasks.md#toolkit-task) which relies on a specific [output token](https://github.com/griptape-ai/griptape/blob/e6a04c7b88cf9fa5d6bcf4c833ffebfab89a3258/griptape/tasks/toolkit_task.py#L28).

```python
--8<-- "docs/griptape-framework/structures/src/json_schema_rule.py"
Expand Down Expand Up @@ -96,7 +96,7 @@ You can define a Ruleset at the Structure level if you need to have certain beha

### Rules

You can pass [rules](../../reference/griptape/structures/structure.md#griptape.structures.structure.Structure.rules) directly to the Structure to have a Ruleset created for you.
You can pass [rules](../../reference/griptape/mixins/rule_mixin/#griptape.mixins.rule_mixin.RuleMixin.rules) directly to the Structure to have a Ruleset created for you.

```python
--8<-- "docs/griptape-framework/structures/src/rulesets_2.py"
Expand Down
48 changes: 31 additions & 17 deletions griptape/events/event_listener.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Callable, Optional
from typing import TYPE_CHECKING, Callable, Generic, Optional, TypeVar

from attrs import Factory, define, field
from attrs import define, field

from .base_event import BaseEvent

if TYPE_CHECKING:
from griptape.drivers import BaseEventListenerDriver

from .base_event import BaseEvent

T = TypeVar("T", bound=BaseEvent)


@define
class EventListener:
handler: Callable[[BaseEvent], Optional[dict]] = field(default=Factory(lambda: lambda event: event.to_dict()))
event_types: Optional[list[type[BaseEvent]]] = field(default=None, kw_only=True)
driver: Optional[BaseEventListenerDriver] = field(default=None, kw_only=True)
class EventListener(Generic[T]):
"""An event listener that listens for events and handles them.

Attributes:
handler: The handler function that will be called when an event is published.
The handler function should accept an event and return either the event or a dictionary.
If the handler returns None, the event will not be published.
event_types: A list of event types that the event listener should listen for.
If not provided, the event listener will listen for all event types.
event_listener_driver: The driver that will be used to publish events.
"""

handler: Optional[Callable[[T], Optional[BaseEvent | dict]]] = field(default=None)
event_types: Optional[list[type[T]]] = field(default=None, kw_only=True)
event_listener_driver: Optional[BaseEventListenerDriver] = field(default=None, kw_only=True)

_last_event_listeners: Optional[list[EventListener]] = field(default=None)

Expand All @@ -32,16 +46,16 @@ def __exit__(self, type, value, traceback) -> None: # noqa: ANN001, A002

self._last_event_listeners = None

def publish_event(self, event: BaseEvent, *, flush: bool = False) -> None:
def publish_event(self, event: T, *, flush: bool = False) -> None:
event_types = self.event_types

if event_types is None or type(event) in event_types:
event_payload = self.handler(event)
if self.driver is not None:
if event_payload is not None and isinstance(event_payload, dict):
self.driver.publish_event(event_payload)
else:
self.driver.publish_event(event)

if self.driver is not None and flush:
self.driver.flush_events()
handled_event = event
if self.handler is not None:
handled_event = self.handler(event)

if self.event_listener_driver is not None and handled_event is not None:
self.event_listener_driver.publish_event(handled_event)

if self.event_listener_driver is not None and flush:
self.event_listener_driver.flush_events()
5 changes: 5 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ nav:
- Structure Config YAML: "griptape-cloud/structures/structure-config.md"
- Running Your Structure: "griptape-cloud/structures/run-structure.md"
- Structure Run Events: "griptape-cloud/structures/structure-run-events.md"
- Rules:
- Create a Ruleset: "griptape-cloud/rules/rulesets.md"
- Threads:
- Create a Thread: "griptape-cloud/threads/threads.md"
- Cloud API:
- API Reference: "griptape-cloud/api/api-reference.md"
- Framework:
Expand Down Expand Up @@ -128,6 +132,7 @@ nav:
- Audio Transcription Drivers: "griptape-framework/drivers/audio-transcription-drivers.md"
- Web Search Drivers: "griptape-framework/drivers/web-search-drivers.md"
- Observability Drivers: "griptape-framework/drivers/observability-drivers.md"
- Ruleset Drivers: "griptape-framework/drivers/ruleset-drivers.md"
- Data:
- Overview: "griptape-framework/data/index.md"
- Artifacts: "griptape-framework/data/artifacts.md"
Expand Down
15 changes: 12 additions & 3 deletions tests/unit/events/test_event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,28 @@ def test_init(self):

assert _EventBus() is _EventBus()

def test_add_event_listeners(self):
def test_add_event_listeners_same(self):
EventBus.add_event_listeners([EventListener(), EventListener()])
assert len(EventBus.event_listeners) == 1

def test_add_event_listeners(self):
EventBus.add_event_listeners([EventListener(handler=lambda e: e), EventListener()])
assert len(EventBus.event_listeners) == 2

def test_remove_event_listeners(self):
listeners = [EventListener(), EventListener()]
listeners = [EventListener(handler=lambda e: e), EventListener()]
EventBus.add_event_listeners(listeners)
EventBus.remove_event_listeners(listeners)
assert len(EventBus.event_listeners) == 0

def test_add_event_listener(self):
def test_add_event_listener_same(self):
EventBus.add_event_listener(EventListener())
EventBus.add_event_listener(EventListener())
assert len(EventBus.event_listeners) == 1

def test_add_event_listener(self):
EventBus.add_event_listener(EventListener(handler=lambda e: e))
EventBus.add_event_listener(EventListener())

assert len(EventBus.event_listeners) == 2

Expand Down
Loading
Loading