From 528d9c81c9a8de123afc2ade9d3cf1f8a32ccc6f Mon Sep 17 00:00:00 2001 From: jmacd Date: Wed, 17 Jul 2019 13:05:54 -0700 Subject: [PATCH 1/6] New metric types --- api/metric/additive.go | 25 +++++++++++++++++++++++++ api/metric/api.go | 20 ++++++++++++++++++-- api/metric/cumulative.go | 25 +++++++++++++++++++++++++ api/metric/measure.go | 25 +++++++++++++++++++++++++ api/metric/noop_meter.go | 24 ++++++++++++++++++++++++ 5 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 api/metric/additive.go create mode 100644 api/metric/cumulative.go create mode 100644 api/metric/measure.go diff --git a/api/metric/additive.go b/api/metric/additive.go new file mode 100644 index 00000000000..8a560cc5537 --- /dev/null +++ b/api/metric/additive.go @@ -0,0 +1,25 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +type Float64AdditiveHandle struct { + Handle +} + +func NewFloat64Additive(name string, mos ...Option) *Float64AdditiveHandle { + g := &Float64AdditiveHandle{} + registerMetric(name, Additive, mos, &g.Handle) + return g +} diff --git a/api/metric/api.go b/api/metric/api.go index c96a9d0e972..6fa44488e28 100644 --- a/api/metric/api.go +++ b/api/metric/api.go @@ -27,18 +27,34 @@ type MetricType int const ( Invalid MetricType = iota Gauge // Supports Set() - Cumulative // Supports Inc() + Cumulative // Supports Inc(): only positive values + Additive // Supports Add(): positive or negative + Measure // Supports Record() ) type Meter interface { - // TODO more Metric types GetFloat64Gauge(ctx context.Context, gauge *Float64GaugeHandle, labels ...core.KeyValue) Float64Gauge + GetFloat64Cumulative(ctx context.Context, gauge *Float64CumulativeHandle, labels ...core.KeyValue) Float64Cumulative + GetFloat64Additive(ctx context.Context, gauge *Float64AdditiveHandle, labels ...core.KeyValue) Float64Additive + GetFloat64Measure(ctx context.Context, gauge *Float64MeasureHandle, labels ...core.KeyValue) Float64Measure } type Float64Gauge interface { Set(ctx context.Context, value float64, labels ...core.KeyValue) } +type Float64Cumulative interface { + Inc(ctx context.Context, value float64, labels ...core.KeyValue) +} + +type Float64Additive interface { + Add(ctx context.Context, value float64, labels ...core.KeyValue) +} + +type Float64Measure interface { + Record(ctx context.Context, value float64, labels ...core.KeyValue) +} + type Handle struct { Variable registry.Variable diff --git a/api/metric/cumulative.go b/api/metric/cumulative.go new file mode 100644 index 00000000000..139ab8821d8 --- /dev/null +++ b/api/metric/cumulative.go @@ -0,0 +1,25 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +type Float64CumulativeHandle struct { + Handle +} + +func NewFloat64Cumulative(name string, mos ...Option) *Float64CumulativeHandle { + g := &Float64CumulativeHandle{} + registerMetric(name, Cumulative, mos, &g.Handle) + return g +} diff --git a/api/metric/measure.go b/api/metric/measure.go new file mode 100644 index 00000000000..b247207656b --- /dev/null +++ b/api/metric/measure.go @@ -0,0 +1,25 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +type Float64MeasureHandle struct { + Handle +} + +func NewFloat64Measure(name string, mos ...Option) *Float64MeasureHandle { + g := &Float64MeasureHandle{} + registerMetric(name, Measure, mos, &g.Handle) + return g +} diff --git a/api/metric/noop_meter.go b/api/metric/noop_meter.go index b5480bcb392..b27c1850ab4 100644 --- a/api/metric/noop_meter.go +++ b/api/metric/noop_meter.go @@ -13,10 +13,34 @@ type noopMetric struct{} var _ Meter = noopMeter{} var _ Float64Gauge = noopMetric{} +var _ Float64Cumulative = noopMetric{} +var _ Float64Additive = noopMetric{} +var _ Float64Measure = noopMetric{} func (noopMeter) GetFloat64Gauge(ctx context.Context, gauge *Float64GaugeHandle, labels ...core.KeyValue) Float64Gauge { return noopMetric{} } +func (noopMeter) GetFloat64Cumulative(ctx context.Context, gauge *Float64CumulativeHandle, labels ...core.KeyValue) Float64Cumulative { + return noopMetric{} +} + +func (noopMeter) GetFloat64Additive(ctx context.Context, gauge *Float64AdditiveHandle, labels ...core.KeyValue) Float64Additive { + return noopMetric{} +} + +func (noopMeter) GetFloat64Measure(ctx context.Context, gauge *Float64MeasureHandle, labels ...core.KeyValue) Float64Measure { + return noopMetric{} +} + func (noopMetric) Set(ctx context.Context, value float64, labels ...core.KeyValue) { } + +func (noopMetric) Inc(ctx context.Context, value float64, labels ...core.KeyValue) { +} + +func (noopMetric) Add(ctx context.Context, value float64, labels ...core.KeyValue) { +} + +func (noopMetric) Record(ctx context.Context, value float64, labels ...core.KeyValue) { +} From d17e9aefc3f4ce5b1d20202869d76fa38c43e5c4 Mon Sep 17 00:00:00 2001 From: jmacd Date: Wed, 17 Jul 2019 17:03:40 -0700 Subject: [PATCH 2/6] Implement the new metric types in experimental/streaming --- api/metric/aggregation/descriptor.go | 76 +++++++++++ api/metric/api.go | 30 +++-- api/stats/stats.go | 118 ------------------ api/trace/current.go | 6 + example/basic/main.go | 10 +- .../exporter/observer/eventtype_string.go | 31 ++--- .../streaming/exporter/observer/observer.go | 28 +++-- .../exporter/reader/format/format.go | 65 +++++----- .../streaming/exporter/reader/reader.go | 108 ++++++++-------- .../streaming/exporter/spandata/spandata.go | 4 +- experimental/streaming/sdk/metric.go | 101 +++++++++++++++ experimental/streaming/sdk/package.go | 17 ++- experimental/streaming/sdk/span.go | 4 +- experimental/streaming/sdk/trace.go | 45 +++---- 14 files changed, 363 insertions(+), 280 deletions(-) create mode 100644 api/metric/aggregation/descriptor.go delete mode 100644 api/stats/stats.go create mode 100644 experimental/streaming/sdk/metric.go diff --git a/api/metric/aggregation/descriptor.go b/api/metric/aggregation/descriptor.go new file mode 100644 index 00000000000..5cb88931f32 --- /dev/null +++ b/api/metric/aggregation/descriptor.go @@ -0,0 +1,76 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregation + +import "go.opentelemetry.io/api/core" + +type Operator int + +const ( + None Operator = iota + SUM + COUNT + MIN + MAX + LAST_VALUE + DISTRIBUTION +) + +type Descriptor struct { + Operator Operator + Keys []core.Key +} + +func Sum(keys ...core.Key) Descriptor { + return Descriptor{ + Operator: SUM, + Keys: keys, + } +} + +func Count(keys ...core.Key) Descriptor { + return Descriptor{ + Operator: COUNT, + Keys: keys, + } +} + +func Min(keys ...core.Key) Descriptor { + return Descriptor{ + Operator: MIN, + Keys: keys, + } +} + +func Max(keys ...core.Key) Descriptor { + return Descriptor{ + Operator: MAX, + Keys: keys, + } +} + +func LastValue(keys ...core.Key) Descriptor { + return Descriptor{ + Operator: LAST_VALUE, + Keys: keys, + } +} + +func Distribution(keys ...core.Key) Descriptor { + return Descriptor{ + Operator: SUM, + Keys: keys, + } +} diff --git a/api/metric/api.go b/api/metric/api.go index 6fa44488e28..99e7230df55 100644 --- a/api/metric/api.go +++ b/api/metric/api.go @@ -18,6 +18,7 @@ import ( "context" "go.opentelemetry.io/api/core" + "go.opentelemetry.io/api/metric/aggregation" "go.opentelemetry.io/api/registry" "go.opentelemetry.io/api/unit" ) @@ -33,10 +34,10 @@ const ( ) type Meter interface { - GetFloat64Gauge(ctx context.Context, gauge *Float64GaugeHandle, labels ...core.KeyValue) Float64Gauge - GetFloat64Cumulative(ctx context.Context, gauge *Float64CumulativeHandle, labels ...core.KeyValue) Float64Cumulative - GetFloat64Additive(ctx context.Context, gauge *Float64AdditiveHandle, labels ...core.KeyValue) Float64Additive - GetFloat64Measure(ctx context.Context, gauge *Float64MeasureHandle, labels ...core.KeyValue) Float64Measure + GetFloat64Gauge(context.Context, *Float64GaugeHandle, ...core.KeyValue) Float64Gauge + GetFloat64Cumulative(context.Context, *Float64CumulativeHandle, ...core.KeyValue) Float64Cumulative + GetFloat64Additive(context.Context, *Float64AdditiveHandle, ...core.KeyValue) Float64Additive + GetFloat64Measure(context.Context, *Float64MeasureHandle, ...core.KeyValue) Float64Measure } type Float64Gauge interface { @@ -58,8 +59,9 @@ type Float64Measure interface { type Handle struct { Variable registry.Variable - Type MetricType - Keys []core.Key + Type MetricType + Keys []core.Key + Aggregations []aggregation.Descriptor } type Option func(*Handle, *[]registry.Option) @@ -78,10 +80,18 @@ func WithUnit(unit unit.Unit) Option { } } -// WithKeys applies the provided dimension keys. +// WithKeys applies recommended dimension keys. Multiple `WithKeys` +// options accumulate. func WithKeys(keys ...core.Key) Option { return func(m *Handle, _ *[]registry.Option) { - m.Keys = keys + m.Keys = append(m.Keys, keys...) + } +} + +// WithAggregation applies user-recommended aggregations to this metric. +func WithAggregations(aggrs ...aggregation.Descriptor) Option { + return func(m *Handle, _ *[]registry.Option) { + m.Aggregations = append(m.Aggregations, aggrs...) } } @@ -95,3 +105,7 @@ func (mtype MetricType) String() string { return "unknown" } } + +func (h Handle) Defined() bool { + return h.Variable.Defined() +} diff --git a/api/stats/stats.go b/api/stats/stats.go deleted file mode 100644 index ac295ac871e..00000000000 --- a/api/stats/stats.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package stats - -import ( - "context" - "sync/atomic" - - "go.opentelemetry.io/api/core" - "go.opentelemetry.io/api/registry" -) - -type MeasureHandle struct { - Variable registry.Variable -} - -type Measure interface { - V() registry.Variable - M(value float64) Measurement -} - -type Measurement struct { - Measure Measure - Value float64 -} - -type Recorder interface { - // TODO: Note as in rfc 0001, allow raw Measures to have pre-defined labels: - GetMeasure(ctx context.Context, measure *MeasureHandle, labels ...core.KeyValue) Measure - - Record(ctx context.Context, m ...Measurement) - RecordSingle(ctx context.Context, m Measurement) -} - -type noopRecorder struct{} -type noopMeasure struct{} - -var global atomic.Value - -// GlobalRecorder return meter registered with global registry. -// If no meter is registered then an instance of noop Recorder is returned. -func GlobalRecorder() Recorder { - if t := global.Load(); t != nil { - return t.(Recorder) - } - return noopRecorder{} -} - -// SetGlobalRecorder sets provided meter as a global meter. -func SetGlobalRecorder(t Recorder) { - global.Store(t) -} - -func Record(ctx context.Context, m ...Measurement) { - GlobalRecorder().Record(ctx, m...) -} - -func RecordSingle(ctx context.Context, m Measurement) { - GlobalRecorder().RecordSingle(ctx, m) -} - -type AnyStatistic struct{} - -func (AnyStatistic) String() string { - return "AnyStatistic" -} - -var ( - WithDescription = registry.WithDescription - WithUnit = registry.WithUnit -) - -func NewMeasure(name string, opts ...registry.Option) *MeasureHandle { - return &MeasureHandle{ - Variable: registry.Register(name, AnyStatistic{}, opts...), - } -} - -func (m *MeasureHandle) M(value float64) Measurement { - return Measurement{ - Measure: m, - Value: value, - } -} - -func (m *MeasureHandle) V() registry.Variable { - return m.Variable -} - -func (noopRecorder) Record(ctx context.Context, m ...Measurement) { -} - -func (noopRecorder) RecordSingle(ctx context.Context, m Measurement) { -} - -func (noopRecorder) GetMeasure(ctx context.Context, handle *MeasureHandle, labels ...core.KeyValue) Measure { - return noopMeasure{} -} - -func (noopMeasure) M(float64) Measurement { - return Measurement{} -} - -func (noopMeasure) V() registry.Variable { - return registry.Variable{} -} diff --git a/api/trace/current.go b/api/trace/current.go index 5eca012c779..b96ecf0ebd5 100644 --- a/api/trace/current.go +++ b/api/trace/current.go @@ -25,10 +25,16 @@ var ( ) func SetCurrentSpan(ctx context.Context, span Span) context.Context { + if ctx == nil { + return ctx + } return context.WithValue(ctx, currentSpanKey, span) } func CurrentSpan(ctx context.Context) Span { + if ctx == nil { + return noopSpan{} + } if span, has := ctx.Value(currentSpanKey).(Span); has { return span } diff --git a/example/basic/main.go b/example/basic/main.go index 9a3421b40bf..bd6b1ffec76 100644 --- a/example/basic/main.go +++ b/example/basic/main.go @@ -20,9 +20,11 @@ import ( "go.opentelemetry.io/api/key" "go.opentelemetry.io/api/metric" "go.opentelemetry.io/api/registry" - "go.opentelemetry.io/api/stats" "go.opentelemetry.io/api/tag" "go.opentelemetry.io/api/trace" + + _ "go.opentelemetry.io/experimental/streaming/exporter/spanlog/install" + _ "go.opentelemetry.io/experimental/streaming/sdk" ) var ( @@ -44,7 +46,7 @@ var ( metric.WithDescription("A gauge set to 1.0"), ) - measureTwo = stats.NewMeasure("ex.com/two") + measureTwo = metric.NewFloat64Measure("ex.com/two") ) func main() { @@ -61,6 +63,8 @@ func main() { lemonsKey.Int(10), ) + measure := meter.GetFloat64Measure(ctx, measureTwo, lemonsKey.Int(10)) + err := tracer.WithSpan(ctx, "operation", func(ctx context.Context) error { trace.CurrentSpan(ctx).Event(ctx, "Nice operation!", key.New("bogons").Int(100)) @@ -77,7 +81,7 @@ func main() { trace.CurrentSpan(ctx).Event(ctx, "Sub span event") - stats.Record(ctx, measureTwo.M(1.3)) + measure.Record(ctx, 1.3) return nil }, diff --git a/experimental/streaming/exporter/observer/eventtype_string.go b/experimental/streaming/exporter/observer/eventtype_string.go index 2cb7fa713ad..d27a3d45e28 100644 --- a/experimental/streaming/exporter/observer/eventtype_string.go +++ b/experimental/streaming/exporter/observer/eventtype_string.go @@ -1,17 +1,3 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - // Code generated by "stringer -type=EventType"; DO NOT EDIT. package observer @@ -26,17 +12,18 @@ func _() { _ = x[START_SPAN-1] _ = x[FINISH_SPAN-2] _ = x[ADD_EVENT-3] - _ = x[ADD_EVENTF-4] - _ = x[NEW_SCOPE-5] - _ = x[NEW_MEASURE-6] - _ = x[NEW_METRIC-7] - _ = x[MODIFY_ATTR-8] - _ = x[RECORD_STATS-9] + _ = x[NEW_SCOPE-4] + _ = x[MODIFY_ATTR-5] + _ = x[SET_STATUS-6] + _ = x[GAUGE_SET-7] + _ = x[CUMULATIVE_INC-8] + _ = x[ADDITIVE_ADD-9] + _ = x[MEASURE_RECORD-10] } -const _EventType_name = "INVALIDSTART_SPANFINISH_SPANLOG_EVENTLOGF_EVENTNEW_SCOPENEW_MEASURENEW_METRICMODIFY_ATTRRECORD_STATS" +const _EventType_name = "INVALIDSTART_SPANFINISH_SPANADD_EVENTNEW_SCOPEMODIFY_ATTRSET_STATUSGAUGE_SETCUMULATIVE_INCADDITIVE_ADDMEASURE_RECORD" -var _EventType_index = [...]uint8{0, 7, 17, 28, 37, 47, 56, 67, 77, 88, 100} +var _EventType_index = [...]uint8{0, 7, 17, 28, 37, 46, 57, 67, 76, 90, 102, 116} func (i EventType) String() string { if i < 0 || i >= EventType(len(_EventType_index)-1) { diff --git a/experimental/streaming/exporter/observer/observer.go b/experimental/streaming/exporter/observer/observer.go index 26248656e93..69a390cd7ff 100644 --- a/experimental/streaming/exporter/observer/observer.go +++ b/experimental/streaming/exporter/observer/observer.go @@ -23,7 +23,7 @@ import ( "google.golang.org/grpc/codes" "go.opentelemetry.io/api/core" - "go.opentelemetry.io/api/stats" + "go.opentelemetry.io/api/metric" "go.opentelemetry.io/api/tag" ) @@ -36,6 +36,12 @@ type ScopeID struct { core.SpanContext } +type Measurement struct { + Metric metric.Handle + Value float64 + Scope ScopeID +} + // TODO: this Event is confusing with event.Event. type Event struct { // Automatic fields @@ -56,11 +62,9 @@ type Event struct { Status codes.Code // SET_STATUS // Values - String string // START_SPAN, EVENT, ... - Float64 float64 - Parent ScopeID // START_SPAN - Stats []stats.Measurement - Stat stats.Measurement + String string // START_SPAN, EVENT, ... + Parent ScopeID // START_SPAN + Measurement Measurement } type Observer interface { @@ -76,13 +80,14 @@ const ( START_SPAN FINISH_SPAN ADD_EVENT - ADD_EVENTF NEW_SCOPE - NEW_MEASURE - NEW_METRIC MODIFY_ATTR - RECORD_STATS SET_STATUS + + GAUGE_SET + CUMULATIVE_INC + ADDITIVE_ADD + MEASURE_RECORD ) var ( @@ -151,6 +156,9 @@ func Foreach(f func(Observer)) { } func NewScope(parent ScopeID, attributes ...core.KeyValue) ScopeID { + if len(attributes) == 0 { + return parent + } eventID := Record(Event{ Type: NEW_SCOPE, Scope: parent, diff --git a/experimental/streaming/exporter/reader/format/format.go b/experimental/streaming/exporter/reader/format/format.go index a8534d84c91..7671e4636c0 100644 --- a/experimental/streaming/exporter/reader/format/format.go +++ b/experimental/streaming/exporter/reader/format/format.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/api/core" "go.opentelemetry.io/api/key" + "go.opentelemetry.io/experimental/streaming/exporter/observer" "go.opentelemetry.io/experimental/streaming/exporter/reader" // TODO this should not be an SDK dependency; move conventional tags into the API. @@ -37,7 +38,10 @@ func AppendEvent(buf *strings.Builder, data reader.Event) { if skipIf && data.Attributes.HasValue(kv.Key) { return true } - buf.WriteString(" " + kv.Key.Variable.Name + "=" + kv.Value.Emit()) + buf.WriteString(" ") + buf.WriteString(kv.Key.Variable.Name) + buf.WriteString("=") + buf.WriteString(kv.Value.Emit()) return true } } @@ -46,7 +50,7 @@ func AppendEvent(buf *strings.Builder, data reader.Event) { buf.WriteString(" ") switch data.Type { - case reader.START_SPAN: + case observer.START_SPAN: buf.WriteString("start ") buf.WriteString(data.Name) @@ -63,7 +67,7 @@ func AppendEvent(buf *strings.Builder, data reader.Event) { buf.WriteString(" >") } - case reader.FINISH_SPAN: + case observer.FINISH_SPAN: buf.WriteString("finish ") buf.WriteString(data.Name) @@ -71,40 +75,41 @@ func AppendEvent(buf *strings.Builder, data reader.Event) { buf.WriteString(data.Duration.String()) buf.WriteString(")") - case reader.ADD_EVENT: + case observer.ADD_EVENT: buf.WriteString("event: ") buf.WriteString(data.Event.Message()) buf.WriteString(" (") for _, kv := range data.Event.Attributes() { - buf.WriteString(" " + kv.Key.Variable.Name + "=" + kv.Value.Emit()) + buf.WriteString(" ") + buf.WriteString(kv.Key.Variable.Name) + buf.WriteString("=") + buf.WriteString(kv.Value.Emit()) } buf.WriteString(")") - case reader.MODIFY_ATTR: - buf.WriteString("modify attr") - case reader.RECORD_STATS: - buf.WriteString("record") - - for _, s := range data.Stats { - f(false)(core.Key{ - Variable: s.Measure.V(), - }.Float64(s.Value)) - - buf.WriteString(" {") - i := 0 - s.Tags.Foreach(func(kv core.KeyValue) bool { - if i != 0 { - buf.WriteString(",") - } - i++ - buf.WriteString(kv.Key.Variable.Name) - buf.WriteString("=") - buf.WriteString(kv.Value.Emit()) - return true - }) - buf.WriteString("}") - } - case reader.SET_STATUS: + case observer.MODIFY_ATTR: + buf.WriteString(data.Type.String()) + + case observer.GAUGE_SET, observer.CUMULATIVE_INC, observer.ADDITIVE_ADD, observer.MEASURE_RECORD: + buf.WriteString(data.Type.String()) + buf.WriteString(" ") + buf.WriteString(data.Measurement.Metric.Variable.Name) + buf.WriteString("=") + buf.WriteString(fmt.Sprint(data.Measurement.Value)) + buf.WriteString(" {") + i := 0 + data.Tags.Foreach(func(kv core.KeyValue) bool { + if i != 0 { + buf.WriteString(",") + } + i++ + buf.WriteString(kv.Key.Variable.Name) + buf.WriteString("=") + buf.WriteString(kv.Value.Emit()) + return true + }) + buf.WriteString("}") + case observer.SET_STATUS: buf.WriteString("set status ") buf.WriteString(data.Status.String()) diff --git a/experimental/streaming/exporter/reader/reader.go b/experimental/streaming/exporter/reader/reader.go index 1f05d0a3628..6231b6dcd3a 100644 --- a/experimental/streaming/exporter/reader/reader.go +++ b/experimental/streaming/exporter/reader/reader.go @@ -23,8 +23,9 @@ import ( "go.opentelemetry.io/api/core" "go.opentelemetry.io/api/event" - "go.opentelemetry.io/api/stats" + "go.opentelemetry.io/api/metric" "go.opentelemetry.io/api/tag" + "go.opentelemetry.io/api/trace" "go.opentelemetry.io/experimental/streaming/exporter/observer" ) @@ -32,17 +33,15 @@ type Reader interface { Read(Event) } -type EventType int - type Event struct { - Type EventType + Type observer.EventType Time time.Time Sequence observer.EventID SpanContext core.SpanContext Tags tag.Map Attributes tag.Map Event event.Event - Stats []Measurement + Measurement Measurement Parent core.SpanContext ParentAttributes tag.Map @@ -54,9 +53,9 @@ type Event struct { } type Measurement struct { - Measure stats.Measure - Value float64 - Tags tag.Map + Metric metric.Handle + Value float64 + Tags tag.Map } type readerObserver struct { @@ -96,16 +95,6 @@ type readerScope struct { attributes tag.Map } -const ( - INVALID EventType = iota - START_SPAN - FINISH_SPAN - ADD_EVENT - MODIFY_ATTR - RECORD_STATS - SET_STATUS -) - // NewReaderObserver returns an implementation that computes the // necessary state needed by a reader to process events in memory. // Practically, this means tracking live metric handles and scope @@ -150,7 +139,7 @@ func (ro *readerObserver) orderedObserve(event observer.Event) { span.readerScope.attributes = rattrs read.Name = span.name - read.Type = START_SPAN + read.Type = observer.START_SPAN read.SpanContext = span.spanContext read.Attributes = rattrs @@ -178,7 +167,7 @@ func (ro *readerObserver) orderedObserve(event observer.Event) { } read.Name = span.name - read.Type = FINISH_SPAN + read.Type = observer.FINISH_SPAN read.Attributes = attrs read.Duration = event.Time.Sub(span.start) @@ -228,7 +217,7 @@ func (ro *readerObserver) orderedObserve(event observer.Event) { return } - read.Type = MODIFY_ATTR + read.Type = observer.MODIFY_ATTR read.Attributes = sc.attributes if span != nil { @@ -236,26 +225,26 @@ func (ro *readerObserver) orderedObserve(event observer.Event) { read.Tags = span.startTags } - case observer.NEW_MEASURE: - measure := &readerMeasure{ - name: event.String, - } - ro.measures.Store(event.Sequence, measure) - return - - case observer.NEW_METRIC: - measureI, has := ro.measures.Load(event.Scope.EventID) - if !has { - panic("metric measure not found") - } - metric := &readerMetric{ - readerMeasure: measureI.(*readerMeasure), - } - ro.metrics.Store(event.Sequence, metric) - return + // case observer.NEW_MEASURE: + // measure := &readerMeasure{ + // name: event.String, + // } + // ro.measures.Store(event.Sequence, measure) + // return + + // case observer.NEW_METRIC: + // measureI, has := ro.measures.Load(event.Scope.EventID) + // if !has { + // panic("metric measure not found") + // } + // metric := &readerMetric{ + // readerMeasure: measureI.(*readerMeasure), + // } + // ro.metrics.Store(event.Sequence, metric) + // return case observer.ADD_EVENT: - read.Type = ADD_EVENT + read.Type = observer.ADD_EVENT read.Message = event.String attrs, span := ro.readScope(event.Scope) @@ -266,22 +255,23 @@ func (ro *readerObserver) orderedObserve(event observer.Event) { read.SpanContext = span.spanContext } - case observer.RECORD_STATS: - read.Type = RECORD_STATS + case observer.GAUGE_SET, + observer.CUMULATIVE_INC, + observer.ADDITIVE_ADD, + observer.MEASURE_RECORD: - _, span := ro.readScope(event.Scope) - if span != nil { - read.SpanContext = span.spanContext - } - for _, es := range event.Stats { - ro.addMeasurement(&read, es) - } - if event.Stat.Measure != nil { - ro.addMeasurement(&read, event.Stat) + read.Type = event.Type + + if event.Context != nil { + span := trace.CurrentSpan(event.Context) + if span != nil { + read.SpanContext = span.SpanContext() + } } + ro.addMeasurement(&read, event.Measurement) case observer.SET_STATUS: - read.Type = SET_STATUS + read.Type = observer.SET_STATUS read.Status = event.Status _, span := ro.readScope(event.Scope) if span != nil { @@ -302,16 +292,16 @@ func (ro *readerObserver) orderedObserve(event observer.Event) { } } -func (ro *readerObserver) addMeasurement(e *Event, m stats.Measurement) { - attrs, _ := ro.readMeasureScope(m.Measure) - e.Stats = append(e.Stats, Measurement{ - Measure: m.Measure, - Value: m.Value, - Tags: attrs, - }) +func (ro *readerObserver) addMeasurement(e *Event, m observer.Measurement) { + attrs, _ := ro.readMeasureScope(m) + e.Measurement = Measurement{ + Metric: m.Metric, + Value: m.Value, + Tags: attrs, + } } -func (ro *readerObserver) readMeasureScope(m stats.Measure) (tag.Map, *readerSpan) { +func (ro *readerObserver) readMeasureScope(m observer.Measurement) (tag.Map, *readerSpan) { // TODO return nil, nil } diff --git a/experimental/streaming/exporter/spandata/spandata.go b/experimental/streaming/exporter/spandata/spandata.go index 323e5133210..8d4bb443755 100644 --- a/experimental/streaming/exporter/spandata/spandata.go +++ b/experimental/streaming/exporter/spandata/spandata.go @@ -46,7 +46,7 @@ func (s *spanReader) Read(data reader.Event) { return } var span *Span - if data.Type == reader.START_SPAN { + if data.Type == observer.START_SPAN { span = &Span{Events: make([]reader.Event, 0, 4)} s.spans[data.SpanContext] = span } else { @@ -59,7 +59,7 @@ func (s *spanReader) Read(data reader.Event) { span.Events = append(span.Events, data) - if data.Type == reader.FINISH_SPAN { + if data.Type == observer.FINISH_SPAN { for _, r := range s.readers { r.Read(span) } diff --git a/experimental/streaming/sdk/metric.go b/experimental/streaming/sdk/metric.go new file mode 100644 index 00000000000..1400c7edbc0 --- /dev/null +++ b/experimental/streaming/sdk/metric.go @@ -0,0 +1,101 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sdk + +import ( + "context" + + "go.opentelemetry.io/api/core" + "go.opentelemetry.io/api/metric" + "go.opentelemetry.io/experimental/streaming/exporter/observer" +) + +type float64Gauge struct { + handle *metric.Float64GaugeHandle + scope observer.ScopeID +} + +type float64Cumulative struct { + handle *metric.Float64CumulativeHandle + scope observer.ScopeID +} + +type float64Additive struct { + handle *metric.Float64AdditiveHandle + scope observer.ScopeID +} + +type float64Measure struct { + handle *metric.Float64MeasureHandle + scope observer.ScopeID +} + +func (s *sdk) GetFloat64Gauge(ctx context.Context, handle *metric.Float64GaugeHandle, labels ...core.KeyValue) metric.Float64Gauge { + return &float64Gauge{ + handle: handle, + scope: observer.NewScope(observer.ScopeID{}, labels...), + } +} + +func (s *sdk) GetFloat64Cumulative(ctx context.Context, handle *metric.Float64CumulativeHandle, labels ...core.KeyValue) metric.Float64Cumulative { + return &float64Cumulative{ + handle: handle, + scope: observer.NewScope(observer.ScopeID{}, labels...), + } +} + +func (s *sdk) GetFloat64Additive(ctx context.Context, handle *metric.Float64AdditiveHandle, labels ...core.KeyValue) metric.Float64Additive { + return &float64Additive{ + handle: handle, + scope: observer.NewScope(observer.ScopeID{}, labels...), + } +} + +func (s *sdk) GetFloat64Measure(ctx context.Context, handle *metric.Float64MeasureHandle, labels ...core.KeyValue) metric.Float64Measure { + return &float64Measure{ + handle: handle, + scope: observer.NewScope(observer.ScopeID{}, labels...), + } +} + +func record(ctx context.Context, handle metric.Handle, etype observer.EventType, value float64, scope observer.ScopeID, labels []core.KeyValue) { + observer.Record(observer.Event{ + Type: etype, + Context: ctx, + Measurement: observer.Measurement{ + Metric: handle, + Value: value, + Scope: scope, + }, + Attributes: labels, + }) + +} + +func (g *float64Gauge) Set(ctx context.Context, value float64, labels ...core.KeyValue) { + record(ctx, g.handle.Handle, observer.GAUGE_SET, value, g.scope, labels) +} + +func (c *float64Cumulative) Inc(ctx context.Context, value float64, labels ...core.KeyValue) { + record(ctx, c.handle.Handle, observer.CUMULATIVE_INC, value, c.scope, labels) +} + +func (a *float64Additive) Add(ctx context.Context, value float64, labels ...core.KeyValue) { + record(ctx, a.handle.Handle, observer.ADDITIVE_ADD, value, a.scope, labels) +} + +func (m *float64Measure) Record(ctx context.Context, value float64, labels ...core.KeyValue) { + record(ctx, m.handle.Handle, observer.MEASURE_RECORD, value, m.scope, labels) +} diff --git a/experimental/streaming/sdk/package.go b/experimental/streaming/sdk/package.go index b21c00b4dfc..d9140db7295 100644 --- a/experimental/streaming/sdk/package.go +++ b/experimental/streaming/sdk/package.go @@ -1,9 +1,24 @@ package sdk import ( + "go.opentelemetry.io/api/metric" "go.opentelemetry.io/api/trace" + "go.opentelemetry.io/experimental/streaming/exporter/observer" ) +type sdk struct { + resources observer.EventID +} + +type SDK interface { + trace.Tracer + metric.Meter +} + +var _ SDK = (*sdk)(nil) + func init() { - trace.SetGlobalTracer(New()) + instance := New() + trace.SetGlobalTracer(instance) + metric.SetGlobalMeter(instance) } diff --git a/experimental/streaming/sdk/span.go b/experimental/streaming/sdk/span.go index 9dc1551150b..44fd8ea2e6f 100644 --- a/experimental/streaming/sdk/span.go +++ b/experimental/streaming/sdk/span.go @@ -27,7 +27,7 @@ import ( ) type span struct { - tracer *tracer + sdk *sdk initial observer.ScopeID } @@ -100,7 +100,7 @@ func (sp *span) Finish() { } func (sp *span) Tracer() apitrace.Tracer { - return sp.tracer + return sp.sdk } func (sp *span) AddEvent(ctx context.Context, event event.Event) { diff --git a/experimental/streaming/sdk/trace.go b/experimental/streaming/sdk/trace.go index c765e7c2f93..840bd527115 100644 --- a/experimental/streaming/sdk/trace.go +++ b/experimental/streaming/sdk/trace.go @@ -22,14 +22,9 @@ import ( "go.opentelemetry.io/api/key" "go.opentelemetry.io/api/tag" "go.opentelemetry.io/api/trace" - apitrace "go.opentelemetry.io/api/trace" "go.opentelemetry.io/experimental/streaming/exporter/observer" ) -type tracer struct { - resources observer.EventID -} - var ( // TODO These should move somewhere in the api, right? ServiceKey = key.New("service") @@ -42,31 +37,31 @@ var ( ) ) -func New() trace.Tracer { - return &tracer{} +func New() SDK { + return &sdk{} } -func (t *tracer) WithResources(attributes ...core.KeyValue) apitrace.Tracer { - s := observer.NewScope(observer.ScopeID{ - EventID: t.resources, +func (s *sdk) WithResources(attributes ...core.KeyValue) trace.Tracer { + scope := observer.NewScope(observer.ScopeID{ + EventID: s.resources, }, attributes...) - return &tracer{ - resources: s.EventID, + return &sdk{ + resources: scope.EventID, } } -func (t *tracer) WithComponent(name string) apitrace.Tracer { - return t.WithResources(ComponentKey.String(name)) +func (s *sdk) WithComponent(name string) trace.Tracer { + return s.WithResources(ComponentKey.String(name)) } -func (t *tracer) WithService(name string) apitrace.Tracer { - return t.WithResources(ServiceKey.String(name)) +func (s *sdk) WithService(name string) trace.Tracer { + return s.WithResources(ServiceKey.String(name)) } -func (t *tracer) WithSpan(ctx context.Context, name string, body func(context.Context) error) error { - // TODO: use runtime/trace.WithRegion for execution tracer support +func (s *sdk) WithSpan(ctx context.Context, name string, body func(context.Context) error) error { + // TODO: use runtime/trace.WithRegion for execution sdk support // TODO: use runtime/pprof.Do for profile tags support - ctx, span := t.Start(ctx, name) + ctx, span := s.Start(ctx, name) defer span.Finish() if err := body(ctx); err != nil { @@ -77,12 +72,12 @@ func (t *tracer) WithSpan(ctx context.Context, name string, body func(context.Co return nil } -func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOption) (context.Context, apitrace.Span) { +func (s *sdk) Start(ctx context.Context, name string, opts ...trace.SpanOption) (context.Context, trace.Span) { var child core.SpanContext child.SpanID = rand.Uint64() - o := &apitrace.SpanOptions{} + o := &trace.SpanOptions{} for _, opt := range opts { opt(o) @@ -93,7 +88,7 @@ func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOp if o.Reference.HasTraceID() { parentScope.SpanContext = o.Reference.SpanContext } else { - parentScope.SpanContext = apitrace.CurrentSpan(ctx).SpanContext() + parentScope.SpanContext = trace.CurrentSpan(ctx).SpanContext() } if parentScope.HasTraceID() { @@ -107,11 +102,11 @@ func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOp childScope := observer.ScopeID{ SpanContext: child, - EventID: t.resources, + EventID: s.resources, } span := &span{ - tracer: t, + sdk: s, initial: observer.ScopeID{ SpanContext: child, EventID: observer.Record(observer.Event{ @@ -128,6 +123,6 @@ func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOp return trace.SetCurrentSpan(ctx, span), span } -func (t *tracer) Inject(ctx context.Context, span apitrace.Span, injector apitrace.Injector) { +func (s *sdk) Inject(ctx context.Context, span trace.Span, injector trace.Injector) { injector.Inject(span.SpanContext(), tag.FromContext(ctx)) } From 1db27c8447f8d607140dc27b75110e76dbc3867b Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 19 Jul 2019 10:06:31 -0700 Subject: [PATCH 3/6] New file --- example/grpc/metrics.go | 42 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 example/grpc/metrics.go diff --git a/example/grpc/metrics.go b/example/grpc/metrics.go new file mode 100644 index 00000000000..dc99888f554 --- /dev/null +++ b/example/grpc/metrics.go @@ -0,0 +1,42 @@ +package grpc + +import ( + "go.opentelemetry.io/api/key" + "go.opentelemetry.io/api/metric/aggregation" + "go.opentelemetry.io/api/unit" +) + +// Discussion +// https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md + +var ( + // Client metrics + clientMethodKey = key.New("grpc_client_method", + key.WithDescription("Full gRPC method name, including package, service and method, e.g. google.bigtable.v2.Bigtable/CheckAndMutateRow"), + ) + + clientStatusKey = key.New("grpc_client_method", + key.WithDescription("gRPC server status code received, e.g. OK, CANCELLED, DEADLINE_EXCEEDED"), + ) + + sentMessagesPerRPC = metric.NewFloat64Measure( + "grpc.io/client/sent_messages_per_rpc", + metric.WithUnit(unit.Bytes), + metric.WithDescription("Number of messages sent per RPC, equals 1 for unary RPCs"), + metric.WithAggregation(aggregation.Distribution(clientMethodKey)), + ) + + receivedMessagesPerRPC = metric.NewFloat64Measure( + "grpc.io/client/received_messages_per_rpc", + metric.WithUnit(unit.Bytes), + metric.WithDescription("Number of messages received per RPC, equals 1 for unary RPCs"), + metric.WithAggregation(aggregation.Distribution(clientMethodKey)), + ) + + receivedMessagesPerRPC = metric.NewFloat64Measure( + "grpc.io/client/received_messages_per_rpc", + metric.WithUnit(unit.Bytes), + metric.WithDescription("Number of messages received per RPC, equals 1 for unary RPCs"), + metric.WithAggregation(aggregation.Distribution(clientMethodKey)), + ) +) From d88ee56a194242acfa7961877843c2910ac3b651 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 19 Jul 2019 16:25:39 -0700 Subject: [PATCH 4/6] Firm up the example --- api/metric/aggregation/descriptor.go | 10 +++- api/metric/api.go | 9 +++- example/grpc/metrics.go | 81 ++++++++++++++++++++++++---- 3 files changed, 86 insertions(+), 14 deletions(-) diff --git a/api/metric/aggregation/descriptor.go b/api/metric/aggregation/descriptor.go index 5cb88931f32..d1215b90fb4 100644 --- a/api/metric/aggregation/descriptor.go +++ b/api/metric/aggregation/descriptor.go @@ -19,7 +19,7 @@ import "go.opentelemetry.io/api/core" type Operator int const ( - None Operator = iota + NONE Operator = iota SUM COUNT MIN @@ -33,6 +33,12 @@ type Descriptor struct { Keys []core.Key } +func None() Descriptor { + return Descriptor{ + Operator: NONE, + } +} + func Sum(keys ...core.Key) Descriptor { return Descriptor{ Operator: SUM, @@ -70,7 +76,7 @@ func LastValue(keys ...core.Key) Descriptor { func Distribution(keys ...core.Key) Descriptor { return Descriptor{ - Operator: SUM, + Operator: DISTRIBUTION, Keys: keys, } } diff --git a/api/metric/api.go b/api/metric/api.go index 99e7230df55..069506e42db 100644 --- a/api/metric/api.go +++ b/api/metric/api.go @@ -81,14 +81,19 @@ func WithUnit(unit unit.Unit) Option { } // WithKeys applies recommended dimension keys. Multiple `WithKeys` -// options accumulate. +// options accumulate. The keys specified in this way are taken as +// the recommended aggregation keys for Gauge, Cumulative, and +// Additive metrics. For Measure metrics, the keys recommended here +// are taken as the default for aggregations. func WithKeys(keys ...core.Key) Option { return func(m *Handle, _ *[]registry.Option) { m.Keys = append(m.Keys, keys...) } } -// WithAggregation applies user-recommended aggregations to this metric. +// WithAggregation applies user-recommended aggregations to this +// metric. This is useful to declare the non-default aggregations, +// particularly for Measure type metrics. func WithAggregations(aggrs ...aggregation.Descriptor) Option { return func(m *Handle, _ *[]registry.Option) { m.Aggregations = append(m.Aggregations, aggrs...) diff --git a/example/grpc/metrics.go b/example/grpc/metrics.go index dc99888f554..853dd4465d5 100644 --- a/example/grpc/metrics.go +++ b/example/grpc/metrics.go @@ -2,15 +2,16 @@ package grpc import ( "go.opentelemetry.io/api/key" + "go.opentelemetry.io/api/metric" "go.opentelemetry.io/api/metric/aggregation" "go.opentelemetry.io/api/unit" ) -// Discussion +// Implements the default metrics export configuration declared here: // https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md var ( - // Client metrics + // Client metric tags clientMethodKey = key.New("grpc_client_method", key.WithDescription("Full gRPC method name, including package, service and method, e.g. google.bigtable.v2.Bigtable/CheckAndMutateRow"), ) @@ -19,24 +20,84 @@ var ( key.WithDescription("gRPC server status code received, e.g. OK, CANCELLED, DEADLINE_EXCEEDED"), ) + // The default client metrics: + // + // grpc.io/client/sent_bytes_per_rpc distribution grpc_client_method + // grpc.io/client/received_bytes_per_rpc distribution grpc_client_method + // grpc.io/client/roundtrip_latency distribution grpc_client_method + // grpc.io/client/completed_rpcs count grpc_client_method, grpc_client_status + // grpc.io/client/started_rpcs count grpc_client_method + + sentBytesPerRPC = metric.NewFloat64Measure( + "grpc.io/client/sent_bytes_per_rpc", + metric.WithUnit(unit.Bytes), + metric.WithDescription("Number of bytes sent per RPC"), + metric.WithAggregations(aggregation.Distribution(clientMethodKey)), + ) + + receivedBytesPerRPC = metric.NewFloat64Measure( + "grpc.io/client/received_bytes_per_rpc", + metric.WithUnit(unit.Bytes), + metric.WithDescription("Number of bytes received per RPC"), + metric.WithAggregations(aggregation.Distribution(clientMethodKey)), + ) + + roundtripLatency = metric.NewFloat64Measure( + "grpc.io/client/roundtrip_latency", + metric.WithUnit(unit.Milliseconds), + metric.WithDescription("Roundtrip request latency"), + metric.WithAggregations(aggregation.Distribution(clientMethodKey)), + ) + + // Note: the specification says to use "Count" aggregation by + // default, which appears to assume that the number of + // completed_rpcs is always 1. Shouldn't this use a Sum + // aggregation? + // + // Question: how is this different than the following? + // NewFloat64Cumulative( + // "grpc.io/client/completed_rpcs", + // metric.WithDescription(...), + // metric.WithKeys(clientMethodKey, clientStatusKey), + // ) + // Answer: these are the effectively the same, i.e., default equivalent default views. + // The only difference is the method name used to add a measurement. + completedRPCs = metric.NewFloat64Measure( + "grpc.io/client/completed_rpcs", + metric.WithDescription("Count of completed RPCs"), + metric.WithAggregations(aggregation.Sum(clientMethodKey, clientStatusKey)), + ) + + startedRPCs = metric.NewFloat64Measure( + "grpc.io/client/started_rpcs", + metric.WithDescription("Count of started RPCs"), + metric.WithAggregations(aggregation.Sum(clientMethodKey)), + ) + + // Extra client metrics + // grpc.io/client/sent_messages_per_rpc distribution grpc_client_method + // grpc.io/client/received_messages_per_rpc distribution grpc_client_method + // grpc.io/client/server_latency distribution grpc_client_method + // grpc.io/client/sent_messages_per_method count grpc_client_method + // grpc.io/client/received_messages_per_method count grpc_client_method + // grpc.io/client/sent_bytes_per_method sum grpc_client_method + // grpc.io/client/received_bytes_per_method sum grpc_client_method + sentMessagesPerRPC = metric.NewFloat64Measure( "grpc.io/client/sent_messages_per_rpc", metric.WithUnit(unit.Bytes), metric.WithDescription("Number of messages sent per RPC, equals 1 for unary RPCs"), - metric.WithAggregation(aggregation.Distribution(clientMethodKey)), + metric.WithKeys(clientMethodKey), + metric.WithAggregations(aggregation.None()), ) receivedMessagesPerRPC = metric.NewFloat64Measure( "grpc.io/client/received_messages_per_rpc", metric.WithUnit(unit.Bytes), metric.WithDescription("Number of messages received per RPC, equals 1 for unary RPCs"), - metric.WithAggregation(aggregation.Distribution(clientMethodKey)), + metric.WithKeys(clientMethodKey), + metric.WithAggregations(aggregation.None()), ) - receivedMessagesPerRPC = metric.NewFloat64Measure( - "grpc.io/client/received_messages_per_rpc", - metric.WithUnit(unit.Bytes), - metric.WithDescription("Number of messages received per RPC, equals 1 for unary RPCs"), - metric.WithAggregation(aggregation.Distribution(clientMethodKey)), - ) + // ... and so on ) From 8ef1a272575dba923a6e7ccd4bd421c32c89a6f8 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 19 Jul 2019 16:36:35 -0700 Subject: [PATCH 5/6] Reduce number of event types; use a stringer for MetricType --- api/metric/api.go | 12 +-------- api/metric/metrictype_string.go | 27 +++++++++++++++++++ .../exporter/observer/eventtype_string.go | 9 +++---- .../streaming/exporter/observer/observer.go | 7 +---- .../exporter/reader/format/format.go | 4 +-- .../streaming/exporter/reader/reader.go | 5 +--- .../streaming/exporter/spandata/spandata.go | 10 ++++++- experimental/streaming/sdk/metric.go | 8 +++--- 8 files changed, 48 insertions(+), 34 deletions(-) create mode 100644 api/metric/metrictype_string.go diff --git a/api/metric/api.go b/api/metric/api.go index 069506e42db..281b65e5f8d 100644 --- a/api/metric/api.go +++ b/api/metric/api.go @@ -25,6 +25,7 @@ import ( type MetricType int +//go:generate stringer -type=MetricType const ( Invalid MetricType = iota Gauge // Supports Set() @@ -100,17 +101,6 @@ func WithAggregations(aggrs ...aggregation.Descriptor) Option { } } -func (mtype MetricType) String() string { - switch mtype { - case Gauge: - return "gauge" - case Cumulative: - return "cumulative" - default: - return "unknown" - } -} - func (h Handle) Defined() bool { return h.Variable.Defined() } diff --git a/api/metric/metrictype_string.go b/api/metric/metrictype_string.go new file mode 100644 index 00000000000..200564c3b82 --- /dev/null +++ b/api/metric/metrictype_string.go @@ -0,0 +1,27 @@ +// Code generated by "stringer -type=MetricType"; DO NOT EDIT. + +package metric + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Invalid-0] + _ = x[Gauge-1] + _ = x[Cumulative-2] + _ = x[Additive-3] + _ = x[Measure-4] +} + +const _MetricType_name = "InvalidGaugeCumulativeAdditiveMeasure" + +var _MetricType_index = [...]uint8{0, 7, 12, 22, 30, 37} + +func (i MetricType) String() string { + if i < 0 || i >= MetricType(len(_MetricType_index)-1) { + return "MetricType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _MetricType_name[_MetricType_index[i]:_MetricType_index[i+1]] +} diff --git a/experimental/streaming/exporter/observer/eventtype_string.go b/experimental/streaming/exporter/observer/eventtype_string.go index d27a3d45e28..03b8c25e15f 100644 --- a/experimental/streaming/exporter/observer/eventtype_string.go +++ b/experimental/streaming/exporter/observer/eventtype_string.go @@ -15,15 +15,12 @@ func _() { _ = x[NEW_SCOPE-4] _ = x[MODIFY_ATTR-5] _ = x[SET_STATUS-6] - _ = x[GAUGE_SET-7] - _ = x[CUMULATIVE_INC-8] - _ = x[ADDITIVE_ADD-9] - _ = x[MEASURE_RECORD-10] + _ = x[UPDATE_METRIC-7] } -const _EventType_name = "INVALIDSTART_SPANFINISH_SPANADD_EVENTNEW_SCOPEMODIFY_ATTRSET_STATUSGAUGE_SETCUMULATIVE_INCADDITIVE_ADDMEASURE_RECORD" +const _EventType_name = "INVALIDSTART_SPANFINISH_SPANADD_EVENTNEW_SCOPEMODIFY_ATTRSET_STATUSUPDATE_METRIC" -var _EventType_index = [...]uint8{0, 7, 17, 28, 37, 46, 57, 67, 76, 90, 102, 116} +var _EventType_index = [...]uint8{0, 7, 17, 28, 37, 46, 57, 67, 80} func (i EventType) String() string { if i < 0 || i >= EventType(len(_EventType_index)-1) { diff --git a/experimental/streaming/exporter/observer/observer.go b/experimental/streaming/exporter/observer/observer.go index 69a390cd7ff..f910b59a801 100644 --- a/experimental/streaming/exporter/observer/observer.go +++ b/experimental/streaming/exporter/observer/observer.go @@ -75,7 +75,6 @@ type observersMap map[Observer]struct{} //go:generate stringer -type=EventType const ( - // TODO: rename these NOUN_VERB INVALID EventType = iota START_SPAN FINISH_SPAN @@ -83,11 +82,7 @@ const ( NEW_SCOPE MODIFY_ATTR SET_STATUS - - GAUGE_SET - CUMULATIVE_INC - ADDITIVE_ADD - MEASURE_RECORD + UPDATE_METRIC ) var ( diff --git a/experimental/streaming/exporter/reader/format/format.go b/experimental/streaming/exporter/reader/format/format.go index 7671e4636c0..10acdd0b367 100644 --- a/experimental/streaming/exporter/reader/format/format.go +++ b/experimental/streaming/exporter/reader/format/format.go @@ -90,8 +90,8 @@ func AppendEvent(buf *strings.Builder, data reader.Event) { case observer.MODIFY_ATTR: buf.WriteString(data.Type.String()) - case observer.GAUGE_SET, observer.CUMULATIVE_INC, observer.ADDITIVE_ADD, observer.MEASURE_RECORD: - buf.WriteString(data.Type.String()) + case observer.UPDATE_METRIC: + buf.WriteString(data.Measurement.Metric.Type.String()) buf.WriteString(" ") buf.WriteString(data.Measurement.Metric.Variable.Name) buf.WriteString("=") diff --git a/experimental/streaming/exporter/reader/reader.go b/experimental/streaming/exporter/reader/reader.go index 6231b6dcd3a..1d08f542086 100644 --- a/experimental/streaming/exporter/reader/reader.go +++ b/experimental/streaming/exporter/reader/reader.go @@ -255,10 +255,7 @@ func (ro *readerObserver) orderedObserve(event observer.Event) { read.SpanContext = span.spanContext } - case observer.GAUGE_SET, - observer.CUMULATIVE_INC, - observer.ADDITIVE_ADD, - observer.MEASURE_RECORD: + case observer.UPDATE_METRIC: read.Type = event.Type diff --git a/experimental/streaming/exporter/spandata/spandata.go b/experimental/streaming/exporter/spandata/spandata.go index 8d4bb443755..7bff30209ea 100644 --- a/experimental/streaming/exporter/spandata/spandata.go +++ b/experimental/streaming/exporter/spandata/spandata.go @@ -42,7 +42,7 @@ func NewReaderObserver(readers ...Reader) observer.Observer { func (s *spanReader) Read(data reader.Event) { if !data.SpanContext.HasSpanID() { - // @@@ This is happening, somehow span context is busted. + panic("How is this?") return } var span *Span @@ -57,6 +57,14 @@ func (s *spanReader) Read(data reader.Event) { } } + switch data.Type { + // GAUGE_SET + // CUMULATIVE_INC + // ADDITIVE_ADD + // MEASURE_RECORD + // @@@ + } + span.Events = append(span.Events, data) if data.Type == observer.FINISH_SPAN { diff --git a/experimental/streaming/sdk/metric.go b/experimental/streaming/sdk/metric.go index 1400c7edbc0..07694b1bf8b 100644 --- a/experimental/streaming/sdk/metric.go +++ b/experimental/streaming/sdk/metric.go @@ -85,17 +85,17 @@ func record(ctx context.Context, handle metric.Handle, etype observer.EventType, } func (g *float64Gauge) Set(ctx context.Context, value float64, labels ...core.KeyValue) { - record(ctx, g.handle.Handle, observer.GAUGE_SET, value, g.scope, labels) + record(ctx, g.handle.Handle, observer.UPDATE_METRIC, value, g.scope, labels) } func (c *float64Cumulative) Inc(ctx context.Context, value float64, labels ...core.KeyValue) { - record(ctx, c.handle.Handle, observer.CUMULATIVE_INC, value, c.scope, labels) + record(ctx, c.handle.Handle, observer.UPDATE_METRIC, value, c.scope, labels) } func (a *float64Additive) Add(ctx context.Context, value float64, labels ...core.KeyValue) { - record(ctx, a.handle.Handle, observer.ADDITIVE_ADD, value, a.scope, labels) + record(ctx, a.handle.Handle, observer.UPDATE_METRIC, value, a.scope, labels) } func (m *float64Measure) Record(ctx context.Context, value float64, labels ...core.KeyValue) { - record(ctx, m.handle.Handle, observer.MEASURE_RECORD, value, m.scope, labels) + record(ctx, m.handle.Handle, observer.UPDATE_METRIC, value, m.scope, labels) } From 48032d7b8931f1734b023411aafdca99d076aaac Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 19 Jul 2019 16:58:25 -0700 Subject: [PATCH 6/6] Tidy --- api/metric/aggregation/descriptor.go | 56 +------------------ api/metric/aggregation/operator_string.go | 29 ++++++++++ api/metric/api.go | 29 +++++----- api/metric/common.go | 2 +- api/metric/metrictype_string.go | 27 --------- api/metric/type_string.go | 27 +++++++++ example/grpc/metrics.go | 19 ++++--- .../streaming/exporter/spandata/spandata.go | 16 +++--- 8 files changed, 93 insertions(+), 112 deletions(-) create mode 100644 api/metric/aggregation/operator_string.go delete mode 100644 api/metric/metrictype_string.go create mode 100644 api/metric/type_string.go diff --git a/api/metric/aggregation/descriptor.go b/api/metric/aggregation/descriptor.go index d1215b90fb4..e723ab84fe9 100644 --- a/api/metric/aggregation/descriptor.go +++ b/api/metric/aggregation/descriptor.go @@ -14,8 +14,7 @@ package aggregation -import "go.opentelemetry.io/api/core" - +//go:generate stringer -type=Operator type Operator int const ( @@ -27,56 +26,3 @@ const ( LAST_VALUE DISTRIBUTION ) - -type Descriptor struct { - Operator Operator - Keys []core.Key -} - -func None() Descriptor { - return Descriptor{ - Operator: NONE, - } -} - -func Sum(keys ...core.Key) Descriptor { - return Descriptor{ - Operator: SUM, - Keys: keys, - } -} - -func Count(keys ...core.Key) Descriptor { - return Descriptor{ - Operator: COUNT, - Keys: keys, - } -} - -func Min(keys ...core.Key) Descriptor { - return Descriptor{ - Operator: MIN, - Keys: keys, - } -} - -func Max(keys ...core.Key) Descriptor { - return Descriptor{ - Operator: MAX, - Keys: keys, - } -} - -func LastValue(keys ...core.Key) Descriptor { - return Descriptor{ - Operator: LAST_VALUE, - Keys: keys, - } -} - -func Distribution(keys ...core.Key) Descriptor { - return Descriptor{ - Operator: DISTRIBUTION, - Keys: keys, - } -} diff --git a/api/metric/aggregation/operator_string.go b/api/metric/aggregation/operator_string.go new file mode 100644 index 00000000000..6606baeadb3 --- /dev/null +++ b/api/metric/aggregation/operator_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type=Operator"; DO NOT EDIT. + +package aggregation + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[NONE-0] + _ = x[SUM-1] + _ = x[COUNT-2] + _ = x[MIN-3] + _ = x[MAX-4] + _ = x[LAST_VALUE-5] + _ = x[DISTRIBUTION-6] +} + +const _Operator_name = "NONESUMCOUNTMINMAXLAST_VALUEDISTRIBUTION" + +var _Operator_index = [...]uint8{0, 4, 7, 12, 15, 18, 28, 40} + +func (i Operator) String() string { + if i < 0 || i >= Operator(len(_Operator_index)-1) { + return "Operator(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _Operator_name[_Operator_index[i]:_Operator_index[i+1]] +} diff --git a/api/metric/api.go b/api/metric/api.go index 281b65e5f8d..d6c8528d85f 100644 --- a/api/metric/api.go +++ b/api/metric/api.go @@ -23,15 +23,15 @@ import ( "go.opentelemetry.io/api/unit" ) -type MetricType int +type Type int -//go:generate stringer -type=MetricType +//go:generate stringer -type=Type const ( - Invalid MetricType = iota - Gauge // Supports Set() - Cumulative // Supports Inc(): only positive values - Additive // Supports Add(): positive or negative - Measure // Supports Record() + Invalid Type = iota + Gauge // Supports Set() + Cumulative // Supports Inc(): only positive values + Additive // Supports Add(): positive or negative + Measure // Supports Record() ) type Meter interface { @@ -60,9 +60,9 @@ type Float64Measure interface { type Handle struct { Variable registry.Variable - Type MetricType - Keys []core.Key - Aggregations []aggregation.Descriptor + Type Type + Keys []core.Key + Aggregation aggregation.Operator } type Option func(*Handle, *[]registry.Option) @@ -92,12 +92,11 @@ func WithKeys(keys ...core.Key) Option { } } -// WithAggregation applies user-recommended aggregations to this -// metric. This is useful to declare the non-default aggregations, -// particularly for Measure type metrics. -func WithAggregations(aggrs ...aggregation.Descriptor) Option { +// WithAggregation applies a user-recommended aggregation to this +// metric, useful particularly for Measure type metrics. +func WithAggregation(aggr aggregation.Operator) Option { return func(m *Handle, _ *[]registry.Option) { - m.Aggregations = append(m.Aggregations, aggrs...) + m.Aggregation = aggr } } diff --git a/api/metric/common.go b/api/metric/common.go index 0ccad41e0b6..cc210e6b9de 100644 --- a/api/metric/common.go +++ b/api/metric/common.go @@ -18,7 +18,7 @@ import ( "go.opentelemetry.io/api/registry" ) -func registerMetric(name string, mtype MetricType, opts []Option, metric *Handle) { +func registerMetric(name string, mtype Type, opts []Option, metric *Handle) { var varOpts []registry.Option for _, opt := range opts { diff --git a/api/metric/metrictype_string.go b/api/metric/metrictype_string.go deleted file mode 100644 index 200564c3b82..00000000000 --- a/api/metric/metrictype_string.go +++ /dev/null @@ -1,27 +0,0 @@ -// Code generated by "stringer -type=MetricType"; DO NOT EDIT. - -package metric - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[Invalid-0] - _ = x[Gauge-1] - _ = x[Cumulative-2] - _ = x[Additive-3] - _ = x[Measure-4] -} - -const _MetricType_name = "InvalidGaugeCumulativeAdditiveMeasure" - -var _MetricType_index = [...]uint8{0, 7, 12, 22, 30, 37} - -func (i MetricType) String() string { - if i < 0 || i >= MetricType(len(_MetricType_index)-1) { - return "MetricType(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _MetricType_name[_MetricType_index[i]:_MetricType_index[i+1]] -} diff --git a/api/metric/type_string.go b/api/metric/type_string.go new file mode 100644 index 00000000000..61f630239bf --- /dev/null +++ b/api/metric/type_string.go @@ -0,0 +1,27 @@ +// Code generated by "stringer -type=Type"; DO NOT EDIT. + +package metric + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Invalid-0] + _ = x[Gauge-1] + _ = x[Cumulative-2] + _ = x[Additive-3] + _ = x[Measure-4] +} + +const _Type_name = "InvalidGaugeCumulativeAdditiveMeasure" + +var _Type_index = [...]uint8{0, 7, 12, 22, 30, 37} + +func (i Type) String() string { + if i < 0 || i >= Type(len(_Type_index)-1) { + return "Type(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _Type_name[_Type_index[i]:_Type_index[i+1]] +} diff --git a/example/grpc/metrics.go b/example/grpc/metrics.go index 853dd4465d5..01123ad5c91 100644 --- a/example/grpc/metrics.go +++ b/example/grpc/metrics.go @@ -32,21 +32,24 @@ var ( "grpc.io/client/sent_bytes_per_rpc", metric.WithUnit(unit.Bytes), metric.WithDescription("Number of bytes sent per RPC"), - metric.WithAggregations(aggregation.Distribution(clientMethodKey)), + metric.WithKeys(clientMethodKey), + metric.WithAggregation(aggregation.DISTRIBUTION), ) receivedBytesPerRPC = metric.NewFloat64Measure( "grpc.io/client/received_bytes_per_rpc", metric.WithUnit(unit.Bytes), metric.WithDescription("Number of bytes received per RPC"), - metric.WithAggregations(aggregation.Distribution(clientMethodKey)), + metric.WithKeys(clientMethodKey), + metric.WithAggregation(aggregation.DISTRIBUTION), ) roundtripLatency = metric.NewFloat64Measure( "grpc.io/client/roundtrip_latency", metric.WithUnit(unit.Milliseconds), metric.WithDescription("Roundtrip request latency"), - metric.WithAggregations(aggregation.Distribution(clientMethodKey)), + metric.WithKeys(clientMethodKey), + metric.WithAggregation(aggregation.DISTRIBUTION), ) // Note: the specification says to use "Count" aggregation by @@ -65,13 +68,15 @@ var ( completedRPCs = metric.NewFloat64Measure( "grpc.io/client/completed_rpcs", metric.WithDescription("Count of completed RPCs"), - metric.WithAggregations(aggregation.Sum(clientMethodKey, clientStatusKey)), + metric.WithKeys(clientMethodKey, clientStatusKey), + metric.WithAggregation(aggregation.SUM), ) startedRPCs = metric.NewFloat64Measure( "grpc.io/client/started_rpcs", metric.WithDescription("Count of started RPCs"), - metric.WithAggregations(aggregation.Sum(clientMethodKey)), + metric.WithKeys(clientMethodKey), + metric.WithAggregation(aggregation.SUM), ) // Extra client metrics @@ -88,7 +93,7 @@ var ( metric.WithUnit(unit.Bytes), metric.WithDescription("Number of messages sent per RPC, equals 1 for unary RPCs"), metric.WithKeys(clientMethodKey), - metric.WithAggregations(aggregation.None()), + metric.WithAggregation(aggregation.NONE), ) receivedMessagesPerRPC = metric.NewFloat64Measure( @@ -96,7 +101,7 @@ var ( metric.WithUnit(unit.Bytes), metric.WithDescription("Number of messages received per RPC, equals 1 for unary RPCs"), metric.WithKeys(clientMethodKey), - metric.WithAggregations(aggregation.None()), + metric.WithAggregation(aggregation.NONE), ) // ... and so on diff --git a/experimental/streaming/exporter/spandata/spandata.go b/experimental/streaming/exporter/spandata/spandata.go index 7bff30209ea..292a6cca596 100644 --- a/experimental/streaming/exporter/spandata/spandata.go +++ b/experimental/streaming/exporter/spandata/spandata.go @@ -25,7 +25,8 @@ type Reader interface { } type Span struct { - Events []reader.Event + Events []reader.Event + Aggregates map[string]float64 } type spanReader struct { @@ -57,12 +58,9 @@ func (s *spanReader) Read(data reader.Event) { } } - switch data.Type { - // GAUGE_SET - // CUMULATIVE_INC - // ADDITIVE_ADD - // MEASURE_RECORD - // @@@ + if data.Type == observer.UPDATE_METRIC { + s.updateMetric(data) + return } span.Events = append(span.Events, data) @@ -74,3 +72,7 @@ func (s *spanReader) Read(data reader.Event) { delete(s.spans, data.SpanContext) } } + +func (s *spanReader) updateMetric(data reader.Event) { + // TODO aggregate +}