From 90f9ddf8d2ccb4d9638637233345998cb333cc29 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Thu, 6 Jul 2023 12:03:29 +0200 Subject: [PATCH] Add OTel tracing bucket Refactor client.NewBucket Add client.NewInstrumentedBucket Signed-off-by: Kemal Akkoyun --- CHANGELOG.md | 3 +- README.md | 24 ++- client/factory.go | 10 +- client/factory_test.go | 118 +++++++++++ client/testconf/blank-gcs.conf.yml | 1 - client/testconf/fake-gcs.conf.yml | 3 - client/testconf/filesystem.conf.yml | 3 + go.mod | 3 +- go.sum | 11 +- objstore_test.go | 22 -- tracing/opentelemetry/opentelemetry.go | 190 ++++++++++++++++++ .../opentracing/opentracing.go | 63 ++++-- tracing/opentracing/opentracing_test.go | 34 ++++ tracing/tracing.go | 35 +--- 14 files changed, 426 insertions(+), 94 deletions(-) create mode 100644 client/factory_test.go delete mode 100644 client/testconf/blank-gcs.conf.yml delete mode 100644 client/testconf/fake-gcs.conf.yml create mode 100644 client/testconf/filesystem.conf.yml create mode 100644 tracing/opentelemetry/opentelemetry.go rename tracing.go => tracing/opentracing/opentracing.go (55%) create mode 100644 tracing/opentracing/opentracing_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 270f6d4f..1c6c3296 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#43](https://github.com/thanos-io/objstore/pull/43) filesystem: abort filesystem bucket operations if the context has been cancelled - [#44](https://github.com/thanos-io/objstore/pull/44) Add new metric to count total number of fetched bytes from bucket - [#50](https://github.com/thanos-io/objstore/pull/50) Add Huawei Cloud OBS Object Storage Support -- [#59](https://github.com/thanos-io/objstore/pull/59) Adding method `IsCustomerManagedKeyError` on the bucket interface. +- [#59](https://github.com/thanos-io/objstore/pull/59) Adding method `IsCustomerManagedKeyError` on the bucket interface. +- [#61](https://github.com/thanos-io/objstore/pull/61) Add OpenTelemetry TracingBucket. ### Changed - [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`. diff --git a/README.md b/README.md index 5cf090e3..89b31865 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ type Bucket interface { Upload(ctx context.Context, name string, r io.Reader) error // Delete removes the object with the given name. - // If object does not exists in the moment of deletion, Delete should throw error. + // If object does not exist in the moment of deletion, Delete should throw error. Delete(ctx context.Context, name string) error ``` @@ -88,7 +88,7 @@ type BucketReader interface { // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. IsObjNotFoundErr(err error) bool - // Attributes returns information about the specified object. + // IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. ``` Those interfaces represent the object storage operations your code can use from `objstore` clients. @@ -128,7 +128,7 @@ Current object storage client implementations: | [Baidu BOS](#baidu-bos) | Beta | Production Usage | no | @yahaa | | [Local Filesystem](#filesystem) | Stable | Testing and Demo only | yes | @bwplotka | | [Oracle Cloud Infrastructure Object Storage](#oracle-cloud-infrastructure-object-storage) | Beta | Production Usage | yes | @aarontams,@gaurav-05,@ericrrath | -| [HuaweiCloud OBS](#huaweicloud-obs) | Beta | Production Usage | no | @setoru | +| [HuaweiCloud OBS](#huaweicloud-obs) | Beta | Production Usage | no | @setoru | **Missing support to some object storage?** Check out [how to add your client section](#how-to-add-a-new-client-to-thanos) @@ -153,6 +153,7 @@ config: insecure: false signature_version2: false secret_key: "" + session_token: "" put_user_metadata: {} http_config: idle_conn_timeout: 1m30s @@ -643,18 +644,19 @@ You can also include any of the optional configuration just like the example in ##### HuaweiCloud OBS -To use HuaweiCloud OBS as an object store, you should apply for a HuaweiCloud Account to create an object storage bucket at first. -More details: [HuaweiCloud OBS](https://support.huaweicloud.com/obs/index.html) +To use HuaweiCloud OBS as an object store, you should apply for a HuaweiCloud Account to create an object storage bucket at first. More details: [HuaweiCloud OBS](https://support.huaweicloud.com/obs/index.html) To configure HuaweiCloud Account to use OBS as storage store you need to set these parameters in YAML format stored in a file: ```yaml mdox-exec="go run scripts/cfggen/main.go --name=cos.Config" -type: OBS -config: - bucket: "" - endpoint: "" - access_key: "" +type: COS +config: + bucket: "" + region: "" + app_id: "" + endpoint: "" secret_key: "" + secret_id: "" http_config: idle_conn_timeout: 1m30s response_header_timeout: 2m @@ -674,7 +676,7 @@ config: prefix: "" ``` -The `access_key` and `secret_key` field is required. The `http_config` field is optional for optimize HTTP transport settings. +The `access_key` and `secret_key` field is required. The `http_config` field is optional for optimize HTTP transport settings. #### How to add a new client to Thanos? diff --git a/client/factory.go b/client/factory.go index 12d61892..5a7e6fa1 100644 --- a/client/factory.go +++ b/client/factory.go @@ -50,7 +50,7 @@ type BucketConfig struct { // NewBucket initializes and returns new object storage clients. // NOTE: confContentYaml can contain secrets. -func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string) (objstore.InstrumentedBucket, error) { +func NewBucket(logger log.Logger, confContentYaml []byte, component string) (objstore.Bucket, error) { level.Info(logger).Log("msg", "loading bucket configuration") bucketConf := &BucketConfig{} if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil { @@ -91,5 +91,11 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe return nil, errors.Wrap(err, fmt.Sprintf("create %s client", bucketConf.Type)) } - return objstore.NewTracingBucket(objstore.BucketWithMetrics(bucket.Name(), objstore.NewPrefixedBucket(bucket, bucketConf.Prefix), reg)), nil + return objstore.NewPrefixedBucket(bucket, bucketConf.Prefix), nil +} + +// NewInstrumentedBucket initializes and returns new object storage clients. +// NOTE: confContentYaml can contain secrets. +func NewInstrumentedBucket(reg prometheus.Registerer, bkt objstore.Bucket) objstore.InstrumentedBucket { + return objstore.BucketWithMetrics(bkt.Name(), bkt, reg) } diff --git a/client/factory_test.go b/client/factory_test.go new file mode 100644 index 00000000..afe8be50 --- /dev/null +++ b/client/factory_test.go @@ -0,0 +1,118 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package client + +import ( + "context" + "fmt" + "io/ioutil" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/trace" + + "github.com/thanos-io/objstore/tracing/opentelemetry" + "github.com/thanos-io/objstore/tracing/opentracing" +) + +func ExampleBucket() { + // Read the configuration file. + confContentYaml, err := ioutil.ReadFile("testconf/filesystem.conf.yml") + if err != nil { + panic(err) + } + + // Create a new bucket. + bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example") + if err != nil { + panic(err) + } + + // Test it. + exists, err := bucket.Exists(context.Background(), "example") + if err != nil { + panic(err) + } + fmt.Println(exists) + // Output: + // false +} + +func ExampleInstrumentedBucket() { + // Read the configuration file. + confContentYaml, err := ioutil.ReadFile("testconf/filesystem.conf.yml") + if err != nil { + panic(err) + } + + // Create a new bucket. + bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example") + if err != nil { + panic(err) + } + + // Wrap it with instrumentation. + bucket = NewInstrumentedBucket(prometheus.NewRegistry(), bucket) + + // Test it. + exists, err := bucket.Exists(context.Background(), "example") + if err != nil { + panic(err) + } + fmt.Println(exists) + // Output: + // false +} + +func ExampleTracingBucketUsingOpenTracing() { + // Read the configuration file. + confContentYaml, err := ioutil.ReadFile("testconf/filesystem.conf.yml") + if err != nil { + panic(err) + } + + // Create a new bucket. + bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example") + if err != nil { + panic(err) + } + + // Wrap it with tracing. + bucket = opentracing.NewTracingBucket(bucket) + + // Test it. + exists, err := bucket.Exists(context.Background(), "example") + if err != nil { + panic(err) + } + fmt.Println(exists) + // Output: + // false +} + +func ExampleTracingBucketUsingOpenTelemetry() { + // Read the configuration file. + confContentYaml, err := ioutil.ReadFile("testconf/filesystem.conf.yml") + if err != nil { + panic(err) + } + + // Create a new bucket. + bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example") + if err != nil { + panic(err) + } + + // Wrap it with tracing. + bucket = opentelemetry.NewTracingBucket(trace.NewNoopTracerProvider().Tracer("bucket"), bucket) + + // Test it. + exists, err := bucket.Exists(context.Background(), "example") + if err != nil { + panic(err) + } + fmt.Println(exists) + // Output: + // false +} diff --git a/client/testconf/blank-gcs.conf.yml b/client/testconf/blank-gcs.conf.yml deleted file mode 100644 index cb5ef588..00000000 --- a/client/testconf/blank-gcs.conf.yml +++ /dev/null @@ -1 +0,0 @@ -type: GCS \ No newline at end of file diff --git a/client/testconf/fake-gcs.conf.yml b/client/testconf/fake-gcs.conf.yml deleted file mode 100644 index 538c8327..00000000 --- a/client/testconf/fake-gcs.conf.yml +++ /dev/null @@ -1,3 +0,0 @@ -type: FAKE-GCS -config: - bucket: test-bucket \ No newline at end of file diff --git a/client/testconf/filesystem.conf.yml b/client/testconf/filesystem.conf.yml new file mode 100644 index 00000000..612644f6 --- /dev/null +++ b/client/testconf/filesystem.conf.yml @@ -0,0 +1,3 @@ +type: "FILESYSTEM" +config: + directory: "./data" diff --git a/go.mod b/go.mod index 8a04716c..7ae1f6d7 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,8 @@ require ( github.com/prometheus/client_golang v1.12.2 github.com/prometheus/common v0.36.0 github.com/tencentyun/cos-go-sdk-v5 v0.7.40 + go.opentelemetry.io/otel v1.16.0 + go.opentelemetry.io/otel/trace v1.16.0 go.uber.org/atomic v1.9.0 golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 @@ -92,7 +94,6 @@ require ( google.golang.org/protobuf v1.28.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/ini.v1 v1.66.6 // indirect - gopkg.in/yaml.v3 v3.0.0 // indirect ) require ( diff --git a/go.sum b/go.sum index d6b3a275..5b85eb43 100644 --- a/go.sum +++ b/go.sum @@ -223,8 +223,8 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= -github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= @@ -385,7 +385,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.194/go.mod h1:7sCQWVkxcsR38nffDW057DRGk8mUjK1Ing/EFOK8s8Y= github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/kms v1.0.194/go.mod h1:yrBKWhChnDqNz1xuXdSbWXG56XawEq0G5j1lg4VwBD4= github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= @@ -403,6 +403,10 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -858,8 +862,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= -gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/objstore_test.go b/objstore_test.go index 0cbd0032..2a316896 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -72,27 +72,6 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload) } -func TestTracingReader(t *testing.T) { - r := bytes.NewReader([]byte("hello world")) - tr := newTracingReadCloser(NopCloserWithSize(r), nil) - - size, err := TryToGetSize(tr) - - testutil.Ok(t, err) - testutil.Equals(t, int64(11), size) - - smallBuf := make([]byte, 4) - n, err := io.ReadFull(tr, smallBuf) - testutil.Ok(t, err) - testutil.Equals(t, 4, n) - - // Verify that size is still the same, after reading 4 bytes. - size, err = TryToGetSize(tr) - - testutil.Ok(t, err) - testutil.Equals(t, int64(11), size) -} - func TestDownloadUploadDirConcurrency(t *testing.T) { r := prometheus.NewRegistry() m := BucketWithMetrics("", NewInMemBucket(), r) @@ -165,7 +144,6 @@ func TestTimingTracingReader(t *testing.T) { tr = newTimingReadCloser(tr, "", m.opsDuration, m.opsFailures, func(err error) bool { return false }, m.opsFetchedBytes) - tr = newTracingReadCloser(tr, nil) size, err := TryToGetSize(tr) diff --git a/tracing/opentelemetry/opentelemetry.go b/tracing/opentelemetry/opentelemetry.go new file mode 100644 index 00000000..669bd8f8 --- /dev/null +++ b/tracing/opentelemetry/opentelemetry.go @@ -0,0 +1,190 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package opentelemetry + +import ( + "context" + "io" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/thanos-io/objstore" +) + +// TracingBucket is a wrapper around objstore.Bucket that adds tracing to all operations using OpenTelemetry. +type TracingBucket struct { + tracer trace.Tracer + bkt objstore.Bucket +} + +func NewTracingBucket(tracer trace.Tracer, bkt objstore.Bucket) objstore.InstrumentedBucket { + return TracingBucket{tracer: tracer, bkt: bkt} +} + +func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) (err error) { + ctx, span := t.tracer.Start(ctx, "bucket_iter") + defer span.End() + span.SetAttributes(attribute.String("dir", dir)) + + defer func() { + if err != nil { + span.RecordError(err) + } + }() + return t.bkt.Iter(ctx, dir, f, options...) +} + +func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + ctx, span := t.tracer.Start(ctx, "bucket_get") + defer span.End() + span.SetAttributes(attribute.String("name", name)) + + r, err := t.bkt.Get(ctx, name) + if err != nil { + span.RecordError(err) + return nil, err + } + + return newTracingReadCloser(r, span), nil +} + +func (t TracingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + ctx, span := t.tracer.Start(ctx, "bucket_getrange") + defer span.End() + span.SetAttributes(attribute.String("name", name), attribute.Int64("offset", off), attribute.Int64("length", length)) + + r, err := t.bkt.GetRange(ctx, name, off, length) + if err != nil { + span.RecordError(err) + return nil, err + } + + return newTracingReadCloser(r, span), nil +} + +func (t TracingBucket) Exists(ctx context.Context, name string) (_ bool, err error) { + ctx, span := t.tracer.Start(ctx, "bucket_exists") + defer span.End() + span.SetAttributes(attribute.String("name", name)) + + defer func() { + if err != nil { + span.RecordError(err) + } + }() + return t.bkt.Exists(ctx, name) +} + +func (t TracingBucket) Attributes(ctx context.Context, name string) (_ objstore.ObjectAttributes, err error) { + ctx, span := t.tracer.Start(ctx, "bucket_attributes") + defer span.End() + span.SetAttributes(attribute.String("name", name)) + + defer func() { + if err != nil { + span.RecordError(err) + } + }() + return t.bkt.Attributes(ctx, name) +} + +func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { + ctx, span := t.tracer.Start(ctx, "bucket_upload") + defer span.End() + span.SetAttributes(attribute.String("name", name)) + + defer func() { + if err != nil { + span.RecordError(err) + } + }() + return t.bkt.Upload(ctx, name, r) +} + +func (t TracingBucket) Delete(ctx context.Context, name string) (err error) { + ctx, span := t.tracer.Start(ctx, "bucket_delete") + defer span.End() + span.SetAttributes(attribute.String("name", name)) + + defer func() { + if err != nil { + span.RecordError(err) + } + }() + return t.bkt.Delete(ctx, name) +} + +func (t TracingBucket) Name() string { + return "tracing: " + t.bkt.Name() +} + +func (t TracingBucket) Close() error { + return t.bkt.Close() +} + +func (t TracingBucket) IsObjNotFoundErr(err error) bool { + return t.bkt.IsObjNotFoundErr(err) +} + +func (t TracingBucket) IsCustomerManagedKeyError(err error) bool { + return t.bkt.IsCustomerManagedKeyError(err) +} + +func (t TracingBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket { + if ib, ok := t.bkt.(objstore.InstrumentedBucket); ok { + return TracingBucket{tracer: t.tracer, bkt: ib.WithExpectedErrs(expectedFunc)} + } + return t +} + +func (t TracingBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return t.WithExpectedErrs(expectedFunc) +} + +type tracingReadCloser struct { + r io.ReadCloser + s trace.Span + + objSize int64 + objSizeErr error + + read int +} + +func newTracingReadCloser(r io.ReadCloser, span trace.Span) io.ReadCloser { + // Since TryToGetSize can only reliably return size before doing any read calls, + // we call during "construction" and remember the results. + objSize, objSizeErr := objstore.TryToGetSize(r) + + return &tracingReadCloser{r: r, s: span, objSize: objSize, objSizeErr: objSizeErr} +} + +func (t *tracingReadCloser) ObjectSize() (int64, error) { + return t.objSize, t.objSizeErr +} + +func (t *tracingReadCloser) Read(p []byte) (int, error) { + n, err := t.r.Read(p) + if n > 0 { + t.read += n + } + if err != nil && err != io.EOF && t.s != nil { + t.s.RecordError(err) + } + return n, err +} + +func (t *tracingReadCloser) Close() error { + err := t.r.Close() + if t.s != nil { + t.s.SetAttributes(attribute.Int64("read", int64(t.read))) + if err != nil { + t.s.SetAttributes(attribute.String("close_err", err.Error())) + } + t.s.End() + t.s = nil + } + return err +} diff --git a/tracing.go b/tracing/opentracing/opentracing.go similarity index 55% rename from tracing.go rename to tracing/opentracing/opentracing.go index 9f09df66..4f396630 100644 --- a/tracing.go +++ b/tracing/opentracing/opentracing.go @@ -1,7 +1,7 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. -package objstore +package opentracing import ( "context" @@ -9,20 +9,21 @@ import ( "github.com/opentracing/opentracing-go" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/tracing" ) // TracingBucket includes bucket operations in the traces. type TracingBucket struct { - bkt Bucket + bkt objstore.Bucket } -func NewTracingBucket(bkt Bucket) InstrumentedBucket { +func NewTracingBucket(bkt objstore.Bucket) objstore.InstrumentedBucket { return TracingBucket{bkt: bkt} } -func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) (err error) { - tracing.DoWithSpan(ctx, "bucket_iter", func(spanCtx context.Context, span opentracing.Span) { +func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) (err error) { + doWithSpan(ctx, "bucket_iter", func(spanCtx context.Context, span opentracing.Span) { span.LogKV("dir", dir) err = t.bkt.Iter(spanCtx, dir, f, options...) }) @@ -30,7 +31,7 @@ func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) erro } func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - span, spanCtx := tracing.StartSpan(ctx, "bucket_get") + span, spanCtx := startSpan(ctx, "bucket_get") span.LogKV("name", name) r, err := t.bkt.Get(spanCtx, name) @@ -44,7 +45,7 @@ func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, err } func (t TracingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - span, spanCtx := tracing.StartSpan(ctx, "bucket_getrange") + span, spanCtx := startSpan(ctx, "bucket_getrange") span.LogKV("name", name, "offset", off, "length", length) r, err := t.bkt.GetRange(spanCtx, name, off, length) @@ -58,15 +59,15 @@ func (t TracingBucket) GetRange(ctx context.Context, name string, off, length in } func (t TracingBucket) Exists(ctx context.Context, name string) (exists bool, err error) { - tracing.DoWithSpan(ctx, "bucket_exists", func(spanCtx context.Context, span opentracing.Span) { + doWithSpan(ctx, "bucket_exists", func(spanCtx context.Context, span opentracing.Span) { span.LogKV("name", name) exists, err = t.bkt.Exists(spanCtx, name) }) return } -func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs ObjectAttributes, err error) { - tracing.DoWithSpan(ctx, "bucket_attributes", func(spanCtx context.Context, span opentracing.Span) { +func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs objstore.ObjectAttributes, err error) { + doWithSpan(ctx, "bucket_attributes", func(spanCtx context.Context, span opentracing.Span) { span.LogKV("name", name) attrs, err = t.bkt.Attributes(spanCtx, name) }) @@ -74,7 +75,7 @@ func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs Objec } func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { - tracing.DoWithSpan(ctx, "bucket_upload", func(spanCtx context.Context, span opentracing.Span) { + doWithSpan(ctx, "bucket_upload", func(spanCtx context.Context, span opentracing.Span) { span.LogKV("name", name) err = t.bkt.Upload(spanCtx, name, r) }) @@ -82,7 +83,7 @@ func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (er } func (t TracingBucket) Delete(ctx context.Context, name string) (err error) { - tracing.DoWithSpan(ctx, "bucket_delete", func(spanCtx context.Context, span opentracing.Span) { + doWithSpan(ctx, "bucket_delete", func(spanCtx context.Context, span opentracing.Span) { span.LogKV("name", name) err = t.bkt.Delete(spanCtx, name) }) @@ -105,14 +106,14 @@ func (t TracingBucket) IsCustomerManagedKeyError(err error) bool { return t.bkt.IsCustomerManagedKeyError(err) } -func (t TracingBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket { - if ib, ok := t.bkt.(InstrumentedBucket); ok { +func (t TracingBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket { + if ib, ok := t.bkt.(objstore.InstrumentedBucket); ok { return TracingBucket{bkt: ib.WithExpectedErrs(expectedFunc)} } return t } -func (t TracingBucket) ReaderWithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) BucketReader { +func (t TracingBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader { return t.WithExpectedErrs(expectedFunc) } @@ -129,7 +130,7 @@ type tracingReadCloser struct { func newTracingReadCloser(r io.ReadCloser, span opentracing.Span) io.ReadCloser { // Since TryToGetSize can only reliably return size before doing any read calls, // we call during "construction" and remember the results. - objSize, objSizeErr := TryToGetSize(r) + objSize, objSizeErr := objstore.TryToGetSize(r) return &tracingReadCloser{r: r, s: span, objSize: objSize, objSizeErr: objSizeErr} } @@ -161,3 +162,33 @@ func (t *tracingReadCloser) Close() error { } return err } + +// Aliases to avoid spreading opentracing package to Thanos code. +type Tag = opentracing.Tag +type Tags = opentracing.Tags +type Span = opentracing.Span + +// startSpan starts and returns span with `operationName` and hooking as child to a span found within given context if any. +// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer without notification. +func startSpan(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (Span, context.Context) { + tracer := tracing.TracerFromContext(ctx) + if tracer == nil { + // No tracing found, return noop span. + return opentracing.NoopTracer{}.StartSpan(operationName), ctx + } + + var span Span + if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { + opts = append(opts, opentracing.ChildOf(parentSpan.Context())) + } + span = tracer.StartSpan(operationName, opts...) + return span, opentracing.ContextWithSpan(ctx, span) +} + +// doWithSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. +// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. +func doWithSpan(ctx context.Context, operationName string, doFn func(context.Context, Span), opts ...opentracing.StartSpanOption) { + span, newCtx := startSpan(ctx, operationName, opts...) + defer span.Finish() + doFn(newCtx, span) +} diff --git a/tracing/opentracing/opentracing_test.go b/tracing/opentracing/opentracing_test.go new file mode 100644 index 00000000..0884bb40 --- /dev/null +++ b/tracing/opentracing/opentracing_test.go @@ -0,0 +1,34 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package opentracing + +import ( + "bytes" + "io" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/thanos-io/objstore" +) + +func TestTracingReader(t *testing.T) { + r := bytes.NewReader([]byte("hello world")) + tr := newTracingReadCloser(objstore.NopCloserWithSize(r), nil) + + size, err := objstore.TryToGetSize(tr) + + testutil.Ok(t, err) + testutil.Equals(t, int64(11), size) + + smallBuf := make([]byte, 4) + n, err := io.ReadFull(tr, smallBuf) + testutil.Ok(t, err) + testutil.Equals(t, 4, n) + + // Verify that size is still the same, after reading 4 bytes. + size, err = objstore.TryToGetSize(tr) + + testutil.Ok(t, err) + testutil.Equals(t, int64(11), size) +} diff --git a/tracing/tracing.go b/tracing/tracing.go index 9beb2bd8..bc3edade 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -14,12 +14,6 @@ const ( ForceTracingBaggageKey = "X-Thanos-Force-Tracing" ) -// Aliases to avoid spreading opentracing package to Thanos code. - -type Tag = opentracing.Tag -type Tags = opentracing.Tags -type Span = opentracing.Span - type contextKey struct{} var tracerKey = contextKey{} @@ -34,36 +28,11 @@ func ContextWithTracer(ctx context.Context, tracer opentracing.Tracer) context.C return context.WithValue(ctx, tracerKey, tracer) } -// tracerFromContext extracts opentracing.Tracer from the given context. -func tracerFromContext(ctx context.Context) opentracing.Tracer { +// TracerFromContext extracts opentracing.Tracer from the given context. +func TracerFromContext(ctx context.Context) opentracing.Tracer { val := ctx.Value(tracerKey) if sp, ok := val.(opentracing.Tracer); ok { return sp } return nil } - -// StartSpan starts and returns span with `operationName` and hooking as child to a span found within given context if any. -// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer without notification. -func StartSpan(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (Span, context.Context) { - tracer := tracerFromContext(ctx) - if tracer == nil { - // No tracing found, return noop span. - return opentracing.NoopTracer{}.StartSpan(operationName), ctx - } - - var span Span - if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { - opts = append(opts, opentracing.ChildOf(parentSpan.Context())) - } - span = tracer.StartSpan(operationName, opts...) - return span, opentracing.ContextWithSpan(ctx, span) -} - -// DoWithSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. -// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. -func DoWithSpan(ctx context.Context, operationName string, doFn func(context.Context, Span), opts ...opentracing.StartSpanOption) { - span, newCtx := StartSpan(ctx, operationName, opts...) - defer span.Finish() - doFn(newCtx, span) -}