-
Notifications
You must be signed in to change notification settings - Fork 0
/
cost_evaluation.py
197 lines (160 loc) · 7.47 KB
/
cost_evaluation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import datetime
import logging
from heuristic_advisor.heuristic_utils.what_if_index_creation import WhatIfIndexCreation
class CostEvaluation:
def __init__(self, db_connector, cost_estimation="whatif"):
logging.debug("Init cost evaluation")
self.db_connector = db_connector
self.cost_estimation = cost_estimation
logging.info("Cost estimation with " + self.cost_estimation)
self.what_if = WhatIfIndexCreation(db_connector)
self.current_indexes = set()
assert len(self.what_if.all_simulated_indexes()) == len(self.current_indexes)
self.cost_requests = 0
self.cache_hits = 0
# Cache structure:
# {(query_object, relevant_indexes): cost}
self.cache = {}
# Cache structure:
# {(query_object, relevant_indexes): (cost, plan)}
self.cache_plans = {}
self.completed = False
self.relevant_indexes_cache = {}
self.costing_time = datetime.timedelta(0)
def estimate_size(self, index):
# We must search in current indexes to get an index object with .hypopg_oid
result = None
for i in self.current_indexes:
if index == i:
result = i
break
if result:
# Index does currently exist and size can be queried
if not index.estimated_size:
index.estimated_size = self.what_if.estimate_index_size(result.hypopg_oid)
else:
self._simulate_or_create_index(index, store_size=True)
def which_indexes_utilized_and_cost(self, query, indexes):
# create / simulate the indexes that do not exist,
# drop / unsimulate the indexes that not in `indexes` but in `self.current_indexes`.
self._prepare_cost_calculation(indexes, store_size=True)
plan = self.db_connector.get_plan(query)
cost = plan["Total Cost"]
plan_str = str(plan)
recommended_indexes = set()
# We are iterating over the CostEvalution's indexes and not over `indexes`
# because it is not guaranteed that hypopg_name is set for all items in
# `indexes`. This is caused by _prepare_cost_calculation that only creates
# indexes which are not yet existing. If there is no hypothetical index
# created for an index object, there is no hypopg_name assigned to it. However,
# all items in current_indexes must also have an equivalent in `indexes`.
for index in self.current_indexes:
assert (
index in indexes
), "Something went wrong with _prepare_cost_calculation."
if index.hypopg_name not in plan_str:
continue
recommended_indexes.add(index)
return recommended_indexes, cost
def _prepare_cost_calculation(self, indexes, store_size=False):
# Creates the current index combination by
# simulating / creating missing indexes.
for index in set(indexes) - self.current_indexes:
self._simulate_or_create_index(index, store_size=store_size)
# Unsimulating / dropping indexes
# that exist but are not in the combination.
for index in self.current_indexes - set(indexes):
self._unsimulate_or_drop_index(index)
assert self.current_indexes == set(indexes)
def _simulate_or_create_index(self, index, store_size=False):
if self.cost_estimation == "whatif":
self.what_if.simulate_index(index, store_size=store_size)
elif self.cost_estimation == "actual_runtimes":
self.db_connector.create_index(index)
self.current_indexes.add(index)
def _unsimulate_or_drop_index(self, index):
if self.cost_estimation == "whatif":
self.what_if.drop_simulated_index(index)
elif self.cost_estimation == "actual_runtimes":
self.db_connector.drop_index(index)
self.current_indexes.remove(index)
def calculate_cost(self, workload, indexes, store_size=False):
assert (
self.completed is False
), "Cost Evaluation is completed and cannot be reused."
self._prepare_cost_calculation(indexes, store_size=store_size)
total_cost = 0
for query in workload.queries:
self.cost_requests += 1
total_cost += self._request_cache(query, indexes) * query.frequency
return total_cost
def calculate_cost_and_plans(self, workload, indexes, store_size=False):
assert (
self.completed is False
), "Cost Evaluation is completed and cannot be reused."
start_time = datetime.datetime.now()
self._prepare_cost_calculation(indexes, store_size=store_size)
total_cost = 0
plans = []
costs = []
for query in workload.queries:
self.cost_requests += 1
cost, plan = self._request_cache_plans(query, indexes)
total_cost += cost * query.frequency
plans.append(plan)
costs.append(cost)
end_time = datetime.datetime.now()
self.costing_time += end_time - start_time
return total_cost, plans, costs
def _get_cost(self, query):
if self.cost_estimation == "whatif":
return self.db_connector.get_cost(query)
elif self.cost_estimation == "actual_runtimes":
runtime = self.db_connector.exec_query(query)[0]
return runtime
def _get_cost_plan(self, query):
query_plan = self.db_connector.get_plan(query)
return query_plan["Total Cost"], query_plan
def complete_cost_estimation(self):
self.completed = True
for index in self.current_indexes.copy():
self._unsimulate_or_drop_index(index)
assert self.current_indexes == set()
def _request_cache(self, query, indexes):
q_i_hash = (query, frozenset(indexes))
if q_i_hash in self.relevant_indexes_cache:
relevant_indexes = self.relevant_indexes_cache[q_i_hash]
else:
relevant_indexes = self._relevant_indexes(query, indexes)
self.relevant_indexes_cache[q_i_hash] = relevant_indexes
# Check if query and corresponding relevant indexes in cache
if (query, relevant_indexes) in self.cache:
self.cache_hits += 1
return self.cache[(query, relevant_indexes)]
# If no cache hit request cost from database system
else:
cost = self._get_cost(query)
self.cache[(query, relevant_indexes)] = cost
return cost
def _request_cache_plans(self, query, indexes):
q_i_hash = (query, frozenset(indexes))
if q_i_hash in self.relevant_indexes_cache:
relevant_indexes = self.relevant_indexes_cache[q_i_hash]
else:
relevant_indexes = self._relevant_indexes(query, indexes)
self.relevant_indexes_cache[q_i_hash] = relevant_indexes
# Check if query and corresponding relevant indexes in cache
if (query, relevant_indexes) in self.cache:
self.cache_hits += 1
return self.cache[(query, relevant_indexes)]
# If no cache hit request cost from database system
else:
cost, plan = self._get_cost_plan(query)
self.cache[(query, relevant_indexes)] = (cost, plan)
return cost, plan
@staticmethod
def _relevant_indexes(query, indexes):
relevant_indexes = [
x for x in indexes if any(c in query.columns for c in x.columns)
]
return frozenset(relevant_indexes)