Skip to content

Commit

Permalink
feat(adapter.sarama): add useful types
Browse files Browse the repository at this point in the history
  • Loading branch information
alebabai committed Feb 21, 2024
1 parent 35f0a26 commit c9fc7c3
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 0 deletions.
102 changes: 102 additions & 0 deletions adapter/sarama/consumer_group_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package sarama

import (
"fmt"

"github.com/IBM/sarama"
"github.com/alebabai/go-kafka"
"github.com/alebabai/go-kafka/adapter"
)

// ConsumerGroupHandler is an implementation of [sarama.ConsumerGroupHandler] using [kafka.Handler] for message processing.
type ConsumerGroupHandler struct {
handler kafka.Handler

converter adapter.ToKafkaMessageConverterFunc[sarama.ConsumerMessage]

setupHook ConsumerGroupHandlerHookFunc
cleanupHook ConsumerGroupHandlerHookFunc
}

// ConsumerGroupHandlerHookFunc is a function for custom hooks across [sarama.ConsumerGroupSession] lifecycle events.
type ConsumerGroupHandlerHookFunc func(sarama.ConsumerGroupSession) error

// ConsumerGroupHandlerEmptyHook is a no-op hook for [sarama.ConsumerGroupSession] lifecycle events.
func ConsumerGroupHandlerEmptyHook(_ sarama.ConsumerGroupSession) error {
return nil
}

// NewConsumerGroupHandler returns a pointer to the new instance of [ConsumerGroupHandler] or an error.
func NewConsumerGroupHandler(h kafka.Handler, opts ...ConsumerGroupHandlerOption) (*ConsumerGroupHandler, error) {
cgh := &ConsumerGroupHandler{
handler: h,
converter: ConvertConsumerMessageToKafkaMessage,
setupHook: ConsumerGroupHandlerEmptyHook,
cleanupHook: ConsumerGroupHandlerEmptyHook,
}

for _, opt := range opts {
opt(cgh)
}

if err := cgh.Validate(); err != nil {
return nil, err
}

return cgh, nil
}

// ConsumerGroupHandlerOption is a function type for setting optional parameters for the [ConsumerGroupHandler].
type ConsumerGroupHandlerOption func(*ConsumerGroupHandler)

// ConsumerGroupHandlerWithConverter is an option to set a customer message converter function.
func ConsumerGroupHandlerWithConverter(convFunc adapter.ToKafkaMessageConverterFunc[sarama.ConsumerMessage]) ConsumerGroupHandlerOption {
return func(cgh *ConsumerGroupHandler) {
cgh.converter = convFunc
}
}

// ConsumerGroupHandlerWithSetupHook is an option to set a custom hook for the setup phase in [sarama.ConsumerGroupSession] lifecycle events.
func ConsumerGroupHandlerWithSetupHook(hookFunc ConsumerGroupHandlerHookFunc) ConsumerGroupHandlerOption {
return func(cgh *ConsumerGroupHandler) {
cgh.setupHook = hookFunc
}
}

// ConsumerGroupHandlerWithCleanupHook is an option to set a custom hook for the cleanup phase in [sarama.ConsumerGroupSession] lifecycle events.
func ConsumerGroupHandlerWithCleanupHook(hookFunc ConsumerGroupHandlerHookFunc) ConsumerGroupHandlerOption {
return func(cgh *ConsumerGroupHandler) {
cgh.cleanupHook = hookFunc
}
}

// Setup implements [sarama.ConsumerGroupHandler].
func (cgh *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
return cgh.setupHook(session)
}

// Cleanup implements [sarama.ConsumerGroupHandler].
func (cgh *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
return cgh.cleanupHook(session)
}

// ConsumeClaim implements [sarama.ConsumerGroupHandler].
func (cgh *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := session.Context()
for {
select {
case <-ctx.Done():
return nil
case msg, ok := <-claim.Messages():
if !ok {
return nil
}

if err := cgh.handler.Handle(ctx, cgh.converter(*msg)); err != nil {
return fmt.Errorf("failed to handle message: %w", err)
}

session.MarkMessage(msg, "")
}
}
}
27 changes: 27 additions & 0 deletions adapter/sarama/consumer_group_handler_validation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package sarama

