Skip to content

Commit

Permalink
Merge pull request #447 from brandonforster/issue_446
Browse files Browse the repository at this point in the history
Refactored usage of service clients to be compatible with latest contracts changes.
  • Loading branch information
cloudxxx8 authored Apr 8, 2020
2 parents 79a40a3 + ee9a22e commit 44a3efc
Show file tree
Hide file tree
Showing 26 changed files with 384 additions and 261 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/edgexfoundry/device-sdk-go
require (
github.com/OneOfOne/xxhash v1.2.6
github.com/edgexfoundry/go-mod-bootstrap v0.0.26
github.com/edgexfoundry/go-mod-core-contracts v0.1.36
github.com/edgexfoundry/go-mod-core-contracts v0.1.52
github.com/edgexfoundry/go-mod-registry v0.1.17
github.com/google/uuid v1.1.0
github.com/gorilla/mux v1.7.1
Expand Down
7 changes: 4 additions & 3 deletions internal/cache/devices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"context"
"testing"

"github.com/edgexfoundry/device-sdk-go/internal/common"
"github.com/edgexfoundry/device-sdk-go/internal/mock"
contract "github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/edgexfoundry/device-sdk-go/internal/common"
"github.com/edgexfoundry/device-sdk-go/internal/mock"
)

var ds []contract.Device
Expand All @@ -23,7 +24,7 @@ func init() {
common.ServiceName = "device-cache-test"
common.DeviceClient = &mock.DeviceClientMock{}
ctx := context.WithValue(context.Background(), common.CorrelationHeader, uuid.New().String())
ds, _ = common.DeviceClient.DevicesForServiceByName(common.ServiceName, ctx)
ds, _ = common.DeviceClient.DevicesForServiceByName(ctx, common.ServiceName)
}

