diff --git a/go.mod b/go.mod index 406c632f..18f44402 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/go-version v1.7.0 github.com/kubernetes-csi/csi-lib-utils v0.19.0 + github.com/pkg/errors v0.9.1 github.com/pkg/xattr v0.4.10 github.com/prometheus/client_golang v1.20.2 github.com/rs/zerolog v1.33.0 diff --git a/go.sum b/go.sum index ece93c7d..c36b7427 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,7 @@ github.com/opencontainers/runc v1.1.13 h1:98S2srgG9vw0zWcDpFMn5TRrh8kLxa/5OFUstu github.com/opencontainers/runc v1.1.13/go.mod h1:R016aXacfp/gwQBYw2FDGa9m+n6atbLWrYY8hNMT/sA= github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk= github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/xattr v0.4.10 h1:Qe0mtiNFHQZ296vRgUjRCoPHPqH7VdTOrZx3g0T+pGA= github.com/pkg/xattr v0.4.10/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= diff --git a/pkg/wekafs/nodeserver.go b/pkg/wekafs/nodeserver.go index 4190666f..c8ec9706 100644 --- a/pkg/wekafs/nodeserver.go +++ b/pkg/wekafs/nodeserver.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel" @@ -32,6 +33,7 @@ import ( "path/filepath" "strings" "sync" + "syscall" "time" ) @@ -84,9 +86,94 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, request *csi.NodeExp panic("implement me") } -//goland:noinspection GoUnusedParameter -func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, request *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { - panic("implement me") +func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + volumeID := req.GetVolumeId() + volumePath := req.GetVolumePath() + + // Validate request fields + if volumeID == "" || volumePath == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID and path must be provided") + } + + // Check if the volume path exists + if ns.getConfig().isInDevMode() { + // In dev mode, we don't have the actual Weka mount, so we just check if the path exists + if _, err := os.Stat(volumePath); err != nil { + return nil, status.Error(codes.NotFound, "Volume path not found") + } + + } else { + // In production mode, we check if the path is indeed a Weka mount (Either NFS or WekaFS) + if !PathIsWekaMount(ctx, volumePath) { + return nil, status.Error(codes.NotFound, "Volume path not found") + } + } + + // Validate Weka volume ID + if err := validateVolumeId(volumeID); err != nil { + return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "invalid volume ID").Error()) + } + + stats, err := getVolumeStats(volumePath) + if err != nil || stats == nil { + return &csi.NodeGetVolumeStatsResponse{ + Usage: nil, + VolumeCondition: &csi.VolumeCondition{ + Abnormal: true, + Message: "Failed to fetch volume stats for volume", + }, + }, status.Errorf(codes.Internal, "Failed to get stats for volume %s: %v", volumeID, err) + } + // Prepare response + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Unit: csi.VolumeUsage_BYTES, + Total: stats.TotalBytes, + Used: stats.UsedBytes, + Available: stats.AvailableBytes, + }, + { + Unit: csi.VolumeUsage_INODES, + Total: stats.TotalInodes, + Used: stats.UsedInodes, + Available: stats.AvailableInodes, + }, + }, + VolumeCondition: &csi.VolumeCondition{ + Abnormal: false, + Message: "volume is healthy", + }, + }, nil +} + +type VolumeStats struct { + TotalBytes int64 + UsedBytes int64 + AvailableBytes int64 + TotalInodes int64 + UsedInodes int64 + AvailableInodes int64 +} + +// getVolumeStats fetches filesystem statistics for the mounted volume path. +func getVolumeStats(volumePath string) (volumeStats *VolumeStats, err error) { + var stat syscall.Statfs_t + + // Use Statfs to get filesystem statistics for the volume path + err = syscall.Statfs(volumePath, &stat) + if err != nil { + return nil, err + } + + // Calculate capacity, available, and used space in bytes + capacityBytes := int64(stat.Blocks) * int64(stat.Bsize) + availableBytes := int64(stat.Bavail) * int64(stat.Bsize) + usedBytes := capacityBytes - availableBytes + inodes := int64(stat.Files) + inodesFree := int64(stat.Ffree) + inodesUsed := inodes - inodesFree + return &VolumeStats{capacityBytes, usedBytes, availableBytes, inodes, inodesUsed, inodesFree}, nil } func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounter AnyMounter, config *DriverConfig) *NodeServer { @@ -94,6 +181,9 @@ func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounte return &NodeServer{ caps: getNodeServiceCapabilities( []csi.NodeServiceCapability_RPC_Type{ + csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, + csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, + csi.NodeServiceCapability_RPC_VOLUME_CONDITION, //csi.NodeServiceCapability_RPC_EXPAND_VOLUME, }, ),