Skip to content

Commit

Permalink
Add documentation to LinearBoundary stages (#418)
Browse files Browse the repository at this point in the history
Resolves #413

Authors:
  - Devin Robison (https://github.com/drobison00)
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #418
  • Loading branch information
drobison00 authored Nov 8, 2022
1 parent 52ffe41 commit 00124d1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 2 deletions.
28 changes: 28 additions & 0 deletions morpheus/pipeline/linear_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,34 @@ def add_stage(self, stage: _pipeline.SinglePortStage):
self._linear_stages.append(stage)

def add_segment_boundary(self, data_type=None, as_shared_pointer=False):
"""
Parameters
----------
data_type : `data_type`
data type that will be passed across the segment boundary, defaults to a general python object
if 'data_type' has no registered edge adapters.
as_shared_pointer : `boolean`
Whether the data type will be wrapped in a shared pointer.
Example
-------
# Create a config and pipeline
config = Config()
pipe = LinearPipeline(config)
# Add a source in Segment #1
pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False))
# Add a segment boundary
# [Current Segment] - [Egress Boundary] ---- [Ingress Boundary] - [Next Segment]
pipe.add_segment_boundary(MessageMeta)
# Add a sink in Segment #2
pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False))
pipe.run()
"""
if (len(self._linear_stages) == 0):
raise RuntimeError("Cannot create a segment boundary, current segment is empty.")

Expand Down
28 changes: 26 additions & 2 deletions morpheus/stages/boundary/linear_boundary_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,23 @@

class LinearBoundaryEgressStage(SinglePortStage):
"""
TheLinearBoundaryEgressStage acts as an egress point from one linear segment to another. Given an existing linear
pipeline that we want to connect to another segment, a linear boundary egress stage would be added, in conjunction
with a matching LinearBoundaryIngressStage on the target linear segment.
Parameters
----------
c : `morpheus.config.Config`
Pipeline configuration instance.
boundary_port_id : `str`
String indicating the name of the egress port associated with the LinearBoundaryEgressStage; allowing it to be
paired with the corresponding ingress port.
data_type : `typing.Type`
Data type that this Stage will accept and then output to its egress port.
Example
-------
boundary_egress = LinearBoundaryEgressStage(config, "my_boundary_port", int)
"""

def __init__(self, c: Config, boundary_port_id: str, data_type):
Expand Down Expand Up @@ -68,11 +80,23 @@ def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> Strea

class LinearBoundaryIngressStage(SingleOutputSource):
"""
TheLinearBoundaryIngressStage acts as source ingress point from a corresponding egress in another linear segment.
Given an existing linear pipeline that we want to connect to another segment, a linear boundary egress stage would
be added to it and a matching LinearBoundaryIngressStage would be created to receive the e.
Parameters
----------
c : `morpheus.config.Config`
Pipeline configuration instance.
boundary_port_id : `str`
String corresponding to the name of the LinearBoundaryEgressStage's egress port, use to identify and connect
with the appropriate egress.
data_type : `object`
Data type that this Stage will accept, which will correspond to some existing egress output.
Example
-------
boundary_ingress = LinearBoundaryIngressStage(config, "my_boundary_port", int)
"""

def __init__(self, c: Config, boundary_port_id: str, data_type=None):
Expand Down

0 comments on commit 00124d1

Please sign in to comment.