Skip to content

Cloud Events NATS extended protocol: opentracing, subject pool for group

License

Notifications You must be signed in to change notification settings

d7561985/protonats

Repository files navigation

Cloud Events NATS protocol

This package extent opentracing package for trace carrier

Feature:

  • OpenTracing features with "github.com/d7561985/tel" send trace to NATS, read tracing span from NATS

  • Producer uses context.TopicFrom feature for overwrite default subject

  • Consumer subject pool for group

  • Protocol Consumer and Sender struct members are Interfaces and easily could be replaced

  • Trace carried within cloudevents payload that’s why this allows TeleObservability to be ubiquitous for any protocol

Trace feature enable

We use TeleObservability correctly read span from NATS and pack it correctly. But this is only like middleware.

All engine pack/unpack under adapter

package main

import (
	"github.com/d7561985/tel"
	"github.com/d7561985/tel/monitoring/metrics"
	cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/cloudevents/sdk-go/v2/client"
	"github.com/d7561985/protonats"
)

func main()  {
	t := tel.New(tel.GetConfigFromEnv())

	p, err := protonats.NewProtocol(env.NATSServer, "-", "", cenats.NatsOptions())

	metricsss := metrics.NewCollectorMetricsReader()
     ce, err := cloudevents.NewClient(p,
         client.WithObservabilityService(protonats.NewTeleObservability(&t, metricsss)),
     )

}

Consumer Subject Group pool

Use option for protocol - WithConsumerOptions

    p, err := protonats.NewProtocol(env.NATSServer, "-", "",
		cenats.NatsOptions(),
		protonats.WithConsumerOptions(
			protonats.WithQueuePoolSubscriber("MyQueue",
				"MySubject1", "MySubject2",
			),
		),
	)