forked from HumanSignal/label-studio-ml-backend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.py
110 lines (94 loc) · 3.54 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import sqlite3
from abc import ABC, abstractmethod
from threading import Lock
from functools import lru_cache
class BaseCache(ABC):
def __init__(self, path):
self.path = path
@abstractmethod
def __getitem__(self, project_id_key: tuple):
"""
Get value from cache
:param project_id_key: tuple (project_id, key)
:return:
"""
@abstractmethod
def __setitem__(self, project_id_key: tuple, value):
"""
Set value to cache
:param project_id_key: tuple (project_id, key)
:param value:
:return:
"""
@abstractmethod
def __contains__(self, project_id_key: tuple):
"""
Check if value exists in cache
:param project_id_key: tuple (project_id, key)
:return:
"""
@abstractmethod
def __delitem__(self, project_id_key: tuple):
"""
Delete value from cache
:param project_id_key: tuple (project_id, key)
:return:
"""
class SqliteCache(BaseCache):
def __init__(self, path: str, db_name: str = 'cache.db'):
super(SqliteCache, self).__init__(path)
os.makedirs(self.path, exist_ok=True)
self.db_name = os.path.join(self.path, db_name)
self.lock = Lock()
# Establish a connection and create table if it doesn't exist
with self.lock, sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS cache (
project_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (project_id, key)
);
''')
@lru_cache(maxsize=100)
def __getitem__(self, project_id_key):
project_id, key = project_id_key
with self.lock, sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute(
'SELECT value FROM cache WHERE project_id = ? AND key = ?;',
(project_id, key))
result = cursor.fetchone()
if result is None:
return result
return result[0]
def __setitem__(self, project_id_key, value):
project_id, key = project_id_key
if not isinstance(value, str):
raise ValueError('Value must be a string')
with self.lock, sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('REPLACE INTO cache (project_id, key, value) VALUES (?, ?, ?);',
(project_id, key, value))
self.__getitem__.cache_clear()
def __delitem__(self, project_id_key):
project_id, key = project_id_key
with self.lock, sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM cache WHERE project_id = ? AND key = ?;',
(project_id, key))
self.__getitem__.cache_clear()
def __contains__(self, project_id_key):
project_id, key = project_id_key
with self.lock, sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('SELECT 1 FROM cache WHERE project_id = ? AND key = ?;',
(project_id, key))
return cursor.fetchone() is not None
def create_cache(cache_type, path, **kwargs):
if cache_type == 'sqlite':
return SqliteCache(path, **kwargs)
else:
raise ValueError(f"Unsupported cache type: {cache_type}")