From 54397878eae5199900acee18cca57bb0eb4f5605 Mon Sep 17 00:00:00 2001 From: AstaFrode Date: Thu, 22 Feb 2024 14:14:30 +0800 Subject: [PATCH] Fix 079 reg (#243) * update for 0.7.10 * update go mod --- README.md | 2 +- cmd/console/run.go | 62 +++++++- configs/system.go | 2 +- examples/calctag/calctag.go | 40 +++-- go.mod | 2 +- go.sum | 4 +- node/calc_tag.go | 283 +++++++++++++++++++++++------------- node/chall_service.go | 7 +- node/common.go | 85 +++++++++-- node/node.go | 43 ++---- node/pois.go | 5 +- node/restore.go | 117 +++++---------- node/runningState.go | 1 - pkg/confile/confile.go | 5 +- 14 files changed, 396 insertions(+), 262 deletions(-) diff --git a/README.md b/README.md index b5d9c79..49f593d 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ If you are using the test network, Please join the [CESS discord](https://discor ### Method one Download the latest release of the binary application directly at: ``` -wget https://github.com/CESSProject/cess-bucket/releases/download/v0.7.8/bucket0.7.8.linux-amd64.tar.gz +wget https://github.com/CESSProject/cess-bucket/releases/download/v0.7.10/bucket0.7.10.linux-amd64.tar.gz ``` ### Method two Compile the binary program from the storage node source code and follow the process as follows: diff --git a/cmd/console/run.go b/cmd/console/run.go index 9651527..d448207 100644 --- a/cmd/console/run.go +++ b/cmd/console/run.go @@ -315,7 +315,7 @@ func runCmd(cmd *cobra.Command, args []string) { if firstReg { n.SetInitStage(node.Stage_Register, "[ok] Registering...") stakingAcc := n.GetStakingAcc() - if stakingAcc != "" { + if stakingAcc != "" && stakingAcc != n.GetSignatureAcc() { out.Ok(fmt.Sprintf("Specify staking account: %s", stakingAcc)) txhash, err := n.RegisterSminerAssignStaking(n.GetEarningsAcc(), n.GetPeerPublickey(), stakingAcc, spaceTiB) if err != nil { @@ -661,13 +661,59 @@ func runCmd(cmd *cobra.Command, args []string) { if n.GetPodr2Key().Spk == nil || n.GetPodr2Key().Spk.N == nil { buf, err := os.ReadFile(n.DataDir.Podr2PubkeyFile) if err != nil { - out.Err(fmt.Sprintf("[ReadFile Podr2PubkeyFile] %v", err)) - os.Exit(1) - } - err = n.SetPublickey(buf) - if err != nil { - out.Err("invalid podr2 public key in the file") - os.Exit(1) + var podr2PubkeyResponse *pb.Podr2PubkeyResponse + for i := 0; i < len(teeEndPointList); i++ { + delay = 30 + suc = false + out.Tip(fmt.Sprintf("Will request master public key to %v", teeEndPointList[i])) + for tryCount := uint8(0); tryCount <= 3; tryCount++ { + if !strings.Contains(teeEndPointList[i], "443") { + dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + } else { + dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(configs.GetCert())} + } + podr2PubkeyResponse, err = n.GetPodr2Pubkey( + teeEndPointList[i], + &pb.Request{StorageMinerAccountId: n.GetSignatureAccPulickey()}, + time.Duration(time.Second*delay), + dialOptions, + nil, + ) + if err != nil { + if strings.Contains(err.Error(), configs.Err_ctx_exceeded) { + delay += 30 + continue + } + if strings.Contains(err.Error(), configs.Err_tee_Busy) { + delay += 10 + continue + } + out.Err(fmt.Sprintf("[GetMasterPubkey] %v", err)) + } else { + suc = true + } + break + } + if suc { + break + } + } + if suc && podr2PubkeyResponse != nil { + err = n.SetPublickey(podr2PubkeyResponse.Pubkey) + if err != nil { + out.Err("invalid podr2 public key from tee") + os.Exit(1) + } + } else { + out.Err("Unable to get podr2 public key from all tees, program exits.") + os.Exit(1) + } + } else { + err = n.SetPublickey(buf) + if err != nil { + out.Err("invalid podr2 public key in the file") + os.Exit(1) + } } } diff --git a/configs/system.go b/configs/system.go index c9ced37..c7bda94 100644 --- a/configs/system.go +++ b/configs/system.go @@ -22,7 +22,7 @@ const ( // Name is the name of the program Name = "bucket" // version - Version = "v0.7.9" + Version = "v0.7.10" // Description is the description of the program Description = "Storage node implementation in CESS networks" // NameSpace is the cached namespace diff --git a/examples/calctag/calctag.go b/examples/calctag/calctag.go index 66a2941..6c75a23 100644 --- a/examples/calctag/calctag.go +++ b/examples/calctag/calctag.go @@ -6,16 +6,11 @@ import ( "fmt" "log" "os" - "strings" - "time" "github.com/CESSProject/cess-bucket/configs" "github.com/CESSProject/cess-bucket/node" sutils "github.com/CESSProject/cess-go-sdk/utils" p2pgo "github.com/CESSProject/p2p-go" - "github.com/CESSProject/p2p-go/pb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) const helpInfo string = `help information: @@ -88,23 +83,24 @@ func main() { log.Println("[CalcSHA256] ", err) os.Exit(1) } - var requestGenTag = &pb.RequestGenTag{ - FragmentData: buf, - FragmentName: "", - CustomData: hash, - FileName: "", - } - var dialOptions []grpc.DialOption - if !strings.Contains(tee, "443") { - dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - } else { - dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(configs.GetCert())} - } - _, err = n.RequestGenTag(tee, requestGenTag, time.Duration(time.Minute*10), dialOptions, nil) - if err != nil { - log.Println("[RequestGenTag] ", err) - os.Exit(1) - } + _ = hash + // var requestGenTag = &pb.RequestGenTag{ + // FragmentData: buf, + // FragmentName: "", + // CustomData: hash, + // FileName: "", + // } + // var dialOptions []grpc.DialOption + // if !strings.Contains(tee, "443") { + // dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + // } else { + // dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(configs.GetCert())} + // } + // _, err = n.RequestGenTag(tee, requestGenTag, time.Duration(time.Minute*10), dialOptions, nil) + // if err != nil { + // log.Println("[RequestGenTag] ", err) + // os.Exit(1) + // } log.Println("[RequestGenTag] suc") } diff --git a/go.mod b/go.mod index 3e07f00..073aa3e 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/AstaFrode/go-libp2p v0.26.4-0.20231113143058-912296254d44 github.com/CESSProject/cess-go-sdk v0.5.0 github.com/CESSProject/cess_pois v0.4.22 - github.com/CESSProject/p2p-go v0.3.0 + github.com/CESSProject/p2p-go v0.3.2 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1 github.com/gin-contrib/cors v1.5.0 diff --git a/go.sum b/go.sum index 441ba92..95804f6 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,8 @@ github.com/CESSProject/cess_pois v0.4.22 h1:RxQRg8A3z79mt19Jb5yd39lIjBSWKBih1KLI github.com/CESSProject/cess_pois v0.4.22/go.mod h1:rztEZjjG+MbKzVgh5WtQcZc/7ZDkBvDNABc7Em8BKPc= github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde h1:5MDRjjtg6PEhqyVjupwaapN96cOZiddOGAYwKQeaTu0= github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde/go.mod h1:RUXBd3ROP98MYepEEa0Y0l/T0vQlIKqFJxI/ocdnRLM= -github.com/CESSProject/p2p-go v0.3.0 h1:PdwbLBri3qYwIHxaG4BHHS3ETH1BlVRCnBG87jr5cQg= -github.com/CESSProject/p2p-go v0.3.0/go.mod h1:SMwJt5Zpk98k+d2J5gsN+0Forr7MbqOYWbHLd3mBLrI= +github.com/CESSProject/p2p-go v0.3.2 h1:l0x/TneDZfirKqFZev+9zRFFQ29RHgbc4GNX2VpLWwM= +github.com/CESSProject/p2p-go v0.3.2/go.mod h1:SMwJt5Zpk98k+d2J5gsN+0Forr7MbqOYWbHLd3mBLrI= github.com/ChainSafe/go-schnorrkel v1.0.0 h1:3aDA67lAykLaG1y3AOjs88dMxC88PgUuHRrLeDnvGIM= github.com/ChainSafe/go-schnorrkel v1.0.0/go.mod h1:dpzHYVxLZcp8pjlV+O+UR8K0Hp/z7vcchBSbMBEhCw4= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= diff --git a/node/calc_tag.go b/node/calc_tag.go index 9036bc4..ff789e3 100644 --- a/node/calc_tag.go +++ b/node/calc_tag.go @@ -8,6 +8,7 @@ package node import ( + "context" "encoding/json" "errors" "fmt" @@ -65,11 +66,8 @@ func (n *Node) calcTag(ch chan<- bool) { n.Stag("info", fmt.Sprintf("[roothashs] %v", roothashs)) - teeEndPoints := n.GetPriorityTeeList() - teeEndPoints = append(teeEndPoints, n.GetAllMarkerTeeEndpoint()...) - for _, fileDir := range roothashs { - err = n.calcFileTag(fileDir, teeEndPoints) + err = n.calcFileTag(fileDir) if err != nil { n.Stag("err", fmt.Sprintf("[%s] [calcFileTag] %v", filepath.Base(fileDir), roothashs)) } @@ -77,16 +75,13 @@ func (n *Node) calcTag(ch chan<- bool) { } } -func (n *Node) calcFileTag(file string, teeEndPoints []string) error { +func (n *Node) calcFileTag(file string) error { var ok bool var isReportTag bool var err error - var fragmentHash string var tagPath string var fragments, tags []string - var dialOptions []grpc.DialOption - var teeSign pattern.TeeSig - var genTag *pb.ResponseGenTag + fid := filepath.Base(file) n.Stag("info", fmt.Sprintf("[%s] Start calc file tag", fid)) @@ -120,19 +115,18 @@ func (n *Node) calcFileTag(file string, teeEndPoints []string) error { n.Stag("err", fmt.Sprintf("[getFragmentTags(%s)] %v", fid, err)) return nil } - //n.Stag("info", fmt.Sprintf("[%s] The file have tag: %v", fid, tags)) latestSig, digest, maxIndex, err := n.calcRequestDigest(filepath.Base(fragments[i]), tags) if err != nil { n.Stag("err", fmt.Sprintf("[calcRequestDigest(%s)] %v", fid, err)) return nil } - buf, err := os.ReadFile(fragments[i]) + _, err = os.Stat(fragments[i]) if err != nil { - n.Stag("err", fmt.Sprintf("[%s] [ReadFile(%s)] %v", fid, fragments[i], err)) + n.Stag("err", fmt.Sprintf("[%s] [os.Stat(%s)] %v", fid, fragments[i], err)) return nil } - fragmentHash = filepath.Base(fragments[i]) + tagPath = (fragments[i] + ".tag") n.Stag("info", fmt.Sprintf("[%s] Check this file tag: %v", fid, tagPath)) fstat, err := os.Stat(tagPath) @@ -149,95 +143,13 @@ func (n *Node) calcFileTag(file string, teeEndPoints []string) error { n.Stag("info", fmt.Sprintf("[%s] The file's tag stat err: %v", fid, err)) } - var requestGenTag = &pb.RequestGenTag{ - FragmentData: buf, - FragmentName: fragmentHash, - CustomData: "", - FileName: fid, - MinerId: n.GetSignatureAccPulickey(), - TeeDigestList: digest, - LastTeeSignature: latestSig, + isreport, err := n.calcTheFragmentTag(fid, fragments[i], maxIndex, latestSig, digest) + if err != nil { + n.Stag("err", fmt.Sprintf("[%s] [calcFragmentTag] %v", fid, err)) + return nil } - for j := 0; j < len(teeEndPoints); j++ { - teePubkey, err := n.GetTeeWorkAccount(teeEndPoints[j]) - if err != nil { - n.Stag("err", fmt.Sprintf("[GetTeeWorkAccount(%s)] %v", teeEndPoints[j], err)) - continue - } - n.Stag("info", fmt.Sprintf("[%s] Will calc file tag: %v", fid, fragmentHash)) - n.Stag("info", fmt.Sprintf("[%s] Will use tee: %v", fid, teeEndPoints[j])) - if !strings.Contains(teeEndPoints[j], "443") { - dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - } else { - dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(configs.GetCert())} - } - genTag = nil - for k := 0; k < 3; k++ { - genTag, err = n.RequestGenTag( - teeEndPoints[j], - requestGenTag, - time.Duration(time.Minute*20), - dialOptions, - nil, - ) - if err != nil { - if strings.Contains(err.Error(), "no permits available") { - time.Sleep(time.Minute * 3) - continue - } - n.Stag("err", fmt.Sprintf("[RequestGenTag] %v", err)) - break - } - } - if genTag == nil { - continue - } - - if len(genTag.USig) != pattern.TeeSignatureLen { - n.Stag("err", fmt.Sprintf("[RequestGenTag] invalid USig length: %d", len(genTag.USig))) - continue - } - - if len(genTag.Signature) != pattern.TeeSigLen { - n.Stag("err", fmt.Sprintf("[RequestGenTag] invalid TagSigInfo length: %d", len(genTag.Signature))) - continue - } - for k := 0; k < pattern.TeeSigLen; k++ { - teeSign[k] = types.U8(genTag.Signature[k]) - } - - var tfile = &TagfileType{ - Tag: genTag.Tag, - USig: genTag.USig, - Signature: genTag.Signature, - FragmentName: []byte(fragmentHash), - TeeAccountId: []byte(string(teePubkey[:])), - Index: (maxIndex + 1), - } - buf, err = json.Marshal(tfile) - if err != nil { - n.Stag("err", fmt.Sprintf("[json.Marshal] err: %s", err)) - continue - } - - // ok, err := n.GetPodr2Key().VerifyAttest(genTag.Tag.T.Name, genTag.Tag.T.U, genTag.Tag.PhiHash, genTag.Tag.Attest, "") - // if err != nil { - // n.Stag("err", fmt.Sprintf("[VerifyAttest] err: %s", err)) - // continue - // } - // if !ok { - // n.Stag("err", "VerifyAttest is false") - // continue - // } - - err = sutils.WriteBufToFile(buf, fmt.Sprintf("%s.tag", fragments[i])) - if err != nil { - n.Stag("err", fmt.Sprintf("[WriteBufToFile] err: %s", err)) - continue - } - isReportTag = true - n.Stag("info", fmt.Sprintf("Calc a service tag: %s", fmt.Sprintf("%s.tag", fragments[i]))) - break + if isreport { + isReportTag = isreport } } @@ -280,6 +192,9 @@ func (n *Node) calcFileTag(file string, teeEndPoints []string) error { txhash, err := n.reportFileTag(fid, tags) if err != nil { + for k := 0; k < len(tags); k++ { + os.Remove(tags[k]) + } n.Stag("err", fmt.Sprintf("[%s] [reportFileTag] %v", fid, err)) } else { n.Cache.Put([]byte(Cach_prefix_Tag+fid), nil) @@ -288,6 +203,172 @@ func (n *Node) calcFileTag(file string, teeEndPoints []string) error { return nil } +func (n *Node) calcTheFragmentTag(fid, fragmentFile string, maxIndex uint16, lastSign []byte, digest []*pb.DigestInfo) (bool, error) { + var err error + var isReportTag bool + var teeSign pattern.TeeSig + var genTag pb.GenTagMsg + var teePubkey string + var fragmentHash = filepath.Base(fragmentFile) + + genTag, teePubkey, err = n.requestTeeTag(fid, fragmentFile, lastSign, digest) + if err != nil { + return false, fmt.Errorf("requestTeeTag: %v", err) + } + + if len(genTag.USig) != pattern.TeeSignatureLen { + return false, fmt.Errorf("invalid USig length: %d", len(genTag.USig)) + } + + if len(genTag.Signature) != pattern.TeeSigLen { + return false, fmt.Errorf("invalid Tag.Signature length: %d", len(genTag.Signature)) + } + for k := 0; k < pattern.TeeSigLen; k++ { + teeSign[k] = types.U8(genTag.Signature[k]) + } + + var tfile = &TagfileType{ + Tag: genTag.Tag, + USig: genTag.USig, + Signature: genTag.Signature, + FragmentName: []byte(fragmentHash), + TeeAccountId: []byte(teePubkey), + Index: (maxIndex + 1), + } + buf, err := json.Marshal(tfile) + if err != nil { + return false, fmt.Errorf("json.Marshal: %v", err) + } + + // ok, err := n.GetPodr2Key().VerifyAttest(genTag.Tag.T.Name, genTag.Tag.T.U, genTag.Tag.PhiHash, genTag.Tag.Attest, "") + // if err != nil { + // n.Stag("err", fmt.Sprintf("[VerifyAttest] err: %s", err)) + // continue + // } + // if !ok { + // n.Stag("err", "VerifyAttest is false") + // continue + // } + + err = sutils.WriteBufToFile(buf, fmt.Sprintf("%s.tag", fragmentFile)) + if err != nil { + return false, fmt.Errorf("WriteBufToFile: %v", err) + } + isReportTag = true + n.Stag("info", fmt.Sprintf("Calc a service tag: %s", fmt.Sprintf("%s.tag", fragmentFile))) + return isReportTag, nil +} + +func (n *Node) requestTeeTag(fid, fragmentFile string, lastSign []byte, digest []*pb.DigestInfo) (pb.GenTagMsg, string, error) { + var err error + var teePubkey string + var tagInfo pb.GenTagMsg + teeEndPoints := n.GetPriorityTeeList() + teeEndPoints = append(teeEndPoints, n.GetAllMarkerTeeEndpoint()...) + + n.Stag("info", fmt.Sprintf("[%s] To calc the fragment tag: %v", fid, filepath.Base(fragmentFile))) + for j := 0; j < len(teeEndPoints); j++ { + n.Stag("info", fmt.Sprintf("[%s] Will use tee: %v", fid, teeEndPoints[j])) + teePubkey, err = n.GetTeeWorkAccount(teeEndPoints[j]) + if err != nil { + n.Stag("err", fmt.Sprintf("[GetTeeWorkAccount(%s)] %v", teeEndPoints[j], err)) + continue + } + tagInfo, err = n.callTeeTag(teeEndPoints[j], fid, fragmentFile, lastSign, digest) + if err != nil { + n.Stag("err", fmt.Sprintf("[callTeeTag(%s)] %v", teeEndPoints[j], err)) + continue + } + return tagInfo, teePubkey, nil + } + return tagInfo, teePubkey, err +} + +func (n *Node) callTeeTag(teeEndPoint, fid, fragmentFile string, lastSign []byte, digest []*pb.DigestInfo) (pb.GenTagMsg, error) { + var dialOptions []grpc.DialOption + if !strings.Contains(teeEndPoint, "443") { + dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + } else { + dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(configs.GetCert())} + } + conn, err := grpc.Dial(teeEndPoint, dialOptions...) + if err != nil { + return pb.GenTagMsg{}, fmt.Errorf("grpc.Dial(%s): %v", teeEndPoint, err) + } + defer conn.Close() + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Minute*20)) + defer cancel() + stream, err := pb.NewPodr2ApiClient(conn).RequestGenTag(ctx) + if err != nil { + return pb.GenTagMsg{}, fmt.Errorf("RequestGenTag: %v", err) + } + fragmentHash := filepath.Base(fragmentFile) + buf, err := os.ReadFile(fragmentFile) + if err != nil { + return pb.GenTagMsg{}, fmt.Errorf("ReadFile: %v", err) + } + n.Stag("info", fmt.Sprintf("Will request first to %s", teeEndPoint)) + err = stream.Send(&pb.RequestGenTag{ + FragmentData: make([]byte, 0), + FragmentName: fragmentHash, + CustomData: "", + FileName: fid, + MinerId: n.GetSignatureAccPulickey(), + TeeDigestList: make([]*pb.DigestInfo, 0), + LastTeeSignature: make([]byte, 0)}) + if err != nil { + return pb.GenTagMsg{}, fmt.Errorf("first send: %v", err) + } + n.Stag("info", fmt.Sprintf("Will recv first result from %s", teeEndPoint)) + ok, err := reciv_signal(stream) + if err != nil { + return pb.GenTagMsg{}, err + } + n.Stag("info", fmt.Sprintf("Recv first result is: %v", ok)) + if !ok { + return pb.GenTagMsg{}, errors.New("reciv_signal: false") + } + n.Stag("info", fmt.Sprintf("Will request second to %s", teeEndPoint)) + err = stream.Send(&pb.RequestGenTag{ + FragmentData: buf, + FragmentName: fragmentHash, + CustomData: "", + FileName: fid, + MinerId: n.GetSignatureAccPulickey(), + TeeDigestList: digest, + LastTeeSignature: lastSign, + }) + if err != nil { + return pb.GenTagMsg{}, fmt.Errorf("second send: %v", err) + } + n.Stag("info", fmt.Sprintf("Will recv second result from %s", teeEndPoint)) + tag, err := reciv_tag(stream) + if err != nil { + return pb.GenTagMsg{}, err + } + n.Stag("info", "Recv second result suc") + err = stream.CloseSend() + if err != nil { + n.Stag("err", fmt.Sprintf(" stream.Close: %v", err)) + } + return tag, nil +} + +func reciv_signal(stream pb.Podr2Api_RequestGenTagClient) (bool, error) { + req, err := stream.Recv() + if err != nil { + return false, err + } + return req.GetProcessing(), nil +} +func reciv_tag(stream pb.Podr2Api_RequestGenTagClient) (pb.GenTagMsg, error) { + req, err := stream.Recv() + if err != nil { + return pb.GenTagMsg{}, err + } + return *req.GetMsg(), nil +} + func getAllFragment(path string) ([]string, error) { st, err := os.Stat(path) if err != nil { diff --git a/node/chall_service.go b/node/chall_service.go index 2bdcb18..11aec73 100644 --- a/node/chall_service.go +++ b/node/chall_service.go @@ -336,6 +336,7 @@ func (n *Node) calcSigma( } } if !isChall { + os.Remove(fragments[j]) continue } n.Schal("info", fmt.Sprintf("chall go on: %s.%s", roothash, fragmentHash)) @@ -362,7 +363,8 @@ func (n *Node) calcSigma( err = n.calcFragmentTag(roothash, fragments[j]) if err != nil { n.Schal("err", fmt.Sprintf("calcFragmentTag %v err: %v", fragments[j], err)) - return names, us, mus, sigma, usig, err + n.GenerateRestoralOrder(roothash, fragmentHash) + continue } } n.Schal("info", fmt.Sprintf("[%s] Read tag file: %s", roothash, serviceTagPath)) @@ -371,7 +373,8 @@ func (n *Node) calcSigma( if err != nil { n.Schal("err", fmt.Sprintf("Unmarshal %v err: %v", serviceTagPath, err)) os.Remove(serviceTagPath) - return names, us, mus, sigma, usig, err + n.GenerateRestoralOrder(roothash, fragmentHash) + continue } _, err = os.Stat(fragments[j]) if err != nil { diff --git a/node/common.go b/node/common.go index cd8f008..c96963c 100644 --- a/node/common.go +++ b/node/common.go @@ -11,14 +11,20 @@ import ( "fmt" "os" "runtime" + "strings" "time" "github.com/AstaFrode/go-libp2p/core/peer" + "github.com/CESSProject/cess-bucket/configs" "github.com/CESSProject/cess-bucket/pkg/utils" "github.com/CESSProject/cess-go-sdk/core/pattern" + sutils "github.com/CESSProject/cess-go-sdk/utils" "github.com/CESSProject/p2p-go/core" "github.com/CESSProject/p2p-go/out" + "github.com/CESSProject/p2p-go/pb" ma "github.com/multiformats/go-multiaddr" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) type DataDir struct { @@ -133,6 +139,8 @@ func (n *Node) syncChainStatus(ch chan<- bool) { n.Pnc(utils.RecoverError(err)) } }() + var dialOptions []grpc.DialOption + var chainPublickey = make([]byte, pattern.WorkerPublicKeyLen) teelist, err := n.QueryAllTeeWorkerMap() if err != nil { n.Log("err", err.Error()) @@ -143,6 +151,44 @@ func (n *Node) syncChainStatus(ch chan<- bool) { n.Log("err", err.Error()) continue } + endpoint = processEndpoint(endpoint) + + if !strings.Contains(endpoint, "443") { + dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + } else { + dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(configs.GetCert())} + } + + // verify identity public key + identityPubkeyResponse, err := n.GetIdentityPubkey(endpoint, + &pb.Request{ + StorageMinerAccountId: n.GetSignatureAccPulickey(), + }, + time.Duration(time.Minute), + dialOptions, + nil, + ) + if err != nil { + n.Log("err", err.Error()) + continue + } + n.Log("info", fmt.Sprintf("get identityPubkeyResponse: %v", identityPubkeyResponse.Pubkey)) + if len(identityPubkeyResponse.Pubkey) != pattern.WorkerPublicKeyLen { + n.DeleteTee(string(teelist[i].Pubkey[:])) + n.Log("err", fmt.Sprintf("identityPubkeyResponse.Pubkey length err: %d", len(identityPubkeyResponse.Pubkey))) + continue + } + + for j := 0; j < pattern.WorkerPublicKeyLen; j++ { + chainPublickey[j] = byte(teelist[i].Pubkey[j]) + } + if !sutils.CompareSlice(identityPubkeyResponse.Pubkey, chainPublickey) { + n.DeleteTee(string(teelist[i].Pubkey[:])) + n.Log("err", "identityPubkeyResponse.Pubkey err: not qual to chain") + continue + } + + n.Log("info", fmt.Sprintf("Save a tee: %s %d", endpoint, teelist[i].Role)) err = n.SaveTee(string(teelist[i].Pubkey[:]), endpoint, uint8(teelist[i].Role)) if err != nil { n.Log("err", err.Error()) @@ -172,19 +218,40 @@ func (n *Node) syncChainStatus(ch chan<- bool) { } } -func (n *Node) watchMem() { +func (n *Node) WatchMem() { memSt := &runtime.MemStats{} tikProgram := time.NewTicker(time.Second * 3) defer tikProgram.Stop() - for { - select { - case <-tikProgram.C: - runtime.ReadMemStats(memSt) - if memSt.HeapSys >= pattern.SIZE_1GiB*8 { - n.Log("err", fmt.Sprintf("Mem heigh: %d", memSt.HeapSys)) - os.Exit(1) - } + for range tikProgram.C { + runtime.ReadMemStats(memSt) + if memSt.HeapSys >= pattern.SIZE_1GiB*8 { + n.Log("err", fmt.Sprintf("Mem heigh: %d", memSt.HeapSys)) + os.Exit(1) + } + } +} + +func processEndpoint(endPoint string) string { + var teeEndPoint string + if strings.HasPrefix(endPoint, "http://") { + teeEndPoint = strings.TrimPrefix(endPoint, "http://") + teeEndPoint = strings.TrimSuffix(teeEndPoint, "/") + if !strings.Contains(teeEndPoint, ":") { + teeEndPoint = teeEndPoint + ":80" + } + } else if strings.HasPrefix(endPoint, "https://") { + teeEndPoint = strings.TrimPrefix(endPoint, "https://") + teeEndPoint = strings.TrimSuffix(teeEndPoint, "/") + if !strings.Contains(teeEndPoint, ":") { + teeEndPoint = teeEndPoint + ":443" + } + } else { + if !strings.Contains(endPoint, ":") { + teeEndPoint = endPoint + ":80" + } else { + teeEndPoint = endPoint } } + return teeEndPoint } diff --git a/node/node.go b/node/node.go index a676453..80c6dd3 100644 --- a/node/node.go +++ b/node/node.go @@ -93,24 +93,6 @@ func (n *Node) Run() { ch_GenIdleFile <- true ch_restoreMgt <- true - // for { - // out.Tip("QueryMasterPublicKey") - // pubkey, err := n.QueryMasterPublicKey() - // if err != nil { - // out.Err(err.Error()) - // time.Sleep(pattern.BlockInterval) - // continue - // } - // out.Err("SetPublickey") - // err = n.SetPublickey(pubkey) - // if err != nil { - // time.Sleep(pattern.BlockInterval) - // continue - // } - // n.Schal("info", "Initialize key successfully") - // break - // } - task_10S := time.NewTicker(time.Duration(time.Second * 10)) defer task_10S.Stop() @@ -120,6 +102,9 @@ func (n *Node) Run() { task_Minute := time.NewTicker(time.Minute) defer task_Minute.Stop() + task_5_Minute := time.NewTicker(time.Minute * 5) + defer task_5_Minute.Stop() + task_Hour := time.NewTicker(time.Hour) defer task_Hour.Stop() @@ -153,10 +138,6 @@ func (n *Node) Run() { case <-task_30S.C: n.SetTaskPeriod("30s") - if len(ch_connectBoot) > 0 { - <-ch_connectBoot - go n.connectBoot(ch_connectBoot) - } if len(ch_reportfiles) > 0 { <-ch_reportfiles go n.reportFiles(ch_reportfiles) @@ -169,20 +150,13 @@ func (n *Node) Run() { case <-task_Minute.C: n.SetTaskPeriod("1m") - if len(ch_syncChainStatus) > 0 { - <-ch_syncChainStatus - go n.syncChainStatus(ch_syncChainStatus) - } - if len(ch_idlechallenge) > 0 || len(ch_servicechallenge) > 0 { go n.challengeMgt(ch_idlechallenge, ch_servicechallenge) } - if len(ch_findPeers) > 0 { <-ch_findPeers //go n.findPeers(ch_findPeers) } - if len(ch_recvPeers) > 0 { <-ch_recvPeers go n.recvPeers(ch_recvPeers) @@ -209,6 +183,16 @@ func (n *Node) Run() { } n.SetTaskPeriod("1m-end") + case <-task_5_Minute.C: + if len(ch_syncChainStatus) > 0 { + <-ch_syncChainStatus + go n.syncChainStatus(ch_syncChainStatus) + } + if len(ch_connectBoot) > 0 { + <-ch_connectBoot + go n.connectBoot(ch_connectBoot) + } + case <-task_Hour.C: n.SetTaskPeriod("1h") go n.reportLogsMgt(ch_reportLogs) @@ -228,6 +212,7 @@ func (n *Node) SetPublickey(pubkey []byte) error { if err != nil { return err } + if n.RSAKeyPair == nil { n.RSAKeyPair = proof.NewKey() } diff --git a/node/pois.go b/node/pois.go index b140c3f..1c68ecc 100644 --- a/node/pois.go +++ b/node/pois.go @@ -100,7 +100,10 @@ func (n *Node) InitPois(firstflag bool, front, rear, freeSpace, count int64, key ChallAccPath: n.DataDir.AccDir, MaxProofThread: n.GetCpuCores(), } - + fmt.Println("freeSpace:", freeSpace) + fmt.Println("int64(n.ExpendersInfo.K):", int64(n.ExpendersInfo.K)) + fmt.Println("int64(n.ExpendersInfo.N):", int64(n.ExpendersInfo.N)) + fmt.Println("int64(n.ExpendersInfo.D):", int64(n.ExpendersInfo.D)) // k,n,d and key are params that needs to be negotiated with the verifier in advance. // minerID is storage node's account ID, and space is the amount of physical space available(MiB) n.Prover, err = pois.NewProver( diff --git a/node/restore.go b/node/restore.go index b099a1c..eb7e4e1 100644 --- a/node/restore.go +++ b/node/restore.go @@ -23,12 +23,8 @@ import ( "github.com/CESSProject/cess-go-sdk/core/pattern" sutils "github.com/CESSProject/cess-go-sdk/utils" "github.com/CESSProject/p2p-go/core" - "github.com/CESSProject/p2p-go/pb" - "github.com/centrifuge/go-substrate-rpc-client/v4/types" "github.com/mr-tron/base58" "github.com/pkg/errors" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) func (n *Node) restoreMgt(ch chan bool) { @@ -637,86 +633,47 @@ func (n *Node) calcFragmentTag(fid, fragment string) error { return errors.New("invalid fragment size") } fragmentHash := filepath.Base(fragment) - teeEndPoints := n.GetPriorityTeeList() - teeEndPoints = append(teeEndPoints, n.GetAllMarkerTeeEndpoint()...) - requestGenTag := &pb.RequestGenTag{ - FragmentData: buf[:pattern.FragmentSize], - FragmentName: fragmentHash, - CustomData: "", - FileName: fid, - MinerId: n.GetSignatureAccPulickey(), - } - var dialOptions []grpc.DialOption - var teeSign pattern.TeeSig - for i := 0; i < len(teeEndPoints); i++ { - teePubkey, err := n.GetTeeWorkAccount(teeEndPoints[i]) - if err != nil { - n.Restore("info", fmt.Sprintf("[GetTee(%s)] %v", teeEndPoints[i], err)) - continue - } - n.Restore("info", fmt.Sprintf("[%s] Will calc file tag: %v", fid, fragmentHash)) - n.Restore("info", fmt.Sprintf("[%s] Will use tee: %v", fid, teeEndPoints[i])) - if !strings.Contains(teeEndPoints[i], "443") { - dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - } else { - dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(configs.GetCert())} - } - genTag, err := n.RequestGenTag( - teeEndPoints[i], - requestGenTag, - time.Duration(time.Minute*20), - dialOptions, - nil, - ) - if err != nil { - n.Restore("err", fmt.Sprintf("[RequestGenTag] %v", err)) - continue - } - if len(genTag.USig) != pattern.TeeSignatureLen { - n.Restore("err", fmt.Sprintf("[RequestGenTag] invalid USig length: %d", len(genTag.USig))) - continue - } + genTag, teePubkey, err := n.requestTeeTag(fid, fragment, nil, nil) + if err != nil { + return err + } - if len(genTag.Signature) != pattern.TeeSigLen { - n.Restore("err", fmt.Sprintf("[RequestGenTag] invalid TagSigInfo length: %d", len(genTag.Signature))) - continue - } - for j := 0; j < pattern.TeeSigLen; j++ { - teeSign[j] = types.U8(genTag.Signature[j]) - } + if len(genTag.USig) != pattern.TeeSignatureLen { + return fmt.Errorf("invalid USig length: %d", len(genTag.USig)) + } - index := getTagsNumber(filepath.Join(n.GetDirs().FileDir, fid)) + if len(genTag.Signature) != pattern.TeeSigLen { + return fmt.Errorf("invalid genTag.Signature length: %d", len(genTag.Signature)) + } - var tfile = &TagfileType{ - Tag: genTag.Tag, - USig: genTag.USig, - Signature: genTag.Signature, - FragmentName: []byte(fragmentHash), - TeeAccountId: []byte(teePubkey), - Index: uint16(index + 1), - } - buf, err = json.Marshal(tfile) - if err != nil { - n.Restore("err", fmt.Sprintf("[json.Marshal] err: %s", err)) - continue - } - ok, err := n.GetPodr2Key().VerifyAttest(genTag.Tag.T.Name, genTag.Tag.T.U, genTag.Tag.PhiHash, genTag.Tag.Attest, "") - if err != nil { - n.Restore("err", fmt.Sprintf("[VerifyAttest] err: %s", err)) - continue - } - if !ok { - n.Restore("err", "VerifyAttest is false") - continue - } - err = sutils.WriteBufToFile(buf, fmt.Sprintf("%s.tag", fragment)) - if err != nil { - n.Restore("err", fmt.Sprintf("[WriteBufToFile] err: %s", err)) - continue - } - n.Restore("info", fmt.Sprintf("Calc a service tag: %s", fmt.Sprintf("%s.tag", fragment))) - break + index := getTagsNumber(filepath.Join(n.GetDirs().FileDir, fid)) + + var tfile = &TagfileType{ + Tag: genTag.Tag, + USig: genTag.USig, + Signature: genTag.Signature, + FragmentName: []byte(fragmentHash), + TeeAccountId: []byte(teePubkey), + Index: uint16(index + 1), + } + buf, err = json.Marshal(tfile) + if err != nil { + return fmt.Errorf("json.Marshal: %v", err) + } + // ok, err := n.GetPodr2Key().VerifyAttest(genTag.Tag.T.Name, genTag.Tag.T.U, genTag.Tag.PhiHash, genTag.Tag.Attest, "") + // if err != nil { + // n.Restore("err", fmt.Sprintf("[VerifyAttest] err: %s", err)) + // continue + // } + // if !ok { + // n.Restore("err", "VerifyAttest is false") + // continue + // } + err = sutils.WriteBufToFile(buf, fmt.Sprintf("%s.tag", fragment)) + if err != nil { + return fmt.Errorf("WriteBufToFile: %v", err) } + n.Restore("info", fmt.Sprintf("Calc a service tag: %s", fmt.Sprintf("%s.tag", fragment))) return nil } diff --git a/node/runningState.go b/node/runningState.go index 21595a0..cb3a41b 100644 --- a/node/runningState.go +++ b/node/runningState.go @@ -67,7 +67,6 @@ type RunningRecordType struct { taskPeriod string cpuCores int pid int32 - workStage uint8 lastReconnectRpcTime string calcTagFlag bool reportFileFlag bool diff --git a/pkg/confile/confile.go b/pkg/confile/confile.go index 1e36089..aa5f478 100644 --- a/pkg/confile/confile.go +++ b/pkg/confile/confile.go @@ -23,7 +23,6 @@ import ( const DefaultProfile = "conf.yaml" const TempleteProfile = `# The rpc endpoint of the chain node Rpc: - - "ws://127.0.0.1:9948/" - "wss://testnet-rpc0.cess.cloud/ws/" - "wss://testnet-rpc1.cess.cloud/ws/" - "wss://testnet-rpc2.cess.cloud/ws/" @@ -47,9 +46,7 @@ UseSpace: 2000 # Number of cpu's used, 0 means use all UseCpu: 0 # Priority tee list address -TeeList: - - "127.0.0.1:8080" - - "127.0.0.1:8081"` +TeeList:` type Confile interface { Parse(fpath string, port int) error