Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session Token Management APIs #36971

Open
wants to merge 54 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
2950e20
merge from main and resolve conflicts
Aug 14, 2024
7a1a1eb
remove async keyword from changeFeed query in aio package
Aug 18, 2024
b6c53fb
refactor
Aug 18, 2024
5f16b14
refactor
Aug 18, 2024
36990ef
fix pylint
Aug 20, 2024
3c569e8
added public surface methods
tvaron3 Aug 20, 2024
7479b0c
pylint fix
Aug 20, 2024
2e76620
fix
Aug 21, 2024
56bbb9e
added functionality for merging session tokens from logical pk
tvaron3 Aug 21, 2024
8c0aa46
fix mypy
Aug 21, 2024
28394b9
added tests for basic merge and split
tvaron3 Aug 21, 2024
25c3363
resolve comments
Aug 27, 2024
cecdfa5
resolve comments
Aug 28, 2024
65ed132
resolve comments
Aug 28, 2024
4bb30d2
resolve comments
Aug 28, 2024
5addcdc
fix pylint
Aug 29, 2024
59814d7
fix mypy
Aug 29, 2024
ec79b94
merge feed range changes
tvaron3 Aug 22, 2024
66c3f7b
fix tests
Sep 4, 2024
1e7a268
merged with feed range branch
tvaron3 Sep 4, 2024
997b6b0
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Sep 4, 2024
7eda72f
Merge branch 'main' into addFeedRangeSupportInChangeFeed
Sep 4, 2024
3a2e4e1
add tests
Sep 5, 2024
0883dac
fix pylint
Sep 5, 2024
b7d1210
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 Sep 5, 2024
195c47c
fix and resolve comments
Sep 6, 2024
246b1be
fix and resolve comments
Sep 6, 2024
10fe387
Added isSubsetFeedRange logic
tvaron3 Sep 9, 2024
6498311
Added request context to crud operations, session token helpers
tvaron3 Sep 11, 2024
5a13ddf
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 Sep 11, 2024
f5d0d7b
Merge branch 'main' into addFeedRangeSupportInChangeFeed
Sep 13, 2024
5cde59b
revert unnecessary change
Sep 13, 2024
a494346
Added more tests
tvaron3 Sep 20, 2024
0d75607
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Sep 20, 2024
c8c099f
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 Sep 20, 2024
ad3ae4f
Added more tests
tvaron3 Oct 5, 2024
8f466a1
merge with main
tvaron3 Oct 6, 2024
5249d0a
Changed tests to use new public feed range and more test coverage for…
tvaron3 Oct 6, 2024
40523f5
Added more tests
tvaron3 Oct 7, 2024
9f88b4e
Fix tests and add changelog
tvaron3 Oct 7, 2024
7c23e87
fix spell checks
tvaron3 Oct 7, 2024
4d0b058
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Oct 7, 2024
d7c598e
Added tests and pushed request context to client level
tvaron3 Oct 8, 2024
8698098
Added async methods and removed feed range from request context
tvaron3 Oct 8, 2024
c252d88
fix tests
tvaron3 Oct 9, 2024
51e721b
fix tests and pylint
tvaron3 Oct 9, 2024
923055b
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Oct 9, 2024
104e341
Reacting to comments
tvaron3 Oct 10, 2024
5552912
Reacting to comments
tvaron3 Oct 10, 2024
1bbbd0f
pylint and added hpk tests
tvaron3 Oct 10, 2024
a9299ab
reacting to comments
tvaron3 Oct 11, 2024
2155016
fix tests and mypy
tvaron3 Oct 11, 2024
0436355
fix mypy
tvaron3 Oct 11, 2024
103eb41
fix mypy
tvaron3 Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Added option to disable write payload on writes. See [PR 37365](https://github.com/Azure/azure-sdk-for-python/pull/37365)
* Added get feed ranges API. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)
* Added feed range support in `query_items_change_feed`. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)
* Added helper APIs for managing session tokens. See [PR 36971](https://github.com/Azure/azure-sdk-for-python/pull/36971)

#### Bugs Fixed
* Consolidated Container Properties Cache to be in the Client to cache partition key definition and container rid to avoid unnecessary container reads. See [PR 35731](https://github.com/Azure/azure-sdk-for-python/pull/35731).
Expand Down
85 changes: 74 additions & 11 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Large diffs are not rendered by default.

29 changes: 27 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ def _compare_helper(a, b):

@staticmethod
def overlaps(range1, range2):

if range1 is None or range2 is None:
return False
if range1.isEmpty() or range2.isEmpty():
Expand All @@ -195,10 +194,36 @@ def overlaps(range1, range2):
cmp1 = Range._compare_helper(range1.min, range2.max)
cmp2 = Range._compare_helper(range2.min, range1.max)

if cmp1 <= 0 or cmp2 <= 0:
if cmp1 <= 0 and cmp2 <= 0:
if (cmp1 == 0 and not (range1.isMinInclusive and range2.isMaxInclusive)) or (
cmp2 == 0 and not (range2.isMinInclusive and range1.isMaxInclusive)
):
return False
return True
return False

def can_merge(self, other):
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
if self.isSingleValue() and other.isSingleValue():
return self.min == other.min
# if share the same boundary, they can merge
overlap_boundary1 = self.max == other.min and self.isMaxInclusive or other.isMinInclusive
overlap_boundary2 = other.max == self.min and other.isMaxInclusive or self.isMinInclusive
if overlap_boundary1 or overlap_boundary2:
return True
return self.overlaps(self, other)

def merge(self, other):
if not self.can_merge(other):
raise ValueError("Ranges do not overlap")
min_val = self.min if self.min < other.min else other.min
max_val = self.max if self.max > other.max else other.max
is_min_inclusive = self.isMinInclusive if self.min < other.min else other.isMinInclusive
is_max_inclusive = self.isMaxInclusive if self.max > other.max else other.isMaxInclusive
return Range(min_val, max_val, is_min_inclusive, is_max_inclusive)

def is_subset(self, parent_range) -> bool:
normalized_parent_range = parent_range.to_normalized_range()
normalized_child_range = self.to_normalized_range()
return normalized_parent_range.contains(normalized_child_range.min) and \
(normalized_parent_range.contains(normalized_child_range.max)
or normalized_parent_range.max == normalized_child_range.max)
191 changes: 191 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_session_token_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Internal Helper functions for manipulating session tokens.
"""
from azure.cosmos._routing.routing_range import Range
from azure.cosmos._vector_session_token import VectorSessionToken

# pylint: disable=protected-access

def merge_session_tokens_with_same_range(session_token1, session_token2):
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
pk_range_id1, vector_session_token1 = parse_session_token(session_token1)
pk_range_id2, vector_session_token2 = parse_session_token(session_token2)
pk_range_id = pk_range_id1
if pk_range_id1 != pk_range_id2:
pk_range_id = pk_range_id1 \
if vector_session_token1.is_greater(vector_session_token2) else pk_range_id2
vector_session_token = vector_session_token1.merge(vector_session_token2)
return pk_range_id + ":" + vector_session_token.session_token

def is_compound_session_token(session_token):
return "," in session_token

def parse_session_token(session_token):
tokens = session_token.split(":")
return tokens[0], VectorSessionToken.create(tokens[1])

def split_compound_session_tokens(compound_session_tokens):
session_tokens = []
for _, session_token in compound_session_tokens:
if is_compound_session_token(session_token):
tokens = session_token.split(",")
for token in tokens:
session_tokens.append(token)
else:
session_tokens.append(session_token)
return session_tokens

def merge_session_tokens_with_same_pkrangeid(session_tokens):
i = 0
while i < len(session_tokens):
j = i + 1
while j < len(session_tokens):
pk_range_id1, vector_session_token1 = parse_session_token(session_tokens[i])
pk_range_id2, vector_session_token2 = parse_session_token(session_tokens[j])
if pk_range_id1 == pk_range_id2:
vector_session_token = vector_session_token1.merge(vector_session_token2)
session_tokens.append(pk_range_id1 + ":" + vector_session_token.session_token)
remove_session_tokens = [session_tokens[i], session_tokens[j]]
for token in remove_session_tokens:
session_tokens.remove(token)
i = -1
break
j += 1
i += 1

return session_tokens

def merge_ranges_with_subsets(overlapping_ranges):
processed_ranges = []
while len(overlapping_ranges) != 0:
feed_range_cmp, session_token_cmp = overlapping_ranges[0]
# compound session tokens are not considered for merging
if is_compound_session_token(session_token_cmp):
processed_ranges.append(overlapping_ranges[0])
overlapping_ranges.remove(overlapping_ranges[0])
continue
_, vector_session_token_cmp = parse_session_token(session_token_cmp)
subsets = []
# finding the subset feed ranges of the current feed range
for j in range(1, len(overlapping_ranges)):
feed_range = overlapping_ranges[j][0]
if not is_compound_session_token(overlapping_ranges[j][1]) and \
feed_range.is_subset(feed_range_cmp):
subsets.append(overlapping_ranges[j] + (j,))

# go through subsets to see if can create current feed range from the subsets
not_found = True
j = 0
while not_found and j < len(subsets):
merged_range = subsets[j][0]
session_tokens = [subsets[j][1]]
merged_indices = [subsets[j][2]]
if len(subsets) == 1:
_, vector_session_token = parse_session_token(session_tokens[0])
if vector_session_token_cmp.is_greater(vector_session_token):
overlapping_ranges.remove(overlapping_ranges[merged_indices[0]])
else:
for k, subset in enumerate(subsets):
if j == k:
continue
if merged_range.can_merge(subset[0]):
merged_range = merged_range.merge(subset[0])
session_tokens.append(subset[1])
merged_indices.append(subset[2])
if feed_range_cmp == merged_range:
# if feed range can be created from the subsets
# take the subsets if their global lsn is larger
# else take the current feed range
children_more_updated = True
for session_token in session_tokens:
_, vector_session_token = parse_session_token(session_token)
if vector_session_token_cmp.is_greater(vector_session_token):
children_more_updated = False
feed_ranges_to_remove = [overlapping_ranges[i] for i in merged_indices]
for feed_range_to_remove in feed_ranges_to_remove:
overlapping_ranges.remove(feed_range_to_remove)
if children_more_updated:
overlapping_ranges.append((merged_range, ','.join(map(str, session_tokens))))
overlapping_ranges.remove(overlapping_ranges[0])
not_found = False
break

j += 1

processed_ranges.append(overlapping_ranges[0])
overlapping_ranges.remove(overlapping_ranges[0])
return processed_ranges

def get_updated_session_token(feed_ranges_to_session_tokens, target_feed_range):
target_feed_range_normalized = target_feed_range._feed_range_internal.get_normalized_range()
# filter out tuples that overlap with target_feed_range and normalizes all the ranges
overlapping_ranges = [(feed_range_to_session_token[0]._feed_range_internal.get_normalized_range(),
feed_range_to_session_token[1])
for feed_range_to_session_token in feed_ranges_to_session_tokens if Range.overlaps(
target_feed_range_normalized, feed_range_to_session_token[0]._feed_range_internal.get_normalized_range())]
# Is there a feed_range that is a superset of some of the other feed_ranges excluding tuples
# with compound session tokens?
if len(overlapping_ranges) == 0:
raise ValueError('There were no overlapping feed ranges with the target.')

# merge any session tokens that are the same exact feed range
i = 0
j = 1
while i < len(overlapping_ranges) and j < len(overlapping_ranges):
cur_feed_range = overlapping_ranges[i][0]
session_token = overlapping_ranges[i][1]
session_token_1 = overlapping_ranges[j][1]
if (not is_compound_session_token(session_token) and
not is_compound_session_token(overlapping_ranges[j][1]) and
cur_feed_range == overlapping_ranges[j][0]):
session_token = merge_session_tokens_with_same_range(session_token, session_token_1)
feed_ranges_to_remove = [overlapping_ranges[i], overlapping_ranges[j]]
for feed_range_to_remove in feed_ranges_to_remove:
overlapping_ranges.remove(feed_range_to_remove)
overlapping_ranges.append((cur_feed_range, session_token))
i, j = 0, 1
else:
j += 1
if j == len(overlapping_ranges):
i += 1
j = i + 1

# checking for merging of feed ranges that can be created from other feed ranges
processed_ranges = merge_ranges_with_subsets(overlapping_ranges)

# break up session tokens that are compound
remaining_session_tokens = split_compound_session_tokens(processed_ranges)

if len(remaining_session_tokens) == 1:
return remaining_session_tokens[0]
# merging any session tokens with same physical partition key range id
remaining_session_tokens = merge_session_tokens_with_same_pkrangeid(remaining_session_tokens)

updated_session_token = ""
# compound the remaining session tokens
for i in range(len(remaining_session_tokens)):
if i == len(remaining_session_tokens) - 1:
updated_session_token += remaining_session_tokens[i]
else:
updated_session_token += remaining_session_tokens[i] + ","

return updated_session_token
5 changes: 5 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_vector_session_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ def equals(self, other):
and self.are_region_progress_equal(other.local_lsn_by_region)
)

def is_greater(self, other):
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
if self.global_lsn > other.global_lsn:
return True
return False

def merge(self, other):
if other is None:
raise ValueError("Invalid Session Token (should not be None)")
Expand Down
Loading
Loading