From 00124d19846be8c8434f5b38609c6c98d9912a11 Mon Sep 17 00:00:00 2001 From: Devin Robison Date: Tue, 8 Nov 2022 09:57:35 -0700 Subject: [PATCH] Add documentation to LinearBoundary stages (#418) Resolves #413 Authors: - Devin Robison (https://github.com/drobison00) - Michael Demoret (https://github.com/mdemoret-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/418 --- morpheus/pipeline/linear_pipeline.py | 28 +++++++++++++++++++ .../stages/boundary/linear_boundary_stage.py | 28 +++++++++++++++++-- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/morpheus/pipeline/linear_pipeline.py b/morpheus/pipeline/linear_pipeline.py index dd84a5d62a..f7ea5c9ad1 100644 --- a/morpheus/pipeline/linear_pipeline.py +++ b/morpheus/pipeline/linear_pipeline.py @@ -103,6 +103,34 @@ def add_stage(self, stage: _pipeline.SinglePortStage): self._linear_stages.append(stage) def add_segment_boundary(self, data_type=None, as_shared_pointer=False): + """ + Parameters + ---------- + data_type : `data_type` + data type that will be passed across the segment boundary, defaults to a general python object + if 'data_type' has no registered edge adapters. + + as_shared_pointer : `boolean` + Whether the data type will be wrapped in a shared pointer. + + Example + ------- + # Create a config and pipeline + config = Config() + pipe = LinearPipeline(config) + + # Add a source in Segment #1 + pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False)) + + # Add a segment boundary + # [Current Segment] - [Egress Boundary] ---- [Ingress Boundary] - [Next Segment] + pipe.add_segment_boundary(MessageMeta) + + # Add a sink in Segment #2 + pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) + + pipe.run() + """ if (len(self._linear_stages) == 0): raise RuntimeError("Cannot create a segment boundary, current segment is empty.") diff --git a/morpheus/stages/boundary/linear_boundary_stage.py b/morpheus/stages/boundary/linear_boundary_stage.py index 584993e338..58304f6e7a 100644 --- a/morpheus/stages/boundary/linear_boundary_stage.py +++ b/morpheus/stages/boundary/linear_boundary_stage.py @@ -27,11 +27,23 @@ class LinearBoundaryEgressStage(SinglePortStage): """ + TheLinearBoundaryEgressStage acts as an egress point from one linear segment to another. Given an existing linear + pipeline that we want to connect to another segment, a linear boundary egress stage would be added, in conjunction + with a matching LinearBoundaryIngressStage on the target linear segment. + Parameters ---------- c : `morpheus.config.Config` Pipeline configuration instance. - + boundary_port_id : `str` + String indicating the name of the egress port associated with the LinearBoundaryEgressStage; allowing it to be + paired with the corresponding ingress port. + data_type : `typing.Type` + Data type that this Stage will accept and then output to its egress port. + + Example + ------- + boundary_egress = LinearBoundaryEgressStage(config, "my_boundary_port", int) """ def __init__(self, c: Config, boundary_port_id: str, data_type): @@ -68,11 +80,23 @@ def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> Strea class LinearBoundaryIngressStage(SingleOutputSource): """ + TheLinearBoundaryIngressStage acts as source ingress point from a corresponding egress in another linear segment. + Given an existing linear pipeline that we want to connect to another segment, a linear boundary egress stage would + be added to it and a matching LinearBoundaryIngressStage would be created to receive the e. + Parameters ---------- c : `morpheus.config.Config` Pipeline configuration instance. - + boundary_port_id : `str` + String corresponding to the name of the LinearBoundaryEgressStage's egress port, use to identify and connect + with the appropriate egress. + data_type : `object` + Data type that this Stage will accept, which will correspond to some existing egress output. + + Example + ------- + boundary_ingress = LinearBoundaryIngressStage(config, "my_boundary_port", int) """ def __init__(self, c: Config, boundary_port_id: str, data_type=None):