Skip to content

Commit

Permalink
refactor: TopicMessageQuery
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Akhterov <daniel@launchbadge.com>
  • Loading branch information
janaakhterov committed Jun 10, 2021
1 parent c85349e commit b48f094
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 50 deletions.
8 changes: 3 additions & 5 deletions subscription_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ type SubscriptionHandle struct {
onUnsubscribe func()
}

func newSubscriptionHandle(onUnsubscribe func()) SubscriptionHandle {
return SubscriptionHandle{onUnsubscribe: onUnsubscribe}
}

func (handle SubscriptionHandle) Unsubscribe() {
handle.onUnsubscribe()
if handle.onUnsubscribe != nil {
handle.onUnsubscribe()
}
}
130 changes: 85 additions & 45 deletions topic_message_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,23 @@ import (
)

type TopicMessageQuery struct {
pb *mirror.ConsensusTopicQuery
errorHandler func(stat status.Status)
pb *mirror.ConsensusTopicQuery
errorHandler func(stat status.Status)
completionHandler func()
retryHandler func(err error) bool
counter uint64
attempt uint64
maxAttempts uint64
limit *uint64
}

func NewTopicMessageQuery() *TopicMessageQuery {
pb := mirror.ConsensusTopicQuery{}
return &TopicMessageQuery{
pb: &pb,
errorHandler: nil,
pb: &mirror.ConsensusTopicQuery{},
maxAttempts: maxAttempts,
errorHandler: nil,
retryHandler: defaultRetryHandler,
completionHandler: nil,
}
}

Expand Down Expand Up @@ -64,79 +72,87 @@ func (query *TopicMessageQuery) GetEndTime() time.Time {
}

func (query *TopicMessageQuery) SetLimit(limit uint64) *TopicMessageQuery {
query.pb.Limit = limit
query.limit = &limit
return query
}

func (query *TopicMessageQuery) GetLimit() uint64 {
return query.pb.Limit
if query.limit != nil {
return *query.limit
} else {
return 0
}
}

func (query *TopicMessageQuery) SetErrorHandler(errorHandler func(stat status.Status)) *TopicMessageQuery {
query.errorHandler = errorHandler
return query
}

func (query *TopicMessageQuery) SetCompletionHandler(completionHandler func()) *TopicMessageQuery {
query.completionHandler = completionHandler
return query
}

func (query *TopicMessageQuery) SetRetryHandler(retryHandler func(err error) bool) *TopicMessageQuery {
query.retryHandler = retryHandler
return query
}

func (query *TopicMessageQuery) Subscribe(client *Client, onNext func(TopicMessage)) (SubscriptionHandle, error) {
ctx, cancel := context.WithCancel(context.TODO())
handle := newSubscriptionHandle(cancel)
handle := SubscriptionHandle{}

messages := sync.Map{}
messagesMutex := sync.Mutex{}

channel, err := client.mirrorNetwork.getNextMirrorNode().getChannel()
if err != nil {
return handle, err
}

go func() {
var subClient mirror.ConsensusService_SubscribeTopicClient
var err error
attempt := 0
resubscribe := true
channel, err := client.mirrorNetwork.getNextMirrorNode().getChannel()
if err != nil {
cancel()
grpcErr, ok := status.FromError(err)
if query.errorHandler == nil && ok {
query.errorHandler(*grpcErr)
}
}

for {
if resubscribe {
if query.attempt <= query.maxAttempts && subClient == nil {
if query.limit != nil {
query.pb.Limit = *query.limit - query.counter
}

ctx, cancel := context.WithCancel(context.TODO())
handle.onUnsubscribe = cancel

subClient, err = (*channel).SubscribeTopic(ctx, query.pb)

if err != nil {
cancel()
grpcErr, ok := status.FromError(err)
if query.errorHandler == nil && ok {
query.errorHandler(*grpcErr)
}
handle.Unsubscribe()
callErrorHandlerWithGrpcStatus(err, query.errorHandler)
subClient = nil
}
}

resp, err := subClient.Recv()
code := status.Code(err)

if err != nil {
if code == codes.NotFound || code == codes.Unavailable {
if attempt >= 10 {
cancel()
} else {
delay := 250.0 * math.Pow(2.0, float64(attempt))
time.Sleep(time.Duration(delay) * time.Millisecond)
attempt += 1
continue
}
break
} else {
cancel()
grpcErr, ok := status.FromError(err)
if query.errorHandler == nil && ok {
query.errorHandler(*grpcErr)
}
break
if query.attempt <= query.maxAttempts && query.retryHandler(err) {
handle.Unsubscribe()
subClient = nil

delay := 250.0 * math.Pow(2.0, float64(query.attempt))
time.Sleep(time.Duration(delay) * time.Millisecond)
query.attempt += 1
continue
}

handle.Unsubscribe()
callErrorHandlerWithGrpcStatus(err, query.errorHandler)
break
}

resubscribe = false
if resp.ChunkInfo == nil || (resp.ChunkInfo != nil && resp.ChunkInfo.Total == 1) {
query.counter += 1

if resp.ChunkInfo == nil {
onNext(topicMessageOfSingle(resp))
} else {
messagesMutex.Lock()
Expand All @@ -149,6 +165,8 @@ func (query *TopicMessageQuery) Subscribe(client *Client, onNext func(TopicMessa
messages.Store(txID, message)

if int32(len(message)) == resp.ChunkInfo.Total {
query.counter += 1

messages.Delete(txID)
messagesMutex.Unlock()
onNext(topicMessageOfMany(message))
Expand All @@ -157,8 +175,30 @@ func (query *TopicMessageQuery) Subscribe(client *Client, onNext func(TopicMessa
}

}

if query.limit != nil && query.counter == *query.limit {
query.completionHandler()
break
}
}
}()

return handle, nil
}

func defaultRetryHandler(err error) bool {
code := status.Code(err)

switch code {
case codes.NotFound, codes.ResourceExhausted, codes.Internal, codes.Unavailable:
return true
default:
return false
}
}

func callErrorHandlerWithGrpcStatus(err error, errorHandler func(stat status.Status)) {
if grpcErr, ok := status.FromError(err); errorHandler != nil && ok {
errorHandler(*grpcErr)
}
}

0 comments on commit b48f094

Please sign in to comment.