Skip to content

Commit

Permalink
add heartbeat interval
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Cassell <ben.cassell@databricks.com>
  • Loading branch information
benc-db committed Apr 5, 2024
1 parent 5636b8f commit c4e6cef
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

TIMESTAMP_AS_STRING_CONFIG = "spark.thriftserver.arrowBasedRowSet.timestampAsString"
DEFAULT_SOCKET_TIMEOUT = float(900)
DEFAULT_STATEMENT_HEARTBEAT_INTERVAL = float(25)

# see Connection.__init__ for parameter descriptions.
# - Min/Max avoids unsustainable configs (sane values are far more constrained)
Expand All @@ -77,6 +78,7 @@ class ThriftBackend:
_retry_stop_after_attempts_count: int
_retry_stop_after_attempts_duration: float
_retry_delay_default: float
_statement_heartbeat_interval: float

def __init__(
self,
Expand Down Expand Up @@ -134,6 +136,10 @@ def __init__(
# An integer representing the maximum number of redirects to follow for a request.
# This number must be <= _retry_stop_after_attempts_count.
# (defaults to None)
# _statement_heartbeat_interval
# The number of seconds to wait between sending GetOperationStatus requests to the server.
# (defaults to 25), as the server will already wait 5 seconds before responding to the request if the
# statement is still running, making a 30 second total interval between each heartbeat.
# max_download_threads
# Number of threads for handling cloud fetch downloads. Defaults to 10

Expand Down Expand Up @@ -231,6 +237,10 @@ def __init__(
# setTimeout defaults to 15 minutes and is expected in ms
self._transport.setTimeout(timeout and (float(timeout) * 1000.0))

self._statement_heartbeat_interval = kwargs.get(
"_statement_heartbeat_interval", DEFAULT_STATEMENT_HEARTBEAT_INTERVAL
)

self._transport.setCustomHeaders(dict(http_headers))
protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol(self._transport)
self._client = TCLIService.Client(protocol)
Expand Down Expand Up @@ -802,6 +812,7 @@ def _wait_until_command_done(self, op_handle, initial_operation_status_resp):
ttypes.TOperationState.RUNNING_STATE,
ttypes.TOperationState.PENDING_STATE,
]:
time.sleep(self._statement_heartbeat_interval)
poll_resp = self._poll_for_status(op_handle)
operation_state = poll_resp.operationState
self._check_command_not_in_error_or_closed_state(op_handle, poll_resp)
Expand Down

0 comments on commit c4e6cef

Please sign in to comment.