Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFileOp PythonFileOp NotebookFileOp log stdout and stderr output to stdout always, S3 file put for run log output file made configurable #3227

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 63 additions & 13 deletions elyra/airflow/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@

logger = logging.getLogger("elyra")
enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"
# not only log File Operations output of NotebookFileOp, RFileOp, PythonFileOp to stdout so it appears
# in runtime / container logs and also Airflow and KFP GUI logs, but also put output to S3 storage
enable_generic_node_script_output_to_s3 = (
os.getenv("ELYRA_GENERIC_NODES_ENABLE_SCRIPT_OUTPUT_TO_S3", "true").lower() == "true"
)
pipeline_name = None # global used in formatted logging
operation_name = None # global used in formatted logging

Expand Down Expand Up @@ -226,21 +231,31 @@ def execute(self) -> None:
# Really hate to do this but have to invoke Papermill via library as workaround
import papermill

papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name)
logger.info("Processing file: %s", notebook)
papermill.execute_notebook(
notebook,
notebook_output,
kernel_name=kernel_name,
log_output=True,
stdout_file=sys.stdout,
stderr_file=sys.stderr,
)
duration = time.time() - t0
OpUtil.log_operation_info("notebook execution completed", duration)

NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")

NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
raise ex

@staticmethod
Expand Down Expand Up @@ -328,20 +343,36 @@ def execute(self) -> None:
f"executing python script using " f"'python3 {python_script}' to '{python_script_output}'"
)
t0 = time.time()
with open(python_script_output, "w") as log_file:
subprocess.run(["python3", python_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)

run_args = ["python3", python_script]

logger.info("Processing file: %s", python_script)
try:
result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
output = result.stdout.decode("utf-8")
if enable_generic_node_script_output_to_s3:
with open(python_script_output, "w") as log_file:
log_file.write(output)
logger.info("Output: %s", output)
logger.info("Return code: %s", result.returncode)
except subprocess.CalledProcessError as e:
logger.error("Output: %s", e.output.decode("utf-8"))
logger.error("Return code: %s", e.returncode)
raise subprocess.CalledProcessError(e.returncode, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("python script execution completed", duration)

self.put_file_to_object_storage(python_script_output, python_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(python_script_output, python_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(python_script_output, python_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(python_script_output, python_script_output)
raise ex


Expand All @@ -357,20 +388,36 @@ def execute(self) -> None:
try:
OpUtil.log_operation_info(f"executing R script using " f"'Rscript {r_script}' to '{r_script_output}'")
t0 = time.time()
with open(r_script_output, "w") as log_file:
subprocess.run(["Rscript", r_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)

run_args = ["Rscript", r_script]

logger.info("Processing file: %s", r_script)
try:
result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
output = result.stdout.decode("utf-8")
if enable_generic_node_script_output_to_s3:
with open(r_script_output, "w") as log_file:
log_file.write(output)
logger.info("Output: %s", output)
logger.info("Return code: %s", result.returncode)
except subprocess.CalledProcessError as e:
logger.error("Output: %s", e.output.decode("utf-8"))
logger.error("Return code: %s", e.returncode)
raise subprocess.CalledProcessError(e.returncode, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("R script execution completed", duration)

self.put_file_to_object_storage(r_script_output, r_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(r_script_output, r_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(r_script_output, r_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(r_script_output, r_script_output)
raise ex


Expand Down Expand Up @@ -537,6 +584,9 @@ def main():
input_params = OpUtil.parse_arguments(sys.argv[1:])
OpUtil.log_operation_info("starting operation")
t0 = time.time()
# must be commented out in airgapped images if packages from
# https://github.com/elyra-ai/elyra/blob/main/etc/generic/requirements-elyra.txt
# already installed via central pip env during container build
OpUtil.package_install()

# Create the appropriate instance, process dependencies and execute the operation
Expand Down
71 changes: 58 additions & 13 deletions elyra/kfp/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@

logger = logging.getLogger("elyra")
enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"
# not only log File Operations output of NotebookFileOp, RFileOp, PythonFileOp to stdout so it appears
# in runtime / container logs and also Airflow and KFP GUI logs, but also put output to S3 storage
enable_generic_node_script_output_to_s3 = (
os.getenv("ELYRA_GENERIC_NODES_ENABLE_SCRIPT_OUTPUT_TO_S3", "true").lower() == "true"
)
pipeline_name = None # global used in formatted logging
operation_name = None # global used in formatted logging

Expand Down Expand Up @@ -376,21 +381,32 @@ def execute(self) -> None:

import papermill

papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs)
logger.info("Processing file: %s", notebook)
papermill.execute_notebook(
notebook,
notebook_output,
kernel_name=kernel_name,
log_output=True,
stdout_file=sys.stdout,
stderr_file=sys.stderr,
**kwargs,
)
duration = time.time() - t0
OpUtil.log_operation_info("notebook execution completed", duration)

NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")

NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(notebook_output, notebook)
self.put_file_to_object_storage(notebook_html)
raise ex

@staticmethod
Expand Down Expand Up @@ -483,20 +499,33 @@ def execute(self) -> None:
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(python_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)
logger.info("Processing file: %s", python_script)
try:
result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
output = result.stdout.decode("utf-8")
if enable_generic_node_script_output_to_s3:
with open(python_script_output, "w") as log_file:
log_file.write(output)
logger.info("Output: %s", output)
logger.info("Return code: %s", result.returncode)
except subprocess.CalledProcessError as e:
logger.error("Output: %s", e.output.decode("utf-8"))
logger.error("Return code: %s", e.returncode)
raise subprocess.CalledProcessError(e.returncode, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("python script execution completed", duration)

self.put_file_to_object_storage(python_script_output, python_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(python_script_output, python_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(python_script_output, python_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(python_script_output, python_script_output)
raise ex


Expand All @@ -517,20 +546,33 @@ def execute(self) -> None:
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(r_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)
logger.info("Processing file: %s", r_script)
try:
result = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
output = result.stdout.decode("utf-8")
if enable_generic_node_script_output_to_s3:
with open(r_script_output, "w") as log_file:
log_file.write(output)
logger.info("Output: %s", output)
logger.info("Return code: %s", result.returncode)
except subprocess.CalledProcessError as e:
logger.error("Output: %s", e.output.decode("utf-8"))
logger.error("Return code: %s", e.returncode)
raise subprocess.CalledProcessError(e.returncode, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("R script execution completed", duration)

self.put_file_to_object_storage(r_script_output, r_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(r_script_output, r_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(r_script_output, r_script_output)
if enable_generic_node_script_output_to_s3:
self.put_file_to_object_storage(r_script_output, r_script_output)
raise ex


Expand Down Expand Up @@ -726,6 +768,9 @@ def main():
input_params = OpUtil.parse_arguments(sys.argv[1:])
OpUtil.log_operation_info("starting operation")
t0 = time.time()
# must be commented out in airgapped images if packages from
# https://github.com/elyra-ai/elyra/blob/main/etc/generic/requirements-elyra.txt
# already installed via central pip env during container build
OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))

# Create the appropriate instance, process dependencies and execute the operation
Expand Down
Loading