Skip to content

Commit

Permalink
✨ Enhanced actions for DDD repositories (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
perdy authored and migduroli committed Sep 3, 2024
1 parent d944180 commit a66d5ee
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 150 deletions.
2 changes: 1 addition & 1 deletion flama/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(

if models:
app = Flama() if not app else app
for (name, url, path) in models:
for name, url, path in models:
app.models.add_model(url, path, name)

self.models = {m[0]: m[1] for m in models or {}}
Expand Down
6 changes: 5 additions & 1 deletion flama/ddd/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__all__ = ["RepositoryException", "IntegrityError", "NotFoundError"]
__all__ = ["RepositoryException", "IntegrityError", "NotFoundError", "MultipleRecordsError"]


class RepositoryException(Exception):
Expand All @@ -11,3 +11,7 @@ class IntegrityError(RepositoryException):

class NotFoundError(RepositoryException):
...


class MultipleRecordsError(RepositoryException):
...
245 changes: 153 additions & 92 deletions flama/ddd/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,95 +46,95 @@ def __eq__(self, other):
and self.table == other.table
)

@property
def primary_key(self) -> sqlalchemy.Column:
"""Returns the primary key of the model.
async def create(self, *data: t.Union[t.Dict[str, t.Any], types.Schema]) -> t.List[t.Tuple[t.Any, ...]]:
"""Creates new elements in the table.
:return: sqlalchemy.Column: The primary key of the model.
:raises: exceptions.IntegrityError: If the model has a composed primary key.
"""

model_pk_columns = list(sqlalchemy.inspect(self.table).primary_key.columns.values())

if len(model_pk_columns) != 1:
raise exceptions.IntegrityError("Composed primary keys are not supported")

return model_pk_columns[0]

async def create(self, data: t.Union[t.Dict[str, t.Any], types.Schema]) -> t.Optional[t.Tuple[t.Any, ...]]:
"""Creates a new element in the repository.
If the element already exists, it raises an `exceptions.IntegrityError`. If the element is created, it returns
If the element already exists, it raises an `IntegrityError`. If the element is created, it returns
the primary key of the element.
:param data: The data to create the element.
:return: The primary key of the created element.
:raises: exceptions.IntegrityError: If the element already exists.
:raises IntegrityError: If the element already exists or cannot be inserted.
"""
try:
result = await self._connection.execute(sqlalchemy.insert(self.table).values(**data))
result = await self._connection.execute(sqlalchemy.insert(self.table), data)
except sqlalchemy.exc.IntegrityError as e:
raise exceptions.IntegrityError(str(e))
return tuple(result.inserted_primary_key) if result.inserted_primary_key else None
return [tuple(x) for x in result.inserted_primary_key_rows]

async def retrieve(self, id: t.Any) -> types.Schema:
"""Retrieves an element from the repository.
async def retrieve(self, *clauses, **filters) -> types.Schema:
"""Retrieves an element from the table.
If the element does not exist, it raises a `NotFoundError`. If more than one element is found, it raises a
`MultipleRecordsError`. If the element is found, it returns the element.
If the element does not exist, it raises a `NotFoundError`.
Clauses are used to filter the elements using sqlalchemy clauses. Filters are used to filter the elements
using exact values to specific columns. Clauses and filters can be combined.
Clause example: `table.c["id"]._in((1, 2, 3))`
Filter example: `id=1`
:param id: The primary key of the element.
:return: The element.
:raises: exceptions.NotFoundError: If the element does not exist.
:raises NotFoundError: If the element does not exist.
:raises MultipleRecordsError: If more than one element is found.
"""
element = (
await self._connection.execute(
sqlalchemy.select(self.table).where(self.table.c[self.primary_key.name] == id)
)
).first()
query = self._filter_query(sqlalchemy.select(self.table), *clauses, **filters)

if element is None:
raise exceptions.NotFoundError(str(id))
try:
element = (await self._connection.execute(query)).one()
except sqlalchemy.exc.NoResultFound:
raise exceptions.NotFoundError()
except sqlalchemy.exc.MultipleResultsFound:
raise exceptions.MultipleRecordsError()

return types.Schema(element._asdict())

async def update(self, id: t.Any, data: t.Union[t.Dict[str, t.Any], types.Schema]) -> types.Schema:
"""Updates an element in the repository.
async def update(self, data: t.Union[t.Dict[str, t.Any], types.Schema], *clauses, **filters) -> int:
"""Updates elements in the table.
Using clauses and filters, it filters the elements to update. If no clauses or filters are given, it updates
all the elements in the table.
If the element does not exist, it raises a `NotFoundError`. If the element is updated, it returns the updated
element.
:param id: The primary key of the element.
:param data: The data to update the element.
:return: The updated element.
:raises: exceptions.NotFoundError: If the element does not exist.
:return: The number of elements updated.
:raises IntegrityError: If the element cannot be updated.
"""
pk = self.primary_key
result = await self._connection.execute(
sqlalchemy.update(self.table).where(self.table.c[pk.name] == id).values(**data)
)
query = self._filter_query(sqlalchemy.update(self.table), *clauses, **filters).values(**data)

if result.rowcount == 0:
raise exceptions.NotFoundError(id)
try:
result = await self._connection.execute(query)
except sqlalchemy.exc.IntegrityError:
raise exceptions.IntegrityError

return types.Schema({pk.name: id, **data})
return result.rowcount

async def delete(self, id: t.Any) -> None:
"""Deletes an element from the repository.
async def delete(self, *clauses, **filters) -> None:
"""Delete elements from the table.
If the element does not exist, it raises a `NotFoundError`.
If no clauses or filters are given, it deletes all the elements in the repository.
:param id: The primary key of the element.
:raises: exceptions.NotFoundError: If the element does not exist.
Clauses are used to filter the elements using sqlalchemy clauses. Filters are used to filter the elements using
exact values to specific columns. Clauses and filters can be combined.
Clause example: `table.c["id"]._in((1, 2, 3))`
Filter example: `id=1`
:param clauses: Clauses to filter the elements.
:param filters: Filters to filter the elements.
:raises NotFoundError: If the element does not exist.
:raises MultipleRecordsError: If more than one element is found.
"""
result = await self._connection.execute(
sqlalchemy.delete(self.table).where(self.table.c[self.primary_key.name] == id)
)
await self.retrieve(*clauses, **filters)

if result.rowcount == 0:
raise exceptions.NotFoundError(id)
query = self._filter_query(sqlalchemy.delete(self.table), *clauses, **filters)

async def list(self, *clauses, **filters) -> t.List[types.Schema]:
"""Lists all the elements in the repository.
await self._connection.execute(query)

async def list(self, *clauses, **filters) -> t.AsyncIterable[types.Schema]:
"""Lists all the elements in the table.
If no elements are found, it returns an empty list. If no clauses or filters are given, it returns all the
elements in the repository.
Expand All @@ -147,26 +147,61 @@ async def list(self, *clauses, **filters) -> t.List[types.Schema]:
:param clauses: Clauses to filter the elements.
:param filters: Filters to filter the elements.
:return: The elements.
:return: Async iterable of the elements.
"""
query = sqlalchemy.select(self.table)
query = self._filter_query(sqlalchemy.select(self.table), *clauses, **filters)

where_clauses = tuple(clauses) + tuple(self.table.c[k] == v for k, v in filters.items())
if where_clauses:
query = query.where(sqlalchemy.and_(*where_clauses))
result = await self._connection.stream(query)

async for row in result:
yield types.Schema(row._asdict())

async def drop(self, *clauses, **filters) -> int:
"""Drops elements in the table.
return [types.Schema(row._asdict()) async for row in await self._connection.stream(query)]
Returns the number of elements dropped. If no clauses or filters are given, it deletes all the elements in the
table.
async def drop(self) -> int:
"""Drops all the elements in the repository.
Clauses are used to filter the elements using sqlalchemy clauses. Filters are used to filter the elements using
exact values to specific columns. Clauses and filters can be combined.
Returns the number of elements dropped.
Clause example: `table.c["id"]._in((1, 2, 3))`
Filter example: `id=1`
:param clauses: Clauses to filter the elements.
:param filters: Filters to filter the elements.
:return: The number of elements dropped.
"""
result = await self._connection.execute(sqlalchemy.delete(self.table))
query = self._filter_query(sqlalchemy.delete(self.table), *clauses, **filters)

result = await self._connection.execute(query)

return result.rowcount

def _filter_query(self, query, *clauses, **filters):
"""Filters a query using clauses and filters.
Returns the filtered query. If no clauses or filters are given, it returns the query without any applying
filter.
Clauses are used to filter the elements using sqlalchemy clauses. Filters are used to filter the elements using
exact values to specific columns. Clauses and filters can be combined.
Clause example: `table.c["id"]._in((1, 2, 3))`
Filter example: `id=1`
:param query: The query to filter.
:param clauses: Clauses to filter the elements.
:param filters: Filters to filter the elements.
:return: The filtered query.
"""
where_clauses = tuple(clauses) + tuple(self.table.c[k] == v for k, v in filters.items())

if where_clauses:
query = query.where(sqlalchemy.and_(*where_clauses))

return query


class SQLAlchemyTableRepository(SQLAlchemyRepository):
_table: t.ClassVar[sqlalchemy.Table]
Expand All @@ -178,53 +213,70 @@ def __init__(self, connection: "AsyncConnection"):
def __eq__(self, other):
return isinstance(other, SQLAlchemyTableRepository) and self._table == other._table and super().__eq__(other)

async def create(self, data: t.Union[t.Dict[str, t.Any], types.Schema]) -> t.Optional[t.Tuple[t.Any, ...]]:
"""Creates a new element in the repository.
async def create(self, *data: t.Union[t.Dict[str, t.Any], types.Schema]) -> t.List[t.Tuple[t.Any, ...]]:
"""Creates new elements in the repository.
If the element already exists, it raises an `exceptions.IntegrityError`. If the element is created, it returns
the primary key of the element.
:param data: The data to create the element.
:return: The primary key of the created element.
:raises: exceptions.IntegrityError: If the element already exists.
:raises IntegrityError: If the element already exists or cannot be inserted.
"""
return await self._table_manager.create(data)
return await self._table_manager.create(*data)

async def retrieve(self, id: t.Any) -> types.Schema:
async def retrieve(self, *clauses, **filters) -> types.Schema:
"""Retrieves an element from the repository.
If the element does not exist, it raises a `NotFoundError`.
If the element does not exist, it raises a `NotFoundError`. If more than one element is found, it raises a
`MultipleRecordsError`. If the element is found, it returns the element.
:param id: The primary key of the element.
Clauses are used to filter the elements using sqlalchemy clauses. Filters are used to filter the elements
using exact values to specific columns. Clauses and filters can be combined.
Clause example: `table.c["id"]._in((1, 2, 3))`
Filter example: `id=1`
:param clauses: Clauses to filter the elements.
:param filters: Filters to filter the elements.
:return: The element.
:raises: exceptions.NotFoundError: If the element does not exist.
:raises NotFoundError: If the element does not exist.
:raises MultipleRecordsError: If more than one element is found.
"""
return await self._table_manager.retrieve(id)
return await self._table_manager.retrieve(*clauses, **filters)

async def update(self, id: t.Any, data: t.Union[t.Dict[str, t.Any], types.Schema]) -> types.Schema:
async def update(self, data: t.Union[t.Dict[str, t.Any], types.Schema], *clauses, **filters) -> int:
"""Updates an element in the repository.
If the element does not exist, it raises a `NotFoundError`. If the element is updated, it returns the updated
element.
:param id: The primary key of the element.
:param data: The data to update the element.
:return: The updated element.
:raises: exceptions.NotFoundError: If the element does not exist.
:param clauses: Clauses to filter the elements.
:param filters: Filters to filter the elements.
:return: The number of elements updated.
:raises IntegrityError: If the element cannot be updated.
"""
return await self._table_manager.update(id, data)
return await self._table_manager.update(data, *clauses, **filters)

async def delete(self, id: t.Any) -> None:
async def delete(self, *clauses, **filters) -> None:
"""Deletes an element from the repository.
If the element does not exist, it raises a `NotFoundError`.
Clauses are used to filter the elements using sqlalchemy clauses. Filters are used to filter the elements
using exact values to specific columns. Clauses and filters can be combined.
Clause example: `table.c["id"]._in((1, 2, 3))`
Filter example: `id=1`
:param id: The primary key of the element.
:raises: exceptions.NotFoundError: If the element does not exist.
:param clauses: Clauses to filter the elements.
:param filters: Filters to filter the elements.
:raises NotFoundError: If the element does not exist.
:raises MultipleRecordsError: If more than one element is found.
"""
return await self._table_manager.delete(id)
return await self._table_manager.delete(*clauses, **filters)

async def list(self, *clauses, **filters) -> t.List[types.Schema]:
def list(self, *clauses, **filters) -> t.AsyncIterable[types.Schema]:
"""Lists all the elements in the repository.
Lists all the elements in the repository that match the clauses and filters. If no clauses or filters are given,
Expand All @@ -238,15 +290,24 @@ async def list(self, *clauses, **filters) -> t.List[types.Schema]:
:param clauses: Clauses to filter the elements.
:param filters: Filters to filter the elements.
:return: The elements.
:return: Async iterable of the elements.
"""
return await self._table_manager.list(*clauses, **filters)
return self._table_manager.list(*clauses, **filters)

async def drop(self, *clauses, **filters) -> int:
"""Drops elements in the repository.
async def drop(self) -> int:
"""Drops all the elements in the repository.
Returns the number of elements dropped. If no clauses or filters are given, it deletes all the elements in the
repository.
Clauses are used to filter the elements using sqlalchemy clauses. Filters are used to filter the elements using
exact values to specific columns. Clauses and filters can be combined.
Returns the number of elements dropped.
Clause example: `table.c["id"]._in((1, 2, 3))`
Filter example: `id=1`
:param clauses: Clauses to filter the elements.
:param filters: Filters to filter the elements.
:return: The number of elements dropped.
"""
return await self._table_manager.drop()
return await self._table_manager.drop(*clauses, **filters)
Loading

0 comments on commit a66d5ee

Please sign in to comment.