diff --git a/CHANGELOG.md b/CHANGELOG.md index 2526bd1a..8e2f5e75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,9 @@ - Caching metadata calls - Enable cloud fetch by default. To disable, set `use_cloud_fetch=False` when building `databricks.sql.client`. - Add integration tests for Databricks UC Volumes ingestion queries -- Add `_retry_max_redirects` config +- Retries: + - Add `_retry_max_redirects` config + - Set `_enable_v3_retries=True` and warn if users override it ## 2.9.3 (2023-08-24) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index 0b3ad175..3295fd21 100644 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -30,6 +30,7 @@ class CommandType(Enum): EXECUTE_STATEMENT = "ExecuteStatement" CLOSE_SESSION = "CloseSession" CLOSE_OPERATION = "CloseOperation" + GET_OPERATION_STATUS = "GetOperationStatus" OTHER = "Other" @classmethod @@ -314,9 +315,9 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]: 2. The request received a 501 (Not Implemented) status code Because this request can never succeed. 3. The request received a 404 (Not Found) code and the request CommandType - was CloseSession or CloseOperation. This code indicates that the session - or cursor was already closed. Further retries will always return the same - code. + was GetOperationStatus, CloseSession or CloseOperation. This code indicates + that the command, session or cursor was already closed. Further retries will + always return the same code. 4. The request CommandType was ExecuteStatement and the HTTP code does not appear in the default status_forcelist or force_dangerous_codes list. By default, this means ExecuteStatement is only retried for codes 429 and 503. @@ -343,6 +344,13 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]: if not self._is_method_retryable(method): # type: ignore return False, "Only POST requests are retried" + # Request failed with 404 and was a GetOperationStatus. This is not recoverable. Don't retry. + if status_code == 404 and self.command_type == CommandType.GET_OPERATION_STATUS: + return ( + False, + "GetOperationStatus received 404 code from Databricks. Operation was canceled.", + ) + # Request failed with 404 because CloseSession returns 404 if you repeat the request. if ( status_code == 404 diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index df75e8c7..288c3e10 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -129,7 +129,7 @@ def __init__( # (defaults to 900) # _enable_v3_retries # Whether to use the DatabricksRetryPolicy implemented in urllib3 - # (defaults to False) + # (defaults to True) # _retry_max_redirects # An integer representing the maximum number of redirects to follow for a request. # This number must be <= _retry_stop_after_attempts_count. @@ -185,7 +185,13 @@ def __init__( self._auth_provider = auth_provider # Connector version 3 retry approach - self.enable_v3_retries = kwargs.get("_enable_v3_retries", False) + self.enable_v3_retries = kwargs.get("_enable_v3_retries", True) + + if not self.enable_v3_retries: + logger.warning( + "Legacy retry behavior is enabled for this connection." + " This behaviour is deprecated and will be removed in a future release." + ) self.force_dangerous_codes = kwargs.get("_retry_dangerous_codes", []) additional_transport_args = {} @@ -396,9 +402,6 @@ def attempt_request(attempt): response = method(request) - # Calling `close()` here releases the active HTTP connection back to the pool - self._transport.close() - # We need to call type(response) here because thrift doesn't implement __name__ attributes for thrift responses logger.debug( "Received response: {}()".format(type(response).__name__) @@ -460,6 +463,10 @@ def attempt_request(attempt): error_message = ThriftBackend._extract_error_message_from_headers( getattr(self._transport, "headers", {}) ) + finally: + # Calling `close()` here releases the active HTTP connection back to the pool + self._transport.close() + return RequestErrorInfo( error=error, error_message=error_message, diff --git a/tests/e2e/common/retry_test_mixins.py b/tests/e2e/common/retry_test_mixins.py index eb5f5d26..5305c124 100644 --- a/tests/e2e/common/retry_test_mixins.py +++ b/tests/e2e/common/retry_test_mixins.py @@ -119,7 +119,6 @@ class PySQLRetryTestsMixin: # For testing purposes _retry_policy = { - "_enable_v3_retries": True, "_retry_delay_min": 0.1, "_retry_delay_max": 5, "_retry_stop_after_attempts_count": 5, @@ -424,3 +423,19 @@ def test_retry_max_redirects_exceeds_max_attempts_count_warns_user(self): expected_message_was_found = target in log assert expected_message_was_found, "Did not find expected log messages" + + def test_retry_legacy_behavior_warns_user(self): + with self.assertLogs( + "databricks.sql", + level="WARN", + ) as cm: + with self.connection( + extra_params={**self._retry_policy, "_enable_v3_retries": False} + ): + expected_message_was_found = False + for log in cm.output: + if expected_message_was_found: + break + target = "Legacy retry behavior is enabled for this connection." + expected_message_was_found = target in log + assert expected_message_was_found, "Did not find expected log messages" diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 27c23e59..d823a12d 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -31,6 +31,8 @@ from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin +from databricks.sql.exc import SessionAlreadyClosedError + log = logging.getLogger(__name__) unsafe_logger = logging.getLogger("databricks.sql.unsafe") @@ -699,10 +701,9 @@ def test_close_connection_closes_cursors(self): conn.close() # When connection closes, any cursor operations should no longer exist at the server - with self.assertRaises(thrift.Thrift.TApplicationException) as cm: + with self.assertRaises(SessionAlreadyClosedError) as cm: op_status_at_server = ars.thrift_backend._client.GetOperationStatus(status_request) - if hasattr(cm, "exception"): - assert "RESOURCE_DOES_NOT_EXIST" in cm.exception.message + def test_closing_a_closed_connection_doesnt_fail(self):