Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Firestore and held-back dependencies #21190

Merged
merged 10 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,6 @@ updates:
day: "sunday"
time: "09:00" # 9am UTC
ignore:
# Deprecated APIs, requires manual changes.
# TODO(xacrimon): Update Firestore and solve deprecations.
- dependency-name: cloud.google.com/go/firestore
# Depends on newer Firestore version.
- dependency-name: cloud.google.com/go/iam
- dependency-name: cloud.google.com/go/kms
- dependency-name: github.com/fsouza/fake-gcs-server
- dependency-name: google.golang.org/api
- dependency-name: google.golang.org/genproto
- dependency-name: cloud.google.com/go/storage
- dependency-name: cloud.google.com/go/container
# Breaks backwards compatibility
- dependency-name: github.com/gravitational/ttlmap
# Breaks backwards compatibility
Expand Down
28 changes: 11 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ go 1.19

require (
cloud.google.com/go/container v1.10.0
cloud.google.com/go/firestore v1.9.0
cloud.google.com/go/iam v0.8.0
cloud.google.com/go/kms v1.6.0
cloud.google.com/go/storage v1.28.1
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.1
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v3 v3.0.1
Expand Down Expand Up @@ -42,6 +46,7 @@ require (
github.com/elastic/go-elasticsearch/v8 v8.6.0
github.com/flynn/hid v0.0.0-20190502022136-f1b9b6cc019a
github.com/flynn/u2f v0.0.0-20180613185708-15554eb68e5d
github.com/fsouza/fake-gcs-server v1.42.2
github.com/fxamacker/cbor/v2 v2.4.0
github.com/ghodss/yaml v1.0.0
github.com/gizak/termui/v3 v3.1.0
Expand Down Expand Up @@ -130,6 +135,8 @@ require (
golang.org/x/sys v0.4.0
golang.org/x/term v0.4.0
golang.org/x/text v0.6.0
google.golang.org/api v0.109.0
google.golang.org/genproto v0.0.0-20230202175211-008b39050e57
google.golang.org/grpc v1.52.3
google.golang.org/grpc/examples v0.0.0-20221010194801-c67245195065
google.golang.org/protobuf v1.28.1
Expand All @@ -153,19 +160,6 @@ require (
sigs.k8s.io/yaml v1.3.0
)

// DO NOT UPDATE any of the following dependencies until the
// deprecated Firestore Batch API is replaced. Updating causes
// the linter to fail due to deprecation warnings.
require (
cloud.google.com/go/firestore v1.6.1
cloud.google.com/go/iam v0.8.0
cloud.google.com/go/kms v1.6.0
cloud.google.com/go/storage v1.28.0
github.com/fsouza/fake-gcs-server v1.42.2
google.golang.org/api v0.103.0
google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update api/go.mod too.

)

// Indirect mailgun dependencies.
// Updating causes breaking changes.
require (
Expand All @@ -174,10 +168,10 @@ require (
)

require (
cloud.google.com/go v0.107.0 // indirect
cloud.google.com/go/compute v1.13.0 // indirect
cloud.google.com/go/compute/metadata v0.2.1 // indirect
cloud.google.com/go/longrunning v0.3.0 // indirect
cloud.google.com/go v0.109.0 // indirect
cloud.google.com/go/compute v1.18.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/longrunning v0.4.0 // indirect
cloud.google.com/go/pubsub v1.27.1 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
Expand Down
156 changes: 16 additions & 140 deletions go.sum

Large diffs are not rendered by default.

34 changes: 24 additions & 10 deletions lib/backend/firestore/firestorebk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (

"cloud.google.com/go/firestore"
apiv1 "cloud.google.com/go/firestore/apiv1/admin"
"cloud.google.com/go/firestore/apiv1/admin/adminpb"
"github.com/gravitational/trace"
"github.com/gravitational/trace/trail"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"google.golang.org/api/option"
adminpb "google.golang.org/genproto/googleapis/firestore/admin/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -200,8 +200,6 @@ const (
idDocProperty = "id"
// timeInBetweenIndexCreationStatusChecks
timeInBetweenIndexCreationStatusChecks = time.Second * 10
// commitLimit is the maximum number of writes per commit
commitLimit = 500
)

// GetName is a part of backend API and it returns Firestore backend type
Expand Down Expand Up @@ -709,19 +707,35 @@ func (b *Backend) purgeExpiredDocuments() error {
// deleteDocuments removes documents from firestore in batches to stay within the
// firestore write limits
func (b *Backend) deleteDocuments(docs []*firestore.DocumentSnapshot) error {
for i := 0; i < len(docs); i += commitLimit {
batch := b.svc.Batch()

for j := 0; j < commitLimit && i+j < len(docs); j++ {
batch.Delete(docs[i+j].Ref)
seen := make(map[string]struct{}, len(docs))
batch := b.svc.BulkWriter(b.clientContext)
jobs := make([]*firestore.BulkWriterJob, 0, len(docs))

for _, doc := range docs {
// Deduplicate documents. The Firestore SDK will error if duplicates are found,
// but existing callers of this function assume this is valid.
if _, ok := seen[doc.Ref.Path]; ok {
continue
}
seen[doc.Ref.Path] = struct{}{}

if _, err := batch.Commit(b.clientContext); err != nil {
job, err := batch.Delete(doc.Ref)
if err != nil {
return ConvertGRPCError(err)
tigrato marked this conversation as resolved.
Show resolved Hide resolved
}

jobs = append(jobs, job)
}

batch.End()
var errs []error
xacrimon marked this conversation as resolved.
Show resolved Hide resolved
for _, job := range jobs {
if _, err := job.Results(); err != nil {
errs = append(errs, ConvertGRPCError(err))
}
}

return nil
return trace.NewAggregate(errs...)
}

// ConvertGRPCError converts GRPC errors
Expand Down
56 changes: 30 additions & 26 deletions lib/backend/firestore/firestorebk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,25 @@ import (
"fmt"
"net"
"os"
"reflect"
"strings"
"testing"
"time"
"unsafe"

"cloud.google.com/go/firestore"
"cloud.google.com/go/firestore/apiv1/admin/adminpb"
"cloud.google.com/go/firestore/apiv1/firestorepb"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"google.golang.org/api/option"
adminpb "google.golang.org/genproto/googleapis/firestore/admin/v1"
firestorepb "google.golang.org/genproto/googleapis/firestore/v1"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -194,25 +196,29 @@ type mockFirestoreServer struct {
commitErr error
}

func (s *mockFirestoreServer) Commit(ctx context.Context, req *firestorepb.CommitRequest) (*firestorepb.CommitResponse, error) {
func (s *mockFirestoreServer) BatchWrite(ctx context.Context, req *firestorepb.BatchWriteRequest) (*firestorepb.BatchWriteResponse, error) {
md, _ := metadata.FromIncomingContext(ctx)
if xg := md["x-goog-api-client"]; len(xg) == 0 || !strings.Contains(xg[0], "gl-go/") {
return nil, fmt.Errorf("x-goog-api-client = %v, expected gl-go key", xg)
}

if len(req.Writes) > commitLimit {
return nil, status.Errorf(codes.InvalidArgument, "too many writes in a transaction")
}

s.reqs = append(s.reqs, req)
if s.commitErr != nil {
return nil, s.commitErr
}
return &firestorepb.CommitResponse{
WriteResults: []*firestorepb.WriteResult{{

resp := &firestorepb.BatchWriteResponse{}
for i := 0; i < len(req.Writes); i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: If you don't need the variable:

Suggested change
for i := 0; i < len(req.Writes); i++ {
for _ := range req.Writes {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for range req.Writes works too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better ☝️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hold up, when did we get this syntax?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since forever, I think.

resp.Status = append(resp.Status, &status.Status{
Code: int32(code.Code_OK),
})

resp.WriteResults = append(resp.WriteResults, &firestorepb.WriteResult{
UpdateTime: timestamppb.Now(),
}},
}, nil
})
}

return resp, nil
}

func TestDeleteDocuments(t *testing.T) {
Expand All @@ -231,19 +237,9 @@ func TestDeleteDocuments(t *testing.T) {
documents: 1,
},
{
name: "commit less than limit",
name: "commit success",
assertion: require.NoError,
documents: commitLimit - 123,
},
{
name: "commit limit",
assertion: require.NoError,
documents: commitLimit,
},
{
name: "commit more than limit",
assertion: require.NoError,
documents: (commitLimit * 3) + 173,
documents: 1796,
},
}

Expand All @@ -261,6 +257,14 @@ func TestDeleteDocuments(t *testing.T) {
CreateTime: time.Now(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use for-range above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable i is actually used in the loop at L252. See L255, always been that way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is still viable, although certainly not worth a PR.

UpdateTime: time.Now(),
})

// We really shouldn't need this, but the Firestore SDK made some unfortunate design
// decisions that make it impossible to set the field of a DocumentRef used for the seemingly
// useless deduplication in the BulkWriter API.
rs := reflect.ValueOf(docs[i].Ref).Elem()
rf := rs.FieldByName("shortPath")
rf = reflect.NewAt(rf.Type(), unsafe.Pointer(rf.UnsafeAddr())).Elem()
rf.SetString(docs[i].Ref.Path)
Comment on lines +261 to +267
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the consequence of not setting this? Could we get around the reflect block in some other way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already merged due to @rosstimothy adding it to the queue. I was confused by errors for some time in tests for quite some time that didn't crop up in real world testing. As it turns out, the firestore SDK BulkWriter API we now use assembles batches of changes internally and submits them. Each batch is deduplicated so you can't get two deletes or an edit after a delete in the same batch, so they always succeed. The consequence of this is that it's illegal to delete a document twice with the same BulkWriter.

This would be fine, the test generates an unique ref.Path right? Well, as it turns out, the SDK does deduplication on an internal hidden variable which represents the path in a smaller encoding. Why? I have no idea. But this is stored in a hidden field that we cannot set easily. If you remove this, you get deduplication errors in the tests, even if the ref.Path is unique.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find an alternative way around this easily.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, thanks for the explanation.

}

mockFirestore := &mockFirestoreServer{
Expand Down Expand Up @@ -310,7 +314,7 @@ func TestDeleteDocuments(t *testing.T) {
var committed int
for _, req := range mockFirestore.reqs {
switch r := req.(type) {
case *firestorepb.CommitRequest:
case *firestorepb.BatchWriteRequest:
committed += len(r.Writes)
}
}
Expand Down
64 changes: 38 additions & 26 deletions lib/events/firestoreevents/firestoreevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (

"cloud.google.com/go/firestore"
apiv1 "cloud.google.com/go/firestore/apiv1/admin"
"cloud.google.com/go/firestore/apiv1/admin/adminpb"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/genproto/googleapis/firestore/admin/v1"

"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
Expand Down Expand Up @@ -537,27 +537,27 @@ func (l *Log) getIndexParent() string {
func (l *Log) ensureIndexes(adminSvc *apiv1.FirestoreAdminClient) error {
tuples := firestorebk.IndexList{}
tuples.Index(
firestorebk.Field(eventNamespaceDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(firestore.DocumentID, admin.Index_IndexField_ASCENDING),
firestorebk.Field(eventNamespaceDocProperty, adminpb.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, adminpb.Index_IndexField_ASCENDING),
firestorebk.Field(firestore.DocumentID, adminpb.Index_IndexField_ASCENDING),
)
tuples.Index(
firestorebk.Field(eventNamespaceDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, admin.Index_IndexField_DESCENDING),
firestorebk.Field(firestore.DocumentID, admin.Index_IndexField_ASCENDING),
firestorebk.Field(eventNamespaceDocProperty, adminpb.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, adminpb.Index_IndexField_DESCENDING),
firestorebk.Field(firestore.DocumentID, adminpb.Index_IndexField_ASCENDING),
)
tuples.Index(
firestorebk.Field(eventNamespaceDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(eventTypeDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, admin.Index_IndexField_DESCENDING),
firestorebk.Field(firestore.DocumentID, admin.Index_IndexField_ASCENDING),
firestorebk.Field(eventNamespaceDocProperty, adminpb.Index_IndexField_ASCENDING),
firestorebk.Field(eventTypeDocProperty, adminpb.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, adminpb.Index_IndexField_DESCENDING),
firestorebk.Field(firestore.DocumentID, adminpb.Index_IndexField_ASCENDING),
)
tuples.Index(
firestorebk.Field(eventNamespaceDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(eventTypeDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(sessionIDDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(firestore.DocumentID, admin.Index_IndexField_ASCENDING),
firestorebk.Field(eventNamespaceDocProperty, adminpb.Index_IndexField_ASCENDING),
firestorebk.Field(eventTypeDocProperty, adminpb.Index_IndexField_ASCENDING),
firestorebk.Field(sessionIDDocProperty, adminpb.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, adminpb.Index_IndexField_ASCENDING),
firestorebk.Field(firestore.DocumentID, adminpb.Index_IndexField_ASCENDING),
)
err := firestorebk.EnsureIndexes(l.svcContext, adminSvc, tuples, l.getIndexParent())
return trace.Wrap(err)
Expand Down Expand Up @@ -589,21 +589,33 @@ func (l *Log) purgeExpiredEvents() error {
if err != nil {
return firestorebk.ConvertGRPCError(err)
}
numDeleted := 0
batch := l.svc.Batch()
batch := l.svc.BulkWriter(l.svcContext)
jobs := make([]*firestore.BulkWriterJob, 0, len(docSnaps))
for _, docSnap := range docSnaps {
batch.Delete(docSnap.Ref)
numDeleted++
}
if numDeleted > 0 {
start = time.Now()
_, err := batch.Commit(l.svcContext)
batchWriteLatencies.Observe(time.Since(start).Seconds())
batchWriteRequests.Inc()
job, err := batch.Delete(docSnap.Ref)
if err != nil {
return firestorebk.ConvertGRPCError(err)
}

jobs = append(jobs, job)
}

if len(jobs) == 0 {
continue
}

start = time.Now()
var errs []error
batch.End()
for _, job := range jobs {
if _, err := job.Results(); err != nil {
errs = append(errs, firestorebk.ConvertGRPCError(err))
}
}

batchWriteLatencies.Observe(time.Since(start).Seconds())
batchWriteRequests.Inc()
return trace.NewAggregate(errs...)
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions lib/events/firestoreevents/firestoreevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"cloud.google.com/go/firestore"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -86,12 +87,19 @@ func (tt *firestoreContext) setupTest(t *testing.T) {
if len(docSnaps) == 0 {
return
}
batch := tt.log.svc.Batch()
batch := tt.log.svc.BulkWriter(tt.log.svcContext)
jobs := make([]*firestore.BulkWriterJob, 0, len(docSnaps))
for _, docSnap := range docSnaps {
batch.Delete(docSnap.Ref)
job, err := batch.Delete(docSnap.Ref)
require.NoError(t, err)
jobs = append(jobs, job)
}

batch.End()
for _, job := range jobs {
_, err := job.Results()
require.NoError(t, err)
}
_, err = batch.Commit(ctx)
require.NoError(t, err)
}

func (tt *firestoreContext) Close(t *testing.T) {
Expand Down