Skip to content

Commit

Permalink
Add functionality to commit to storage for OCM shares (#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored May 20, 2020
1 parent fbf3d95 commit 9ca331b
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 163 deletions.
6 changes: 6 additions & 0 deletions cmd/reva/ocm-share-create.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func ocmShareCreateCommand() *command {
os.Exit(1)
}

if *idp == "" {
fmt.Println("idp cannot be empty: use -idp flag")
fmt.Println(cmd.Usage())
os.Exit(1)
}

fn := cmd.Args()[0]

ctx := getAuthContext()
Expand Down
106 changes: 104 additions & 2 deletions internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ func (s *svc) CreateOCMShare(ctx context.Context, req *ocm.CreateOCMShareRequest
return nil, errors.Wrap(err, "gateway: error calling CreateShare")
}

// if we don't need to commit we return earlier
if !s.c.CommitShareToStorageGrant && !s.c.CommitShareToStorageRef {
return res, nil
}

// TODO(labkode): if both commits are enabled they could be done concurrently.
if s.c.CommitShareToStorageGrant {
addGrantStatus, err := s.addGrant(ctx, req.ResourceId, req.Grant.Grantee, req.Grant.Permissions.Permissions)
if addGrantStatus.Code != rpc.Code_CODE_OK {
return &ocm.CreateOCMShareResponse{
Status: addGrantStatus,
}, err
}
}

return res, nil
}

Expand All @@ -58,11 +73,45 @@ func (s *svc) RemoveOCMShare(ctx context.Context, req *ocm.RemoveOCMShareRequest
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling RemoveShare")
}

// if we don't need to commit we return earlier
if !s.c.CommitShareToStorageGrant && !s.c.CommitShareToStorageRef {
return res, nil
}

// if we need to commit the share, we need the resource it points to.
getShareReq := &ocm.GetOCMShareRequest{
Ref: req.Ref,
}
getShareRes, err := c.GetOCMShare(ctx, getShareReq)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling GetShare")
}

if getShareRes.Status.Code != rpc.Code_CODE_OK {
res := &ocm.RemoveOCMShareResponse{
Status: status.NewInternal(ctx, status.NewErrorFromCode(getShareRes.Status.Code, "gateway"),
"error getting share when committing to the storage"),
}
return res, nil
}
share := getShareRes.Share

// TODO(labkode): if both commits are enabled they could be done concurrently.
if s.c.CommitShareToStorageGrant {
removeGrantStatus, err := s.removeGrant(ctx, share.ResourceId, share.Grantee, share.Permissions.Permissions)
if removeGrantStatus.Code != rpc.Code_CODE_OK {
return &ocm.RemoveOCMShareResponse{
Status: removeGrantStatus,
}, err
}
}

return res, nil
}

// TODO(labkode): we need to validate share state vs storage grant and storage ref
// If there are any inconsitencies, the share needs to be flag as invalid and a background process
// If there are any inconsistencies, the share needs to be flag as invalid and a background process
// or active fix needs to be performed.
func (s *svc) GetOCMShare(ctx context.Context, req *ocm.GetOCMShareRequest) (*ocm.GetOCMShareResponse, error) {
return s.getOCMShare(ctx, req)
Expand Down Expand Up @@ -156,7 +205,60 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
},
}, nil
}
return res, nil

// if we don't need to create/delete references then we return early.
if !s.c.CommitShareToStorageGrant && !s.c.CommitShareToStorageRef {
return res, nil
}

// we don't commit to storage invalid update fields or empty display names.
if req.Field.GetState() == ocm.ShareState_SHARE_STATE_INVALID && req.Field.GetDisplayName() == "" {
log.Error().Msg("the update field is invalid, aborting reference manipulation")
return res, nil

}

// TODO(labkode): if update field is displayName we need to do a rename on the storage to align
// share display name and storage filename.
if req.Field.GetState() != ocm.ShareState_SHARE_STATE_INVALID {
if req.Field.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED {
getShareReq := &ocm.GetReceivedOCMShareRequest{Ref: req.Ref}
getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq)
if err != nil {
log.Err(err).Msg("gateway: error calling GetReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, nil
}

if getShareRes.Status.Code != rpc.Code_CODE_OK {
log.Error().Msg("gateway: error calling GetReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, nil
}

share := getShareRes.Share
if share == nil {
panic("gateway: error updating a received share: the share is nil")
}

createRefStatus, err := s.createReference(ctx, share.Share.ResourceId)
return &ocm.UpdateReceivedOCMShareResponse{
Status: createRefStatus,
}, err
}
}

// TODO(labkode): implementing updating display name
err = errors.New("gateway: update of display name is not yet implemented")
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewUnimplemented(ctx, err, "error updating received share"),
}, nil
}

func (s *svc) GetReceivedOCMShare(ctx context.Context, req *ocm.GetReceivedOCMShareRequest) (*ocm.GetReceivedOCMShareResponse, error) {
Expand Down
Loading

0 comments on commit 9ca331b

Please sign in to comment.