Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

addFeedRangesAndUseFeedRangeInQueryChangeFeed #37687

Merged
merged 19 commits into from
Oct 4, 2024

Conversation

xinlian12
Copy link
Member

@xinlian12 xinlian12 commented Oct 2, 2024

This PR merges PR #36930 and PR #37444

Description

In this PR, we added support for getting the feedRanges of the container and added support for getting change feed by using feed ranges.

Getting feed ranges from container

feedRange - represents a set of co-located logical partitions. For each container, customer will be able to get a list of feedRanges(each representing a hash-range of single physical partition). For query(future) and changeFeed(in this PR), SDK will allow customers to config a feedRange(can map to a single physical partition, span of multiple physical partitions or a subset of single physical partition) to filter the results.

Examples:
created_collection.read_feed_ranges()
await created_collection.read_feed_ranges()

Adding feed range support in change feed query

There are few issues with current changeFeed query API:
image

  • It allows customer to pass in a physical partition id for filtering the results, however there is no official public contract exists today for customer to acquire the physical partition id.
  • Physical partition id is an internal concept which from SDK perspective, we would like to keep as internal implementation details.
  • Currently, the continuation token being returned is simple _lsn, which is not split/merge proof. A new format continuation token will need to be returned
  • Does not support all change feed mode(Not included in this PR, will add in following PR)

Based on the above considerations, except adding feedRange support, we are also going to deprecate partition_key_range_id and is_start_from_beginning parameter. The API will be changed into the following:

 @overload
    def query_items_change_feed(
            self,
            *,
            max_item_count: Optional[int] = None,
            start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
            partition_key: PartitionKeyType,
            priority: Optional[Literal["High", "Low"]] = None,
            **kwargs: Any
    ) -> ItemPaged[Dict[str, Any]]:
  @overload
    def query_items_change_feed(
            self,
            *,
            feed_range:FeedRange,
            max_item_count: Optional[int] = None,
            start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
            priority: Optional[Literal["High", "Low"]] = None,
            **kwargs: Any
    ) -> ItemPaged[Dict[str, Any]]:
    @overload
    def query_items_change_feed(
            self,
            *,
            continuation: str,
            max_item_count: Optional[int] = None,
            priority: Optional[Literal["High", "Low"]] = None,
            **kwargs: Any
    ) -> ItemPaged[Dict[str, Any]]:
    @overload
    def query_items_change_feed(
            self,
            *,
            max_item_count: Optional[int] = None,
            start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
            priority: Optional[Literal["High", "Low"]] = None,
            **kwargs: Any
    ) -> ItemPaged[Dict[str, Any]]:
    @distributed_trace
    def query_items_change_feed(
            self,
            *args: Any,
            **kwargs: Any
    ) -> ItemPaged[Dict[str, Any]]:

Notes:
continuation token v1 -> old/existing formatted continuation token -> simple _lsn format, for example "3"
continuation token v2 -> new formatted continuation token ,it will contain information including containerRid, changeFeed mode, changeFeed start from, and a list of continuationTokens for each sub-range
continuation token when filter by pk value

eyJ2IjogInYyIiwiY29udGFpbmVyUmlkIjogIk1TVXRBTEVqc2FzPSIsICJtb2RlIjogIkluY3JlbWVudGFsIiwgInN0YXJ0RnJvbSI6IHsiVHlwZSI6ICJCZWdpbm5pbmcifSwiY29udGludWF0aW9uIjogeyJ2IjogInYyIiwicmlkIjogIk1TVXRBTEVqc2FzPSIsImNvbnRpbnVhdGlvbiI6IFsgeyAidG9rZW4iOiAiXCIyXCIiLCJyYW5nZSI6IHsibWluIjogIiIsICJtYXgiOiAiRkYiLCJpc01pbkluY2x1c2l2ZSI6IHRydWUsImlzTWF4SW5jbHVzaXZlIjogZmFsc2UgfSB9IF0sIlBLIjogJ3BrJyB9fQ==

continuation token when filter by feed range

eyJ2IjogInYyIiwiY29udGFpbmVyUmlkIjogIk1TVXRBTEVqc2FzPSIsIm1vZGUiOiAiSW5jcmVtZW50YWwiLCJzdGFydEZyb20iOiB7IlR5cGUiOiAiQmVnaW5uaW5nIiB9LCJjb250aW51YXRpb24iOiB7InYiOiAidjIiLCJyaWQiOiAiTVNVdEFMRWpzYXM9IiwiY29udGludWF0aW9uIjogWyB7InRva2VuIjogIlwiMlwiIiwicmFuZ2UiOiB7Im1pbiI6ICIiLCJtYXgiOiAiRkYiLCJpc01pbkluY2x1c2l2ZSI6IHRydWUsImlzTWF4SW5jbHVzaXZlIjogZmFsc2UgfSB9XSwiUmFuZ2UiOiB7Im1pbiI6ICIiLCJtYXgiOiAiRkYiLCAiaXNNaW5JbmNsdXNpdmUiOiB0cnVlLCAiaXNNYXhJbmNsdXNpdmUiOiBmYWxzZSB9fX0=
  • Filter by physical partition id will still be supported, but with the continuation token v1 version being returned, it will not split/merge safe
  • When filter by partition key, if it starts with continuation token none, then new formatted continuation token will be returned. If continuation token v1 is being used, then continuation token v1 version will continue being returned. Otherwise, it will return continuation token v2 version
  • When filter by feed range, if continuation token v1 is being used, an exception will be thrown

