Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(CSI-288): validate API user role prior to performing ops #365

Merged
merged 2 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/NFS.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ This allows you to use WekaFS as a storage backend for your Kubernetes cluster w
- **Network Configuration**: NFS interface group IP addresses must be accessible from the Kubernetes cluster nodes
- **Security**: NFS transport is less secure than the native WekaFS driver, and may require additional security considerations
- **QoS**: QoS is not supported for NFS transport
- **Organizations and Multitenancy**: NFS transport can be used only for filesystems in `Root` organization.
If you need to use a filesystem in a different organization, you must use the native WekaFS driver.

### Host Network Mode
Weka CSI Plugin will automatically install in `hostNetwork` mode when using NFS transport.
Expand Down
35 changes: 29 additions & 6 deletions pkg/wekafs/apiclient/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,21 @@ import (
"time"
)

type ApiUserRole string

const (
ApiHttpTimeOutSeconds = 60
ApiRetryIntervalSeconds = 1
ApiRetryMaxCount = 5
RetryBackoffExponentialFactor = 1
RootOrganizationName = "Root"
TracerName = "weka-csi"
ApiHttpTimeOutSeconds = 60
ApiRetryIntervalSeconds = 1
ApiRetryMaxCount = 5
RetryBackoffExponentialFactor = 1
RootOrganizationName = "Root"
TracerName = "weka-csi"
ApiUserRoleClusterAdmin ApiUserRole = "ClusterAdmin"
ApiUserRoleOrgAdmin ApiUserRole = "OrgAdmin"
ApiUserRoleReadOnly ApiUserRole = "ReadOnly"
ApiUserRoleCSI ApiUserRole = "CSI"
ApiUserRoleS3 ApiUserRole = "S3"
ApiUserRoleRegular ApiUserRole = "Regular"
)

// ApiClient is a structure that defines Weka API client
Expand Down Expand Up @@ -65,6 +73,8 @@ type ApiClient struct {
clientHash uint32
hostname string
NfsInterfaceGroups map[string]*InterfaceGroup
ApiUserRole ApiUserRole
ApiOrgId int
}

type ApiEndPoint struct {
Expand Down Expand Up @@ -644,6 +654,12 @@ func (a *ApiClient) Login(ctx context.Context) error {
}
logger.Debug().Msg("Successfully connected to cluster API")

err = a.ensureSufficientPermissions(ctx)
if err != nil {
logger.Error().Err(err).Msg("Failed to ensure sufficient permissions for supplied credentials. Cannot continue")
return err
}

if a.Credentials.AutoUpdateEndpoints {
if err := a.UpdateApiEndpoints(ctx); err != nil {
logger.Error().Err(err).Msg("Failed to update actual API endpoints")
Expand Down Expand Up @@ -781,6 +797,13 @@ func (c *Credentials) String() string {
c.HttpScheme, c.Username, c.Organization, c.Endpoints)
}

func (a *ApiClient) HasCSIPermissions() bool {
if a.ApiUserRole != "" {
return a.ApiUserRole == ApiUserRoleCSI || a.ApiUserRole == ApiUserRoleClusterAdmin || a.ApiUserRole == ApiUserRoleOrgAdmin && a.ApiOrgId != 0
}
return false
}

func maskPayload(payload string) string {
masker := mask.NewMasker()
masker.RegisterMaskStringFunc(mask.MaskTypeFilled, masker.MaskFilledString)
Expand Down
42 changes: 40 additions & 2 deletions pkg/wekafs/apiclient/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ const ApiPathRefresh = "login/refresh"

const ApiPathClusterInfo = "cluster"

const ApiContainersInfo = "containers"
const ApiPathContainersInfo = "containers"

const ApiPathWhoami = "users/whoami"

// updateTokensExpiryInterval fetches the refresh token expiry from API
func (a *ApiClient) updateTokensExpiryInterval(ctx context.Context) error {
Expand Down Expand Up @@ -110,6 +112,15 @@ type ClusterInfoResponse struct {
Capacity Capacity `json:"capacity,omitempty"`
}

type WhoamiResponse struct {
OrgId int `json:"org_id,omitempty"`
Username string `json:"username,omitempty"`
Source string `json:"source,omitempty"`
Uid uuid.UUID `json:"uid,omitempty"`
Role ApiUserRole `json:"role,omitempty"`
OrgName string `json:"org_name,omitempty"`
}

type Container struct {
Id string `json:"id,omitempty"`
SwReleaseString string `json:"sw_release_string,omitempty"`
Expand Down Expand Up @@ -176,7 +187,7 @@ type ContainersResponse []Container

func (a *ApiClient) getContainers(ctx context.Context) (*ContainersResponse, error) {
responseData := &ContainersResponse{}
err := a.Get(ctx, ApiContainersInfo, nil, responseData)
err := a.Get(ctx, ApiPathContainersInfo, nil, responseData)
return responseData, err
}

Expand Down Expand Up @@ -231,3 +242,30 @@ func filterFrontendContainers(ctx context.Context, hostname string, containerLis
}
return ret
}

func (a *ApiClient) fetchUserRoleAndOrgId(ctx context.Context) {
logger := log.Ctx(ctx)
ret := &WhoamiResponse{}
err := a.Request(ctx, "GET", ApiPathWhoami, nil, nil, ret)
if err != nil {
logger.Error().Err(err).Msg("Failed to fetch user role. Assuming old cluster version")
return
}
if ret != nil {
a.ApiUserRole = ret.Role
a.ApiOrgId = ret.OrgId
}
}

func (a *ApiClient) ensureSufficientPermissions(ctx context.Context) error {
logger := log.Ctx(ctx)
a.fetchUserRoleAndOrgId(ctx)
if a.ApiUserRole == "" {
logger.Error().Msg("Could not determine user role, assuming old version of WEKA cluster")
}
if !a.HasCSIPermissions() {
logger.Error().Str("username", a.Credentials.Username).Msg("User does not have CSI permissions and cannot be used for WEKA CSI Plugin")
return errors.New(fmt.Sprintf("user %s does not have sufficient permissions for performing CSI operations", a.Credentials.Username))
}
return nil
}
6 changes: 6 additions & 0 deletions pkg/wekafs/nfsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,
logger.Trace().Msg("No API client for mount, cannot proceed")
return errors.New("no API client for mount, cannot do NFS mount")
}
// to validate that the organization is root, otherwise cannot mount NFS volumes
if apiClient.ApiOrgId != 0 {
err := errors.New("cannot mount NFS volumes with non-Root organization")
logger.Error().Err(err).Int("organization_id", apiClient.ApiOrgId).Msg("Cannot mount NFS volumes with non-Root organization")
return err
}

if !m.isInDevMode() {

Expand Down