Skip to content

Commit

Permalink
feat: Add capability to Publish to MessageBus when using non-MessageB…
Browse files Browse the repository at this point in the history
…us triggers

closes #1439

Signed-off-by: Marc-Philippe Fuller <marc-philippe.fuller@intel.com>
  • Loading branch information
marcpfuller committed Jul 25, 2023
1 parent fd7ae28 commit 0caaaeb
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 26 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/edgexfoundry/go-mod-configuration/v3 v3.1.0-dev.3 // indirect
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.3.0 // indirect
github.com/edgexfoundry/go-mod-secrets/v2 v2.3.0 // indirect
github.com/edgexfoundry/go-mod-secrets/v3 v3.1.0-dev.3 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ github.com/edgexfoundry/go-mod-bootstrap/v3 v3.1.0-dev.9 h1:sO18almJrbT8H2BB14D6
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.1.0-dev.9/go.mod h1:zVtOPTNUn7jbEchgGFfbjTlLCv/KzOwqTWXxSFGIsVQ=
github.com/edgexfoundry/go-mod-configuration/v3 v3.1.0-dev.3 h1:xWyraOW+RtFwIO+DnCWBKoaa5w1ZX8vRf91zBa2Ll8c=
github.com/edgexfoundry/go-mod-configuration/v3 v3.1.0-dev.3/go.mod h1:yaxBuJh45+VxXX+nSt/Q+1gGEk4/BDe9edYWm8aEtfg=
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.3.0 h1:8Svk1HTehXEgwxgyA4muVhSkP3D9n1q+oSHI3B1Ac90=
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.3.0/go.mod h1:4/e61acxVkhQWCTjQ4XcHVJDnrMDloFsZZB1B6STCRw=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.1.0-dev.2 h1:H3ls1vyxCv6pigZ/RZlRhj9lUTQq7CiT5/dnZRsgVmQ=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.1.0-dev.2/go.mod h1:dadH49hOQlIKkbOfRj5gYEez8l9kUQ3aj9cc8uA5hv4=
github.com/edgexfoundry/go-mod-messaging/v3 v3.1.0-dev.11 h1:vpyu8SDM9WQ2F9MeRx1MQsAcOLWyFeVFt/NARZw385g=
github.com/edgexfoundry/go-mod-messaging/v3 v3.1.0-dev.11/go.mod h1:sLm8Zs3/S5zg8C4fyqy6u6PxxAWYY4GhLnFNSkGs9gY=
github.com/edgexfoundry/go-mod-registry/v3 v3.1.0-dev.3 h1:lpXaCcxzGp+O5OKIIuPdJK+pdSdoFzzg6Eee6nSYtag=
github.com/edgexfoundry/go-mod-registry/v3 v3.1.0-dev.3/go.mod h1:vcbabBpeASKL+ST/hlKwfX5J4KQiDJmcxZb2ciqC7lI=
github.com/edgexfoundry/go-mod-secrets/v2 v2.3.0 h1:MRehbe0ZdIP2jx3Nv18LktENlWdXSmw0E6x6VKvFOfo=
github.com/edgexfoundry/go-mod-secrets/v2 v2.3.0/go.mod h1:y9l/5p0EEA2IBf2ctvJ0PtjTN/XPab5BWnfIi3Sy6C4=
github.com/edgexfoundry/go-mod-secrets/v3 v3.1.0-dev.3 h1:bSm3RwsXAUIAZZPldFwpkRs1EJrZHLlOU475xGlsgKY=
github.com/edgexfoundry/go-mod-secrets/v3 v3.1.0-dev.3/go.mod h1:n+z3ugFBidF7K6CTzzJGKtwmlOz3N28Kh0FJtLaD9l0=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
Expand Down
47 changes: 47 additions & 0 deletions internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package app

