Skip to content

.NET client for Memphis. Memphis is an event processing platform

License

Notifications You must be signed in to change notification settings

halilkocaoz/memphis.net

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

41 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Memphis light logo

Memphis light logo

Simple as RabbitMQ, Robust as Apache Kafka, and Perfect for microservices.

Memphis UI

CNCF Silver Member

CNCF Silver Member

Sandbox - Docs - Twitter - YouTube
Created and maintained by the Memphis community.
Maintainer: @bazen-teklehaymanot

Discord Code Of Conduct GitHub release (latest by date)

Memphis is a next-generation alternative to traditional message brokers.

A simple, robust, and durable cloud-native message broker wrapped with
an entire ecosystem that enables cost-effective, fast, and reliable development of modern queue-based use cases.

Memphis enables the building of modern queue-based applications that require
large volumes of streamed and enriched data, modern protocols, zero ops, rapid development,
extreme cost reduction, and a significantly lower amount of dev time for data-oriented developers and data engineers.

Installation

 dotnet add package Memphis.Client -v ${MEMPHIS_CLIENT_VERSION}

Update

Update-Package Memphis.Client

Importing

using Memphis.Client;

Connecting to Memphis

First, we need to create or use default ClientOptions and then connect with Memphis by using MemphisClientFactory.CreateClient(ClientOptions opst).

try
{
    var options = MemphisClientFactory.GetDefaultOptions();
    options.Host = "<broker-address>";
    options.Username = "<application-type-username>";
    options.ConnectionToken = "<token>"; // you will get it on application type user creation
    var memphisClient = MemphisClientFactory.CreateClient(options);
    ...
}
catch (Exception ex)
{
    Console.Error.WriteLine("Exception: " + ex.Message);
    Console.Error.WriteLine(ex);
}

Once client created, the entire functionalities offered by Memphis are available.

Disconnecting from Memphis

To disconnect from Memphis, call Dispose() on the MemphisClient.

await memphisClient.Dispose()

Creating a Station

try
{
    // First: creating Memphis client
    var options = MemphisClientFactory.GetDefaultOptions();
    options.Host = "<memphis-host>";
    options.Username = "<application type username>";
    options.ConnectionToken = "<broker-token>";
    var client = MemphisClientFactory.CreateClient(options);
    
    // Second: creaing Memphis station
    var station = await client.CreateStation(
        stationOptions: new StationOptions()
        {
            Name = "<station-name>",
            RetentionType = RetentionTypes.MAX_MESSAGE_AGE_SECONDS,
            RetentionValue = 604_800,
            StorageType = StorageTypes.DISK,
            Replicas = 1,
            IdempotencyWindowMs = 0,
            SendPoisonMessageToDls = true,
            SendSchemaFailedMessageToDls = true,
        });
}
catch (Exception ex)
{
    Console.Error.WriteLine("Exception: " + ex.Message);
    Console.Error.WriteLine(ex);
}

Memphis currently supports the following types of retention:

RetentionTypes.MAX_MESSAGE_AGE_SECONDS

The above means that every message persists for the value set in the retention value field (in seconds).

RetentionTypes.MESSAGES

The above means that after the maximum number of saved messages (set in retention value) has been reached, the oldest messages will be deleted.

RetentionTypes.BYTES

The above means that after maximum number of saved bytes (set in retention value) has been reached, the oldest messages will be deleted.

Retention Values

The retention values are directly related to the retention types mentioned above,
where the values vary according to the type of retention chosen.

All retention values are of type int but with different representations as follows:

RetentionTypes.MAX_MESSAGE_AGE_SECONDS is represented in seconds, RetentionTypes.MESSAGES in a number of messages
and finally RetentionTypes.BYTES in a number of bytes.

After these limits are reached oldest messages will be deleted.

Storage Types

Memphis currently supports the following types of messages storage:

StorageTypes.DISK

The above means that messages persist on disk.

StorageTypes.MEMORY

The above means that messages persist on the main memory.

Destroying a Station

Destroying a station will remove all its resources (including producers and consumers).

station.DestroyAsync()

Attaching a Schema to an Existing Station

await client.AttachSchema(stationName: "<station-name>", schemaName: "<schema-name>");

Detaching a Schema from Station

await client.DetachSchema(stationName: station.Name);

Produce and Consume messages

The most common client operations are produce to send messages and consume to receive messages.

Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.

Memphis messages are payload agnostic. Payloads are byte[].

In order to stop getting messages, you have to call consumer.Dispose(). Destroy will terminate regardless of whether there are messages in flight for the client.

Creating a Producer

try
{
   // First: creating Memphis client
    var options = MemphisClientFactory.GetDefaultOptions();
    options.Host = "<memphis-host>";
    options.Username = "<application type username>";
    options.ConnectionToken = "<broker-token>";
    var client = MemphisClientFactory.CreateClient(options);

    // Second: creating the Memphis producer 
    var producer = await client.CreateProducer(
        stationName: "<memphis-station-name>",
        producerName: "<memphis-producer-name>",
        generateRandomSuffix:true);    
}
catch (Exception ex)
{
    Console.Error.WriteLine("Exception: " + ex.Message);
    Console.Error.WriteLine(ex);
}

Producing a message

var commonHeaders = new NameValueCollection();
commonHeaders.Add("key-1", "value-1");

await producer.ProduceAsync(Encoding.UTF8.GetBytes(text), commonHeaders);

Destroying a Producer

await producer.DestroyAsync()

Creating a Consumer

try
{
    // First: creating Memphis client
    var options = MemphisClientFactory.GetDefaultOptions();
    options.Host = "<memphis-host>";
    options.Username = "<application type username>";
    options.ConnectionToken = "<broker-token>";
    var client = MemphisClientFactory.CreateClient(options);
    
    // Second: creaing Memphis consumer
    var consumer = await client.CreateConsumer(new ConsumerOptions
    {
        StationName = "<station-name>",
        ConsumerName = "<consumer-name>",
        ConsumerGroup = "<consumer-group-name>",
    }); 
       
}
catch (Exception ex)
{
    Console.Error.WriteLine("Exception: " + ex.Message);
    Console.Error.WriteLine(ex);
}

Creating message handler for consuming a message

First, create a callback functions that receives a args that holds list of MemhpisMessage. Then, pass this callback into consumer.Consume function. The consumer will try to fetch messages every PullIntervalMs (that was given in Consumer's creation) and call the defined message handler.

EventHandler<MemphisMessageHandlerEventArgs> msgHandler = (sender, args) =>
{
    if (args.Exception != null)
    {
        Console.Error.WriteLine(args.Exception);
        return;
    }

    foreach (var msg in args.MessageList)
    {
        //print message itself
        Console.WriteLine("Received data: " + Encoding.UTF8.GetString(msg.GetData()));


        // print message headers
        foreach (var headerKey in msg.GetHeaders().Keys)
        {
            Console.WriteLine(
                $"Header Key: {headerKey}, value: {msg.GetHeaders()[headerKey.ToString()]}");
        }

        Console.WriteLine("---------");
        msg.Ack();
    }
};

Consuming a message

 await consumer.ConsumeAsync( msgCallbackHandler:msgHandler, dlqCallbackHandler:msgHandler);

Acknowledging a Message

Acknowledging a message indicates to the Memphis server to not re-send the same message again to the same consumer or consumers group.

msg.Ack();

Get headers

Get headers per message

msg.GetHeaders()

Destroying a Consumer

await consumer.DestroyAsync();

Check if broker is connected

memphisClient.IsConnected();

About

.NET client for Memphis. Memphis is an event processing platform

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • C# 100.0%