Skip to content

Commit

Permalink
coap-gw: set not found for empty body from batch observation
Browse files Browse the repository at this point in the history
ra: set not found to unpublished resources
  • Loading branch information
jkralik committed Aug 16, 2022
1 parent 03b5c96 commit e6803bc
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 41 deletions.
1 change: 0 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"TEST_OAUTH_SERVER_ID_TOKEN_PRIVATE_KEY": "${workspaceFolder}/.tmp/privKeys/idTokenKey.pem",
"TEST_OAUTH_SERVER_ACCESS_TOKEN_PRIVATE_KEY": "${workspaceFolder}/.tmp/privKeys/accessTokenKey.pem",
"TEST_HTTP_GW_WWW_ROOT": "${workspaceFolder}/.tmp/usr/local/www",
"ENABLE_RECEIVE_CANCELLATION_NOTIFICATION": false,
// "GOMAXPROCS": 1,
// "GOFLAGS":"-mod=vendor",
// "GRPC_VERBOSITY":"DEBUG",
Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ define RUN-DOCKER
-e TEST_OAUTH_SERVER_ID_TOKEN_PRIVATE_KEY=/privKeys/idTokenKey.pem \
-e TEST_OAUTH_SERVER_ACCESS_TOKEN_PRIVATE_KEY=/privKeys/accessTokenKey.pem \
-e TEST_HTTP_GW_WWW_ROOT=/usr/local/www \
-e ENABLE_RECEIVE_CANCELLATION_NOTIFICATION=false \
hub-test \
$(1) ;
endef
Expand Down
27 changes: 19 additions & 8 deletions coap-gateway/coapconv/coapconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,19 @@ var filterOutEmptyResources = []string{
}

