-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(storage): add benchmarking script (#5856)
- Loading branch information
Showing
9 changed files
with
2,066 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# go-bench-gcs | ||
**This is not an officially supported Google product** | ||
|
||
## Run example: | ||
This runs 1000 iterations on 512kib to 2Gib files in the background, sending output to `out.log`: | ||
|
||
`go run main -p {PROJECT_ID} -t 72h -max_samples 1000 -o {RESULTS_FILE_NAME}.csv &> out.log &` | ||
|
||
|
||
## CLI parameters | ||
|
||
| Parameter | Description | Possible values | Default | | ||
| --------- | ----------- | --------------- |:-------:| | ||
| -p | projectID | a project ID | * | | ||
| -creds | path to credentials file | any path | from environment | | ||
| -o | file to output results to <br> if empty, will output to stdout | any file path | stdout | | ||
| -output_type | output results as csv records or cloud monitoring | `csv`, `cloud-monitoring` | `cloud-monitoring` | | ||
| -api | which API to use | `JSON`: use JSON to upload and XML to download <br> `XML`: use JSON to upload and XML to download <br> `GRPC`: use GRPC <br> `MIXED`: select an API at random for each upload/download <br> `DirectPath`: use GRPC with direct path | `MIXED` | | ||
| -r | bucket region for benchmarks | any GCS region | `US-WEST1` | | ||
| -workers | number of goroutines to run at once; set to 1 for no concurrency | any positive integer | `16` | | ||
| -t | timeout (maximum time running benchmarks) <br> the program may run for longer while it finishes running processes | any [time.Duration](https://pkg.go.dev/time#Duration) | `1h` | | ||
| -min_samples | minimum number of objects to upload | any positive integer | `10` | | ||
| -max_samples | maximum number of objects to upload | any positive integer | `10 000` | | ||
| -gc_f | whether to force garbage collection <br> before every write or read benchmark | `true` or `false` (present/not present) | `false` | | ||
| -min_size | minimum object size in bytes | any positive integer | `512` | | ||
| -max_size | maximum object size in bytes | any positive integer | `2 097 152` (2 GiB) | | ||
| -defaults | use default settings for the client <br> (conn_pool, read, write and chunk size parameters will be ignored) | `true` or `false` | `false` | ||
| -conn_pool | GRPC connection pool size | any positive integer | 4 | | ||
| -min_cs | minimum ChunkSize in bytes | any positive integer | `16 384` (16 MiB) | | ||
| -max_cs | maximum ChunkSize in bytes | any positive integer | `16 384` (16 MiB) | | ||
| -q_read | download quantum | any positive integer | 1 | | ||
| -q_write | upload quantum | any positive integer | 1 | | ||
| -min_r_size | minimum read size in bytes | any positive integer | `4000` | | ||
| -max_r_size | maximum read size in bytes | any positive integer | `4000` | | ||
| -min_w_size | minimum write size in bytes | any positive integer | `4000` | | ||
| -max_w_size | maximum write size in bytes | any positive integer | `4000` | | ||
| -labels | labels added to cloud monitoring output (ignored when outputting as csv) | any string; should be in the format: <br> `stringKey=\"value\",intKey=3,boolKey=true` | empty | | ||
|
||
\* required values | ||
|
||
Note: while the default read/write size for HTTP clients is 4Kb | ||
(the default for this benchmarking), the default for GRPC is 32Kb. | ||
If you want to capture performance using the defaults for GRPC run the script | ||
separately setting the read and write sizes to 32Kb, or run with the `defaults` | ||
parameter set. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
// Copyright 2022 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"net" | ||
"net/http" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"cloud.google.com/go/storage" | ||
"golang.org/x/net/http2" | ||
"google.golang.org/api/option" | ||
htransport "google.golang.org/api/transport/http" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
// clientPool functions much like a sync Pool (https://pkg.go.dev/sync#Pool), | ||
// except it does not automatically remove items stored in the clientPool. | ||
// Re-using the clients rather than creating a new one each time reduces overhead | ||
// (such as re-creating the underlying HTTP client and opening credential files), | ||
// and is the intended way to use Storage clients. | ||
// | ||
// There is no limit to how many clients will be created, but it should be around | ||
// the order of 5 * min(workers, max_samples). | ||
type clientPool struct { | ||
New func() *storage.Client | ||
clients []*storage.Client | ||
} | ||
|
||
func (p *clientPool) Get() *storage.Client { | ||
// Create the slice if not already created | ||
if p.clients == nil { | ||
p.clients = make([]*storage.Client, 0) | ||
} | ||
|
||
// If there is an unused client, return it | ||
if len(p.clients) > 0 { | ||
c := p.clients[0] | ||
p.clients = p.clients[1:] | ||
return c | ||
} | ||
|
||
// Otherwise, create a new client and return it | ||
return p.New() | ||
} | ||
|
||
func (p *clientPool) Put(c *storage.Client) { | ||
p.clients = append(p.clients, c) | ||
} | ||
|
||
// we can share clients as long as the app buffer sizes are constant | ||
var httpClients, gRPCClients *clientPool | ||
|
||
var nonBenchmarkingClients = clientPool{ | ||
New: func() *storage.Client { | ||
// For debuggability's sake, these are HTTP | ||
clientMu.Lock() | ||
client, err := storage.NewClient(context.Background()) | ||
clientMu.Unlock() | ||
if err != nil { | ||
log.Fatalf("storage.NewClient: %v", err) | ||
} | ||
|
||
return client | ||
}, | ||
} | ||
|
||
func initializeClientPools(opts *benchmarkOptions) func() { | ||
httpClients = &clientPool{ | ||
New: func() *storage.Client { | ||
client, err := initializeHTTPClient(context.Background(), opts.minWriteSize, opts.maxReadSize, opts.useDefaults) | ||
if err != nil { | ||
log.Fatalf("initializeHTTPClient: %v", err) | ||
} | ||
|
||
return client | ||
}, | ||
} | ||
|
||
gRPCClients = &clientPool{ | ||
New: func() *storage.Client { | ||
client, err := initializeGRPCClient(context.Background(), opts.minWriteSize, opts.maxReadSize, opts.connPoolSize, opts.useDefaults) | ||
if err != nil { | ||
log.Fatalf("initializeGRPCClient: %v", err) | ||
} | ||
return client | ||
}, | ||
} | ||
|
||
return func() { | ||
for _, c := range httpClients.clients { | ||
c.Close() | ||
} | ||
for _, c := range gRPCClients.clients { | ||
c.Close() | ||
} | ||
} | ||
} | ||
|
||
// We can't pool storage clients if we need to change parameters at the HTTP or GRPC client level, | ||
// since we can't access those after creation as it is set up now. | ||
// If we are using defaults (ie. not creating an underlying HTTP client ourselves), or if | ||
// we are only interested in one app buffer size at a time, we don't need to change anything on the underlying | ||
// client and can re-use it (and therefore the storage client) for other benchmark runs. | ||
func canUseClientPool(opts *benchmarkOptions) bool { | ||
return opts.useDefaults || (opts.maxReadSize == opts.minReadSize && opts.maxWriteSize == opts.minWriteSize) | ||
} | ||
|
||
func getClient(ctx context.Context, opts *benchmarkOptions, br benchmarkResult) (*storage.Client, func() error, error) { | ||
noOp := func() error { return nil } | ||
grpc := br.params.api == grpcAPI || br.params.api == directPath | ||
if canUseClientPool(opts) { | ||
if grpc { | ||
c := gRPCClients.Get() | ||
return c, func() error { gRPCClients.Put(c); return nil }, nil | ||
} | ||
c := httpClients.Get() | ||
return c, func() error { httpClients.Put(c); return nil }, nil | ||
} | ||
|
||
// if necessary, create a client | ||
if grpc { | ||
c, err := initializeGRPCClient(ctx, br.params.appBufferSize, br.params.appBufferSize, opts.connPoolSize, false) | ||
if err != nil { | ||
return nil, noOp, fmt.Errorf("initializeGRPCClient: %w", err) | ||
} | ||
return c, c.Close, nil | ||
} | ||
c, err := initializeHTTPClient(ctx, br.params.appBufferSize, br.params.appBufferSize, false) | ||
if err != nil { | ||
return nil, noOp, fmt.Errorf("initializeHTTPClient: %w", err) | ||
} | ||
return c, c.Close, nil | ||
} | ||
|
||
// mutex on starting a client so that we can set an env variable for GRPC clients | ||
var clientMu sync.Mutex | ||
|
||
func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize int, useDefaults bool) (*storage.Client, error) { | ||
if useDefaults { | ||
clientMu.Lock() | ||
c, err := storage.NewClient(ctx, option.WithCredentialsFile(credentialsFile)) | ||
clientMu.Unlock() | ||
return c, err | ||
} | ||
|
||
dialer := &net.Dialer{ | ||
Timeout: 30 * time.Second, | ||
KeepAlive: 30 * time.Second, | ||
} | ||
|
||
// These are the default parameters with write and read buffer sizes modified | ||
base := &http.Transport{ | ||
Proxy: http.ProxyFromEnvironment, | ||
DialContext: dialer.DialContext, | ||
ForceAttemptHTTP2: true, | ||
MaxIdleConns: 100, | ||
IdleConnTimeout: 90 * time.Second, | ||
TLSHandshakeTimeout: 10 * time.Second, | ||
ExpectContinueTimeout: 1 * time.Second, | ||
WriteBufferSize: writeBufferSize, | ||
ReadBufferSize: readBufferSize, | ||
} | ||
|
||
http2Trans, err := http2.ConfigureTransports(base) | ||
if err == nil { | ||
http2Trans.ReadIdleTimeout = time.Second * 31 | ||
} | ||
|
||
trans, err := htransport.NewTransport(ctx, base, | ||
option.WithScopes("https://www.googleapis.com/auth/devstorage.full_control"), | ||
option.WithCredentialsFile(credentialsFile)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
clientMu.Lock() | ||
client, err := storage.NewClient(ctx, option.WithHTTPClient(&http.Client{Transport: trans})) | ||
clientMu.Unlock() | ||
|
||
return client, err | ||
} | ||
|
||
func initializeGRPCClient(ctx context.Context, writeBufferSize, readBufferSize int, connPoolSize int, useDefaults bool) (*storage.Client, error) { | ||
if useDefaults { | ||
clientMu.Lock() | ||
os.Setenv("STORAGE_USE_GRPC", "true") | ||
c, err := storage.NewClient(ctx, option.WithCredentialsFile(credentialsFile)) | ||
os.Unsetenv("STORAGE_USE_GRPC") | ||
clientMu.Unlock() | ||
return c, err | ||
} | ||
|
||
clientMu.Lock() | ||
os.Setenv("STORAGE_USE_GRPC", "true") | ||
client, err := storage.NewClient(ctx, option.WithCredentialsFile(credentialsFile), | ||
option.WithGRPCDialOption(grpc.WithReadBufferSize(readBufferSize)), | ||
option.WithGRPCDialOption(grpc.WithWriteBufferSize(writeBufferSize)), | ||
option.WithGRPCConnectionPool(connPoolSize)) | ||
os.Unsetenv("STORAGE_USE_GRPC") | ||
clientMu.Unlock() | ||
|
||
return client, err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
// Copyright 2022 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"os" | ||
"time" | ||
|
||
"cloud.google.com/go/storage" | ||
) | ||
|
||
type downloadOpts struct { | ||
client *storage.Client | ||
objectSize int64 | ||
bucket string | ||
object string | ||
} | ||
|
||
func downloadBenchmark(ctx context.Context, dopts downloadOpts) (elapsedTime time.Duration, rerr error) { | ||
// Set timer | ||
start := time.Now() | ||
// Multiple defer statements execute in LIFO order, so this will be the last | ||
// thing executed. We use named return parameters so that we can set it directly | ||
// and defer the statement so that the time includes typical cleanup steps and | ||
// gets set regardless of errors. | ||
defer func() { elapsedTime = time.Since(start) }() | ||
|
||
// Set additional timeout | ||
ctx, cancel := context.WithTimeout(ctx, time.Minute) | ||
defer cancel() | ||
|
||
// Create file to download to | ||
f, err := os.CreateTemp("", objectPrefix) | ||
if err != nil { | ||
rerr = fmt.Errorf("os.Create: %w", err) | ||
return | ||
} | ||
defer func() { | ||
closeErr := f.Close() | ||
removeErr := os.Remove(f.Name()) | ||
// if we don't have another error to return, return error for closing file | ||
// if that error is also nil, return removeErr | ||
if rerr == nil { | ||
rerr = removeErr | ||
if closeErr != nil { | ||
rerr = closeErr | ||
} | ||
} | ||
}() | ||
|
||
// Get reader from object | ||
o := dopts.client.Bucket(dopts.bucket).Object(dopts.object) | ||
objectReader, err := o.NewReader(ctx) | ||
if err != nil { | ||
rerr = fmt.Errorf("Object(%q).NewReader: %w", o.ObjectName(), err) | ||
return | ||
} | ||
defer func() { | ||
err := objectReader.Close() | ||
if rerr == nil { | ||
rerr = err | ||
} | ||
}() | ||
|
||
// Download | ||
written, err := io.Copy(f, objectReader) | ||
if err != nil { | ||
rerr = fmt.Errorf("io.Copy: %w", err) | ||
return | ||
} | ||
|
||
if written != dopts.objectSize { | ||
rerr = fmt.Errorf("did not read all bytes; read: %d, expected to read: %d", written, dopts.objectSize) | ||
return | ||
} | ||
|
||
return | ||
} |
Oops, something went wrong.