From 435ecc63ba9e722170003aae866c2866a9ca3ace Mon Sep 17 00:00:00 2001 From: akshya96 <87045294+akshya96@users.noreply.github.com> Date: Thu, 17 Oct 2024 11:13:41 -0700 Subject: [PATCH] Add tokens to local path (#28722) --- vault/activity_log.go | 30 +++++++++++--------- vault/activity_log_test.go | 39 +++++++++++++++++--------- vault/activity_log_util_common.go | 2 +- vault/activity_log_util_common_test.go | 8 +++--- 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/vault/activity_log.go b/vault/activity_log.go index 71df6654a16a..62678dbdd838 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -36,20 +36,22 @@ import ( const ( // activitySubPath is the directory under the system view where // the log will be stored. - activitySubPath = "counters/activity/" - activityEntityBasePath = "log/entity/" - activityTokenBasePath = "log/directtokens/" - activityQueryBasePath = "queries/" - activityConfigKey = "config" - activityIntentLogKey = "endofmonth" + activitySubPath = "counters/activity/" + activityEntityBasePath = "log/entity/" + activityTokenBasePath = "log/directtokens/" + activityTokenLocalBasePath = "local/" + activityTokenBasePath + activityQueryBasePath = "queries/" + activityConfigKey = "config" + activityIntentLogKey = "endofmonth" activityACMERegenerationKey = "acme-regeneration" // sketch for each month that stores hash of client ids distinctClientsBasePath = "log/distinctclients/" // for testing purposes (public as needed) - ActivityLogPrefix = "sys/counters/activity/log/" - ActivityPrefix = "sys/counters/activity/" + ActivityLogPrefix = "sys/counters/activity/log/" + ActivityLogLocalPrefix = "sys/counters/activity/local/log/" + ActivityPrefix = "sys/counters/activity/" // Time to wait on perf standby before sending fragment activityFragmentStandbyTime = 10 * time.Minute @@ -504,7 +506,7 @@ func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegm return "", nil } // RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed - tokenPath := fmt.Sprintf("%s%d/0", activityTokenBasePath, currentSegment.startTimestamp) + tokenPath := fmt.Sprintf("%s%d/0", activityTokenLocalBasePath, currentSegment.startTimestamp) // We must still allow for the tokenCount of the current segment to // be written to storage, since if we remove this code we will incur // data loss for one segment's worth of TWEs. @@ -586,7 +588,7 @@ func parseSegmentNumberFromPath(path string) (int, bool) { // sorted last to first func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) { paths := make([]string, 0) - for _, basePath := range []string{activityEntityBasePath, activityTokenBasePath} { + for _, basePath := range []string{activityEntityBasePath, activityTokenLocalBasePath} { p, err := a.view.List(ctx, basePath) if err != nil { return nil, err @@ -694,7 +696,7 @@ func (a *ActivityLog) WalkTokenSegments(ctx context.Context, startTime time.Time, walkFn func(*activity.TokenCount), ) error { - basePath := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/" + basePath := activityTokenLocalBasePath + fmt.Sprint(startTime.Unix()) + "/" pathList, err := a.view.List(ctx, basePath) if err != nil { return err @@ -795,7 +797,7 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti // tokenCountExists checks if there's a token log for :startTime: // this function should be called with the lock held func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time) (bool, error) { - p, err := a.view.List(ctx, activityTokenBasePath+fmt.Sprint(startTime.Unix())+"/") + p, err := a.view.List(ctx, activityTokenLocalBasePath+fmt.Sprint(startTime.Unix())+"/") if err != nil { return false, err } @@ -820,7 +822,7 @@ func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) e return nil } - path := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/0" + path := activityTokenLocalBasePath + fmt.Sprint(startTime.Unix()) + "/0" data, err := a.view.Get(ctx, path) if err != nil { return err @@ -916,7 +918,7 @@ func (a *ActivityLog) resetCurrentLog() { func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) { entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp) - tokenPath := fmt.Sprintf("%v%v/", activityTokenBasePath, startTimestamp) + tokenPath := fmt.Sprintf("%v%v/", activityTokenLocalBasePath, startTimestamp) entitySegments, err := a.view.List(ctx, entityPath) if err != nil { diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 81a691dadbed..d5e221f0197a 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -297,7 +297,7 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) { a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment nsIDs := [...]string{"ns1_id", "ns2_id", "ns3_id"} - path := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogPrefix, a.GetStartTimestamp()) + path := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogLocalPrefix, a.GetStartTimestamp()) for i := 0; i < 3; i++ { a.AddTokenToFragment(nsIDs[0]) @@ -380,7 +380,7 @@ func TestActivityLog_SaveTokensToStorageDoesNotUpdateTokenCount(t *testing.T) { a.SetStandbyEnable(ctx, true) a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment - tokenPath := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogPrefix, a.GetStartTimestamp()) + tokenPath := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogLocalPrefix, a.GetStartTimestamp()) clientPath := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", a.GetStartTimestamp()) // Create some entries without entityIDs tokenEntryOne := logical.TokenEntry{NamespaceID: namespace.RootNamespaceID, Policies: []string{"hi"}} @@ -637,13 +637,18 @@ func TestActivityLog_availableLogs(t *testing.T) { // set up a few files in storage core, _, _ := TestCoreUnsealed(t) a := core.activityLog - paths := [...]string{"entity/1111/1", "directtokens/1111/1", "directtokens/1000000/1", "entity/992/3", "directtokens/992/1"} + paths := [...]string{"entity/1111/1", "entity/992/3"} + tokenPaths := [...]string{"directtokens/1111/1", "directtokens/1000000/1", "directtokens/992/1"} expectedTimes := [...]time.Time{time.Unix(1000000, 0), time.Unix(1111, 0), time.Unix(992, 0)} for _, path := range paths { WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test")) } + for _, path := range tokenPaths { + WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test")) + } + // verify above files are there, and dates in correct order times, err := a.availableLogs(context.Background(), time.Now()) if err != nil { @@ -778,7 +783,7 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp) path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp) path2 := fmt.Sprintf("sys/counters/activity/log/entity/%d/2", startTimestamp) - tokenPath := fmt.Sprintf("sys/counters/activity/log/directtokens/%d/0", startTimestamp) + tokenPath := fmt.Sprintf("sys/counters/activity/local/log/directtokens/%d/0", startTimestamp) genID := func(i int) string { return fmt.Sprintf("11111111-1111-1111-1111-%012d", i) @@ -1140,7 +1145,7 @@ func TestActivityLog_tokenCountExists(t *testing.T) { a := core.activityLog paths := [...]string{"directtokens/992/0", "directtokens/1001/foo", "directtokens/1111/0", "directtokens/2222/1"} for _, path := range paths { - WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test")) + WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test")) } testCases := []struct { @@ -1507,7 +1512,7 @@ func TestActivityLog_loadTokenCount(t *testing.T) { ctx := context.Background() for _, tc := range testCases { - WriteToStorage(t, core, ActivityLogPrefix+tc.path, data) + WriteToStorage(t, core, ActivityLogLocalPrefix+tc.path, data) } for _, tc := range testCases { @@ -1651,7 +1656,7 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities t.Fatalf(err.Error()) } - WriteToStorage(t, core, ActivityLogPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData) + WriteToStorage(t, core, ActivityLogLocalPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData) } return a, entityRecords, tokenRecords @@ -1978,13 +1983,19 @@ func TestActivityLog_DeleteWorker(t *testing.T) { "entity/1111/2", "entity/1111/3", "entity/1112/1", - "directtokens/1111/1", - "directtokens/1112/1", } for _, path := range paths { WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test")) } + tokenPaths := []string{ + "directtokens/1111/1", + "directtokens/1112/1", + } + for _, path := range tokenPaths { + WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test")) + } + doneCh := make(chan struct{}) timeout := time.After(20 * time.Second) @@ -1998,13 +2009,13 @@ func TestActivityLog_DeleteWorker(t *testing.T) { // Check segments still present readSegmentFromStorage(t, core, ActivityLogPrefix+"entity/1112/1") - readSegmentFromStorage(t, core, ActivityLogPrefix+"directtokens/1112/1") + readSegmentFromStorage(t, core, ActivityLogLocalPrefix+"directtokens/1112/1") // Check other segments not present expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/1") expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/2") expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/3") - expectMissingSegment(t, core, ActivityLogPrefix+"directtokens/1111/1") + expectMissingSegment(t, core, ActivityLogLocalPrefix+"directtokens/1111/1") } // checkAPIWarnings ensures there is a warning if switching from enabled -> disabled, @@ -2123,7 +2134,7 @@ func TestActivityLog_EnableDisable(t *testing.T) { path = fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, seg2) readSegmentFromStorage(t, core, path) - path = fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogPrefix, seg2) + path = fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogLocalPrefix, seg2) } readSegmentFromStorage(t, core, path) } @@ -2371,7 +2382,7 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) { if err != nil { t.Fatal(err) } - tokenPath := fmt.Sprintf("%vdirecttokens/%v/%v", ActivityLogPrefix, segment.StartTime, segment.Segment) + tokenPath := fmt.Sprintf("%vdirecttokens/%v/%v", ActivityLogLocalPrefix, segment.StartTime, segment.Segment) WriteToStorage(t, core, tokenPath, data) } @@ -3694,7 +3705,7 @@ func TestActivityLog_Deletion(t *testing.T) { paths[i] = append(paths[i], entityPath) WriteToStorage(t, core, entityPath, []byte("test")) } - tokenPath := fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogPrefix, start.Unix()) + tokenPath := fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogLocalPrefix, start.Unix()) paths[i] = append(paths[i], tokenPath) WriteToStorage(t, core, tokenPath, []byte("test")) diff --git a/vault/activity_log_util_common.go b/vault/activity_log_util_common.go index 3e71ee8cc0ac..4be1bfdcb4c8 100644 --- a/vault/activity_log_util_common.go +++ b/vault/activity_log_util_common.go @@ -439,7 +439,7 @@ func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.T if err != nil { return nil, err } - tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenBasePath) + tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenLocalBasePath) if err != nil { return nil, err } diff --git a/vault/activity_log_util_common_test.go b/vault/activity_log_util_common_test.go index 48a3e8dea43b..76001f35dc6d 100644 --- a/vault/activity_log_util_common_test.go +++ b/vault/activity_log_util_common_test.go @@ -1003,7 +1003,7 @@ func writeTokenSegment(t *testing.T, core *Core, ts time.Time, index int, item * t.Helper() protoItem, err := proto.Marshal(item) require.NoError(t, err) - WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, ts, index), protoItem) + WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, ts, index), protoItem) } // makeSegmentPath formats the path for a segment at a particular time and index @@ -1020,7 +1020,7 @@ func TestSegmentFileReader_BadData(t *testing.T) { now := time.Now() // write bad data that won't be able to be unmarshaled at index 0 - WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, now, 0), []byte("fake data")) + WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, 0), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, 0), []byte("fake data")) // write entity at index 1 @@ -1063,7 +1063,7 @@ func TestSegmentFileReader_MissingData(t *testing.T) { now := time.Now() // write entities and tokens at indexes 0, 1, 2 for i := 0; i < 3; i++ { - WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, now, i), []byte("fake data")) + WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, i), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, i), []byte("fake data")) } @@ -1084,7 +1084,7 @@ func TestSegmentFileReader_MissingData(t *testing.T) { // delete the indexes 0, 1, 2 for i := 0; i < 3; i++ { - require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenBasePath, now, i))) + require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenLocalBasePath, now, i))) require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityEntityBasePath, now, i))) }