Skip to content

Commit

Permalink
allow to pull static of list device resources
Browse files Browse the repository at this point in the history
  • Loading branch information
jkralik committed Jun 30, 2020
1 parent a13beea commit a2c05ac
Show file tree
Hide file tree
Showing 20 changed files with 325 additions and 169 deletions.
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
"LISTEN_ACME_DEVICE_ID":"adebc667-1f2b-41e3-bf5c-6d6eabc68cc6",
"LISTEN_ACME_DIRECTORY_URL":"https://localhost:10443/acme/acme/directory",
"TEST_COAP_GW_OVERWRITE_LISTEN_ACME_DIRECTORY_URL": "https://localhost:10443/acme/ocf.gw/directory",
"GOMAXPROCS": 1,
// "GOMAXPROCS": 1,
// "GOFLAGS":"-mod=vendor",
// "GRPC_VERBOSITY":"DEBUG",
// "GRPC_GO_LOG_VERBOSITY_LEVEL":99,
// "GRPC_GO_LOG_SEVERITY_LEVEL":"info",
},
"go.testTimeout": "180s",
"go.testTimeout": "300s",
"go.buildFlags": [
// "-mod=vendor",
]
Expand Down
4 changes: 2 additions & 2 deletions cloud2cloud-connector/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var ContentType_VNDOCFCBOR = message.AppOcfCbor.String()

