Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
[Refact] Created FilterableQuery.
Browse files Browse the repository at this point in the history
  • Loading branch information
ducdetronquito committed Jan 13, 2017
1 parent 4385f42 commit 53ae007
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 58 deletions.
144 changes: 87 additions & 57 deletions plume/plume.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class SQLiteAPI:
TABLE = 'TABLE'
TEXT = 'TEXT'
UNIQUE = 'UNIQUE'

#Delete query
DELETE = 'DELETE'

# Drop table query
DROP = 'DROP'
Expand Down Expand Up @@ -63,8 +66,15 @@ def create_table(cls, name, fields):
SQLiteAPI.CREATE, SQLiteAPI.TABLE, SQLiteAPI.IF,
SQLiteAPI.invert_operator(SQLiteAPI.EXISTS), name.lower(),
SQLiteAPI.to_csv(fields, bracket=True),
]

)
return ' '.join(query)

@classmethod
def delete(cls, table, where=None):
query = (
SQLiteAPI.DELETE, SQLiteAPI.FROM, table.lower(),
SQLiteAPI.WHERE, str(where),
)
return ' '.join(query)

@classmethod
Expand All @@ -76,14 +86,13 @@ def drop_table(cls, name):

@classmethod
def insert_into(cls, table_name, field_names):
query = [
query = (
SQLiteAPI.INSERT, table_name.lower(),
SQLiteAPI.to_csv(field_names, bracket=True),
SQLiteAPI.VALUES,
SQLiteAPI.to_csv([SQLiteAPI.PLACEHOLDER] * len(field_names), bracket=True)
]

return " ".join(query)
)
return ' '.join(query)

@classmethod
def invert_operator(cls, operator):
Expand All @@ -110,7 +119,6 @@ def select(cls, tables, fields=None, where=None, count=None, offset=None):
query.extend((
SQLiteAPI.LIMIT, str(count), SQLiteAPI.OFFSET, str(offset),
))

return ' '.join(query)

@staticmethod
Expand Down Expand Up @@ -189,57 +197,68 @@ def __str__(self):
return ' '.join((str(e) for e in (self.lo, self.op, self.ro) if e is not None))


class SelectQuery:
"""A SelectQuery allows to forge a lazy SQL query.
class FilterableQuery:
__slots__ = ('_where',)

def __init__(self, where=None):
self._where = where

def where(self, *args):
if not len(args):
return self

args = list(args)

if self._where is None:
self._where = args.pop()

for expression in args:
self._where &= expression

return self


class DeleteQuery(FilterableQuery):
"""A DeleteQuery allows to forge a lazy DELETE FROM SQL query."""
__slots__ = ('_model', '_table', '_where')

def __init__(self, model, where=None):
super().__init__(where)
self._model = model
self._table = model.__name__.lower()

def __str__(self):
return ''.join(
('(', SQLiteAPI.delete(self._table, self._where), ')')
)

def execute(self):
self._model._db.delete(self._tables, self._fields)


class SelectQuery(FilterableQuery):
"""A SelectQuery allows to forge a lazy SELECT SQL query.
A SelectQuery represents a Select-From-Where SQL query, and allow the user to define the WHERE
clause. The user is allowed to add dynamically several criteria on a QuerySet. The SelectQuery only
hit the database when it is iterated over or sliced.
"""
__slots__ = ('_model', '_tables', '_fields', '_expression', '_count', '_offset')
__slots__ = ('_model', '_tables', '_fields', '_where', '_count', '_offset')

def __init__(self, model):
def __init__(self, model, fields=None, where=None):
super().__init__(where)
self._model = model
self._tables = [model.__name__.lower()]
self._fields = None
self._expression = None
self._fields = fields
self._count = None
self._offset = None

def __str__(self):
return ''.join((
'(', SQLiteAPI.select(
self._tables, self._fields, self._expression, self._count, self._offset), ')'
self._tables, self._fields, self._where, self._count, self._offset), ')'
))

def where(self, *args):
"""
Allow to filter a SelectQuery.
A QuerySet can filter as a Python list to select a subset of model instances.
It is similar as setting a WHERE clause in a SQL query.
This operation does not hit the database.
Args:
args: a list of Criterion represented in a conjonctive normal form.
Returns:
A SelectQuery
"""
if not len(args):
return self

args = list(args)

if self._expression is None:
self._expression = args.pop()

for expression in args:
self._expression &= expression

return self

def select(self, *args):
# Allow to filter Select-Query on columns.
self._fields = [str(field) for field in args]
Expand Down Expand Up @@ -312,7 +331,7 @@ def __getitem__(self, key):
def _execute(self):
"""Query the database and returns the result as a list of tuples."""
return self._model._db.select(
self._tables, self._fields, self._expression, self._count, self._offset
self._tables, self._fields, self._where, self._count, self._offset,
)

def dicts(self, allow_fields_subset=True):
Expand Down Expand Up @@ -365,11 +384,14 @@ def create(self, **kwargs):

return instance

def delete(self, *args):
return DeleteQuery(model=self._model).where(*args)

def select(self, *args):
return SelectQuery(self._model).select(*args)
return SelectQuery(model=self._model, fields=args)

def where(self, *args):
return SelectQuery(self._model).where(*args)
return SelectQuery(model=self._model).where(*args)


class RelatedManager(Manager):
Expand Down Expand Up @@ -603,10 +625,30 @@ def __init__(self, db_name):

def drop_table(self, model_class):
query = SQLiteAPI.drop_table(model_class.__name__)

with closing(self._connection.cursor()) as cursor:
cursor.execute(query)
self._connection.commit()

def create_table(self, model_class):
fields = [
' '.join(getattr(model_class, fieldname).sql())
for fieldname in model_class._fieldnames
]

query = SQLiteAPI.create_table(model_class.__name__, fields)

with closing(self._connection.cursor()) as cursor:
cursor.execute(query)
self._connection.commit()

def delete(self, table, where=None):
query = SQLiteAPI.delete(table, where)

with closing(self._connection.cursor()) as cursor:
cursor.execute(query)
self._connection.commit()

def insert_into(self, query, values):
last_row_id = None

Expand All @@ -623,18 +665,6 @@ def select(self, tables, fields=None, where=None, count=None, offset=None):
with closing(self._connection.cursor()) as cursor:
return cursor.execute(query).fetchall()

def create_table(self, model_class):
fields = [
' '.join(getattr(model_class, fieldname).sql())
for fieldname in model_class._fieldnames
]

query = SQLiteAPI.create_table(model_class.__name__, fields)

with closing(self._connection.cursor()) as cursor:
cursor.execute(query)
self._connection.commit()

def register(self, *args):
try:
for model_class in args:
Expand Down
1 change: 0 additions & 1 deletion tests/test_criterion.py

This file was deleted.

21 changes: 21 additions & 0 deletions tests/test_deletquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from plume import *
from plume.plume import DeleteQuery
from utils import Trainer

import pytest

class TestDeleteQuery:

def test_is_slotted(self):
with pytest.raises(AttributeError):
DeleteQuery(model=Trainer).__dict__

def test_delete_all_rows(self):
pass

def test_delete_with_one_filter(self):
pass

def test_delete_with_several_filters(self):
pass

0 comments on commit 53ae007

Please sign in to comment.