From fddebb9f815b30229f77b2465746caf862e9eb70 Mon Sep 17 00:00:00 2001 From: Brad Deam Date: Mon, 20 Nov 2023 10:58:11 +1030 Subject: [PATCH 1/2] Capture 'ForceMerge' task list exceptions --- esrally/driver/runner.py | 11 ++++++++++- tests/driver/runner_test.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 6f51c94a4..d5376bab7 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -693,7 +693,16 @@ async def __call__(self, es, params): pass while not complete: await asyncio.sleep(params.get("poll-period")) - tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) + + try: + tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) + except elasticsearch.ApiError as e: + self.logger.exception("Received API error when fetching force-merge task list: [%s: %s]", e.message, e.status_code) + raise e + except elasticsearch.exceptions.TransportError as e: + self.logger.exception("Received transport error when fetching force-merge task list: [%s]", e) + raise e + if len(tasks["nodes"]) == 0: # empty nodes response indicates no tasks complete = True diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 61abe0dc2..3ab4dda73 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -29,6 +29,7 @@ import pytest from esrally import client, config, exceptions +from elastic_transport import ApiResponseMeta from esrally.client.asynchronous import RallyAsyncElasticsearch from esrally.driver import runner @@ -1534,6 +1535,35 @@ async def test_force_merge_with_polling_and_params(self, es): ) es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000) + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_api_error(self, es, caplog): + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.tasks.list = mock.AsyncMock( + side_effect=elasticsearch.ApiError( + "ApiError", ApiResponseMeta(status=400, http_version="1.1", headers={}, duration=0.0, node=None), None + ) + ) + force_merge = runner.ForceMerge() + + with pytest.raises(elasticsearch.ApiError) as exc: + await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) + assert exc.value.args[0] == "ApiError" + assert exc.value.status_code == 400 + assert (f"Received API error when fetching force-merge task list: [{exc.value.args[0]}: {exc.value.status_code}]" in caplog.text) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_transport_error(self, es, caplog): + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.tasks.list = mock.AsyncMock(side_effect=elasticsearch.exceptions.TransportError("TransportError")) + force_merge = runner.ForceMerge() + + with pytest.raises(elasticsearch.exceptions.TransportError) as exc: + await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) + assert exc.value.message == "TransportError" + assert (f"Received transport error when fetching force-merge task list: [{exc.value.message}]" in caplog.text) + class TestIndicesStatsRunner: @mock.patch("elasticsearch.Elasticsearch") From 6fceca7c58bab24722c14fd918477dbd57179f9f Mon Sep 17 00:00:00 2001 From: Brad Deam Date: Mon, 20 Nov 2023 12:01:25 +1030 Subject: [PATCH 2/2] Linting --- tests/driver/runner_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 3ab4dda73..7d43f707d 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -27,9 +27,9 @@ import elastic_transport import elasticsearch import pytest +from elastic_transport import ApiResponseMeta from esrally import client, config, exceptions -from elastic_transport import ApiResponseMeta from esrally.client.asynchronous import RallyAsyncElasticsearch from esrally.driver import runner @@ -1550,7 +1550,7 @@ async def test_force_merge_with_polling_api_error(self, es, caplog): await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) assert exc.value.args[0] == "ApiError" assert exc.value.status_code == 400 - assert (f"Received API error when fetching force-merge task list: [{exc.value.args[0]}: {exc.value.status_code}]" in caplog.text) + assert f"Received API error when fetching force-merge task list: [{exc.value.args[0]}: {exc.value.status_code}]" in caplog.text @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio @@ -1562,7 +1562,7 @@ async def test_force_merge_with_polling_transport_error(self, es, caplog): with pytest.raises(elasticsearch.exceptions.TransportError) as exc: await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) assert exc.value.message == "TransportError" - assert (f"Received transport error when fetching force-merge task list: [{exc.value.message}]" in caplog.text) + assert f"Received transport error when fetching force-merge task list: [{exc.value.message}]" in caplog.text class TestIndicesStatsRunner: