Skip to content

Commit

Permalink
add loop for comsume , add sqs.go for connection
Browse files Browse the repository at this point in the history
  • Loading branch information
truongkienan2020 authored Apr 16, 2021
1 parent b7e5cd5 commit 069ef7d
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 35 deletions.
74 changes: 39 additions & 35 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,46 +29,50 @@ func NewConsumer(client *sqs.SQS, queueURL string, ackOnConsume bool, visibility
}

func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq.Message, error) error) {
result, er1 := c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: c.QueueURL,
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds
WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds),
})
if er1 != nil {
handle(ctx, nil, er1)
} else {
if len(result.Messages) > 0 {
m := result.Messages[0]
data := []byte(*m.Body)
attributes := PtrToMap(m.Attributes)
message := mq.Message{
Id: *m.MessageId,
Data: data,
Attributes: attributes,
Raw: m,
}
if c.AckOnConsume {
_, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: c.QueueURL,
ReceiptHandle: result.Messages[0].ReceiptHandle,
})
if er2 != nil {
handle(ctx, nil, er2)
var result *sqs.ReceiveMessageOutput
var er1 error
loop:
result, er1 = c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: c.QueueURL,
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(c.VisibilityTimeout), // 20 seconds
WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds),
})
if er1 != nil {
handle(ctx, nil, er1)
} else {
if len(result.Messages) > 0 {
m := result.Messages[0]
data := []byte(*m.Body)
attributes := PtrToMap(m.Attributes)
message := mq.Message{
Id: *m.MessageId,
Data: data,
Attributes: attributes,
Raw: m,
}
if c.AckOnConsume {
_, er2 := c.Client.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: c.QueueURL,
ReceiptHandle: result.Messages[0].ReceiptHandle,
})
if er2 != nil {
handle(ctx, nil, er2)
} else {
handle(ctx, &message, nil)
}
} else {
handle(ctx, &message, nil)
}
} else {
handle(ctx, &message, nil)
}
}
}
goto loop
}

func PtrToMap(m map[string]*string) map[string]string {
Expand Down
38 changes: 38 additions & 0 deletions sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package sqs

import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/sqs"
)

type (
Config struct {
Region string `mapstructure:"region" json:"region,omitempty" gorm:"column:region" bson:"region,omitempty" dynamodbav:"region,omitempty" firestore:"region,omitempty"`
AccessKeyID string `mapstructure:"access_key_id" json:"accessKeyID,omitempty" gorm:"column:accessKeyID" bson:"accessKeyID,omitempty" dynamodbav:"accessKeyID,omitempty" firestore:"accessKeyID,omitempty"`
SecretAccessKey string `mapstructure:"secret_access_key" json:"secretAccessKey,omitempty" gorm:"column:secretaccesskey" bson:"secretAccessKey,omitempty" dynamodbav:"secretAccessKey,omitempty" firestore:"secretAccessKey,omitempty"`
QueueName string `mapstructure:"queue_name" json:"queueName,omitempty" gorm:"column:token" bson:"queueName,omitempty" dynamodbav:"queueName,omitempty" firestore:"queueName,omitempty"`
}
)

func NewSession(config Config) (*session.Session, error) {
c := &aws.Config{
Region: aws.String(config.Region),
Credentials: credentials.NewStaticCredentials(config.AccessKeyID, config.SecretAccessKey, ""),
}
return session.NewSession(c)
}

func Connect(config Config) (*sqs.SQS, error) {
sess, err := NewSession(config)
if err != nil {
return nil, err
}
mySQS := sqs.New(sess)
return mySQS, nil
}

func ConnectWithSession(session *session.Session) *sqs.SQS {
return sqs.New(session)
}

0 comments on commit 069ef7d

Please sign in to comment.