import (
"context"
"encoding/json"
"errors"
"fmt"
nethttp "net/http"
Expand All @@ -31,12 +32,14 @@ import (

bootstrapHandlers "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/handlers"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"

"github.com/edgexfoundry/app-functions-sdk-go/v3/internal"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/appfunction"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/bootstrap/container"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/bootstrap/handlers"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/common"
contractsCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/runtime"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/webserver"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
Expand Down Expand Up @@ -733,3 +736,47 @@ func (svc *Service) setServiceKey(profile string) {
func (svc *Service) BuildContext(correlationId string, contentType string) interfaces.AppFunctionContext {
return appfunction.NewContext(correlationId, svc.dic, contentType)
}

// Publish pushes data to the messagebus
func (svc *Service) Publish(v any) error {
messageClient := bootstrapContainer.MessagingClientFrom(svc.dic.Get)
if messageClient == nil {
return fmt.Errorf("messagebus is disabled via configuration")
}

triggertopic := svc.config.Trigger.PublishTopic
baseTopic := svc.config.MessageBus.BaseTopicPrefix

payload, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("failed to marshal data: %v", err)
}
message := types.NewMessageEnvelope(payload, context.Background())
err = messageClient.Publish(message, contractsCommon.BuildTopic(baseTopic, triggertopic))
if err != nil {
return fmt.Errorf("failed to publish data to messagebus: %v", err)
}
return nil
}

// Publish pushes data to the messagebus for a given topic
func (svc *Service) PublishWithTopic(topic string, data any) error {
messageClient := bootstrapContainer.MessagingClientFrom(svc.dic.Get)
if messageClient == nil {
return fmt.Errorf("messagebus is disabled via configuration")
}

full_topic := commonConstants.BuildTopic(svc.config.MessageBus.BaseTopicPrefix, topic)
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal data: %v", err)
}

message := types.NewMessageEnvelope(payload, context.Background())
err = messageClient.Publish(message, full_topic)
if err != nil {
return fmt.Errorf("failed to publish data to messagebus: %v", err)
}

return nil
}
192 changes: 183 additions & 9 deletions internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@ package app

import (
"context"
"errors"
"fmt"
"net/http"
"os"
"reflect"
"testing"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces/mocks"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
clients "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/http"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"

"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/appfunction"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/bootstrap/container"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/common"
Expand All @@ -41,7 +34,15 @@ import (
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/webserver"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
builtin "github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/transforms"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces/mocks"
"github.com/edgexfoundry/go-mod-bootstrap/v3/config"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
clients "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/http"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
messageMocks "github.com/edgexfoundry/go-mod-messaging/v3/messaging/mocks"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -1012,3 +1013,176 @@ func TestService_AppContext(t *testing.T) {
// Linter requires use cancel function
cancel()
}

func TestService_Publish(t *testing.T) {
tests := []struct {
name string
message string
publishErr error
config *common.ConfigurationStruct
expectedError error
expectedTopic string
}{
{
name: "No message bus config",
config: &common.ConfigurationStruct{
MessageBus: config.MessageBusInfo{
Disabled: true,
},
Trigger: common.TriggerInfo{
PublishTopic: "test",
},
},
message: "test",
expectedError: errors.New("messagebus is disabled via configuration"),
},
{
name: "valid publish",
config: &common.ConfigurationStruct{
MessageBus: config.MessageBusInfo{
Disabled: false,
BaseTopicPrefix: "test",
},
Trigger: common.TriggerInfo{
PublishTopic: "test",
},
},
expectedTopic: "test/test",
message: "test",
publishErr: nil,
expectedError: nil,
},
{
name: "publish error",
config: &common.ConfigurationStruct{
MessageBus: config.MessageBusInfo{
Disabled: false,
BaseTopicPrefix: "test",
},
Trigger: common.TriggerInfo{
PublishTopic: "test",
},
},
expectedTopic: "test/test",
message: "test",
publishErr: errors.New("failed"),
expectedError: errors.New("failed to publish data to messagebus: failed"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var mockMessageClient *messageMocks.MessageClient
if !tt.config.MessageBus.Disabled {
mockMessageClient = &messageMocks.MessageClient{}
mockMessageClient.On("Connect").Return(nil)
mockMessageClient.On("Publish", mock.Anything, tt.expectedTopic).Return(tt.publishErr)
}

dic = di.NewContainer(di.ServiceConstructorMap{
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
return logger.NewMockClient()
},
bootstrapContainer.MessagingClientName: func(get di.Get) interface{} {
if mockMessageClient == nil {
return nil
}
return mockMessageClient
},
})

svc := &Service{
dic: dic,
config: tt.config,
}

err := svc.Publish(tt.message)
require.Equal(t, tt.expectedError, err)
})
}
}

func TestService_PublishWithTopic(t *testing.T) {
tests := []struct {
name string
topic string
message string
publishErr error
expectedTopic string
expectedError error
config *common.ConfigurationStruct
wantErr bool
}{
{
name: "No message bus config",
config: &common.ConfigurationStruct{
MessageBus: config.MessageBusInfo{
Disabled: true,
},
},
message: "test",
expectedError: errors.New("messagebus is disabled via configuration"),
wantErr: true,
},
{
name: "valid publish",
config: &common.ConfigurationStruct{
MessageBus: config.MessageBusInfo{
Disabled: false,
BaseTopicPrefix: "test",
},
},
topic: "test_topic",
expectedTopic: "test/test_topic",
message: "test",
publishErr: nil,
wantErr: false,
},
{
name: "publish error",
config: &common.ConfigurationStruct{
MessageBus: config.MessageBusInfo{
Disabled: false,
BaseTopicPrefix: "test",
},
},
topic: "test_topic",
expectedTopic: "test/test_topic",
message: "test",
publishErr: errors.New("failed"),
expectedError: errors.New("failed to publish data to messagebus: failed"),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var mockMessageClient *messageMocks.MessageClient
if !tt.config.MessageBus.Disabled {
mockMessageClient = &messageMocks.MessageClient{}
mockMessageClient.On("Connect").Return(nil)
mockMessageClient.On("Publish", mock.Anything, tt.expectedTopic).Return(tt.publishErr)
}

dic = di.NewContainer(di.ServiceConstructorMap{
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
return logger.NewMockClient()
},
bootstrapContainer.MessagingClientName: func(get di.Get) interface{} {
// ensures nil is returned in PublishWIthTopic
if mockMessageClient == nil {
return nil
}
return mockMessageClient
},
})

svc := &Service{
dic: dic,
config: tt.config,
}

err := svc.PublishWithTopic(tt.topic, tt.message)
require.Equal(t, tt.expectedError, err)

})
}
}
Loading

0 comments on commit 0caaaeb

Please sign in to comment.