Split flow

Continuation token before split:

eyAidiI6ICJ2MiIsImNvbnRhaW5lclJpZCI6ICJNU1V0QUxFanNhcz0iLCJtb2RlIjogIkluY3JlbWVudGFsIiwgInN0YXJ0RnJvbSI6IHsiVHlwZSI6ICJCZWdpbm5pbmcifSwiY29udGludWF0aW9uIjogeyJ2IjogInYyIiwicmlkIjogIk1TVXRBTEVqc2FzPSIsImNvbnRpbnVhdGlvbiI6IFt7InRva2VuIjogIlwiMlwiIiwgInJhbmdlIjogeyJtaW4iOiAiIiwibWF4IjogIkZGIiwiaXNNaW5JbmNsdXNpdmUiOiB0cnVlLCJpc01heEluY2x1c2l2ZSI6IGZhbHNlfSB9XSwiUmFuZ2UiOiB7ICJtaW4iOiAiIiwibWF4IjogIkZGIiwgImlzTWluSW5jbHVzaXZlIjogdHJ1ZSwiaXNNYXhJbmNsdXNpdmUiOiBmYWxzZX19fQ==

Continuation token after split:

eyAidiI6ICJ2MiIsImNvbnRhaW5lclJpZCI6ICJNU1V0QUxFanNhcz0iLCJtb2RlIjogIkluY3JlbWVudGFsIiwic3RhcnRGcm9tIjogeyAiVHlwZSI6ICJCZWdpbm5pbmciIH0sImNvbnRpbnVhdGlvbiI6IHsgInYiOiAidjIiLCJyaWQiOiAiTVNVdEFMRWpzYXM9IiwiY29udGludWF0aW9uIjogWyB7InRva2VuIjogIlwiMTFcIiIsICJyYW5nZSI6IHsibWluIjogIiIsICJtYXgiOiAiMUZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkYiLCJpc01pbkluY2x1c2l2ZSI6IHRydWUsICJpc01heEluY2x1c2l2ZSI6IGZhbHNlfSB9LCB7InRva2VuIjogIlwiMTRcIiIsInJhbmdlIjogeyJtaW4iOiAiMUZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkZGRkYiLCAibWF4IjogIkZGIiwiaXNNaW5JbmNsdXNpdmUiOiB0cnVlLCJpc01heEluY2x1c2l2ZSI6IGZhbHNlIH0gfSBdLCJSYW5nZSI6IHsibWluIjogIiIsICJtYXgiOiAiRkYiLCJpc01pbkluY2x1c2l2ZSI6IHRydWUsImlzTWF4SW5jbHVzaXZlIjogZmFsc2UgfSB9fQ==

What happens when query change feed from multiple partitions

The continuation token will contain a list of tokens for each partition, changes will be read from each partition in round-robin fashion.

using the continuation token from the above split example, it includes to tokens. one for range ["", 1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF), one for [1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, FF). It will read changes from
["", 1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF), then read changes from [1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, FF) and continue the pattern.

Future changes

  • Add full fidelity mode support
  • Add API so customer can re-load balancing based on the existing continuation tokens

@xinlian12 xinlian12 requested review from annatisch and a team as code owners October 2, 2024 16:35
@github-actions github-actions bot added the Cosmos label Oct 2, 2024
@xinlian12 xinlian12 force-pushed the users/xinlian/feature/feedRangeAndChangeFeed branch from b75c260 to 659e439 Compare October 2, 2024 16:45
@azure-sdk
Copy link
Collaborator

API change check

APIView has identified API level changes in this PR and created following API reviews.

azure-cosmos

@xinlian12
Copy link
Member Author

/azp run python - cosmos - tests

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12
Copy link
Member Author

/azp run python - cosmos - tests

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Copy link
Member

@tvaron3 tvaron3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Thanks

@xinlian12 xinlian12 merged commit 4a136a4 into main Oct 4, 2024
27 checks passed
@xinlian12 xinlian12 deleted the users/xinlian/feature/feedRangeAndChangeFeed branch October 4, 2024 15:29
w-javed pushed a commit to w-javed/azure-sdk-for-python that referenced this pull request Oct 4, 2024
* Add getFeedRanges API 
* Add feedRange support in query changeFeed


