Skip to content

The .NET package to achieve a clean pipeline architecture.

License

Notifications You must be signed in to change notification settings

fzf003/ObservablePipelines

 
 

Repository files navigation

ObservablePipelines

A simple .NET package to achieve a clean pipeline architecture with Observables and injectable pipes.

When to use

This package is useful in scenarios, where you want to explicitly state what happens step by step. A Pipe would represent the perfect Single Responsibility Object, as it will only handle one explicit case, e.g. filtering, or transforming.

Additionally you can construct Pipelines with Dependency Injection, and thus save a lot of construction code and unnecessary dependencies.

Introduction

This package contains a few simple interfaces and logic, that allows you to configure and build Pipelines extremely clean, fast and dynamic with Dependency Injection and IObservable's. The only reference needed is "Microsoft.Extensions.DependencyInjection", as the PipelineBuilder uses this internally to dynamically construct the Pipelines.

  graph LR;
    IObservable --> lPipe
    lPipe --> fPipe
    fPipe --> tPipe
    tPipe --> IObserver

    IObservable --> fPipe2
    fPipe2 --> tPipe2
    tPipe2 --> IObserver2

    subgraph Pipeline2
        subgraph FilterPipe2
            fPipe2[IPipe< T1, T2 >]
        end
        subgraph TransformPipe2
            tPipe2[IPipe< T2, T3 >]
        end
    end
    subgraph Target2
      IObserver2[IObserver< T3 >]
    end

    subgraph Source
      IObservable[IObservable< T1 >]
    end
    subgraph Pipeline1
        subgraph LoggerPipe
            lPipe[IPipe< T1, T2 >]
        end
        subgraph FilterPipe
            fPipe[IPipe< T2, T3 >]
        end
        subgraph TransformPipe
            tPipe[IPipe< T3, T4 >]
        end
    end
    subgraph Target1
      IObserver[IObserver< T4 >]
    end

Loading

In the diagram above, you can see the principle of a pipeline. You have a single source, which emits values. These values will be sent through one or more pipelines, where they may be filtered, used for side-logic, or transformed.

Observables are very well suited for this job, as they are already extremly well usable with the package System.Reactive.Linq. When using Observables with this package, you are able to handle streams of events with a LINQ like query. They also enable you to filter the event streams. This would not be possible with using T instead of IObservable<T>, as you would need to return null inside a pipe or anything like that.

If you have not yet worked with Observables, I highly recommend checking out this tutorial:

Introduction to Rx

Usage

1. Install the package:

Install-Package ObservablePipelines
or
dotnet add package ObservablePipelines

2. (Optional, but recommended) Install System.Reactive.Linq:

Install-Package System.Reactive.Linq
or
dotnet add package System.Reactive.Linq

3. Add Dependencies to DI-Container

using Microsoft.Extensions.DependencyInjection;

services.AddObservablePipelines()

4. Implement IPipe<TIn, TOut>

internal class LoggerPipe : IPipe<ChatMessage, ChatMessage>
{
    private readonly ILogger<LoggerPipe> logger;

    public LoggerPipe(ILogger<LoggerPipe> logger) {
        this.logger = logger
            ?? throw new ArgumentNullException(nameof(logger));
    }

    public IObservable<ChatMessage> Handle(IObservable<ChatMessage> source) {
        return source
            .Do(m =>
                logger.LogInformation($"Pipeline triggered for message: '{m.Message}'.")
            );
    }
}

Inside the Pipe, I recommend you to use System.Reactive.Linq to handle the event streams. In the constructor you can add all dependencies, that you set up the DI-Container with.

6. Inject and use IPipelineBuilder

var pipeline = pipelineBuilder
    .ConfigureOptions(builder => builder
        .Add(new MessageFilterPipeOptions(Guid.Empty))
    )
    .ConfigurePipeline(builder => builder
        .AddSource(chatMessages)
        .AddStep<LoggerPipe, ChatMessage>()
        .AddStep<MessageFilterPipe, ChatMessage>()
        .AddStep<MessageTransformPipe, IdentifiedChatMessage>()
        .AddStep(new ConsoleLoggerPipe())
    )
    .Build();

pipeline.Subscribe(m =>
    logger.LogInformation($"New Message from {m.SenderName}: {m.Message}.")
);

Here you can add Options-Instances, that were not added to the global DI-Container. The PipelineBuilder clones the global DI-Container and adds all Options-Instances to it. With them you are able to explitly configure your Pipes for certain usecases and you can use the same Pipe with different configurations.

First, you need to add the source of the Pipeline with AddSource(). After that, you can add your IPipes as generics, where you also have to specify the type of the output. This way it will be built, using Dependency Injection.

Alternatively, you can add an instance and do not need to specify generic type arguments.

Configuration

As the PipelineBuilder uses Dependency-Injection, you can add Options-Instances to the builders ServiceCollection, by calling Configure(). The type, that you add to the ConfigurationBuilder should be unique, so that it can be injected into the correct pipe.

internal record MessageFilterPipeOptions(Guid ReceiverId);

internal class MessageFilterPipe : IPipe<ChatMessage, ChatMessage>
{
    private readonly MessageFilterPipeOptions options;

    public MessageFilterPipe(MessageFilterPipeOptions options) {
        this.options = options;
    }
}

pipelineBuilder
    .Configure(builder => builder
        .Add(new MessageFilterPipeOptions(Guid.Empty))
    );

Types to use

  • IPipe<TIn,TOut>: Pipe Interface with ingoing and outgoing IObservable Stream
  • IPipelineBuilder: Builder, to set up a Pipeline with Configurations, a Source and multiple Pipes.

About

The .NET package to achieve a clean pipeline architecture.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • C# 100.0%