func TestNewDeviceCache(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions internal/cache/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ func InitCache() {
}
newValueDescriptorCache(vds)

ds, err := common.DeviceClient.DevicesForServiceByName(common.ServiceName, ctx)
ds, err := common.DeviceClient.DevicesForServiceByName(ctx, common.ServiceName)
if err != nil {
common.LoggingClient.Error(fmt.Sprintf("Device cache initialization failed: %v", err))
ds = make([]contract.Device, 0)
}
newDeviceCache(ds)

pws, err := common.ProvisionWatcherClient.ProvisionWatchersForServiceByName(common.ServiceName, ctx)
pws, err := common.ProvisionWatcherClient.ProvisionWatchersForServiceByName(ctx, common.ServiceName)
if err != nil {
common.LoggingClient.Error(fmt.Sprintf("Provision Watcher cache initialization failed %v", err))
pws = make([]contract.ProvisionWatcher, 0)
Expand Down
7 changes: 4 additions & 3 deletions internal/cache/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"context"
"testing"

"github.com/edgexfoundry/device-sdk-go/internal/common"
"github.com/edgexfoundry/device-sdk-go/internal/mock"
contract "github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/edgexfoundry/device-sdk-go/internal/common"
"github.com/edgexfoundry/device-sdk-go/internal/mock"
)

func TestInitCache(t *testing.T) {
Expand All @@ -29,7 +30,7 @@ func TestInitCache(t *testing.T) {
t.Errorf("the expected number of valuedescriptors in cache is %d but got: %d:", len(vdsBeforeAddingToCache), vl)
}

dsBeforeAddingToCache, _ := common.DeviceClient.DevicesForServiceByName(common.ServiceName, ctx)
dsBeforeAddingToCache, _ := common.DeviceClient.DevicesForServiceByName(ctx, common.ServiceName)
if dl := len(Devices().All()); dl != len(dsBeforeAddingToCache) {
t.Errorf("the expected number of devices in cache is %d but got: %d:", len(dsBeforeAddingToCache), dl)
}
Expand Down
7 changes: 4 additions & 3 deletions internal/cache/provisionwatchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"context"
"testing"

"github.com/edgexfoundry/device-sdk-go/internal/common"
"github.com/edgexfoundry/device-sdk-go/internal/mock"
contract "github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/edgexfoundry/device-sdk-go/internal/common"
"github.com/edgexfoundry/device-sdk-go/internal/mock"
)

var pws []contract.ProvisionWatcher
Expand All @@ -23,7 +24,7 @@ func init() {
common.ServiceName = "watcher-cache-test"
common.ProvisionWatcherClient = &mock.ProvisionWatcherClientMock{}
ctx := context.WithValue(context.Background(), common.CorrelationHeader, uuid.New().String())
pws, _ = common.ProvisionWatcherClient.ProvisionWatchersForServiceByName(common.ServiceName, ctx)
pws, _ = common.ProvisionWatcherClient.ProvisionWatchersForServiceByName(ctx, common.ServiceName)
}

func TestNewProvisionWatcherCache(t *testing.T) {
Expand Down
149 changes: 77 additions & 72 deletions internal/clients/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,27 @@
package clients

import (
"context"
"fmt"
"net"
"net/http"
"sync"
"time"

"github.com/edgexfoundry/device-sdk-go/internal/common"
"github.com/edgexfoundry/device-sdk-go/internal/endpoint"
"github.com/edgexfoundry/go-mod-core-contracts/clients"
"github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
"github.com/edgexfoundry/go-mod-core-contracts/clients/general"
"github.com/edgexfoundry/go-mod-core-contracts/clients/metadata"
"github.com/edgexfoundry/go-mod-core-contracts/clients/types"
)

const clientCount int = 8
"github.com/edgexfoundry/device-sdk-go/internal/common"
"github.com/edgexfoundry/device-sdk-go/internal/urlclient"
)

// InitDependencyClients triggers Service Client Initializer to establish connection to Metadata and Core Data Services
// through Metadata Client and Core Data Client.
// Service Client Initializer also needs to check the service status of Metadata and Core Data Services,
// because they are important dependencies of Device Service.
// The initialization process should be pending until Metadata Service and Core Data Service are both available.
func InitDependencyClients() error {
func InitDependencyClients(ctx context.Context, waitGroup *sync.WaitGroup) error {
if err := validateClientConfig(); err != nil {
return err
}
Expand All @@ -39,7 +37,7 @@ func InitDependencyClients() error {
return err
}

initializeClients()
initializeClients(ctx, waitGroup)

common.LoggingClient.Info("Service clients initialize successful.")
return nil
Expand Down Expand Up @@ -155,71 +153,78 @@ func checkServiceAvailableViaRegistry(serviceId string) bool {
return true
}

func checkConsulAvailable() bool {
addr := fmt.Sprintf("%v:%v", common.CurrentConfig.Registry.Host, common.CurrentConfig.Registry.Port)
conn, err := net.Dial("tcp", addr)
if err != nil {
common.LoggingClient.Error(fmt.Sprintf("Consul cannot be reached, address: %v and error is \"%v\" ", addr, err.Error()))
return false
}
conn.Close()
return true
}

func initializeClients() {
var waitGroup sync.WaitGroup
waitGroup.Add(clientCount)

endpoint := &endpoint.Endpoint{RegistryClient: common.RegistryClient, WG: &waitGroup}
metaAddr := common.CurrentConfig.Clients[common.ClientMetadata].Url()
dataAddr := common.CurrentConfig.Clients[common.ClientData].Url()
isRegistry := common.RegistryClient != nil

params := types.EndpointParams{
UseRegistry: isRegistry,
Interval: 15,
}

func initializeClients(ctx context.Context, waitGroup *sync.WaitGroup) {
// initialize Core Metadata clients
params.ServiceKey = clients.CoreMetaDataServiceKey

params.Path = clients.ApiAddressableRoute
params.Url = metaAddr + params.Path
common.AddressableClient = metadata.NewAddressableClient(params, endpoint)

params.Path = clients.ApiDeviceRoute
params.Url = metaAddr + params.Path
common.DeviceClient = metadata.NewDeviceClient(params, endpoint)

params.Path = clients.ApiDeviceServiceRoute
params.Url = metaAddr + params.Path
common.DeviceServiceClient = metadata.NewDeviceServiceClient(params, endpoint)

params.Path = clients.ApiDeviceProfileRoute
params.Url = metaAddr + params.Path
common.DeviceProfileClient = metadata.NewDeviceProfileClient(params, endpoint)

params.Path = clients.ApiConfigRoute
params.Url = metaAddr
common.MetadataGeneralClient = general.NewGeneralClient(params, endpoint)

params.Path = clients.ApiProvisionWatcherRoute
params.Url = metaAddr + params.Path
common.ProvisionWatcherClient = metadata.NewProvisionWatcherClient(params, endpoint)
common.AddressableClient = metadata.NewAddressableClient(
urlclient.NewMetadata(
ctx,
common.RegistryClient,
waitGroup,
clients.ApiAddressableRoute,
),
)

common.DeviceClient = metadata.NewDeviceClient(
urlclient.NewMetadata(
ctx,
common.RegistryClient,
waitGroup,
clients.ApiDeviceRoute,
),
)

common.DeviceServiceClient = metadata.NewDeviceServiceClient(
urlclient.NewMetadata(
ctx,
common.RegistryClient,
waitGroup,
clients.ApiDeviceServiceRoute,
),
)

common.DeviceProfileClient = metadata.NewDeviceProfileClient(
urlclient.NewMetadata(
ctx,
common.RegistryClient,
waitGroup,
clients.ApiDeviceProfileRoute,
),
)

common.MetadataGeneralClient = general.NewGeneralClient(
urlclient.NewMetadata(
ctx,
common.RegistryClient,
waitGroup,
clients.ApiConfigRoute,
),
)

common.ProvisionWatcherClient = metadata.NewProvisionWatcherClient(
urlclient.NewMetadata(
ctx,
common.RegistryClient,
waitGroup,
clients.ApiProvisionWatcherRoute,
),
)

// initialize Core Data clients
params.ServiceKey = clients.CoreDataServiceKey

params.Path = clients.ApiEventRoute
params.Url = dataAddr + params.Path
common.EventClient = coredata.NewEventClient(params, endpoint)

params.Path = common.APIValueDescriptorRoute
params.Url = dataAddr + params.Path
common.ValueDescriptorClient = coredata.NewValueDescriptorClient(params, endpoint)

if isRegistry {
// wait for the first endpoint discovery to make sure all clients work
waitGroup.Wait()
}
common.EventClient = coredata.NewEventClient(
urlclient.NewData(
ctx,
common.RegistryClient,
waitGroup,
clients.ApiEventRoute,
),
)

common.ValueDescriptorClient = coredata.NewValueDescriptorClient(
urlclient.NewData(
ctx,
common.RegistryClient,
waitGroup,
common.APIValueDescriptorRoute,
),
)
}
13 changes: 7 additions & 6 deletions internal/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import (
"sync"
"time"

dsModels "github.com/edgexfoundry/device-sdk-go/pkg/models"
"github.com/edgexfoundry/go-mod-core-contracts/clients"
contract "github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/google/uuid"

dsModels "github.com/edgexfoundry/device-sdk-go/pkg/models"
)

var (
Expand Down Expand Up @@ -78,12 +79,12 @@ func SendEvent(event *dsModels.Event) {
LoggingClient.Debug("SendEvent: EventClient.MarshalEvent passed through encoded event", clients.CorrelationHeader, correlation)
}
// Call AddBytes to post event to core data
responseBody, errPost := EventClient.AddBytes(event.EncodedEvent, ctx)
responseBody, errPost := EventClient.AddBytes(ctx, event.EncodedEvent)
if errPost != nil {
LoggingClient.Error("SendEvent Failed to push event", "device", event.Device, "response", responseBody, "error", errPost)
} else {
LoggingClient.Info("SendEvent: Pushed event to core data", clients.ContentType, clients.FromContext(clients.ContentType, ctx), clients.CorrelationHeader, correlation)
LoggingClient.Trace("SendEvent: Pushed this event to core data", clients.ContentType, clients.FromContext(clients.ContentType, ctx), clients.CorrelationHeader, correlation, "event", event)
LoggingClient.Info("SendEvent: Pushed event to core data", clients.ContentType, clients.FromContext(ctx, clients.ContentType), clients.CorrelationHeader, correlation)
LoggingClient.Trace("SendEvent: Pushed this event to core data", clients.ContentType, clients.FromContext(ctx, clients.ContentType), clients.CorrelationHeader, correlation, "event", event)
}
}

Expand Down Expand Up @@ -270,7 +271,7 @@ func FilterQueryParams(queryParams string) url.Values {
LoggingClient.Error("Error parsing query parameters: %s\n", err)
}
// Filter out parameters with predefined prefix
for k, _ := range m {
for k := range m {
if strings.HasPrefix(k, SDKReservedPrefix) {
delete(m, k)
}
Expand All @@ -286,7 +287,7 @@ func UpdateLastConnected(name string) {
}

t := time.Now().UnixNano() / int64(time.Millisecond)
err := DeviceClient.UpdateLastConnectedByName(name, t, context.Background())
err := DeviceClient.UpdateLastConnectedByName(context.Background(), name, t)
if err != nil {
LoggingClient.Error("Failed to update last connected value for device: " + name)
}
Expand Down
Loading

0 comments on commit 44a3efc

Please sign in to comment.