Skip to content

Commit

Permalink
support for synthetic_key, makes it possible to reuse cached 'prefix'…
Browse files Browse the repository at this point in the history
… without repeat computations
  • Loading branch information
karlicoss committed Apr 11, 2022
1 parent c54e1ed commit 1702b5b
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 33 deletions.
132 changes: 102 additions & 30 deletions src/cachew/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ def cachew(
# you can use 'test_many' to experiment
# - too small values (e.g. 10) are slower than 100 (presumably, too many sql statements)
# - too large values (e.g. 10K) are slightly slower as well (not sure why?)
synthetic_key: Optional[str]=None,
**kwargs,
):
r"""
Expand Down Expand Up @@ -793,13 +794,14 @@ def cachew(
# TODO not sure if should be more serious error...

ctx = Context(
func =func,
cache_path=cache_path,
force_file=force_file,
cls =cls,
depends_on=depends_on,
logger =logger,
chunk_by =chunk_by,
func =func,
cache_path =cache_path,
force_file =force_file,
cls =cls,
depends_on =depends_on,
logger =logger,
chunk_by =chunk_by,
synthetic_key=synthetic_key,
)

# hack to avoid extra stack frame (see test_recursive, test_deep-recursive)
Expand All @@ -816,16 +818,23 @@ def cname(func: Callable) -> str:
return f'{mod}:{func.__qualname__}'


_CACHEW_CACHED = 'cachew_cached' # TODO add to docs
_SYNTHETIC_KEY = 'synthetic_key'
_SYNTHETIC_KEY_VALUE = 'synthetic_key_value'
_DEPENDENCIES = 'dependencies'


class Context(NamedTuple):
func : Callable
cache_path: PathProvider
force_file: bool
cls : Type
depends_on: HashFunction
logger : logging.Logger
chunk_by : int

def composite_hash(self, *args, **kwargs) -> SourceHash:
func : Callable
cache_path : PathProvider
force_file : bool
cls : Type
depends_on : HashFunction
logger : logging.Logger
chunk_by : int
synthetic_key: Optional[str]

def composite_hash(self, *args, **kwargs) -> Dict[str, Any]:
fsig = inspect.signature(self.func)
# defaults wouldn't be passed in kwargs, but they can be an implicit dependency (especially inbetween program runs)
defaults = {
Expand All @@ -846,9 +855,16 @@ def composite_hash(self, *args, **kwargs) -> SourceHash:
hash_parts = {
'cachew' : CACHEW_VERSION,
'schema' : schema,
'dependencies': str(self.depends_on(*args, **kwargs)),
_DEPENDENCIES : str(self.depends_on(*args, **kwargs)),
}
return json.dumps(hash_parts)
synthetic_key = self.synthetic_key
if synthetic_key is not None:
hash_parts[_SYNTHETIC_KEY ] = synthetic_key
hash_parts[_SYNTHETIC_KEY_VALUE] = kwargs[synthetic_key]
# FIXME assert it's in kwargs in the first place?
# FIXME support positional args too? maybe extract the name from signature somehow? dunno
# need to test it
return hash_parts


def cachew_wrapper(
Expand All @@ -857,13 +873,14 @@ def cachew_wrapper(
**kwargs,
):
C = _cachew_context
func = C.func
cache_path = C.cache_path
force_file = C.force_file
cls = C.cls
depend_on = C.depends_on
logger = C.logger
chunk_by = C.chunk_by
func = C.func
cache_path = C.cache_path
force_file = C.force_file
cls = C.cls
depends_on = C.depends_on
logger = C.logger
chunk_by = C.chunk_by
synthetic_key = C.synthetic_key

cn = cname(func)
if not settings.ENABLE:
Expand Down Expand Up @@ -909,7 +926,8 @@ def cachew_wrapper(

logger.debug('using %s for db cache', dbp)

new_hash = C.composite_hash(*args, **kwargs); assert new_hash is not None # just in case
new_hash_d = C.composite_hash(*args, **kwargs)
new_hash = json.dumps(new_hash_d)
logger.debug('new hash: %s', new_hash)

with DbHelper(dbp, cls) as db, \
Expand Down Expand Up @@ -940,21 +958,75 @@ def cachew_wrapper(

logger.debug('old hash: %s', old_hash)

if new_hash == old_hash:
logger.debug('hash matched: loading from cache')

def cached_items():
rows = conn.execute(table_cache.select())
for row in rows:
yield binder.from_row(row)

if new_hash == old_hash:
logger.debug('hash matched: loading from cache')
yield from cached_items()
return

logger.debug('hash mismatch: computing data and writing to db')

if synthetic_key is not None:
# attempt to use existing cache if possible, as a 'prefix'

old_hash_d: Dict[str, Any] = {}
if old_hash is not None:
try:
old_hash_d = json.loads(old_hash)
except json.JSONDecodeError:
# possible if we used old cachew version (<=0.8.1), hash wasn't json
pass

hash_diffs = {
k: new_hash_d.get(k) == old_hash_d.get(k)
for k in (*new_hash_d.keys(), *old_hash_d.keys())
# the only 'allowed' differences for hash, otherwise need to recompute (e.g. if schema changed)
if k not in {_SYNTHETIC_KEY_VALUE, _DEPENDENCIES}
}
cache_compatible = all(hash_diffs.values())
if cache_compatible:
def missing_keys(cached: List[str], wanted: List[str]) -> Optional[List[str]]:
# FIXME assert both cached and wanted are sorted? since we rely on it
# if not, then the user could use some custom key for caching (e.g. normalise filenames etc)
# although in this case passing it into the function wouldn't make sense?

if len(cached) == 0:
# no point trying to reuse anything, cache should be empty?
return None
if len(wanted) == 0:
# similar, no way to reuse cache
return None
if cached[0] != wanted[0]:
# there is no common prefix, so no way to reuse cache really
return None
last_cached = cached[-1]
# ok, now actually figure out which items are missing
for i, k in enumerate(wanted):
if k > last_cached:
# ok, rest of items are missing
return wanted[i:]
# otherwise too many things are cached, and we seem to wante less
return None

new_values: List[str] = new_hash_d[_SYNTHETIC_KEY_VALUE]
old_values: List[str] = old_hash_d[_SYNTHETIC_KEY_VALUE]
missing = missing_keys(cached=old_values, wanted=new_values)
if missing is not None:
# can reuse cache
kwargs[_CACHEW_CACHED] = cached_items()
kwargs[synthetic_key] = missing


# NOTE on recursive calls
# somewhat magically, they should work as expected with no extra database inserts?
# the top level call 'wins' the write transaction and once it's gathered all data, will write it
# the 'intermediate' level calls fail to get it and will pass data through
# the cached 'bottom' level is read only and will be yielded withotu a write transaction

# the cached 'bottom' level is read only and will be yielded without a write transaction
try:
# first 'write' statement will upgrade transaction to write transaction which might fail due to concurrency
# see https://www.sqlite.org/lang_transaction.html
Expand Down
98 changes: 95 additions & 3 deletions src/cachew/tests/test_cachew.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, date, timezone
import inspect
from itertools import islice
from itertools import islice, chain
import logging
import os
from pathlib import Path
Expand All @@ -10,9 +10,9 @@
import sys
import time
import timeit
from typing import NamedTuple, Iterator, Optional, List, Set, Tuple, cast, Iterable, Dict, Any, Union
from typing import NamedTuple, Iterator, Optional, List, Set, Tuple, cast, Iterable, Dict, Any, Union, Sequence

from more_itertools import one, ilen, last
from more_itertools import one, ilen, last, unique_everseen

import pytz
import pytest # type: ignore
Expand Down Expand Up @@ -1183,3 +1183,95 @@ def fun():
r = run(['python3', '-c', prog], cwd=tmp_path, stderr=PIPE, stdout=PIPE, check=True)
assert r.stdout.strip() == b'FINISHED'
assert b'Traceback' not in r.stderr


# tests both modes side by side to demonstrate the difference
@pytest.mark.parametrize('use_synthetic', ['False', 'True'])
def test_synthetic_keyset(tmp_path: Path, use_synthetic: bool) -> None:
# just to keep track of which data we had to compute from scratch
_recomputed: List[str] = []

# assume key i is responsible for numbers i and i-1
# in reality this could be some slow function we'd like to avoid calling if its results is already cached
# e.g. the key would typically be a filename (e.g. isoformat timestamp)
# and the returned values could be the results of an export over the month prior to the timestamp, or something like that
# see https://beepb00p.xyz/exports.html#synthetic for more on the motivation
def compute(key: str) -> Iterator[str]:
_recomputed.append(key)
n = int(key)
yield str(n - 1)
yield str(n)


# should result in 01 + 12 + 45 == 01245
keys125 = ['1', '2', '5' ]
# should result in 01 + 12 + 45 + 56 + 67 == 0124567
keys12567 = ['1', '2', '5', '6', '7' ]
# should result in 01 + 12 + 45 + 56 + 78 + 89 == 012456789
keys125689 = ['1', '2', '5', '6', '8', '9']
# should result in 45 + 56 + 78 + 89 == 456789
keys5689 = [ '5', '6', '8', '9']


def recomputed() -> List[str]:
r = list(_recomputed)
_recomputed.clear()
return r


## 'cachew_cached' will just be [] if synthetic key is not used, so no impact on data
@cachew(tmp_path, synthetic_key=('keys' if use_synthetic else None))
def fun_aux(keys: Sequence[str], *, cachew_cached: Iterable[str] = []) -> Iterator[str]:
yield from unique_everseen(chain(
cachew_cached,
*(compute(key) for key in keys),
))

def fun(keys: Sequence[str]) -> Set[str]:
return set(fun_aux(keys=keys))
##

assert fun(keys125) == set('01' '12' '45')
assert recomputed() == keys125
assert fun(keys125) == set('01' '12' '45')
assert recomputed() == [] # should be cached

assert fun(keys12567) == set('01' '12' '45' '56' '67')
if use_synthetic:
# 1, 2 and 5 should be already cached from the previous call
assert recomputed() == ['6', '7']
else:
# but without synthetic key this would cause everything to recompute
assert recomputed() == keys12567
assert fun(keys12567) == set('01' '12' '45' '56' '67')
assert recomputed() == [] # should be cached

assert fun(keys125689) == set('01' '12' '45' '56' '78' '89')
if use_synthetic:
# similarly, 1 2 5 6 7 are cached from the previous cacll
assert recomputed() == ['8', '9']
else:
# and we need to call against all keys otherwise
assert recomputed() == keys125689
assert fun(keys125689) == set('01' '12' '45' '56' '78' '89')
assert recomputed() == [] # should be cached

assert fun(keys5689) == set('45' '56' '78' '89')
# now the prefix has changed, so if we returned cached items it might return too much
# so have to recompute everything
assert recomputed() == keys5689
assert fun(keys5689) == set('45' '56' '78' '89')
assert recomputed() == [] # should be cached

# TODO maybe call combined function? so it could return total result and last cached?
# TODO another option is:
# the function yields all cached stuff first
# then the user yields stuff from new
# and then external function does merging
# TODO test with kwargs hash?...
# TODO try without and with simultaneously?
# TODO check what happens when errors happen?
# FIXME check what happens if we switch between modes? (synthetic/non-synthetic)
# FIXME make sure this thing works if len(keys) > chunk size?
# TODO check what happens when we forget to set 'cachew_cached' argument
# TODO check what happens when keys are not str but e.g. Path

0 comments on commit 1702b5b

Please sign in to comment.