Co-authored-by: annie-mac <xinlian@microsoft.com>
w-javed pushed a commit to w-javed/azure-sdk-for-python that referenced this pull request Oct 4, 2024
* Add getFeedRanges API 
* Add feedRange support in query changeFeed


Co-authored-by: annie-mac <xinlian@microsoft.com>
from azure.cosmos.aio import _retry_utility_async
from azure.cosmos.exceptions import CosmosHttpResponseError

# pylint: disable=protected-access
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove if this is not in use

Please note v1 does not support split or merge.

"""
def __init__(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the init functions in V1 and V2 are almost identical. Can we add the init function in the base class(ChangeFeedFetcher) and call the base class's init instead? That will reduce the code duplication.

Such as, super().__init__()


return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback)

def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want this method to be public?

If yes, please add this method as an abstract method in the abstract class, ChangeFeedFetcher
If not, please add double leading underscore, such as __fetch_change_feed_items(). In Python, even though there isn't a forcible way to make methods private, but adding underscores will help developers to differentiate between private and public methods.

self._options = options
self._fetch_function = fetch_function
self._collection_link = collection_link
self._change_feed_fetcher: Optional[Union[ChangeFeedFetcherV1, ChangeFeedFetcherV2]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not preferrable, since every time when we add new versions, you will have to update this line. Instead, you could just use the base class type: ChangeFeedFetcher.

self._change_feed_state.populate_feed_options(self._feed_options)
is_s_time_first_fetch = self._change_feed_state._continuation is None
while True:
(fetched_items, response_headers) = self._fetch_function(self._feed_options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there list of fetch functions?

How will you handle exceptions from them?

Comment on lines +120 to +125
self._container_link = container_link
self._container_rid = container_rid
self._change_feed_start_from = change_feed_start_from
self._partition_key_range_id = partition_key_range_id
self._partition_key = partition_key
self._continuation = continuation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If class variables used outside of class, they should be considered as public variables and they shouldn't have a single leading underscore. Please drop the underscores for those variables, such as _continuation and _change_feed_start_from

Comment on lines +80 to +82
self._feed_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(self._feed_options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is little confusing to me with this dependency:

  • feed_options contains change_feed_state
  • change_feed_state calls its function with feed_options

self._client = client
self._feed_options = feed_options

self._change_feed_state: ChangeFeedStateV1 = self._feed_options.pop("changeFeedState")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you have to pop(), and reassign the item later? It didn't seem to be necessary.

Can we just use self._feed_options["changeFeedState"], whenever we need to update the change feed state?

Also, please don't use the hardcoded text here. Use a static variable like ChangeFeedStateVersion in change_feed_state.py file. It can be CHANGE_FEED_STATE_KEY="changeFeedState"

After that change, we can use self._feed_options[CHANGE_FEED_STATE_KEY]

Comment on lines +141 to +145
if exceptions._partition_range_is_gone(e) or exceptions._is_partition_split_or_merge(e):
# refresh change feed state
self._change_feed_state.handle_feed_range_gone(self._client._routing_map_provider, self._resource_link)
else:
raise e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this error handling was required for v2 only

# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
self._change_feed_state.apply_server_response_continuation(
cast(str, response_headers.get(continuation_key)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get() will return None if the key doesn't exist, and cast will cast None to a string "None".

w-javed added a commit that referenced this pull request Oct 7, 2024
* Azure-AI-Generative-Deprecation-Msg

* addFeedRangesAndUseFeedRangeInQueryChangeFeed (#37687)

* Add getFeedRanges API 
* Add feedRange support in query changeFeed


Co-authored-by: annie-mac <xinlian@microsoft.com>

* Azure-AI-Generative-Deprecation-Msg

* Update release date for core (#37723)

* Improvements to mindependency dev_requirement conflict resolution (#37669)

* during mindependency runs, dev_requirements on local relative paths are now checked for conflict with the targeted set of minimum dependencies
* multiple type clarifications within azure-sdk-tools
* added tests for new conflict resolution logic

---------

Co-authored-by: McCoy Patiño <39780829+mccoyp@users.noreply.github.com>

* update doc settings

* Need to add environment to subscription configuration (#37726)

Co-authored-by: Wes Haggard <Wes.Haggard@microsoft.com>

* Enable samples for formrecognizer (#37676)

---------

Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com>
Co-authored-by: annie-mac <xinlian@microsoft.com>
Co-authored-by: Xiang Yan <xiangsjtu@gmail.com>
Co-authored-by: Scott Beddall <45376673+scbedd@users.noreply.github.com>
Co-authored-by: McCoy Patiño <39780829+mccoyp@users.noreply.github.com>
Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Wes Haggard <Wes.Haggard@microsoft.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

4 participants