From a6002191fd93b8a37be58067def24a0138e1276a Mon Sep 17 00:00:00 2001 From: AstaFrode Date: Wed, 31 Jan 2024 19:50:34 +0800 Subject: [PATCH] Adapt to CESS 0.7.6 --- cmd/console/reward.go | 12 -- configs/config.go | 4 - configs/system.go | 2 +- go.mod | 4 +- go.sum | 8 +- node/calc_tag.go | 381 ++++++++++++++++++++++++------------------ node/chall_idle.go | 30 ++-- node/chall_service.go | 24 +-- node/challenge.go | 8 +- node/discover.go | 3 +- node/node.go | 1 - node/report_file.go | 287 +++++++++++++++++-------------- 12 files changed, 421 insertions(+), 343 deletions(-) diff --git a/cmd/console/reward.go b/cmd/console/reward.go index b3095854..b73f7b5b 100644 --- a/cmd/console/reward.go +++ b/cmd/console/reward.go @@ -73,7 +73,6 @@ func Command_Reward_Runfunc(cmd *cobra.Command, args []string) { } var total string var claimed string - var available string var sep uint8 = 0 for i := len(rewardInfo.Total) - 1; i >= 0; i-- { total = fmt.Sprintf("%c%s", rewardInfo.Total[i], total) @@ -94,20 +93,9 @@ func Command_Reward_Runfunc(cmd *cobra.Command, args []string) { } claimed = strings.TrimPrefix(claimed, "_") - sep = 0 - for i := len(rewardInfo.Available) - 1; i >= 0; i-- { - available = fmt.Sprintf("%c%s", rewardInfo.Available[i], available) - sep++ - if sep%3 == 0 { - available = fmt.Sprintf("_%s", available) - } - } - available = strings.TrimPrefix(available, "_") - var tableRows = []table.Row{ {"total reward", total}, {"claimed reward", claimed}, - {"available reward", available}, } tw := table.NewWriter() tw.AppendRows(tableRows) diff --git a/configs/config.go b/configs/config.go index 5da17ab5..fa2e9fab 100644 --- a/configs/config.go +++ b/configs/config.go @@ -31,10 +31,6 @@ const ( MinMTagFileSize = 600000 ) -const ( - OrserState_CalcTag uint8 = 2 -) - const ( Err_tee_Busy = "is being fully calculated" Err_ctx_exceeded = "context deadline exceeded" diff --git a/configs/system.go b/configs/system.go index 8670e95a..c9ced37b 100644 --- a/configs/system.go +++ b/configs/system.go @@ -22,7 +22,7 @@ const ( // Name is the name of the program Name = "bucket" // version - Version = "v0.7.9 pre-release" + Version = "v0.7.9" // Description is the description of the program Description = "Storage node implementation in CESS networks" // NameSpace is the cached namespace diff --git a/go.mod b/go.mod index daeae2b4..3e07f00f 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,9 @@ go 1.20 require ( github.com/AstaFrode/go-libp2p v0.26.4-0.20231113143058-912296254d44 - github.com/CESSProject/cess-go-sdk v0.4.32 + github.com/CESSProject/cess-go-sdk v0.5.0 github.com/CESSProject/cess_pois v0.4.22 - github.com/CESSProject/p2p-go v0.2.13 + github.com/CESSProject/p2p-go v0.3.0 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1 github.com/gin-contrib/cors v1.5.0 diff --git a/go.sum b/go.sum index f5a7347c..441ba921 100644 --- a/go.sum +++ b/go.sum @@ -21,14 +21,14 @@ github.com/AstaFrode/go-peertaskqueue v0.8.2-0.20231108073729-990e433425a4 h1:fu github.com/AstaFrode/go-peertaskqueue v0.8.2-0.20231108073729-990e433425a4/go.mod h1:0YcQDsyTRKBTK9yE22DQgiwQndGFvenqMvD6Spkxl28= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/CESSProject/cess-go-sdk v0.4.32 h1:/VcKShWBPJdJkthlnLzLYbiQgRhKzp7GG1A2dd+XjHM= -github.com/CESSProject/cess-go-sdk v0.4.32/go.mod h1:IamF02Ng+FQvThFbQRtpEpPzmlPOXqQzgaF9YDE0YhY= +github.com/CESSProject/cess-go-sdk v0.5.0 h1:N87Trs+Btt3mftpRxMt7ZeI7va4v/SaT4OiZxi00KA4= +github.com/CESSProject/cess-go-sdk v0.5.0/go.mod h1:IamF02Ng+FQvThFbQRtpEpPzmlPOXqQzgaF9YDE0YhY= github.com/CESSProject/cess_pois v0.4.22 h1:RxQRg8A3z79mt19Jb5yd39lIjBSWKBih1KLIqN1WnIY= github.com/CESSProject/cess_pois v0.4.22/go.mod h1:rztEZjjG+MbKzVgh5WtQcZc/7ZDkBvDNABc7Em8BKPc= github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde h1:5MDRjjtg6PEhqyVjupwaapN96cOZiddOGAYwKQeaTu0= github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde/go.mod h1:RUXBd3ROP98MYepEEa0Y0l/T0vQlIKqFJxI/ocdnRLM= -github.com/CESSProject/p2p-go v0.2.13 h1:JWETd5xqvHUBkE86EV4IFXluhdaAnqXA1YLcQlLhvzo= -github.com/CESSProject/p2p-go v0.2.13/go.mod h1:SMwJt5Zpk98k+d2J5gsN+0Forr7MbqOYWbHLd3mBLrI= +github.com/CESSProject/p2p-go v0.3.0 h1:PdwbLBri3qYwIHxaG4BHHS3ETH1BlVRCnBG87jr5cQg= +github.com/CESSProject/p2p-go v0.3.0/go.mod h1:SMwJt5Zpk98k+d2J5gsN+0Forr7MbqOYWbHLd3mBLrI= github.com/ChainSafe/go-schnorrkel v1.0.0 h1:3aDA67lAykLaG1y3AOjs88dMxC88PgUuHRrLeDnvGIM= github.com/ChainSafe/go-schnorrkel v1.0.0/go.mod h1:dpzHYVxLZcp8pjlV+O+UR8K0Hp/z7vcchBSbMBEhCw4= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= diff --git a/node/calc_tag.go b/node/calc_tag.go index 63295855..9036bc4d 100644 --- a/node/calc_tag.go +++ b/node/calc_tag.go @@ -54,12 +54,6 @@ func (n *Node) calcTag(ch chan<- bool) { return } - var ok bool - var fid string - var fragmentHash string - var dialOptions []grpc.DialOption - var teeSign pattern.TeeSig - n.SetCalcTagFlag(true) defer n.SetCalcTagFlag(false) @@ -69,97 +63,117 @@ func (n *Node) calcTag(ch chan<- bool) { return } + n.Stag("info", fmt.Sprintf("[roothashs] %v", roothashs)) + teeEndPoints := n.GetPriorityTeeList() teeEndPoints = append(teeEndPoints, n.GetAllMarkerTeeEndpoint()...) for _, fileDir := range roothashs { - fid = filepath.Base(fileDir) - // ok, _ = n.Cache.Has([]byte(Cach_prefix_Tag + fid)) - // if ok { - // continue - // } - ok, err = n.Has([]byte(Cach_prefix_File + fid)) - if err == nil { - if !ok { - continue - } - } else { - n.Report("err", err.Error()) - time.Sleep(time.Second) - continue + err = n.calcFileTag(fileDir, teeEndPoints) + if err != nil { + n.Stag("err", fmt.Sprintf("[%s] [calcFileTag] %v", filepath.Base(fileDir), roothashs)) } + time.Sleep(time.Second) + } +} + +func (n *Node) calcFileTag(file string, teeEndPoints []string) error { + var ok bool + var isReportTag bool + var err error + var fragmentHash string + var tagPath string + var fragments, tags []string + var dialOptions []grpc.DialOption + var teeSign pattern.TeeSig + var genTag *pb.ResponseGenTag + fid := filepath.Base(file) + n.Stag("info", fmt.Sprintf("[%s] Start calc file tag", fid)) + + ok, _ = n.Has([]byte(Cach_prefix_File + fid)) + if !ok { + n.Stag("info", fmt.Sprintf("[%s] file not report", fid)) + return nil + } + + ok, _ = n.Has([]byte(Cach_prefix_Tag + fid)) + if ok { + n.Stag("info", fmt.Sprintf("[%s] the file's tag already report", fid)) + return nil + } + + fragments, err = getAllFragment(file) + if err != nil { + n.Stag("err", fmt.Sprintf("[getAllFragment(%s)] %v", fid, err)) + return nil + } + //n.Stag("info", fmt.Sprintf("[%s] The file have fragments: %v", fid, fragments)) + + if err = checkFragmentsSize(fragments); err != nil { + n.Stag("err", fmt.Sprintf("[checkFragmentsSize(%s)] %v", fid, err)) + return nil + } - fragments, tags, err := getFragmentAndTag(fileDir) + for i := 0; i < len(fragments); i++ { + tags, err = getFragmentTags(file) if err != nil { - n.Stag("err", fmt.Sprintf("[getFragmentAndTag(%s)] %v", fid, err)) - time.Sleep(time.Second) - continue + n.Stag("err", fmt.Sprintf("[getFragmentTags(%s)] %v", fid, err)) + return nil } + //n.Stag("info", fmt.Sprintf("[%s] The file have tag: %v", fid, tags)) - if err = checkFragmentsSize(fragments); err != nil { - n.Stag("err", fmt.Sprintf("[checkFragmentsSize(%s)] %v", fid, err)) - time.Sleep(time.Second) - continue + latestSig, digest, maxIndex, err := n.calcRequestDigest(filepath.Base(fragments[i]), tags) + if err != nil { + n.Stag("err", fmt.Sprintf("[calcRequestDigest(%s)] %v", fid, err)) + return nil } - - for i := 0; i < len(fragments); i++ { - tags, err = getFragmentTags(fileDir) - if err != nil { - n.Stag("err", fmt.Sprintf("[getFragmentTags(%s)] %v", fid, err)) + buf, err := os.ReadFile(fragments[i]) + if err != nil { + n.Stag("err", fmt.Sprintf("[%s] [ReadFile(%s)] %v", fid, fragments[i], err)) + return nil + } + fragmentHash = filepath.Base(fragments[i]) + tagPath = (fragments[i] + ".tag") + n.Stag("info", fmt.Sprintf("[%s] Check this file tag: %v", fid, tagPath)) + fstat, err := os.Stat(tagPath) + if err == nil { + if fstat.Size() < configs.MinMTagFileSize { + n.Stag("err", fmt.Sprintf("[%s] The file's tag size: %d < %d", fid, fstat.Size(), configs.MinMTagFileSize)) + os.Remove(tagPath) + } else { + n.Stag("info", fmt.Sprintf("[%s] The file's tag already calced", fid)) time.Sleep(time.Second) continue } + } else { + n.Stag("info", fmt.Sprintf("[%s] The file's tag stat err: %v", fid, err)) + } - if _, err = os.Stat(fragments[i] + ".tag"); err == nil { - continue - } - latestSig, digest, maxIndex, err := n.calcRequestDigest(filepath.Base(fragments[i]), tags) - if err != nil { - break - } - n.Stag("info", fmt.Sprintf("[checkFragments] latestSig: %v", latestSig)) - n.Stag("info", fmt.Sprintf("[checkFragments] len(digest): %d", len(digest))) - n.Stag("info", fmt.Sprintf("[checkFragments] maxIndex: %d", maxIndex)) - - buf, err := os.ReadFile(fragments[i]) + var requestGenTag = &pb.RequestGenTag{ + FragmentData: buf, + FragmentName: fragmentHash, + CustomData: "", + FileName: fid, + MinerId: n.GetSignatureAccPulickey(), + TeeDigestList: digest, + LastTeeSignature: latestSig, + } + for j := 0; j < len(teeEndPoints); j++ { + teePubkey, err := n.GetTeeWorkAccount(teeEndPoints[j]) if err != nil { - break - } - fragmentHash = filepath.Base(fragments[i]) - - fstat, err := os.Stat(fragments[i] + ".tag") - if err == nil { - if fstat.Size() < configs.MinMTagFileSize { - os.Remove(fragments[i] + ".tag") - } else { - time.Sleep(time.Second) - continue - } + n.Stag("err", fmt.Sprintf("[GetTeeWorkAccount(%s)] %v", teeEndPoints[j], err)) + continue } - - var requestGenTag = &pb.RequestGenTag{ - FragmentData: buf, - FragmentName: fragmentHash, - CustomData: "", - FileName: fid, - MinerId: n.GetSignatureAccPulickey(), - TeeDigestList: digest, - LastTeeSignature: latestSig, + n.Stag("info", fmt.Sprintf("[%s] Will calc file tag: %v", fid, fragmentHash)) + n.Stag("info", fmt.Sprintf("[%s] Will use tee: %v", fid, teeEndPoints[j])) + if !strings.Contains(teeEndPoints[j], "443") { + dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + } else { + dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(configs.GetCert())} } - for j := 0; j < len(teeEndPoints); j++ { - teePubkey, err := n.GetTeeWorkAccount(teeEndPoints[j]) - if err != nil { - n.Stag("err", fmt.Sprintf("[GetTeeWorkAccount(%s)] %v", teeEndPoints[j], err)) - continue - } - n.Stag("info", fmt.Sprintf("[%s] Will calc file tag: %v", fid, fragmentHash)) - n.Stag("info", fmt.Sprintf("[%s] Will use tee: %v", fid, teeEndPoints[j])) - if !strings.Contains(teeEndPoints[j], "443") { - dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - } else { - dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(configs.GetCert())} - } - genTag, err := n.RequestGenTag( + genTag = nil + for k := 0; k < 3; k++ { + genTag, err = n.RequestGenTag( teeEndPoints[j], requestGenTag, time.Duration(time.Minute*20), @@ -167,101 +181,132 @@ func (n *Node) calcTag(ch chan<- bool) { nil, ) if err != nil { + if strings.Contains(err.Error(), "no permits available") { + time.Sleep(time.Minute * 3) + continue + } n.Stag("err", fmt.Sprintf("[RequestGenTag] %v", err)) - continue + break } + } + if genTag == nil { + continue + } - if len(genTag.USig) != pattern.TeeSignatureLen { - n.Stag("err", fmt.Sprintf("[RequestGenTag] invalid USig length: %d", len(genTag.USig))) - continue - } + if len(genTag.USig) != pattern.TeeSignatureLen { + n.Stag("err", fmt.Sprintf("[RequestGenTag] invalid USig length: %d", len(genTag.USig))) + continue + } - if len(genTag.Signature) != pattern.TeeSigLen { - n.Stag("err", fmt.Sprintf("[RequestGenTag] invalid TagSigInfo length: %d", len(genTag.Signature))) - continue - } - for k := 0; k < pattern.TeeSigLen; k++ { - teeSign[k] = types.U8(genTag.Signature[k]) - } + if len(genTag.Signature) != pattern.TeeSigLen { + n.Stag("err", fmt.Sprintf("[RequestGenTag] invalid TagSigInfo length: %d", len(genTag.Signature))) + continue + } + for k := 0; k < pattern.TeeSigLen; k++ { + teeSign[k] = types.U8(genTag.Signature[k]) + } - var tfile = &TagfileType{ - Tag: genTag.Tag, - USig: genTag.USig, - Signature: genTag.Signature, - FragmentName: []byte(fragmentHash), - TeeAccountId: []byte(string(teePubkey[:])), - Index: (maxIndex + 1), - } - buf, err = json.Marshal(tfile) - if err != nil { - n.Stag("err", fmt.Sprintf("[json.Marshal] err: %s", err)) - continue - } - // ok, err := n.GetPodr2Key().VerifyAttest(genTag.Tag.T.Name, genTag.Tag.T.U, genTag.Tag.PhiHash, genTag.Tag.Attest, "") - // if err != nil { - // n.Stag("err", fmt.Sprintf("[VerifyAttest] err: %s", err)) - // continue - // } - // if !ok { - // n.Stag("err", "VerifyAttest is false") - // continue - // } - err = sutils.WriteBufToFile(buf, fmt.Sprintf("%s.tag", fragments[i])) - if err != nil { - n.Stag("err", fmt.Sprintf("[WriteBufToFile] err: %s", err)) - continue - } - n.Stag("info", fmt.Sprintf("Calc a service tag: %s", fmt.Sprintf("%s.tag", fragments[i]))) - break + var tfile = &TagfileType{ + Tag: genTag.Tag, + USig: genTag.USig, + Signature: genTag.Signature, + FragmentName: []byte(fragmentHash), + TeeAccountId: []byte(string(teePubkey[:])), + Index: (maxIndex + 1), + } + buf, err = json.Marshal(tfile) + if err != nil { + n.Stag("err", fmt.Sprintf("[json.Marshal] err: %s", err)) + continue } - } - if !n.checkAllFragmentTag(fragments) { - n.Stag("err", fmt.Sprintf("[%s] [checkAllFragmentTag] failed", fid)) - continue + // ok, err := n.GetPodr2Key().VerifyAttest(genTag.Tag.T.Name, genTag.Tag.T.U, genTag.Tag.PhiHash, genTag.Tag.Attest, "") + // if err != nil { + // n.Stag("err", fmt.Sprintf("[VerifyAttest] err: %s", err)) + // continue + // } + // if !ok { + // n.Stag("err", "VerifyAttest is false") + // continue + // } + + err = sutils.WriteBufToFile(buf, fmt.Sprintf("%s.tag", fragments[i])) + if err != nil { + n.Stag("err", fmt.Sprintf("[WriteBufToFile] err: %s", err)) + continue + } + isReportTag = true + n.Stag("info", fmt.Sprintf("Calc a service tag: %s", fmt.Sprintf("%s.tag", fragments[i]))) + break } + } - tags, err = getFragmentTags(fileDir) + if !isReportTag { + fmeta, err := n.QueryFileMetadata(fid) if err != nil { - n.Stag("err", fmt.Sprintf("[getFragmentTags(%s)] %v", fid, err)) - time.Sleep(time.Second) - continue + n.Stag("err", fmt.Sprintf("[%s] [QueryFileMetadata] %v", fid, err)) + return nil } - - txhash, err := n.reportFileTag(fid, tags) - if err != nil { - n.Stag("err", fmt.Sprintf("[%s] [reportFileTag] %v", fid, err)) - } else { - n.Cache.Put([]byte(Cach_prefix_Tag+fid), nil) - n.Stag("info", fmt.Sprintf("[%s] [reportFileTag] %v", fid, txhash)) + for _, segment := range fmeta.SegmentList { + for _, fragment := range segment.FragmentList { + if sutils.CompareSlice(fragment.Miner[:], n.GetSignatureAccPulickey()) { + if fragment.Tag.HasValue() { + err = n.Cache.Put([]byte(Cach_prefix_Tag+fid), nil) + if err != nil { + n.Stag("err", fmt.Sprintf("[%s] [Cache.Put] %v", fid, err)) + } + return nil + } + isReportTag = true + break + } + } + if isReportTag { + break + } } } + + if !n.checkAllFragmentTag(fragments) { + n.Stag("err", fmt.Sprintf("[%s] [checkAllFragmentTag] failed", fid)) + return nil + } + + tags, err = getFragmentTags(file) + if err != nil { + n.Stag("err", fmt.Sprintf("[getFragmentTags(%s)] %v", fid, err)) + return nil + } + + txhash, err := n.reportFileTag(fid, tags) + if err != nil { + n.Stag("err", fmt.Sprintf("[%s] [reportFileTag] %v", fid, err)) + } else { + n.Cache.Put([]byte(Cach_prefix_Tag+fid), nil) + n.Stag("info", fmt.Sprintf("[%s] [reportFileTag] %v", fid, txhash)) + } + return nil } -func getFragmentAndTag(path string) ([]string, []string, error) { +func getAllFragment(path string) ([]string, error) { st, err := os.Stat(path) if err != nil { - return nil, nil, err + return nil, err } if !st.IsDir() { - return nil, nil, errors.New("not dir") + return nil, errors.New("not dir") } files, err := utils.DirFiles(path, 0) if err != nil { - return nil, nil, err + return nil, err } var fragments []string - var tags []string for i := 0; i < len(files); i++ { - if strings.Contains(files[i], ".tag") { - tags = append(tags, files[i]) - continue - } if len(filepath.Base(files[i])) == pattern.FileHashLen { fragments = append(fragments, files[i]) } } - return fragments, tags, nil + return fragments, nil } func getFragmentTags(path string) ([]string, error) { @@ -316,11 +361,11 @@ func (n *Node) calcRequestDigest(fragment string, tags []string) ([]byte, []*pb. } var maxIndex uint16 var latestSig []byte - var digest []*pb.DigestInfo - n.Stag("Info", fmt.Sprintf("will check fragment tag: %s", fragment)) - n.Stag("Info", fmt.Sprintf("can check fragment tags: %v", tags)) + var digest = make([]*pb.DigestInfo, len(tags)) + n.Stag("info", fmt.Sprintf("will check fragment tag: %s", fragment)) + n.Stag("info", fmt.Sprintf("can check fragment tags: %v", tags)) for _, v := range tags { - n.Stag("Info", fmt.Sprintf("check fragment tag: %v", v)) + n.Stag("info", fmt.Sprintf("check fragment tag: %v", v)) if strings.Contains(v, fragment) { continue } @@ -334,20 +379,26 @@ func (n *Node) calcRequestDigest(fragment string, tags []string) ([]byte, []*pb. os.Remove(v) return nil, nil, 0, err } - n.Stag("Info", fmt.Sprintf("tag index: %d", tag.Index)) + n.Stag("info", fmt.Sprintf("tag index: %d", tag.Index)) if tag.Index == 0 { - if maxIndex == 0 { - latestSig = tag.Signature - } - } else if tag.Index > maxIndex { + msg := fmt.Sprintf("[%s] invalid tag.Index: %d ", fragment, tag.Index) + return nil, nil, 0, errors.New(msg) + } + if tag.Index > maxIndex { maxIndex = tag.Index + n.Stag("info", fmt.Sprintf("lastest tag sin: %d", tag.Index)) latestSig = tag.Signature } var dig = &pb.DigestInfo{ FragmentName: tag.FragmentName, TeeAccountId: tag.TeeAccountId, } - digest = append(digest, dig) + digest[tag.Index-1] = dig + } + if len(tags) == 0 { + digest = nil + latestSig = nil + maxIndex = 0 } return latestSig, digest, maxIndex, nil } @@ -376,7 +427,6 @@ func getTagsNumber(path string) int { func (n *Node) reportFileTag(fid string, tags []string) (string, error) { var onChainFlag bool var err error - var maxIndex uint16 var blocknumber uint32 var txhash string var tagSigInfo pattern.TagSigInfo @@ -385,7 +435,7 @@ func (n *Node) reportFileTag(fid string, tags []string) (string, error) { for j := 0; j < pattern.FileHashLen; j++ { tagSigInfo.Filehash[j] = types.U8(fid[j]) } - var digest []*pb.DigestInfo + var digest = make([]*pb.DigestInfo, len(tags)) for _, v := range tags { buf, err := os.ReadFile(v) if err != nil { @@ -397,15 +447,22 @@ func (n *Node) reportFileTag(fid string, tags []string) (string, error) { os.Remove(v) return txhash, err } - if tag.Index > maxIndex { - maxIndex = tag.Index + if tag.Index == 0 { + msg := fmt.Sprintf("[%s] invalid tag.Index: %d ", fid, tag.Index) + return "", errors.New(msg) + } + if int(tag.Index) == len(tags) { latestSig = tag.Signature } + if int(tag.Index) > len(tags) { + msg := fmt.Sprintf("[%s] invalid tag.Index: %d maxIndex: %d", fid, tag.Index, len(tags)) + return "", errors.New(msg) + } var dig = &pb.DigestInfo{ FragmentName: tag.FragmentName, TeeAccountId: tag.TeeAccountId, } - digest = append(digest, dig) + digest[tag.Index-1] = dig } tagSigInfo.Miner = types.AccountID(n.GetSignatureAccPulickey()) @@ -419,7 +476,7 @@ func (n *Node) reportFileTag(fid string, tags []string) (string, error) { txhash, err = n.ReportTagCalculated(latestSig, tagSigInfo) if err != nil || txhash == "" { n.Stag("err", fmt.Sprintf("[%s] ReportTagCalculated: %s %v", fid, txhash, err)) - time.Sleep(pattern.BlockInterval) + time.Sleep(pattern.BlockInterval * 2) fmeta, err = n.QueryFileMetadata(fid) if err == nil { for _, segment := range fmeta.SegmentList { diff --git a/node/chall_idle.go b/node/chall_idle.go index 462f3e25..39265d68 100644 --- a/node/chall_idle.go +++ b/node/chall_idle.go @@ -706,21 +706,25 @@ func (n *Node) checkIdleProofRecord( for j := 0; j < len(teeSig); j++ { teeSignBytes[j] = byte(teeSig[j]) } - txHash, err := n.SubmitIdleProofResult( - idleProve, - types.U64(minerChallFront), - types.U64(minerChallRear), - minerAccumulator, - types.Bool(spaceProofVerifyTotal.IdleResult), - teeSignBytes, - idleProofRecord.AllocatedTeeWorkpuk, - ) - if err != nil { - n.Ichal("err", fmt.Sprintf("[SubmitIdleProofResult] hash: %s, err: %v", txHash, err)) - time.Sleep(time.Minute) + var txHash string + for j := 2; j < 10; j++ { + txHash, err = n.SubmitIdleProofResult( + idleProve, + types.U64(minerChallFront), + types.U64(minerChallRear), + minerAccumulator, + types.Bool(spaceProofVerifyTotal.IdleResult), + teeSignBytes, + idleProofRecord.AllocatedTeeWorkpuk, + ) + if err != nil { + n.Ichal("err", fmt.Sprintf("[SubmitIdleProofResult] hash: %s, err: %v", txHash, err)) + time.Sleep(time.Minute * time.Duration(j)) + continue + } + n.Ichal("info", fmt.Sprintf("submit idle proof result suc: %s", txHash)) return nil } - n.Ichal("info", fmt.Sprintf("submit idle proof result suc: %s", txHash)) return nil } diff --git a/node/chall_service.go b/node/chall_service.go index ed3056ca..2bdcb188 100644 --- a/node/chall_service.go +++ b/node/chall_service.go @@ -180,17 +180,21 @@ func (n *Node) serviceChallenge( for j := 0; j < len(signature); j++ { teeSignBytes[j] = byte(signature[j]) } - txhash, err = n.SubmitServiceProofResult( - types.Bool(serviceProofRecord.ServiceResult), - teeSignBytes, - bloomFilter, - serviceProofRecord.AllocatedTeeWorkpuk, - ) - if err != nil { - n.Schal("err", fmt.Sprintf("[SubmitServiceProofResult] hash: %s, err: %v", txhash, err)) + for i := 2; i < 10; i++ { + txhash, err = n.SubmitServiceProofResult( + types.Bool(serviceProofRecord.ServiceResult), + teeSignBytes, + bloomFilter, + serviceProofRecord.AllocatedTeeWorkpuk, + ) + if err != nil { + n.Schal("err", fmt.Sprintf("[SubmitServiceProofResult] hash: %s, err: %v", txhash, err)) + time.Sleep(time.Minute * time.Duration(i)) + continue + } + n.Schal("info", fmt.Sprintf("submit service aggr proof result suc: %s", txhash)) return } - n.Schal("info", fmt.Sprintf("submit service aggr proof result suc: %s", txhash)) } // save challenge random number @@ -348,6 +352,7 @@ func (n *Node) calcSigma( return names, us, mus, sigma, usig, err } if blocknumber > uint64(challStart) { + n.Schal("info", fmt.Sprintf("Not at chall: %d > %d", blocknumber, challStart)) continue } } @@ -413,7 +418,6 @@ func (n *Node) calcSigma( us = append(us, tag.Tag.T.U) mus = append(mus, proveResponse.MU) usig = append(usig, tag.USig) - break } } return names, us, mus, sigma, usig, nil diff --git a/node/challenge.go b/node/challenge.go index 1cb330d7..0db5a64a 100644 --- a/node/challenge.go +++ b/node/challenge.go @@ -60,7 +60,7 @@ func (n *Node) challengeMgt(idleChallTaskCh, serviceChallTaskCh chan bool) { n.Ichal("err", fmt.Sprintf("idle data challenge verification expired: %v < %v", uint32(challenge.ChallengeElement.VerifySlip), latestBlock)) } else { if len(idleChallTaskCh) > 0 { - _ = <-idleChallTaskCh + <-idleChallTaskCh go n.idleChallenge( idleChallTaskCh, true, @@ -82,7 +82,7 @@ func (n *Node) challengeMgt(idleChallTaskCh, serviceChallTaskCh chan bool) { n.Ichal("err", fmt.Sprintf("idle data challenge has expired: %v < %v", uint32(challenge.ChallengeElement.IdleSlip), latestBlock)) } else { if len(idleChallTaskCh) > 0 { - _ = <-idleChallTaskCh + <-idleChallTaskCh go n.idleChallenge( idleChallTaskCh, false, @@ -107,7 +107,7 @@ func (n *Node) challengeMgt(idleChallTaskCh, serviceChallTaskCh chan bool) { n.Schal("err", fmt.Sprintf("service data challenge verification expired: %v < %v", uint32(challenge.ChallengeElement.VerifySlip), latestBlock)) } else { if len(serviceChallTaskCh) > 0 { - _ = <-serviceChallTaskCh + <-serviceChallTaskCh go n.serviceChallenge( serviceChallTaskCh, true, @@ -126,7 +126,7 @@ func (n *Node) challengeMgt(idleChallTaskCh, serviceChallTaskCh chan bool) { n.Schal("err", fmt.Sprintf("service challenge has expired: %v < %v", uint32(challenge.ChallengeElement.ServiceSlip), latestBlock)) } else { if len(serviceChallTaskCh) > 0 { - _ = <-serviceChallTaskCh + <-serviceChallTaskCh go n.serviceChallenge( serviceChallTaskCh, false, diff --git a/node/discover.go b/node/discover.go index fc5d0eb3..47643b1f 100644 --- a/node/discover.go +++ b/node/discover.go @@ -125,7 +125,7 @@ func (n *Node) reportLogsMgt(reportTaskCh chan bool) { } if len(reportTaskCh) > 0 { - _ = <-reportTaskCh + <-reportTaskCh defer func() { reportTaskCh <- true if err := recover(); err != nil { @@ -143,6 +143,7 @@ func (n *Node) reportLogsMgt(reportTaskCh chan bool) { n.ReportLogs(filepath.Join(n.DataDir.LogDir, "panic.log")) time.Sleep(time.Second * time.Duration(rand.Intn(120))) n.ReportLogs(filepath.Join(n.DataDir.LogDir, "log.log")) + n.ReportLogs(filepath.Join(n.DataDir.LogDir, "report.log")) } } diff --git a/node/node.go b/node/node.go index ee45d35e..a6764537 100644 --- a/node/node.go +++ b/node/node.go @@ -211,7 +211,6 @@ func (n *Node) Run() { case <-task_Hour.C: n.SetTaskPeriod("1h") - // go n.UpdatePeers() go n.reportLogsMgt(ch_reportLogs) n.SetTaskPeriod("1h-end") default: diff --git a/node/report_file.go b/node/report_file.go index 171ffa36..688fb5cb 100644 --- a/node/report_file.go +++ b/node/report_file.go @@ -11,10 +11,8 @@ import ( "fmt" "os" "path/filepath" - "strings" "time" - "github.com/CESSProject/cess-bucket/configs" "github.com/CESSProject/cess-bucket/pkg/utils" "github.com/CESSProject/cess-go-sdk/core/pattern" sutils "github.com/CESSProject/cess-go-sdk/utils" @@ -39,15 +37,6 @@ func (n *Node) reportFiles(ch chan<- bool) { return } - var ( - ok bool - reReport bool - roothash string - txhash string - metadata pattern.FileMetadata - storageorder pattern.StorageOrder - ) - n.SetReportFileFlag(true) defer n.SetReportFileFlag(false) @@ -56,163 +45,203 @@ func (n *Node) reportFiles(ch chan<- bool) { n.Report("err", fmt.Sprintf("[Dirs(TmpDir)] %v", err)) return } - for _, v := range roothashs { - roothash = filepath.Base(v) - n.Report("info", fmt.Sprintf("fid: %v", roothash)) - ok, err = n.Has([]byte(Cach_prefix_File + roothash)) - if err == nil { - if ok { - n.Report("info", fmt.Sprintf("Cach.Has: %v", roothash)) - continue - } - } else { - n.Report("err", err.Error()) + for _, file := range roothashs { + err = n.reportFile(file) + if err != nil { + n.Report("err", fmt.Sprintf("[%s] [reportFile] %v", filepath.Base(file), err)) } + time.Sleep(time.Second) + } +} - metadata, err = n.QueryFileMetadata(roothash) - if err != nil { - n.Report("err", fmt.Sprintf("QueryFileMetadata: %v", err)) - if err.Error() != pattern.ERR_Empty { - n.Report("err", err.Error()) - time.Sleep(pattern.BlockInterval) - continue - } +func (n *Node) reportFile(file string) error { + var ( + ok bool + err error + reReport bool + txhash string + queryFileMeta bool + metadata pattern.FileMetadata + storageorder pattern.StorageOrder + ) + queryFileMeta = true + fid := filepath.Base(file) + n.Report("info", fmt.Sprintf("[%s] will report file", fid)) + + ok, _ = n.Has([]byte(Cach_prefix_File + fid)) + if ok { + n.Report("info", fmt.Sprintf("[%s] already reported file", fid)) + if _, err = os.Stat(filepath.Join(n.GetDirs().FileDir, fid)); err == nil { + return nil + } + metadata, err = n.QueryFileMetadata(fid) + if err == nil { + queryFileMeta = false } else { - var deletedFrgmentList []string - var savedFrgment []string + return err + } + } else { + metadata, err = n.QueryFileMetadata(fid) + if err == nil { + queryFileMeta = false for _, segment := range metadata.SegmentList { for _, fragment := range segment.FragmentList { - if !sutils.CompareSlice(fragment.Miner[:], n.GetSignatureAccPulickey()) { - deletedFrgmentList = append(deletedFrgmentList, string(fragment.Hash[:])) - continue - } - savedFrgment = append(savedFrgment, string(fragment.Hash[:])) - } - } - - if len(savedFrgment) == 0 { - for _, d := range deletedFrgmentList { - _, err = os.Stat(filepath.Join(n.GetDirs().TmpDir, roothash, d)) - if err != nil { - continue - } - err = os.Remove(filepath.Join(n.GetDirs().TmpDir, roothash, d)) - if err != nil { - if !strings.Contains(err.Error(), configs.Err_file_not_fount) { - n.Report("err", fmt.Sprintf("[Delete TmpFile (%s.%s)] %v", roothash, d, err)) + if sutils.CompareSlice(fragment.Miner[:], n.GetSignatureAccPulickey()) { + err = n.Put([]byte(Cach_prefix_File+fid), nil) + if err != nil { + n.Report("err", fmt.Sprintf("[%s] Cach.Put: %v", fid, err)) } } } - continue } - - if _, err = os.Stat(filepath.Join(n.GetDirs().FileDir, roothash)); err != nil { - err = os.Mkdir(filepath.Join(n.GetDirs().FileDir, roothash), os.ModeDir) - if err != nil { - n.Report("err", fmt.Sprintf("[Mkdir.FileDir(%s)] %v", roothash, err)) - continue + } + } + var deletedFrgmentList []string + var savedFrgment []string + if queryFileMeta { + metadata, err = n.QueryFileMetadata(fid) + if err != nil { + n.Report("err", fmt.Sprintf("[%s] QueryFileMetadata: %v", fid, err)) + if err.Error() != pattern.ERR_Empty { + time.Sleep(pattern.BlockInterval) + return nil + } + } + } else { + for _, segment := range metadata.SegmentList { + for _, fragment := range segment.FragmentList { + if sutils.CompareSlice(fragment.Miner[:], n.GetSignatureAccPulickey()) { + n.Report("info", fmt.Sprintf("[%s] fragment should be save: %s", fid, string(fragment.Hash[:]))) + savedFrgment = append(savedFrgment, string(fragment.Hash[:])) + } else { + n.Report("info", fmt.Sprintf("[%s] fragment should be delete: %s", fid, string(fragment.Hash[:]))) + deletedFrgmentList = append(deletedFrgmentList, string(fragment.Hash[:])) } } - for i := 0; i < len(savedFrgment); i++ { - _, err = os.Stat(filepath.Join(n.GetDirs().TmpDir, roothash, savedFrgment[i])) + } + + if len(savedFrgment) == 0 { + for _, d := range deletedFrgmentList { + _, err = os.Stat(filepath.Join(n.GetDirs().TmpDir, fid, d)) if err != nil { - n.Report("err", fmt.Sprintf("[os.Stat(%s)] %v", roothash, err)) + n.Report("info", fmt.Sprintf("[%s] delete the fragment [%s] failed: %v", fid, d, err)) continue } - err = os.Rename(filepath.Join(n.GetDirs().TmpDir, roothash, savedFrgment[i]), - filepath.Join(n.GetDirs().FileDir, roothash, savedFrgment[i])) + err = os.Remove(filepath.Join(n.GetDirs().TmpDir, fid, d)) if err != nil { - n.Report("err", fmt.Sprintf("[Rename TmpDir to FileDir (%s.%s)] %v", roothash, savedFrgment[i], err)) + n.Report("err", fmt.Sprintf("[%s] delete the fragment [%s] failed: %v", fid, d, err)) continue } + n.Report("info", fmt.Sprintf("[%s] deleted the fragment: %s", fid, d)) } + return nil + } - err = n.Put([]byte(Cach_prefix_File+roothash), nil) + if _, err = os.Stat(filepath.Join(n.GetDirs().FileDir, fid)); err != nil { + err = os.Mkdir(filepath.Join(n.GetDirs().FileDir, fid), os.ModeDir) if err != nil { - n.Report("err", fmt.Sprintf("[Cach.Put(%s.%s)] %v", roothash, savedFrgment, err)) + n.Report("err", fmt.Sprintf("[%s] Mkdir: %v", fid, err)) + return nil } + } - for _, d := range deletedFrgmentList { - err = os.Remove(filepath.Join(n.GetDirs().TmpDir, roothash, d)) - if err != nil { - if !strings.Contains(err.Error(), configs.Err_file_not_fount) { - n.Report("err", fmt.Sprintf("[Delete TmpFile (%s.%s)] %v", roothash, d, err)) - } - } + for i := 0; i < len(savedFrgment); i++ { + _, err = os.Stat(filepath.Join(n.GetDirs().TmpDir, fid, savedFrgment[i])) + if err != nil { + n.Report("err", fmt.Sprintf("[%s] os.Stat(%s): %v", fid, savedFrgment[i], err)) + return nil } - - continue + err = os.Rename(filepath.Join(n.GetDirs().TmpDir, fid, savedFrgment[i]), + filepath.Join(n.GetDirs().FileDir, fid, savedFrgment[i])) + if err != nil { + n.Report("err", fmt.Sprintf("[%s] move [%s] to filedir: %v", fid, savedFrgment[i], err)) + return nil + } + n.Report("info", fmt.Sprintf("[%s] move [%s] to filedir", fid, savedFrgment[i])) } - storageorder, err = n.QueryStorageOrder(roothash) + err = n.Put([]byte(Cach_prefix_File+fid), nil) if err != nil { - if err.Error() != pattern.ERR_Empty { - n.Report("err", err.Error()) - } - continue + n.Report("err", fmt.Sprintf("[%s] Cach.Put: %v", fid, err)) } - reReport = true - for _, completeMiner := range storageorder.CompleteList { - if sutils.CompareSlice(completeMiner.Miner[:], n.GetSignatureAccPulickey()) { - reReport = false + for _, d := range deletedFrgmentList { + err = os.Remove(filepath.Join(n.GetDirs().TmpDir, fid, d)) + if err != nil { + n.Report("err", fmt.Sprintf("[%s] delete the fragment [%s] failed: %v", fid, d, err)) + continue } + n.Report("info", fmt.Sprintf("[%s] deleted the fragment: %s", fid, d)) } + return nil + } - if !reReport { - continue + storageorder, err = n.QueryStorageOrder(fid) + if err != nil { + n.Report("err", err.Error()) + time.Sleep(pattern.BlockInterval) + return nil + } + + reReport = true + for _, completeMiner := range storageorder.CompleteList { + if sutils.CompareSlice(completeMiner.Miner[:], n.GetSignatureAccPulickey()) { + reReport = false + break } - var sucCount uint8 - - var sucIndex = make([]uint8, 0) - for idx := uint8(0); idx < uint8(pattern.DataShards+pattern.ParShards); idx++ { - sucCount = 0 - for i := 0; i < len(storageorder.SegmentList); i++ { - for j := 0; j < len(storageorder.SegmentList[i].FragmentHash); j++ { - if j == int(idx) { - fstat, err := os.Stat( - filepath.Join( - n.GetDirs().TmpDir, roothash, - string(storageorder.SegmentList[i].FragmentHash[j][:]), - ), - ) - if err != nil { - break - } - if fstat.Size() != pattern.FragmentSize { - break - } - sucCount++ - break - } + } + + if !reReport { + n.Report("info", fmt.Sprintf("[%s] already report", fid)) + return nil + } + + var sucCount int + var sucIndex = make([]uint8, 0) + for idx := uint8(0); idx < uint8(pattern.DataShards+pattern.ParShards); idx++ { + sucCount = 0 + n.Report("info", fmt.Sprintf("[%s] check the %d batch fragments", fid, idx)) + for i := 0; i < len(storageorder.SegmentList); i++ { + fstat, err := os.Stat( + filepath.Join(n.GetDirs().TmpDir, fid, string(storageorder.SegmentList[i].FragmentHash[idx][:])), + ) + if err != nil { + break + } + if fstat.Size() != pattern.FragmentSize { + break + } + sucCount++ + n.Report("info", fmt.Sprintf("[%s] the %d segment's %d fragment saved", fid, i, idx)) + } + if sucCount == len(storageorder.SegmentList) { + for _, v := range storageorder.CompleteList { + if uint8(v.Index) == uint8(idx+1) { + sucCount = 0 + break } } if sucCount > 0 { - for _, v := range storageorder.CompleteList { - if uint8(v.Index) == uint8(idx+1) { - sucCount = 0 - break - } - } - if sucCount > 0 { - sucIndex = append(sucIndex, idx+1) - } + sucIndex = append(sucIndex, (idx + 1)) } } + } - if len(sucIndex) == 0 { - continue - } + n.Report("info", fmt.Sprintf("[%s] successfully stored index: %v", fid, sucIndex)) - n.Report("info", fmt.Sprintf("Will report %s", roothash)) - for _, v := range sucIndex { - txhash, err = n.ReportFile(v, roothash) - if err != nil { - n.Report("err", fmt.Sprintf("[%s] File reporting failed: [%s] %v", roothash, txhash, err)) - continue - } - n.Report("info", fmt.Sprintf("[%s] File reported successfully: %s", roothash, txhash)) - break + if len(sucIndex) == 0 { + return nil + } + + for _, v := range sucIndex { + n.Report("info", fmt.Sprintf("[%s] will report index: %d", fid, v)) + txhash, err = n.ReportFile(v, fid) + if err != nil { + n.Report("err", fmt.Sprintf("[%s] report failed: [%s] %v", fid, txhash, err)) + continue } + n.Report("info", fmt.Sprintf("[%s] reported successfully: %s", fid, txhash)) + return nil } + return nil }