diff --git a/airflow/providers/exasol/hooks/exasol.py b/airflow/providers/exasol/hooks/exasol.py index c2f8e13d99810a..2233ce1e2c3470 100644 --- a/airflow/providers/exasol/hooks/exasol.py +++ b/airflow/providers/exasol/hooks/exasol.py @@ -134,7 +134,7 @@ def export_to_file( def run( self, sql: Union[str, list], autocommit: bool = False, parameters: Optional[dict] = None, handler=None - ) -> None: + ) -> Optional[list]: """ Runs a command or a list of commands. Pass a list of sql statements to the sql parameter to get them to execute @@ -150,6 +150,11 @@ def run( if isinstance(sql, str): sql = [sql] + if sql: + self.log.debug("Executing %d statements against Exasol DB", len(sql)) + else: + raise ValueError("List of SQL statements is empty") + with closing(self.get_conn()) as conn: if self.supports_autocommit: self.set_autocommit(conn, autocommit) @@ -157,12 +162,23 @@ def run( for query in sql: self.log.info(query) with closing(conn.execute(query, parameters)) as cur: + results = [] + + if handler is not None: + cur = handler(cur) + + for row in cur: + self.log.info("Statement execution info - %s", row) + results.append(row) + self.log.info(cur.row_count) # If autocommit was set to False for db that supports autocommit, - # or if db does not supports autocommit, we do a manual commit. + # or if db does not support autocommit, we do a manual commit. if not self.get_autocommit(conn): conn.commit() + return results + def set_autocommit(self, conn, autocommit: bool) -> None: """ Sets the autocommit flag on the connection diff --git a/tests/providers/exasol/hooks/test_exasol.py b/tests/providers/exasol/hooks/test_exasol.py index 9c8bd6cefd643c..bf85465629c430 100644 --- a/tests/providers/exasol/hooks/test_exasol.py +++ b/tests/providers/exasol/hooks/test_exasol.py @@ -132,6 +132,11 @@ def test_run_multi_queries(self): self.conn.execute.assert_called_with(sql[1], None) self.conn.commit.assert_not_called() + def test_run_no_queries(self): + with pytest.raises(ValueError) as err: + self.db_hook.run(sql=[]) + assert err.value.args[0] == "List of SQL statements is empty" + def test_bulk_load(self): with pytest.raises(NotImplementedError): self.db_hook.bulk_load('table', '/tmp/file')