Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explicit heartbeat interval for GetOperationStatus #385

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

TIMESTAMP_AS_STRING_CONFIG = "spark.thriftserver.arrowBasedRowSet.timestampAsString"
DEFAULT_SOCKET_TIMEOUT = float(900)
DEFAULT_STATEMENT_HEARTBEAT_INTERVAL = float(25)
Copy link
Collaborator

@kravets-levko kravets-levko Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me 25 seconds delay between GetOperationStatus requests looks a bit too much. I mean - if query execution finishes faster than this delay - client will still wait that 25 seconds. In Nodejs we poll for operation status with 100ms interval - which may be too low, but looks way more reasonable

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100ms? Do you actually see that in practice? Today we wait 0, and the server holds up for up to 5 seconds. Seems weird to care about subsecond latency on queries that we know take more than 5 seconds.


# see Connection.__init__ for parameter descriptions.
# - Min/Max avoids unsustainable configs (sane values are far more constrained)
Expand All @@ -77,6 +78,7 @@ class ThriftBackend:
_retry_stop_after_attempts_count: int
_retry_stop_after_attempts_duration: float
_retry_delay_default: float
_statement_heartbeat_interval: float

def __init__(
self,
Expand Down Expand Up @@ -134,6 +136,10 @@ def __init__(
# An integer representing the maximum number of redirects to follow for a request.
# This number must be <= _retry_stop_after_attempts_count.
# (defaults to None)
# _statement_heartbeat_interval
# The number of seconds to wait between sending GetOperationStatus requests to the server.
# (defaults to 25), as the server will already wait 5 seconds before responding to the request if the
# statement is still running, making a 30 second total interval between each heartbeat.
# max_download_threads
# Number of threads for handling cloud fetch downloads. Defaults to 10

Expand Down Expand Up @@ -231,6 +237,10 @@ def __init__(
# setTimeout defaults to 15 minutes and is expected in ms
self._transport.setTimeout(timeout and (float(timeout) * 1000.0))

self._statement_heartbeat_interval = kwargs.get(
"_statement_heartbeat_interval", DEFAULT_STATEMENT_HEARTBEAT_INTERVAL
)

self._transport.setCustomHeaders(dict(http_headers))
protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol(self._transport)
self._client = TCLIService.Client(protocol)
Expand Down Expand Up @@ -802,6 +812,7 @@ def _wait_until_command_done(self, op_handle, initial_operation_status_resp):
ttypes.TOperationState.RUNNING_STATE,
ttypes.TOperationState.PENDING_STATE,
]:
time.sleep(self._statement_heartbeat_interval)
poll_resp = self._poll_for_status(op_handle)
operation_state = poll_resp.operationState
self._check_command_not_in_error_or_closed_state(op_handle, poll_resp)
Expand Down
Loading