// inaccessible oic/sec resources have empty content and should be skipped
func filterOutEmptyResource(resource resources.BatchRepresentation) bool {
func filterOutEmptyResource(resource resources.BatchRepresentation) (isEmpty bool, filterOut bool) {
if len(resource.Content) == 2 {
var v map[interface{}]interface{}
if err := cbor.Decode(resource.Content, &v); err == nil && len(v) == 0 {
isEmpty = true
for _, f := range filterOutEmptyResources {
if strings.HasPrefix(resource.Href(), f) {
return true
return isEmpty, true
}
}
}
}
return false
return isEmpty, false
}

func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connectionID string, req *pool.Message) ([]*commands.NotifyResourceChangedRequest, error) {
Expand All @@ -288,19 +289,29 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec

requests := make([]*commands.NotifyResourceChangedRequest, 0, len(rs))
for _, r := range rs {
if filterOutEmptyResource(r) {
isEmpty, filterOut := filterOutEmptyResource(r)
if filterOut {
continue
}
ct := contentFormat
data := r.Content
code := CoapCodeToStatus(req.Code())
if isEmpty {
// if we gets empty content we consider it as not found. Empty message is send when resource is deleted/acls don't allows as to access.
ct = -1
data = nil
code = commands.Status_NOT_FOUND
}

requests = append(requests, &commands.NotifyResourceChangedRequest{
ResourceId: commands.NewResourceID(deviceID, r.Href()),
Content: &commands.Content{
ContentType: getContentFormatString(contentFormat),
CoapContentFormat: contentFormat,
Data: r.Content,
ContentType: getContentFormatString(ct),
CoapContentFormat: ct,
Data: data,
},
CommandMetadata: metadata,
Status: CoapCodeToStatus(req.Code()),
Status: code,
})
}
return requests, nil
Expand Down
46 changes: 17 additions & 29 deletions grpc-gateway/service/createAndDeleteResource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"crypto/tls"
"fmt"
"os"
"testing"
"time"

"github.com/plgd-dev/device/schema"
"github.com/plgd-dev/device/schema/interfaces"
Expand Down Expand Up @@ -160,7 +160,7 @@ func createSwitchResourceExpectedEvents(t *testing.T, deviceID, subID, correlati
}
}

func deleteSwitchResourceExpectedEvents(t *testing.T, deviceID, subID, correlationID, switchID string, isDiscoveryResourceBatchObservable bool) map[string]*pb.Event {
func deleteSwitchResourceExpectedEvents(t *testing.T, deviceID, subID, correlationID, switchID string) map[string]*pb.Event {
deletePending := &pb.Event{
SubscriptionId: subID,
CorrelationId: correlationID,
Expand Down Expand Up @@ -200,35 +200,24 @@ func deleteSwitchResourceExpectedEvents(t *testing.T, deviceID, subID, correlati
},
}

res := pbTest.MakeResourceChanged(t, deviceID, test.TestResourceSwitchesInstanceHref(switchID), "", nil)
res.Status = commands.Status_NOT_FOUND
res.Content.CoapContentFormat = -1
res.Content.ContentType = ""
changedRes := &pb.Event{
SubscriptionId: subID,
CorrelationId: correlationID,
Type: &pb.Event_ResourceChanged{
ResourceChanged: res,
},
}

e := map[string]*pb.Event{
pbTest.GetEventID(deletePending): deletePending,
pbTest.GetEventID(deleted): deleted,
pbTest.GetEventID(unpublished): unpublished,
pbTest.GetEventID(changed): changed,
}

if isDiscoveryResourceBatchObservable {
changedRes := &pb.Event{
SubscriptionId: subID,
CorrelationId: correlationID,
Type: &pb.Event_ResourceChanged{
ResourceChanged: pbTest.MakeResourceChanged(t, deviceID, test.TestResourceSwitchesInstanceHref(switchID), "", map[interface{}]interface{}{}),
},
}
e[pbTest.GetEventID(changedRes)] = changedRes
} else if os.Getenv("ENABLE_RECEIVE_CANCELLATION_NOTIFICATION") == "" {
res := pbTest.MakeResourceChanged(t, deviceID, test.TestResourceSwitchesInstanceHref(switchID), "", nil)
res.Status = commands.Status_NOT_FOUND
res.Content.CoapContentFormat = -1
res.Content.ContentType = ""
changedRes := &pb.Event{
SubscriptionId: subID,
CorrelationId: correlationID,
Type: &pb.Event_ResourceChanged{
ResourceChanged: res,
},
}
e[pbTest.GetEventID(changedRes)] = changedRes
pbTest.GetEventID(changedRes): changedRes,
}

return e
Expand Down Expand Up @@ -281,15 +270,14 @@ func TestCreateAndDeleteResource(t *testing.T) {
subClient, subID := subscribeToAllEvents(t, ctx, c, correlationID)
const switchID = "1"

isDiscoveryResourceBatchObservable := test.IsDiscoveryResourceBatchObservable(ctx, t, deviceID)

for i := 0; i < 5; i++ {
fmt.Printf("iteration %v\n", i)
time.Sleep(time.Millisecond * 500)
createSwitchResource(t, ctx, c, deviceID, switchID)
expectedCreateEvents := createSwitchResourceExpectedEvents(t, deviceID, subID, correlationID, switchID)
validateEvents(t, subClient, expectedCreateEvents)
deleteSwitchResource(t, ctx, c, deviceID, switchID)
expectedDeleteEvents := deleteSwitchResourceExpectedEvents(t, deviceID, subID, correlationID, switchID, isDiscoveryResourceBatchObservable)
expectedDeleteEvents := deleteSwitchResourceExpectedEvents(t, deviceID, subID, correlationID, switchID)
validateEvents(t, subClient, expectedDeleteEvents)
}
}
1 change: 0 additions & 1 deletion resource-aggregate/events/resourceStateSnapshotTaken.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ func (e *ResourceStateSnapshotTaken) HandleEventResourceChanged(ctx context.Cont

func (e *ResourceStateSnapshotTaken) HandleEventResourceDeleted(ctx context.Context, deleted *ResourceDeleted) error {
if deleted.GetStatus() == commands.Status_OK || deleted.GetStatus() == commands.Status_ACCEPTED {
e.LatestResourceChange = nil
e.ResourceCreatePendings = nil
e.ResourceRetrievePendings = nil
e.ResourceDeletePendings = nil
Expand Down
18 changes: 17 additions & 1 deletion resource-aggregate/service/grpcApi.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,23 @@ func (r RequestHandler) UnpublishResourceLinks(ctx context.Context, request *com
log.Errorf("cannot publish resource links unpublished events: %v", err)
}
auditContext := commands.NewAuditContext(owner, "")
return newUnpublishResourceLinksResponse(events, aggregate.DeviceID(), auditContext), nil

resp := newUnpublishResourceLinksResponse(events, aggregate.DeviceID(), auditContext)
for _, href := range resp.GetUnpublishedHrefs() {
_, err = r.NotifyResourceChanged(ctx, &commands.NotifyResourceChangedRequest{
ResourceId: commands.NewResourceID(resp.GetDeviceId(), href),
Content: &commands.Content{
CoapContentFormat: -1,
},
Status: commands.Status_NOT_FOUND,
CommandMetadata: request.GetCommandMetadata(),
})
if err != nil {
log.Errorf("cannot reset content for unpublished resource %v%v : %v", err, resp.GetDeviceId(), href)
}
}

return resp, nil
}

func newUnpublishResourceLinksResponse(events []eventstore.Event, deviceID string, auditContext *commands.AuditContext) *commands.UnpublishResourceLinksResponse {
Expand Down

0 comments on commit e6803bc

Please sign in to comment.