Skip to content

Commit

Permalink
more fs wip
Browse files Browse the repository at this point in the history
  • Loading branch information
johankristianss committed Aug 9, 2023
1 parent 3a2ab09 commit aa6b90f
Show file tree
Hide file tree
Showing 351 changed files with 6,369 additions and 130,699 deletions.
13 changes: 6 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ go 1.19

require (
github.com/btcsuite/btcd v0.22.0-beta
github.com/docker/distribution v2.8.2+incompatible
github.com/fergusstrange/embedded-postgres v1.23.0
github.com/gin-contrib/cors v1.3.1
github.com/gin-gonic/gin v1.7.7
github.com/go-playground/assert/v2 v2.0.1
github.com/go-resty/resty/v2 v2.7.0
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213
github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23
github.com/lib/pq v1.10.9
github.com/minio/minio-go/v7 v7.0.61
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.2
github.com/schollz/progressbar/v3 v3.13.1
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.7.0
Expand All @@ -27,13 +28,11 @@ require (
)

require (
github.com/aws/aws-sdk-go v1.44.318 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
Expand All @@ -43,22 +42,21 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down Expand Up @@ -94,6 +92,7 @@ require (
go.uber.org/zap v1.17.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/term v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
Expand Down
51 changes: 15 additions & 36 deletions go.sum

Large diffs are not rendered by default.

133 changes: 111 additions & 22 deletions pkg/fs/fs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,48 @@ package fs
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"os"

"github.com/colonyos/colonies/pkg/client"
"github.com/colonyos/colonies/pkg/core"
)

type FSClient struct {
coloniesClient *client.ColoniesClient
colonyID string
executorPrvKey string
s3Client *S3Client
}

type FileInfo struct {
Name string
Checksum string
}

func CreateFSClient(coloniesClient *client.ColoniesClient, colonyID string, executorPrvKey string) *FSClient {
type SyncPlan struct {
Dir string
LocalMissing []*FileInfo
RemoteMissing []*FileInfo
Conflicts []*FileInfo
Label string
}

func CreateFSClient(coloniesClient *client.ColoniesClient, colonyID string, executorPrvKey string) (*FSClient, error) {
fsClient := &FSClient{}
fsClient.coloniesClient = coloniesClient
fsClient.colonyID = colonyID
fsClient.executorPrvKey = executorPrvKey

return fsClient
s3Client, err := CreateS3Client()
fsClient.s3Client = s3Client
if err != nil {
return nil, err
}

return fsClient, nil
}

func checksum(filePath string) (string, error) {
Expand All @@ -47,22 +63,96 @@ func checksum(filePath string) (string, error) {
return hex.EncodeToString(hasher.Sum(nil)), nil
}

func (fsClient *FSClient) CalcSyncPlan(dir string, prefix string) ([]*FileInfo, []*FileInfo, []*FileInfo, []*FileInfo, error) {
func (fsClient *FSClient) uploadFile(syncPlan *SyncPlan, fileInfo *FileInfo) error {
fileStat, err := os.Stat(syncPlan.Dir + "/" + fileInfo.Name)
if err != nil {
return err
}
fmt.Println(fileStat.Size())
fmt.Println(fileInfo.Checksum)
s3Object := core.S3Object{
Server: fsClient.s3Client.Endpoint,
Port: -1,
TLS: fsClient.s3Client.TLS,
AccessKey: fsClient.s3Client.AccessKey,
SecretKey: fsClient.s3Client.SecretKey,
Region: fsClient.s3Client.Region,
EncryptionKey: "",
EncryptionAlg: "",
Object: core.GenerateRandomID(),
Bucket: fsClient.s3Client.BucketName,
}
ref := core.Reference{Protocol: "s3", S3Object: s3Object}
coloniesFile := &core.File{
ColonyID: fsClient.colonyID,
Label: syncPlan.Label,
Name: fileInfo.Name,
Size: fileStat.Size(),
Checksum: fileInfo.Checksum,
ChecksumAlg: "SHA256",
Reference: ref}

err = fsClient.s3Client.Upload(syncPlan.Dir, coloniesFile.Name, coloniesFile.Size)
if err != nil {
return err
}
_, err = fsClient.coloniesClient.AddFile(coloniesFile, fsClient.executorPrvKey)
if err != nil {
return err
}

return nil
}

func (fsClient *FSClient) ApplySyncPlan(colonyID string, syncPlan *SyncPlan, keepLocal bool) error {

// 1. Upload all remote missing files
for _, fileInfo := range syncPlan.RemoteMissing {
fsClient.uploadFile(syncPlan, fileInfo)
}

// 2. Download all local missing files
for _, fileInfo := range syncPlan.LocalMissing {
err := fsClient.s3Client.Download(fileInfo.Name, syncPlan.Dir)
if err != nil {
return err
}
}

// 3. Handle conflicts
// If keepLocalFiles then upload conflicting files to server else download conflicting files to local filesystem
if keepLocal {
for _, fileInfo := range syncPlan.Conflicts {
fsClient.uploadFile(syncPlan, fileInfo)
}
} else {
for _, fileInfo := range syncPlan.LocalMissing {
err := fsClient.s3Client.Download(fileInfo.Name, syncPlan.Dir)
if err != nil {
return err
}
}
}

return nil
}

func (fsClient *FSClient) CalcSyncPlan(dir string, label string) (*SyncPlan, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, nil, nil, nil, err
return nil, err
}

remoteFilenames, err := fsClient.coloniesClient.GetFilenames(fsClient.colonyID, prefix, fsClient.executorPrvKey)
remoteFilenames, err := fsClient.coloniesClient.GetFilenames(fsClient.colonyID, label, fsClient.executorPrvKey)
if err != nil {
return nil, nil, nil, nil, err
return nil, err
}

var remoteFileMap = make(map[string]string)
for _, remoteFilename := range remoteFilenames {
remoteColoniesFile, err := fsClient.coloniesClient.GetFileByName(fsClient.colonyID, prefix, remoteFilename, fsClient.executorPrvKey)
remoteColoniesFile, err := fsClient.coloniesClient.GetFileByName(fsClient.colonyID, label, remoteFilename, fsClient.executorPrvKey)
if err != nil {
return nil, nil, nil, nil, err
return nil, err
}
for _, revision := range remoteColoniesFile {
remoteFileMap[revision.Name] = revision.Checksum
Expand All @@ -74,43 +164,42 @@ func (fsClient *FSClient) CalcSyncPlan(dir string, prefix string) ([]*FileInfo,
if !file.IsDir() {
checksum, err := checksum(dir + "/" + file.Name())
if err != nil {
return nil, nil, nil, nil, err
return nil, err
}
localFileMap[file.Name()] = checksum
}
}

// Find out which files are missing at the server
var remoteMissingFiles []*FileInfo
var remoteOverWrite []*FileInfo
var remoteMissing []*FileInfo
for filename, checksum := range localFileMap {
_, ok := remoteFileMap[filename]
if !ok {
// File missing on server
remoteMissingFiles = append(remoteMissingFiles, &FileInfo{Name: filename, Checksum: checksum})
} else {
// File is on server, but does not match local file
if remoteFileMap[filename] != checksum {
remoteOverWrite = append(remoteOverWrite, &FileInfo{Name: filename, Checksum: checksum})
}
remoteMissing = append(remoteMissing, &FileInfo{Name: filename, Checksum: checksum})
}
}

// Find out which files are missing locally
var localMissingFiles []*FileInfo
var localOverWrite []*FileInfo
var localMissing []*FileInfo
var conflicts []*FileInfo
for filename, checksum := range remoteFileMap {
_, ok := localFileMap[filename]
if !ok {
// File missing locally
localMissingFiles = append(localMissingFiles, &FileInfo{Name: filename, Checksum: checksum})
localMissing = append(localMissing, &FileInfo{Name: filename, Checksum: checksum})
} else {
// File exists locally, but does not match file on server
if localFileMap[filename] != checksum {
localOverWrite = append(localOverWrite, &FileInfo{Name: filename, Checksum: checksum})
conflicts = append(conflicts, &FileInfo{Name: filename, Checksum: checksum})
}
}
}

return localMissingFiles, remoteMissingFiles, localOverWrite, remoteOverWrite, err
return &SyncPlan{
LocalMissing: localMissing,
RemoteMissing: remoteMissing,
Conflicts: conflicts,
Dir: dir,
Label: label}, nil
}
Loading

0 comments on commit aa6b90f

Please sign in to comment.