type EventHeader struct {
CorrelationID string
SubscriptionID string
ID string
ContentType string
EventType EventType
SequenceNumber uint64
Expand Down Expand Up @@ -111,7 +111,7 @@ func ParseEventHeader(r *http.Request) (h EventHeader, _ error) {

return EventHeader{
CorrelationID: correlationID,
SubscriptionID: subscriptionID,
ID: subscriptionID,
ContentType: contentType,
EventType: eventType,
SequenceNumber: sequenceNumber,
Expand Down
23 changes: 15 additions & 8 deletions cloud2cloud-connector/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,24 @@ import (
"github.com/go-ocf/kit/security/oauth/manager"
)

type PullStaticDeviceEvents struct {
CacheSize int `envconfig:"CACHE_SIZE" default:"2048"`
Timeout time.Duration `envconfig:"TIMEOUT" default:"5s"`
MaxParallelGets int64 `envconfig:"MAX_PARALLEL_GETS" default:"128"`
}

//Config represent application configuration
type Config struct {
grpc.Config
AuthServerAddr string `envconfig:"AUTH_SERVER_ADDRESS" default:"127.0.0.1:9100"`
ResourceAggregateAddr string `envconfig:"RESOURCE_AGGREGATE_ADDRESS" default:"127.0.0.1:9100"`
ResourceDirectoryAddr string `envconfig:"RESOURCE_DIRECTORY_ADDRESS" default:"127.0.0.1:9100"`
OAuthCallback string `envconfig:"OAUTH_CALLBACK"`
EventsURL string `envconfig:"EVENTS_URL"`
PullDevicesDisabled bool `envconfig:"PULL_DEVICES_DISABLED" default:"false"`
PullDevicesInterval time.Duration `envconfig:"PULL_DEVICES_INTERVAL" default:"5s"`
OAuth manager.Config `envconfig:"OAUTH"`
AuthServerAddr string `envconfig:"AUTH_SERVER_ADDRESS" default:"127.0.0.1:9100"`
ResourceAggregateAddr string `envconfig:"RESOURCE_AGGREGATE_ADDRESS" default:"127.0.0.1:9100"`
ResourceDirectoryAddr string `envconfig:"RESOURCE_DIRECTORY_ADDRESS" default:"127.0.0.1:9100"`
OAuthCallback string `envconfig:"OAUTH_CALLBACK"`
EventsURL string `envconfig:"EVENTS_URL"`
PullDevicesDisabled bool `envconfig:"PULL_DEVICES_DISABLED" default:"false"`
PullDevicesInterval time.Duration `envconfig:"PULL_DEVICES_INTERVAL" default:"5s"`
PullStaticDeviceEvents PullStaticDeviceEvents `envconfig:"PULL_STATIC_DEVICE_EVENTS"`
OAuth manager.Config `envconfig:"OAUTH"`
}

//String return string representation of Config
Expand Down
17 changes: 7 additions & 10 deletions cloud2cloud-connector/service/deviceSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func cancelDeviceSubscription(ctx context.Context, linkedAccount store.LinkedAcc
return nil
}

func (s *SubscriptionManager) updateCloudStatus(ctx context.Context, deviceID string, online bool, authContext pbCQRS.AuthorizationContext, sequence uint64) error {
func (s *SubscriptionManager) updateCloudStatus(ctx context.Context, deviceID string, online bool, authContext pbCQRS.AuthorizationContext, cmdMetadata pbCQRS.CommandMetadata) error {
status := cloud.Status{
ResourceTypes: cloud.StatusResourceTypes,
Interfaces: cloud.StatusInterfaces,
Expand All @@ -60,11 +60,8 @@ func (s *SubscriptionManager) updateCloudStatus(ctx context.Context, deviceID st
CoapContentFormat: int32(message.AppOcfCbor),
Data: data,
},
Status: pbRA.Status_OK,
CommandMetadata: &pbCQRS.CommandMetadata{
ConnectionId: Cloud2cloudConnectorConnectionId,
Sequence: sequence,
},
Status: pbRA.Status_OK,
CommandMetadata: &cmdMetadata,
AuthorizationContext: &authContext,
}

Expand Down Expand Up @@ -104,14 +101,14 @@ func (s *SubscriptionManager) SubscribeToResource(ctx context.Context, deviceID,
if err != nil {
return fmt.Errorf("cannot cache subscription for device subscriptions: %v", err)
}
sub.SubscriptionID, err = s.subscribeToResource(ctx, linkedAccount, linkedCloud, correlationID.String(), signingSecret, deviceID, href)
sub.ID, err = s.subscribeToResource(ctx, linkedAccount, linkedCloud, correlationID.String(), signingSecret, deviceID, href)
if err != nil {
s.cache.Delete(correlationID.String())
return fmt.Errorf("cannot subscribe to device %v resource %v: %v", deviceID, href, err)
}
_, err = s.store.FindOrCreateSubscription(ctx, sub)
if err != nil {
cancelResourceSubscription(ctx, linkedAccount, linkedCloud, sub.DeviceID, sub.Href, sub.SubscriptionID)
cancelResourceSubscription(ctx, linkedAccount, linkedCloud, sub.DeviceID, sub.Href, sub.ID)
return fmt.Errorf("cannot store resource subscription to DB: %v", err)
}
return nil
Expand Down Expand Up @@ -150,7 +147,7 @@ func (s *SubscriptionManager) HandleResourcesPublished(ctx context.Context, d su
EndpointInformations: endpoints,
},
CommandMetadata: &pbCQRS.CommandMetadata{
ConnectionId: Cloud2cloudConnectorConnectionId,
ConnectionId: d.linkedAccount.ID + "." + d.subscription.ID,
Sequence: header.SequenceNumber,
},
})
Expand Down Expand Up @@ -185,7 +182,7 @@ func (s *SubscriptionManager) HandleResourcesUnpublished(ctx context.Context, d
},
ResourceId: raCqrs.MakeResourceId(link.GetDeviceID(), kitHttp.CanonicalHref(href)),
CommandMetadata: &pbCQRS.CommandMetadata{
ConnectionId: Cloud2cloudConnectorConnectionId,
ConnectionId: d.linkedAccount.ID + "." + d.subscription.ID,
Sequence: header.SequenceNumber,
},
})
Expand Down
4 changes: 2 additions & 2 deletions cloud2cloud-connector/service/deviceSubscriptionHandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (h *loadDevicesHandler) Handle(ctx context.Context, iter store.Subscription
}
linkedAccount, ok := h.linkedAccounts[s.LinkedAccountID]
if !ok {
h.deleteSubs = append(h.deleteSubs, s.SubscriptionID)
h.deleteSubs = append(h.deleteSubs, s.ID)
continue
}
linkedCloud, ok := h.linkedClouds[linkedAccount.LinkedCloudID]
Expand Down Expand Up @@ -113,7 +113,7 @@ func (c *DevicesSubscription) Load(s store.Store) error {

err = s.LoadSubscriptions(ctx, []store.SubscriptionQuery{{Type: store.Type_Resource}}, &h)
for _, ID := range h.deleteSubs {
errDel := s.RemoveSubscriptions(ctx, store.SubscriptionQuery{SubscriptionID: ID})
errDel := s.RemoveSubscriptions(ctx, store.SubscriptionQuery{ID: ID})
if errDel != nil {
log.Errorf("cannot delete subscription %v: %v", ID, errDel)
}
Expand Down
53 changes: 35 additions & 18 deletions cloud2cloud-connector/service/devicesSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func cancelDevicesSubscription(ctx context.Context, linkedAccount store.LinkedAc
return nil
}

func (s *SubscriptionManager) publishCloudDeviceStatus(ctx context.Context, deviceID string, authCtx pbCQRS.AuthorizationContext, sequence uint64) error {
func (s *SubscriptionManager) publishCloudDeviceStatus(ctx context.Context, deviceID string, authCtx pbCQRS.AuthorizationContext, cmdMetadata pbCQRS.CommandMetadata) error {
resource := pbRA.Resource{
Id: raCqrs.MakeResourceId(deviceID, cloud.StatusHref),
Href: cloud.StatusHref,
Expand All @@ -55,10 +55,7 @@ func (s *SubscriptionManager) publishCloudDeviceStatus(ctx context.Context, devi
ResourceId: resource.Id,
Resource: &resource,
TimeToLive: 0,
CommandMetadata: &pbCQRS.CommandMetadata{
Sequence: sequence,
ConnectionId: Cloud2cloudConnectorConnectionId,
},
CommandMetadata: &cmdMetadata,
}

_, err := s.raClient.PublishResource(ctx, &request)
Expand Down Expand Up @@ -92,14 +89,14 @@ func (s *SubscriptionManager) SubscribeToDevice(ctx context.Context, deviceID st
if err != nil {
return fmt.Errorf("cannot cache subscription for device subscriptions: %v", err)
}
sub.SubscriptionID, err = s.subscribeToDevice(ctx, linkedAccount, linkedCloud, correlationID, signingSecret, deviceID)
sub.ID, err = s.subscribeToDevice(ctx, linkedAccount, linkedCloud, correlationID, signingSecret, deviceID)
if err != nil {
s.cache.Delete(correlationID)
return fmt.Errorf("cannot subscribe to device %v: %v", deviceID, err)
}
_, err = s.store.FindOrCreateSubscription(ctx, sub)
if err != nil {
cancelDeviceSubscription(ctx, linkedAccount, linkedCloud, deviceID, sub.SubscriptionID)
cancelDeviceSubscription(ctx, linkedAccount, linkedCloud, deviceID, sub.ID)
return fmt.Errorf("cannot store subscription to DB: %v", err)
}
err = s.devicesSubscription.Add(deviceID, linkedAccount, linkedCloud)
Expand All @@ -120,6 +117,14 @@ func (s *SubscriptionManager) HandleDevicesRegistered(ctx context.Context, d sub
errors = append(errors, err)
continue
}
if d.linkedCloud.SupportedSubscriptionsEvents.StaticDeviceEvents {
s.triggerPullDevice(pullDevice{
linkedAccount: d.linkedAccount,
linkedCloud: d.linkedCloud,
deviceID: device.ID,
})
continue
}
if d.linkedCloud.SupportedSubscriptionsEvents.NeedPullDevice() {
continue
}
Expand Down Expand Up @@ -164,18 +169,24 @@ func (s *SubscriptionManager) HandleDevicesUnregistered(ctx context.Context, sub
}

// HandleDevicesOnline sets device online to resource aggregate and register device to projection.
func (s *SubscriptionManager) HandleDevicesOnline(ctx context.Context, subscriptionData subscriptionData, header events.EventHeader, devices events.DevicesOnline) error {
func (s *SubscriptionManager) HandleDevicesOnline(ctx context.Context, d subscriptionData, header events.EventHeader, devices events.DevicesOnline) error {
var errors []error
for _, device := range devices {
authCtx := pbCQRS.AuthorizationContext{
DeviceId: device.ID,
}
err := s.publishCloudDeviceStatus(ctx, device.ID, authCtx, header.SequenceNumber)
err := s.publishCloudDeviceStatus(ctx, device.ID, authCtx, pbCQRS.CommandMetadata{
ConnectionId: d.linkedAccount.ID + "." + d.subscription.ID,
Sequence: header.SequenceNumber,
})
if err != nil {
errors = append(errors, err)
continue
}
err = s.updateCloudStatus(ctx, device.ID, true, authCtx, header.SequenceNumber)
err = s.updateCloudStatus(ctx, device.ID, true, authCtx, pbCQRS.CommandMetadata{
ConnectionId: d.linkedAccount.ID + "." + d.subscription.ID,
Sequence: header.SequenceNumber,
})
if err != nil {
errors = append(errors, fmt.Errorf("cannot set device %v to online: %v", device.ID, err))
}
Expand All @@ -188,18 +199,24 @@ func (s *SubscriptionManager) HandleDevicesOnline(ctx context.Context, subscript
}

// HandleDevicesOffline sets device off to resource aggregate and unregister device to projection.
func (s *SubscriptionManager) HandleDevicesOffline(ctx context.Context, subscriptionData subscriptionData, header events.EventHeader, devices events.DevicesOffline) error {
func (s *SubscriptionManager) HandleDevicesOffline(ctx context.Context, d subscriptionData, header events.EventHeader, devices events.DevicesOffline) error {
var errors []error
for _, device := range devices {
authCtx := pbCQRS.AuthorizationContext{
DeviceId: device.ID,
}
err := s.publishCloudDeviceStatus(ctx, device.ID, authCtx, header.SequenceNumber)
err := s.publishCloudDeviceStatus(ctx, device.ID, authCtx, pbCQRS.CommandMetadata{
ConnectionId: d.linkedAccount.ID + "." + d.subscription.ID,
Sequence: header.SequenceNumber,
})
if err != nil {
errors = append(errors, err)
continue
}
err = s.updateCloudStatus(ctx, device.ID, false, authCtx, header.SequenceNumber)
err = s.updateCloudStatus(ctx, device.ID, false, authCtx, pbCQRS.CommandMetadata{
ConnectionId: d.linkedAccount.ID + "." + d.subscription.ID,
Sequence: header.SequenceNumber,
})

if err != nil {
errors = append(errors, fmt.Errorf("cannot set device %v to offline: %v", device.ID, err))
Expand All @@ -212,7 +229,7 @@ func (s *SubscriptionManager) HandleDevicesOffline(ctx context.Context, subscrip
return nil
}

func (s *SubscriptionManager) HandleDevicesEvent(ctx context.Context, header events.EventHeader, body []byte, subscriptionData subscriptionData) error {
func (s *SubscriptionManager) HandleDevicesEvent(ctx context.Context, header events.EventHeader, body []byte, d subscriptionData) error {
contentReader, err := header.GetContentDecoder()
if err != nil {
return fmt.Errorf("cannot handle device event: %v", err)
Expand All @@ -225,28 +242,28 @@ func (s *SubscriptionManager) HandleDevicesEvent(ctx context.Context, header eve
if err != nil {
return fmt.Errorf("cannot decode devices event: %v", err)
}
return s.HandleDevicesRegistered(ctx, subscriptionData, devices, header)
return s.HandleDevicesRegistered(ctx, d, devices, header)
case events.EventType_DevicesUnregistered:
var devices events.DevicesUnregistered
err = contentReader(body, &devices)
if err != nil {
return fmt.Errorf("cannot decode devices event: %v", err)
}
return s.HandleDevicesUnregistered(ctx, subscriptionData, header.CorrelationID, devices)
return s.HandleDevicesUnregistered(ctx, d, header.CorrelationID, devices)
case events.EventType_DevicesOnline:
var devices events.DevicesOnline
err = contentReader(body, &devices)
if err != nil {
return fmt.Errorf("cannot decode devices event: %v", err)
}
return s.HandleDevicesOnline(ctx, subscriptionData, header, devices)
return s.HandleDevicesOnline(ctx, d, header, devices)
case events.EventType_DevicesOffline:
var devices events.DevicesOffline
err = contentReader(body, &devices)
if err != nil {
return fmt.Errorf("cannot decode devices event: %v", err)
}
return s.HandleDevicesOffline(ctx, subscriptionData, header, devices)
return s.HandleDevicesOffline(ctx, d, header, devices)
}

return fmt.Errorf("cannot decode devices: unsupported Event-Type %v", header.EventType)
Expand Down
18 changes: 18 additions & 0 deletions cloud2cloud-connector/service/getDevices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,24 @@ func TestRequestHandler_GetDevices(t *testing.T) {
},
},
},
{
name: "pull resource, devices + static device events",
args: args{
events: store.Events{
StaticDeviceEvents: true,
},
},
},
{
name: "resource, devices events + static device events",
args: args{
events: store.Events{
Devices: events.AllDevicesEvents,
Resource: events.AllResourceEvents,
StaticDeviceEvents: true,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions cloud2cloud-connector/service/oauthCallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (rh *RequestHandler) oAuthCallback(w http.ResponseWriter, r *http.Request)
return http.StatusInternalServerError, err
}
linkedAccount.ID = id.String()
//time.Sleep(time.Second * 10)
err = rh.store.InsertLinkedAccount(r.Context(), linkedAccount)
if err != nil {
return http.StatusBadRequest, fmt.Errorf("cannot store linked account %+v: %v", linkedAccount, err)
Expand Down
Loading

0 comments on commit a2c05ac

Please sign in to comment.