From f2b1f1bf819196b659bab66fbdac6d2b354f147b Mon Sep 17 00:00:00 2001 From: shollyman Date: Mon, 12 Dec 2022 21:38:43 -0800 Subject: [PATCH] chore(bigquery/storage/managedwriter): internal refactor (flow controller, ids) (#7104) This PR adds a new internal mechanism to simplify duplicating flow controllers, and does some preliminary work to wire in a UUID-based ID for managed stream instances. Neither is used elsewhere at this time, but lay the foundation for further refactoring. Towards: https://togithub.com/googleapis/google-cloud-go/issues/7103 --- bigquery/storage/managedwriter/client.go | 10 +++++ .../storage/managedwriter/flow_controller.go | 11 ++++++ .../managedwriter/flow_controller_test.go | 37 +++++++++++++++++++ .../storage/managedwriter/integration_test.go | 3 ++ .../storage/managedwriter/managed_stream.go | 3 ++ 5 files changed, 64 insertions(+) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 514dd65ec3de..e26c4e93688e 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -24,6 +24,7 @@ import ( storage "cloud.google.com/go/bigquery/storage/apiv1" "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "cloud.google.com/go/internal/detect" + "github.com/google/uuid" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" "google.golang.org/grpc" @@ -38,6 +39,8 @@ import ( // does not have the project ID encoded. const DetectProjectID = "*detect-project-id*" +const managedstreamIDPrefix = "managedstream" + // Client is a managed BigQuery Storage write client scoped to a single project. type Client struct { rawClient *storage.BigQueryWriteClient @@ -106,6 +109,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient ctx, cancel := context.WithCancel(ctx) ms := &ManagedStream{ + id: newUUID(managedstreamIDPrefix), streamSettings: defaultStreamSettings(), c: c, ctx: ctx, @@ -232,3 +236,9 @@ func TableParentFromStreamName(streamName string) string { func TableParentFromParts(projectID, datasetID, tableID string) string { return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID) } + +// newUUID simplifies generating UUIDs for internal resources. +func newUUID(prefix string) string { + id := uuid.New() + return fmt.Sprintf("%s_%s", prefix, id.String()) +} diff --git a/bigquery/storage/managedwriter/flow_controller.go b/bigquery/storage/managedwriter/flow_controller.go index 3177ecfa38f3..5a212ac2fa29 100644 --- a/bigquery/storage/managedwriter/flow_controller.go +++ b/bigquery/storage/managedwriter/flow_controller.go @@ -50,6 +50,17 @@ func newFlowController(maxInserts, maxInsertBytes int) *flowController { return fc } +// copyFlowController is for creating a new flow controller based on +// settings from another. It does not copy flow state. +func copyFlowController(in *flowController) *flowController { + var maxInserts, maxBytes int + if in != nil { + maxInserts = in.maxInsertCount + maxBytes = in.maxInsertBytes + } + return newFlowController(maxInserts, maxBytes) +} + // acquire blocks until one insert of size bytes can proceed or ctx is done. // It returns nil in the first case, or ctx.Err() in the second. // diff --git a/bigquery/storage/managedwriter/flow_controller_test.go b/bigquery/storage/managedwriter/flow_controller_test.go index ff880639af03..f633a95157d3 100644 --- a/bigquery/storage/managedwriter/flow_controller_test.go +++ b/bigquery/storage/managedwriter/flow_controller_test.go @@ -257,3 +257,40 @@ func TestFlowControllerUnboundedBytes(t *testing.T) { t.Error("got true, wanted false") } } + +func TestCopyFlowController(t *testing.T) { + testcases := []struct { + description string + in *flowController + wantMaxRequests int + wantMaxBytes int + }{ + { + description: "nil source", + wantMaxRequests: 0, + wantMaxBytes: 0, + }, + { + description: "no limit", + in: newFlowController(0, 0), + wantMaxRequests: 0, + wantMaxBytes: 0, + }, + { + description: "bounded", + in: newFlowController(10, 1024), + wantMaxRequests: 10, + wantMaxBytes: 1024, + }, + } + + for _, tc := range testcases { + fc := copyFlowController(tc.in) + if fc.maxInsertBytes != tc.wantMaxBytes { + t.Errorf("%s: max bytes mismatch, got %d want %d ", tc.description, fc.maxInsertBytes, tc.wantMaxBytes) + } + if fc.maxInsertCount != tc.wantMaxRequests { + t.Errorf("%s: max requests mismatch, got %d want %d ", tc.description, fc.maxInsertBytes, tc.wantMaxBytes) + } + } +} diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index fe811edba18d..871f35120e9b 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -187,6 +187,9 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl if err != nil { t.Fatalf("NewManagedStream: %v", err) } + if ms.id == "" { + t.Errorf("managed stream is missing ID") + } validateTableConstraints(ctx, t, bqClient, testTable, "before send", withExactRowCount(0)) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 2819d5c57cac..f1a4656ff2e3 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -72,6 +72,9 @@ func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type { // ManagedStream is the abstraction over a single write stream. type ManagedStream struct { + // Unique id for the managedstream instance. + id string + streamSettings *streamSettings schemaDescriptor *descriptorpb.DescriptorProto destinationTable string