diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index e7b6dfd1..68927170 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -55,6 +55,7 @@ TIMESTAMP_AS_STRING_CONFIG = "spark.thriftserver.arrowBasedRowSet.timestampAsString" DEFAULT_SOCKET_TIMEOUT = float(900) +DEFAULT_STATEMENT_HEARTBEAT_INTERVAL = float(25) # see Connection.__init__ for parameter descriptions. # - Min/Max avoids unsustainable configs (sane values are far more constrained) @@ -77,6 +78,7 @@ class ThriftBackend: _retry_stop_after_attempts_count: int _retry_stop_after_attempts_duration: float _retry_delay_default: float + _statement_heartbeat_interval: float def __init__( self, @@ -134,6 +136,10 @@ def __init__( # An integer representing the maximum number of redirects to follow for a request. # This number must be <= _retry_stop_after_attempts_count. # (defaults to None) + # _statement_heartbeat_interval + # The number of seconds to wait between sending GetOperationStatus requests to the server. + # (defaults to 25), as the server will already wait 5 seconds before responding to the request if the + # statement is still running, making a 30 second total interval between each heartbeat. # max_download_threads # Number of threads for handling cloud fetch downloads. Defaults to 10 @@ -231,6 +237,10 @@ def __init__( # setTimeout defaults to 15 minutes and is expected in ms self._transport.setTimeout(timeout and (float(timeout) * 1000.0)) + self._statement_heartbeat_interval = kwargs.get( + "_statement_heartbeat_interval", DEFAULT_STATEMENT_HEARTBEAT_INTERVAL + ) + self._transport.setCustomHeaders(dict(http_headers)) protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol(self._transport) self._client = TCLIService.Client(protocol) @@ -802,6 +812,7 @@ def _wait_until_command_done(self, op_handle, initial_operation_status_resp): ttypes.TOperationState.RUNNING_STATE, ttypes.TOperationState.PENDING_STATE, ]: + time.sleep(self._statement_heartbeat_interval) poll_resp = self._poll_for_status(op_handle) operation_state = poll_resp.operationState self._check_command_not_in_error_or_closed_state(op_handle, poll_resp)