diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 6f51c94a4..d5376bab7 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -693,7 +693,16 @@ async def __call__(self, es, params): pass while not complete: await asyncio.sleep(params.get("poll-period")) - tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) + + try: + tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) + except elasticsearch.ApiError as e: + self.logger.exception("Received API error when fetching force-merge task list: [%s: %s]", e.message, e.status_code) + raise e + except elasticsearch.exceptions.TransportError as e: + self.logger.exception("Received transport error when fetching force-merge task list: [%s]", e) + raise e + if len(tasks["nodes"]) == 0: # empty nodes response indicates no tasks complete = True diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 61abe0dc2..7d43f707d 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -27,6 +27,7 @@ import elastic_transport import elasticsearch import pytest +from elastic_transport import ApiResponseMeta from esrally import client, config, exceptions from esrally.client.asynchronous import RallyAsyncElasticsearch @@ -1534,6 +1535,35 @@ async def test_force_merge_with_polling_and_params(self, es): ) es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000) + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_api_error(self, es, caplog): + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.tasks.list = mock.AsyncMock( + side_effect=elasticsearch.ApiError( + "ApiError", ApiResponseMeta(status=400, http_version="1.1", headers={}, duration=0.0, node=None), None + ) + ) + force_merge = runner.ForceMerge() + + with pytest.raises(elasticsearch.ApiError) as exc: + await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) + assert exc.value.args[0] == "ApiError" + assert exc.value.status_code == 400 + assert f"Received API error when fetching force-merge task list: [{exc.value.args[0]}: {exc.value.status_code}]" in caplog.text + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_transport_error(self, es, caplog): + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.tasks.list = mock.AsyncMock(side_effect=elasticsearch.exceptions.TransportError("TransportError")) + force_merge = runner.ForceMerge() + + with pytest.raises(elasticsearch.exceptions.TransportError) as exc: + await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) + assert exc.value.message == "TransportError" + assert f"Received transport error when fetching force-merge task list: [{exc.value.message}]" in caplog.text + class TestIndicesStatsRunner: @mock.patch("elasticsearch.Elasticsearch")