import (
"errors"
)

// Validate validates [ConsumerGroupHandler] and returns an error if validation is failed.
func (cgh ConsumerGroupHandler) Validate() error {
errs := make([]error, 0)
if cgh.handler == nil {
errs = append(errs, errors.New("handler is required"))
}

if cgh.converter == nil {
errs = append(errs, errors.New("converter is required"))
}

if cgh.setupHook == nil {
errs = append(errs, errors.New("setupHook is required"))
}

if cgh.cleanupHook == nil {
errs = append(errs, errors.New("cleanupHook is required"))
}

return errors.Join(errs...)
}
67 changes: 67 additions & 0 deletions adapter/sarama/consumer_group_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package sarama

import (
"context"
"fmt"

"github.com/IBM/sarama"
)

// ConsumerGroupListener is responsible for message consumption of a [sarama.ConsumerGroup] via an infinite loop.
type ConsumerGroupListener struct {
consumerGroup sarama.ConsumerGroup
consumerGroupHandler sarama.ConsumerGroupHandler
}

// NewConsumerGroupHandler returns a pointer to the new instance of [ConsumerGroupListener] or an error.
func NewConsumerGroupListener(
cg sarama.ConsumerGroup,
cgh sarama.ConsumerGroupHandler,
opts ...ConsumerGroupListenerOption,
) (*ConsumerGroupListener, error) {
cgl := &ConsumerGroupListener{
consumerGroup: cg,
consumerGroupHandler: cgh,
}

for _, opt := range opts {
opt(cgl)
}

if err := cgl.Validate(); err != nil {
return nil, err
}

return cgl, nil
}

// ConsumerGroupListenerOption is a function type for setting optional parameters for the [ConsumerGroupListener].
type ConsumerGroupListenerOption func(*ConsumerGroupListener)

// Listen starts the message consumption process on the specified topics, using the provided [sarama.ConsumerGroupHandler] for processing messages.
func (cgl *ConsumerGroupListener) Listen(ctx context.Context, topics ...string) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := cgl.consumerGroup.Consume(ctx, topics, cgl.consumerGroupHandler); err != nil {
return fmt.Errorf("failed to consume messages: %w", err)
}
}
}
}

// Errors returns a channel for receiving errors from the [sarama.ConsumerGroup].
func (cgl *ConsumerGroupListener) Errors() <-chan error {
return cgl.consumerGroup.Errors()
}

// Close shuts down the [sarama.ConsumerGroup] and releases any associated resources.
func (cgl *ConsumerGroupListener) Close() error {
if err := cgl.consumerGroup.Close(); err != nil {
return fmt.Errorf("failed to close consumer group: %w", err)
}

return nil
}
19 changes: 19 additions & 0 deletions adapter/sarama/consumer_group_listener_validation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package sarama

import (
"errors"
)

// Validate validates [ConsumerGroupListener] and returns an error if validation is failed.
func (cgh ConsumerGroupListener) Validate() error {
errs := make([]error, 0)
if cgh.consumerGroup == nil {
errs = append(errs, errors.New("consumerGroup is required"))
}

if cgh.consumerGroupHandler == nil {
errs = append(errs, errors.New("consumerGroupHandler is required"))
}

return errors.Join(errs...)
}
18 changes: 18 additions & 0 deletions adapter/sarama/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package sarama

import (
"github.com/IBM/sarama"
"github.com/alebabai/go-kafka"
)

// SyncProducer is an adapter interface.
type SyncProducer interface {
sarama.SyncProducer
kafka.Handler
}

// AsyncProducer is an adapter interface.
type AsyncProducer interface {
sarama.AsyncProducer
kafka.Handler
}

0 comments on commit c9fc7c3

Please sign in to comment.