Skip to content

Commit

Permalink
Grpc as rd (#43)
Browse files Browse the repository at this point in the history
* upgrade go-coap to v2

* migration coapconv

* cleanup from go-coap/v0

* migrate to gocoapV2

* migrate coap-gw to coapV2

* remove depedency on go-coap/v0

* resolve errors with go-coap/v2

* resolve all compilation errors

* use GetAccept from coap package

* fix tests for coap-gw

* don't release message in observe - it will be released by coap.Server

* fix of compilation error

* fix tests for coap-gateway

* fix race issues

* add Dockerfile.test

* update to the newest kit

* ignore certificate-authority/acme/refImpl/acmeDB directory

* integrate ResourceDirectory to grpc-gw

* cleanup device conv functions

* set old certificate package

* replace go.mod

* update go.sum

* fix compilation error

* fix test in http-gateway

* cancel operation when close was triggered

* fix build

* grpc-gw: fix deviceDirectorty type filter

* replace resource directory with grpc gateway

* move SetUp's to services

* remove Ctx from auth SetUp

* coap-gw: use SetUp of test services

* fix SetUp test

* send service token with userID to another service

* add event ResourceUpdatePending to device subscription

* use UserID from grpc metadata

* fix tests for coap-gw

* grpc-gw - adapt tests

* grpc-gw: fix SubscribeForEvents

* fix tests for grpc-gw and http-gw

* fix test of grpc-gw and http-gw

* fix build errors

* fix authorization tests

* fix grpc-clien and http-gw tests

* fix of parallel get resources/devices

* fix after CR

* fix set of serviceToken to calls from coap-gw

Co-authored-by: Jozef Kralik <jozef.kralik@kistler.com>
  • Loading branch information
jkralik and jkralik committed Jun 12, 2020
1 parent e8868be commit d14109c
Show file tree
Hide file tree
Showing 195 changed files with 7,564 additions and 8,710 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.github
.gitignore
.travis.yml
.tmp
Makefile
Dockerfile
Dockerfile.test
Expand Down
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
"LISTEN_ACME_DEVICE_ID":"adebc667-1f2b-41e3-bf5c-6d6eabc68cc6",
"LISTEN_ACME_DIRECTORY_URL":"https://localhost:10443/acme/acme/directory",
"TEST_COAP_GW_OVERWRITE_LISTEN_ACME_DIRECTORY_URL": "https://localhost:10443/acme/ocf.gw/directory",
//"GOFLAGS":"-mod=vendor",
// "GOFLAGS":"-mod=vendor",
// "GRPC_VERBOSITY":"DEBUG",
// "GRPC_GO_LOG_VERBOSITY_LEVEL":99,
// "GRPC_GO_LOG_SEVERITY_LEVEL":"info",
},
"go.testTimeout": "180s",
"go.buildFlags": [
Expand Down
91 changes: 64 additions & 27 deletions authorization/client/userDevices.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package client

import (
"context"
Expand All @@ -11,6 +11,7 @@ import (

pbAS "github.com/go-ocf/cloud/authorization/pb"
kitSync "github.com/go-ocf/kit/sync"
"github.com/patrickmn/go-cache"
"google.golang.org/grpc/status"
)

Expand All @@ -20,11 +21,12 @@ type UserDevicesManager struct {
asClient pbAS.AuthorizationServiceClient
errFunc ErrFunc

lock sync.RWMutex
users map[string]*kitSync.RefCounter
done chan struct{}
trigger chan triggerUserDevice
doneWg sync.WaitGroup
lock sync.RWMutex
users map[string]*kitSync.RefCounter
done context.CancelFunc
trigger chan triggerUserDevice
doneWg sync.WaitGroup
getUserDevicesCache *cache.Cache
}

// TriggerFunc notifies users remove/add device.
Expand All @@ -35,16 +37,25 @@ type ErrFunc func(err error)

// NewUserDevicesManager creates userID devices manager.
func NewUserDevicesManager(fn TriggerFunc, asClient pbAS.AuthorizationServiceClient, tickFrequency, expiration time.Duration, errFunc ErrFunc) *UserDevicesManager {
c := cache.New(expiration, cache.DefaultExpiration)
c.OnEvicted(func(key string, v interface{}) {
r := v.(*kitSync.RefCounter)
r.Release(context.Background())
})

ctx, cancel := context.WithCancel(context.Background())

m := &UserDevicesManager{
fn: fn,
asClient: asClient,
done: make(chan struct{}),
trigger: make(chan triggerUserDevice, 32),
users: make(map[string]*kitSync.RefCounter),
errFunc: errFunc,
fn: fn,
asClient: asClient,
done: cancel,
trigger: make(chan triggerUserDevice, 32),
users: make(map[string]*kitSync.RefCounter),
errFunc: errFunc,
getUserDevicesCache: c,
}
m.doneWg.Add(1)
go m.run(tickFrequency, expiration)
go m.run(ctx, tickFrequency, expiration)
return m
}

Expand Down Expand Up @@ -74,11 +85,11 @@ func (d *UserDevicesManager) getRef(userID string, create bool) (_ *kitSync.RefC

// Acquire acquires reference counter by 1 for userID.
func (d *UserDevicesManager) Acquire(ctx context.Context, userID string) error {
v, created := d.getRef(userID, true)
_, created := d.getRef(userID, true)
if created {
userDevices, err := getUsersDevices(ctx, d.asClient, []string{userID})
if err != nil {
v.Release(ctx)
d.Release(userID)
return err
}
d.trigger <- triggerUserDevice{
Expand All @@ -95,12 +106,38 @@ func (d *UserDevicesManager) Acquire(ctx context.Context, userID string) error {
return nil
}

// GetUserDevices returns devices which belows to user.
func (d *UserDevicesManager) GetUserDevices(ctx context.Context, userID string) ([]string, error) {
v, created := d.getRef(userID, true)
if created {
userDevices, err := getUsersDevices(ctx, d.asClient, []string{userID})
if err != nil {
d.Release(userID)
return nil, err
}
d.trigger <- triggerUserDevice{
userID: userID,
userDevices: userDevices,
create: true,
}
d.getUserDevicesCache.Add(userID, v, cache.DefaultExpiration)
return userDevices[userID], nil
}
defer d.Release(userID) // getRef increase ref counter
mapDevs := v.Data().(*userDevices).getDevices()
devs := make([]string, 0, len(mapDevs))
for d := range mapDevs {
devs = append(devs, d)
}
return devs, nil
}

func (d *UserDevicesManager) IsUserDevice(userID, deviceID string) bool {
v, _ := d.getRef(userID, false)
if v == nil {
return false
}
defer v.Release(context.Background()) // getRef increase ref counter
defer d.Release(userID) // getRef increase ref counter
return v.Data().(*userDevices).isUserDevice(deviceID)
}

Expand Down Expand Up @@ -133,7 +170,7 @@ func (d *UserDevicesManager) updateDevices(ctx context.Context, userID string, d
return
}
defer func() {
err := v.Release(ctx)
err := d.Release(userID)
if err != nil {
d.errFunc(fmt.Errorf("cannot release userID %v devices: %v", userID, err))
}
Expand All @@ -148,7 +185,7 @@ func (d *UserDevicesManager) getDevices(ctx context.Context, userID string) map[
return nil
}
defer func() {
err := v.Release(ctx)
err := d.Release(userID)
if err != nil {
d.errFunc(fmt.Errorf("cannot release userID %v devices: %v", userID, err))
}
Expand Down Expand Up @@ -196,8 +233,8 @@ func getUsersDevices(ctx context.Context, asClient pbAS.AuthorizationServiceClie
return userDevices, nil
}

func (d *UserDevicesManager) onTick(timeout time.Duration, expiration time.Duration, triggerTime time.Time) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func (d *UserDevicesManager) onTick(ctx context.Context, timeout time.Duration, expiration time.Duration, triggerTime time.Time) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
users := d.getUsers(triggerTime)
if len(users) == 0 {
Expand All @@ -216,8 +253,8 @@ func (d *UserDevicesManager) onTick(timeout time.Duration, expiration time.Durat
}
}

func (d *UserDevicesManager) onTrigger(timeout time.Duration, expiration time.Duration, t triggerUserDevice) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func (d *UserDevicesManager) onTrigger(ctx context.Context, timeout time.Duration, expiration time.Duration, t triggerUserDevice) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if t.create {
for userID, devices := range t.userDevices {
Expand All @@ -230,25 +267,25 @@ func (d *UserDevicesManager) onTrigger(timeout time.Duration, expiration time.Du
}
}

func (d *UserDevicesManager) run(tickFrequency, expiration time.Duration) {
func (d *UserDevicesManager) run(ctx context.Context, tickFrequency, expiration time.Duration) {
ticker := time.NewTicker(tickFrequency)
defer d.doneWg.Done()
defer ticker.Stop()
for {
select {
case <-d.done:
case <-ctx.Done():
return
case triggerTime := <-ticker.C:
d.onTick(tickFrequency, expiration, triggerTime)
d.onTick(ctx, tickFrequency, expiration, triggerTime)
case t := <-d.trigger:
d.onTrigger(tickFrequency, expiration, t)
d.onTrigger(ctx, tickFrequency, expiration, t)
}
}
}

// Close stops userID manager goroutine.
func (d *UserDevicesManager) Close() {
close(d.done)
d.done()
d.doneWg.Wait()
}

Expand Down
42 changes: 33 additions & 9 deletions authorization/client/userDevices_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package client

import (
"context"
Expand All @@ -7,16 +7,14 @@ import (
"testing"
"time"

"github.com/go-ocf/cloud/authorization/pb"
"github.com/go-ocf/cloud/authorization/service"
authService "github.com/go-ocf/cloud/authorization/test"
"github.com/go-ocf/kit/security/certManager"

"github.com/kelseyhightower/envconfig"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/go-ocf/cloud/authorization/pb"
"github.com/go-ocf/cloud/authorization/service"
testService "github.com/go-ocf/cloud/authorization/test/service"
)

type testTrigger struct {
Expand Down Expand Up @@ -99,7 +97,7 @@ func TestAddDeviceAfterRegister(t *testing.T) {
require.NoError(t, err)
cfg.Addr = "localhost:1234"

shutdown := testService.NewAuthServer(t, cfg)
shutdown := authService.NewAuthServer(t, cfg)
defer shutdown()

var acmeCfg certManager.Config
Expand All @@ -114,6 +112,7 @@ func TestAddDeviceAfterRegister(t *testing.T) {
c := pb.NewAuthorizationServiceClient(conn)

m := NewUserDevicesManager(trigger.Trigger, c, time.Millisecond*200, time.Millisecond*500, func(err error) { fmt.Println(err) })
defer m.Close()
err = m.Acquire(context.Background(), t.Name())
require.NoError(t, err)

Expand All @@ -129,6 +128,12 @@ func TestAddDeviceAfterRegister(t *testing.T) {
},
}, trigger.Clone().allDevices)

for i := 0; i < 5; i++ {
devs, err := m.GetUserDevices(context.Background(), t.Name())
require.NoError(t, err)
require.NotEmpty(t, devs)
}

_, err = c.RemoveDevice(context.Background(), &pb.RemoveDeviceRequest{
UserId: t.Name(),
DeviceId: "deviceId_" + t.Name(),
Expand All @@ -139,6 +144,23 @@ func TestAddDeviceAfterRegister(t *testing.T) {

err = m.Release(t.Name())
require.NoError(t, err)

devs, err := m.GetUserDevices(context.Background(), t.Name())
require.NoError(t, err)
require.Empty(t, devs)

_, err = c.AddDevice(context.Background(), &pb.AddDeviceRequest{
UserId: t.Name(),
DeviceId: "deviceId_" + t.Name(),
})
time.Sleep(time.Second * 2)

devs, err = m.GetUserDevices(context.Background(), t.Name())
require.NoError(t, err)
require.NotEmpty(t, devs)

err = m.Release(t.Name())
require.NoError(t, err)
}

func TestUserDevicesManager_Acquire(t *testing.T) {
Expand Down Expand Up @@ -193,7 +215,7 @@ func TestUserDevicesManager_Acquire(t *testing.T) {
require.NoError(t, err)
cfg.Addr = "localhost:1234"

shutdown := testService.NewAuthServer(t, cfg)
shutdown := authService.NewAuthServer(t, cfg)
defer shutdown()

var acmeCfg certManager.Config
Expand All @@ -215,6 +237,7 @@ func TestUserDevicesManager_Acquire(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := NewUserDevicesManager(tt.fields.trigger.Trigger, c, time.Millisecond*200, time.Second, func(err error) { fmt.Println(err) })
defer m.Close()
err := m.Acquire(context.Background(), tt.args.userID)
if tt.wantErr {
require.Error(t, err)
Expand Down Expand Up @@ -283,7 +306,7 @@ func TestUserDevicesManager_Release(t *testing.T) {
require.NoError(t, err)
cfg.Addr = "localhost:1234"

shutdown := testService.NewAuthServer(t, cfg)
shutdown := authService.NewAuthServer(t, cfg)
defer shutdown()

var acmeCfg certManager.Config
Expand All @@ -305,6 +328,7 @@ func TestUserDevicesManager_Release(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := NewUserDevicesManager(tt.fields.trigger.Trigger, c, time.Millisecond*200, time.Millisecond*500, func(err error) { fmt.Println(err) })
defer m.Close()
err := m.Acquire(context.Background(), tt.args.userID)
if tt.wantErr {
require.Error(t, err)
Expand Down
Loading

0 comments on commit d14109c

Please sign in to comment.