Skip to content

Commit

Permalink
core: switch to new marshalling implentation
Browse files Browse the repository at this point in the history
see benchmarks/20230912-comparison-with-legacy.org for the justification
  • Loading branch information
karlicoss committed Sep 12, 2023
1 parent 985c9b2 commit 250f766
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
install_requires = [
'appdirs' , # default cache dir
'sqlalchemy>=1.0', # cache DB interaction
'orjson', # fast json serialization
]


Expand Down Expand Up @@ -49,7 +50,6 @@ def main() -> None:

'enlighten', # used in logging helper, but not really required

'orjson', # for now test only but may actually use soon
'cattrs', # benchmarking alternative marshalling implementation

'pyinstrument', # for profiling from within tests
Expand Down
41 changes: 27 additions & 14 deletions src/cachew/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@
import dataclasses
import warnings

try:
# orjson might not be available on some architectures, so let's make it defensive just in case
from orjson import loads as orjson_loads, dumps as orjson_dumps # pylint: disable=no-name-in-module
except:
warnings.warn("orjson couldn't be imported. It's _highly_ recommended for better caching performance")
def orjson_dumps(*args, **kwargs): # type: ignore[misc]
# sqlite needs a blob
return json.dumps(*args, **kwargs).encode('utf8')

orjson_loads = json.loads

import appdirs

Expand All @@ -27,8 +37,8 @@
except Exception as e:
logging.exception(e)

from .legacy import NTBinder
from .logging_helper import makeLogger
from .marshall.cachew import CachewMarshall
from .utils import (
is_primitive,
is_union,
Expand Down Expand Up @@ -142,11 +152,10 @@ def do_begin(conn):
self.meta = sqlalchemy.MetaData()
self.table_hash = Table('hash', self.meta, Column('value', sqlalchemy.String))

self.binder = NTBinder.make(tp=cls)
# actual cache
self.table_cache = Table('cache' , self.meta, *self.binder.columns)
self.table_cache = Table('cache' , self.meta, Column('data', sqlalchemy.BLOB))
# temporary table, we use it to insert and then (atomically?) rename to the above table at the very end
self.table_cache_tmp = Table('cache_tmp', self.meta, *self.binder.columns)
self.table_cache_tmp = Table('cache_tmp', self.meta, Column('data', sqlalchemy.BLOB))

def __enter__(self) -> 'DbHelper':
return self
Expand Down Expand Up @@ -463,8 +472,7 @@ def composite_hash(self, *args, **kwargs) -> Dict[str, Any]:
if k in hsig.parameters or 'kwargs' in hsig.parameters
}
kwargs = {**defaults, **kwargs}
binder = NTBinder.make(tp=self.cls_)
schema = str(binder.columns) # todo not super nice, but works fine for now
schema = str(self.cls_)
hash_parts = {
'cachew' : CACHEW_VERSION,
'schema' : schema,
Expand Down Expand Up @@ -547,7 +555,7 @@ def cachew_wrapper(
db.connection.begin():
# NOTE: deferred transaction
conn = db.connection
binder = db.binder
marshall = CachewMarshall(Type_=cls)
table_cache = db.table_cache
table_cache_tmp = db.table_cache_tmp

Expand Down Expand Up @@ -579,7 +587,9 @@ def cachew_wrapper(
def cached_items():
rows = conn.execute(table_cache.select())
for row in rows:
yield binder.from_row(row)
j = orjson_loads(row[0])
obj = marshall.load(j)
yield obj

if new_hash == old_hash:
logger.debug('hash matched: loading from cache')
Expand Down Expand Up @@ -683,8 +693,7 @@ def missing_keys(cached: List[str], wanted: List[str]) -> Optional[List[str]]:
def flush() -> None:
nonlocal chunk
if len(chunk) > 0:
# TODO hmm, it really doesn't work unless you zip into a dict first
# maybe should return dicts from binder instead then?
# TODO optimize this, we really don't need to make extra dicts here just to insert
chunk_dict = [
dict(zip(column_names, row))
for row in chunk
Expand All @@ -693,15 +702,17 @@ def flush() -> None:
chunk = []

total_objects = 0
for d in datas:
for obj in datas:
try:
total_objects += 1
yield d
yield obj
except GeneratorExit:
early_exit = True
return

chunk.append(binder.to_row(d))

dct = marshall.dump(obj)
j = orjson_dumps(dct)
chunk.append((j,))
if len(chunk) >= chunk_by:
flush()
flush()
Expand Down Expand Up @@ -732,6 +743,8 @@ def flush() -> None:
yield from func(*args, **kwargs)


from .legacy import NTBinder

__all__ = [
'cachew',
'CachewException',
Expand Down

0 comments on commit 250f766

Please sign in to comment.