From cd282396bed8012c31ec72e84e67c2e13ffb4273 Mon Sep 17 00:00:00 2001 From: shalberd <21118431+shalberd@users.noreply.github.com> Date: Fri, 16 Aug 2024 17:25:19 +0200 Subject: [PATCH] RFileOp and PythonFileOp log stdout and stderr output to stdout so it appears in system logs, catch non zero return/exit code in exception, no logging of output to file in S3 COS if env var ELYRA_GENERIC_NODES_ENABLE_SCRIPT_OUTPUT_TO_S3 is set to false Signed-off-by: shalberd <21118431+shalberd@users.noreply.github.com> Signed-off-by: shalberd <21118431+shalberd@users.noreply.github.com> --- elyra/airflow/bootstrapper.py | 76 +++++++++++++++++++++++++++++------ elyra/kfp/bootstrapper.py | 71 ++++++++++++++++++++++++++------ 2 files changed, 121 insertions(+), 26 deletions(-) diff --git a/elyra/airflow/bootstrapper.py b/elyra/airflow/bootstrapper.py index a0f0b2209..c2a5346c2 100644 --- a/elyra/airflow/bootstrapper.py +++ b/elyra/airflow/bootstrapper.py @@ -40,6 +40,11 @@ logger = logging.getLogger("elyra") enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +# not only log File Operations output of NotebookFileOp, RFileOp, PythonFileOp to stdout so it appears +# in runtime / container logs and also Airflow and KFP GUI logs, but also put output to S3 storage +enable_generic_node_script_output_to_s3 = ( + os.getenv("ELYRA_GENERIC_NODES_ENABLE_SCRIPT_OUTPUT_TO_S3", "true").lower() == "true" +) pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -226,21 +231,31 @@ def execute(self) -> None: # Really hate to do this but have to invoke Papermill via library as workaround import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name) + print("Processing file:", notebook) + papermill.execute_notebook( + notebook, + notebook_output, + kernel_name=kernel_name, + log_output=True, + stdout_file=sys.stdout, + stderr_file=sys.stderr, + ) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) - self.put_file_to_object_storage(notebook_output, notebook) - self.put_file_to_object_storage(notebook_html) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(notebook_output, notebook) + self.put_file_to_object_storage(notebook_html) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) - self.put_file_to_object_storage(notebook_output, notebook) - self.put_file_to_object_storage(notebook_html) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(notebook_output, notebook) + self.put_file_to_object_storage(notebook_html) raise ex @staticmethod @@ -328,20 +343,36 @@ def execute(self) -> None: f"executing python script using " f"'python3 {python_script}' to '{python_script_output}'" ) t0 = time.time() - with open(python_script_output, "w") as log_file: - subprocess.run(["python3", python_script], stdout=log_file, stderr=subprocess.STDOUT, check=True) + + run_args = ["python3", python_script] + + print("Processing file:", python_script) + try: + result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True) + output = result.stdout.decode("utf-8") + if enable_generic_node_script_output_to_s3: + with open(python_script_output, "w") as log_file: + log_file.write(output) + logger.info(f"Output: {output}") + logger.info(f"Return code: {result.returncode}") + except subprocess.CalledProcessError as e: + logger.error("Output: %s", e.output.decode("utf-8")) + logger.error("Return code: %s", e.returncode) + raise subprocess.CalledProcessError(e.returncode, run_args) duration = time.time() - t0 OpUtil.log_operation_info("python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -357,20 +388,36 @@ def execute(self) -> None: try: OpUtil.log_operation_info(f"executing R script using " f"'Rscript {r_script}' to '{r_script_output}'") t0 = time.time() - with open(r_script_output, "w") as log_file: - subprocess.run(["Rscript", r_script], stdout=log_file, stderr=subprocess.STDOUT, check=True) + + run_args = ["Rscript", r_script] + + print("Processing file:", r_script) + try: + result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True) + output = result.stdout.decode("utf-8") + if enable_generic_node_script_output_to_s3: + with open(r_script_output, "w") as log_file: + log_file.write(output) + logger.info(f"Output: {output}") + logger.info(f"Return code: {result.returncode}") + except subprocess.CalledProcessError as e: + logger.error("Output: %s", e.output.decode("utf-8")) + logger.error("Return code: %s", e.returncode) + raise subprocess.CalledProcessError(e.returncode, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -537,6 +584,9 @@ def main(): input_params = OpUtil.parse_arguments(sys.argv[1:]) OpUtil.log_operation_info("starting operation") t0 = time.time() + # must be commented out in airgapped images if packages from + # https://github.com/elyra-ai/elyra/blob/main/etc/generic/requirements-elyra.txt + # already installed via central pip env during container build OpUtil.package_install() # Create the appropriate instance, process dependencies and execute the operation diff --git a/elyra/kfp/bootstrapper.py b/elyra/kfp/bootstrapper.py index 8915d38a9..a64b3bb55 100644 --- a/elyra/kfp/bootstrapper.py +++ b/elyra/kfp/bootstrapper.py @@ -46,6 +46,11 @@ logger = logging.getLogger("elyra") enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +# not only log File Operations output of NotebookFileOp, RFileOp, PythonFileOp to stdout so it appears +# in runtime / container logs and also Airflow and KFP GUI logs, but also put output to S3 storage +enable_generic_node_script_output_to_s3 = ( + os.getenv("ELYRA_GENERIC_NODES_ENABLE_SCRIPT_OUTPUT_TO_S3", "true").lower() == "true" +) pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -376,21 +381,32 @@ def execute(self) -> None: import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs) + print("Processing file:", notebook) + papermill.execute_notebook( + notebook, + notebook_output, + kernel_name=kernel_name, + log_output=True, + stdout_file=sys.stdout, + stderr_file=sys.stderr, + **kwargs, + ) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) - self.put_file_to_object_storage(notebook_output, notebook) - self.put_file_to_object_storage(notebook_html) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(notebook_output, notebook) + self.put_file_to_object_storage(notebook_html) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) - self.put_file_to_object_storage(notebook_output, notebook) - self.put_file_to_object_storage(notebook_html) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(notebook_output, notebook) + self.put_file_to_object_storage(notebook_html) raise ex @staticmethod @@ -483,20 +499,33 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(python_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + print("Processing file:", python_script) + try: + result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True) + output = result.stdout.decode("utf-8") + if enable_generic_node_script_output_to_s3: + with open(python_script_output, "w") as log_file: + log_file.write(output) + logger.info(f"Output: {output}") + logger.info(f"Return code: {result.returncode}") + except subprocess.CalledProcessError as e: + logger.error("Output: %s", e.output.decode("utf-8")) + logger.error("Return code: %s", e.returncode) + raise subprocess.CalledProcessError(e.returncode, run_args) duration = time.time() - t0 OpUtil.log_operation_info("python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -517,20 +546,33 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(r_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + print("Processing file:", r_script) + try: + result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True) + output = result.stdout.decode("utf-8") + if enable_generic_node_script_output_to_s3: + with open(r_script_output, "w") as log_file: + log_file.write(output) + logger.info(f"Output: {output}") + logger.info(f"Return code: {result.returncode}") + except subprocess.CalledProcessError as e: + logger.error("Output: %s", e.output.decode("utf-8")) + logger.error("Return code: %s", e.returncode) + raise subprocess.CalledProcessError(e.returncode, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) + if enable_generic_node_script_output_to_s3: + self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -726,6 +768,9 @@ def main(): input_params = OpUtil.parse_arguments(sys.argv[1:]) OpUtil.log_operation_info("starting operation") t0 = time.time() + # must be commented out in airgapped images if packages from + # https://github.com/elyra-ai/elyra/blob/main/etc/generic/requirements-elyra.txt + # already installed via central pip env during container build OpUtil.package_install(user_volume_path=input_params.get("user-volume-path")) # Create the appropriate instance, process dependencies and execute the operation