Skip to content

Commit

Permalink
RFileOp and PythonFileOp log stdout and stderr output to stdout so i…
Browse files Browse the repository at this point in the history
…t appears in system logs, catch non zero return/exit code in exception, no logging of output to file in S3 COS if env var ELYRA_GENERIC_NODES_ENABLE_SCRIPT_OUTPUT_TO_S3 is set to false

    Signed-off-by: shalberd <21118431+shalberd@users.noreply.github.com>

Signed-off-by: shalberd <21118431+shalberd@users.noreply.github.com>
  • Loading branch information
shalberd committed Aug 16, 2024
1 parent 8108696 commit cd28239
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 26 deletions.
76 changes: 63 additions & 13 deletions elyra/airflow/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@

logger = logging.getLogger("elyra")
enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"
# not only log File Operations output of NotebookFileOp, RFileOp, PythonFileOp to stdout so it appears
# in runtime / container logs and also Airflow and KFP GUI logs, but also put output to S3 storage
enable_generic_node_script_output_to_s3 = (
os.getenv("ELYRA_GENERIC_NODES_ENABLE_SCRIPT_OUTPUT_TO_S3", "true").lower() == "true"
)
pipeline_name = None # global used in formatted logging
operation_name = None # global used in formatted logging

Expand Down Expand Up @@ -226,21 +231,31 @@ def execute(self) -> None:
# Really hate to do this but have to invoke Papermill via library as workaround
import papermill

papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name)
print("Processing file:", notebook)
papermill.execute_notebook(
notebook,
notebook_output,
kernel_name=kernel_name,
log_output=True,
stdout_file=sys.stdout,
stderr_file=sys.stderr,
)
duration = time.time() - t0
OpUtil.log_operation_info("notebook execution completed", duration)

NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")

NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
raise ex

@staticmethod
Expand Down Expand Up @@ -328,20 +343,36 @@ def execute(self) -> None:
f"executing python script using " f"'python3 {python_script}' to '{python_script_output}'"
)
t0 = time.time()
with open(python_script_output, "w") as log_file:
subprocess.run(["python3", python_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)

run_args = ["python3", python_script]

print("Processing file:", python_script)
try:
result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
output = result.stdout.decode("utf-8")
if enable_generic_node_script_output_to_s3:
with open(python_script_output, "w") as log_file:
log_file.write(output)
logger.info(f"Output: {output}")
logger.info(f"Return code: {result.returncode}")
except subprocess.CalledProcessError as e:
logger.error("Output: %s", e.output.decode("utf-8"))
logger.error("Return code: %s", e.returncode)
raise subprocess.CalledProcessError(e.returncode, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("python script execution completed", duration)

self.put_file_to_object_storage(python_script_output, python_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(python_script_output, python_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(python_script_output, python_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(python_script_output, python_script_output)
raise ex


Expand All @@ -357,20 +388,36 @@ def execute(self) -> None:
try:
OpUtil.log_operation_info(f"executing R script using " f"'Rscript {r_script}' to '{r_script_output}'")
t0 = time.time()
with open(r_script_output, "w") as log_file:
subprocess.run(["Rscript", r_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)

run_args = ["Rscript", r_script]

print("Processing file:", r_script)
try:
result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
output = result.stdout.decode("utf-8")
if enable_generic_node_script_output_to_s3:
with open(r_script_output, "w") as log_file:
log_file.write(output)
logger.info(f"Output: {output}")
logger.info(f"Return code: {result.returncode}")
except subprocess.CalledProcessError as e:
logger.error("Output: %s", e.output.decode("utf-8"))
logger.error("Return code: %s", e.returncode)
raise subprocess.CalledProcessError(e.returncode, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("R script execution completed", duration)

self.put_file_to_object_storage(r_script_output, r_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(r_script_output, r_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(r_script_output, r_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(r_script_output, r_script_output)
raise ex


Expand Down Expand Up @@ -537,6 +584,9 @@ def main():
input_params = OpUtil.parse_arguments(sys.argv[1:])
OpUtil.log_operation_info("starting operation")
t0 = time.time()
# must be commented out in airgapped images if packages from
# https://github.com/elyra-ai/elyra/blob/main/etc/generic/requirements-elyra.txt
# already installed via central pip env during container build
OpUtil.package_install()

# Create the appropriate instance, process dependencies and execute the operation
Expand Down
71 changes: 58 additions & 13 deletions elyra/kfp/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@

logger = logging.getLogger("elyra")
enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"
# not only log File Operations output of NotebookFileOp, RFileOp, PythonFileOp to stdout so it appears
# in runtime / container logs and also Airflow and KFP GUI logs, but also put output to S3 storage
enable_generic_node_script_output_to_s3 = (
os.getenv("ELYRA_GENERIC_NODES_ENABLE_SCRIPT_OUTPUT_TO_S3", "true").lower() == "true"
)
pipeline_name = None # global used in formatted logging
operation_name = None # global used in formatted logging

Expand Down Expand Up @@ -376,21 +381,32 @@ def execute(self) -> None:

import papermill

papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs)
print("Processing file:", notebook)
papermill.execute_notebook(
notebook,
notebook_output,
kernel_name=kernel_name,
log_output=True,
stdout_file=sys.stdout,
stderr_file=sys.stderr,
**kwargs,
)
duration = time.time() - t0
OpUtil.log_operation_info("notebook execution completed", duration)

NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")

NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
raise ex

@staticmethod
Expand Down Expand Up @@ -483,20 +499,33 @@ def execute(self) -> None:
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(python_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)
print("Processing file:", python_script)
try:
result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
output = result.stdout.decode("utf-8")
if enable_generic_node_script_output_to_s3:
with open(python_script_output, "w") as log_file:
log_file.write(output)
logger.info(f"Output: {output}")
logger.info(f"Return code: {result.returncode}")
except subprocess.CalledProcessError as e:
logger.error("Output: %s", e.output.decode("utf-8"))
logger.error("Return code: %s", e.returncode)
raise subprocess.CalledProcessError(e.returncode, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("python script execution completed", duration)

self.put_file_to_object_storage(python_script_output, python_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(python_script_output, python_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(python_script_output, python_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(python_script_output, python_script_output)
raise ex


Expand All @@ -517,20 +546,33 @@ def execute(self) -> None:
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(r_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)
print("Processing file:", r_script)
try:
result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
output = result.stdout.decode("utf-8")
if enable_generic_node_script_output_to_s3:
with open(r_script_output, "w") as log_file:
log_file.write(output)
logger.info(f"Output: {output}")
logger.info(f"Return code: {result.returncode}")
except subprocess.CalledProcessError as e:
logger.error("Output: %s", e.output.decode("utf-8"))
logger.error("Return code: %s", e.returncode)
raise subprocess.CalledProcessError(e.returncode, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("R script execution completed", duration)

self.put_file_to_object_storage(r_script_output, r_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(r_script_output, r_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(r_script_output, r_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(r_script_output, r_script_output)
raise ex


Expand Down Expand Up @@ -726,6 +768,9 @@ def main():
input_params = OpUtil.parse_arguments(sys.argv[1:])
OpUtil.log_operation_info("starting operation")
t0 = time.time()
# must be commented out in airgapped images if packages from
# https://github.com/elyra-ai/elyra/blob/main/etc/generic/requirements-elyra.txt
# already installed via central pip env during container build
OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))

# Create the appropriate instance, process dependencies and execute the operation
Expand Down

0 comments on commit cd28239

Please sign in to comment.