Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Context propagation #297

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 43 additions & 35 deletions api/distributedcontext/map.go → api/distributedcontext/baggage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,77 +18,85 @@ import (
"go.opentelemetry.io/otel/api/core"
)

type entry struct {
value core.Value
type Baggage struct {
m map[core.Key]core.Value
}

type rawMap map[core.Key]entry
type BaggageUpdate struct {
DropSingleK core.Key
krnowak marked this conversation as resolved.
Show resolved Hide resolved
DropMultiK []core.Key

type Map struct {
m rawMap
}

type MapUpdate struct {
SingleKV core.KeyValue
MultiKV []core.KeyValue
Map Baggage
}

func newMap(raw rawMap) Map {
return Map{
m: raw,
func NewBaggage() Baggage {
return Baggage{
m: nil,
}
}

func NewEmptyMap() Map {
return newMap(nil)
}

func NewMap(update MapUpdate) Map {
return NewEmptyMap().Apply(update)
}

func (m Map) Apply(update MapUpdate) Map {
r := make(rawMap, len(m.m)+len(update.MultiKV))
func (m Baggage) Apply(update BaggageUpdate) Baggage {
r := make(map[core.Key]core.Value, len(m.m)+len(update.MultiKV)+update.Map.Len())
for k, v := range m.m {
r[k] = v
}
if update.DropSingleK.Defined() {
delete(r, update.DropSingleK)
}
for _, k := range update.DropMultiK {
delete(r, k)
}
if update.SingleKV.Key.Defined() {
r[update.SingleKV.Key] = entry{
value: update.SingleKV.Value,
}
r[update.SingleKV.Key] = update.SingleKV.Value
}
for _, kv := range update.MultiKV {
r[kv.Key] = entry{
value: kv.Value,
}
r[kv.Key] = kv.Value
}
for k, v := range update.Map.m {
r[k] = v
}
if len(r) == 0 {
r = nil
}
return newMap(r)
return Baggage{
m: r,
}
}

func (m Map) Value(k core.Key) (core.Value, bool) {
entry, ok := m.m[k]
return entry.value, ok
func (m Baggage) Value(k core.Key) (core.Value, bool) {
v, ok := m.m[k]
return v, ok
}

func (m Map) HasValue(k core.Key) bool {
func (m Baggage) HasValue(k core.Key) bool {
_, has := m.Value(k)
return has
}

func (m Map) Len() int {
func (m Baggage) Len() int {
return len(m.m)
}

func (m Map) Foreach(f func(kv core.KeyValue) bool) {
func (m Baggage) Foreach(f func(kv core.KeyValue) bool) {
for k, v := range m.m {
if !f(core.KeyValue{
Key: k,
Value: v.value,
Value: v,
}) {
return
}
}
}

func (m Baggage) KeyValues() []core.KeyValue {
a := make([]core.KeyValue, 0, len(m.m))
for k, v := range m.m {
a = append(a, core.KeyValue{
Key: k,
Value: v,
})
}
return a
}
49 changes: 34 additions & 15 deletions api/distributedcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,52 @@ package distributedcontext

import (
"context"
"runtime/pprof"

"go.opentelemetry.io/otel/api/core"
)

type ctxEntriesType struct{}
type ctxKey int

var (
ctxEntriesKey = &ctxEntriesType{}
const (
ctxCorrelationsKey ctxKey = iota
ctxBaggageKey
)

func WithMap(ctx context.Context, m Map) context.Context {
return context.WithValue(ctx, ctxEntriesKey, m)
func NewCorrelationsContextKV(ctx context.Context, correlations ...Correlation) context.Context {
return withCorrelationsMapUpdate(ctx, CorrelationsUpdate{
MultiKV: correlations,
})
}

func NewCorrelationsContextMap(ctx context.Context, correlations Correlations) context.Context {
return withCorrelationsMapUpdate(ctx, CorrelationsUpdate{
Map: correlations,
})
}

func CorrelationsFromContext(ctx context.Context) Correlations {
if m, ok := ctx.Value(ctxCorrelationsKey).(Correlations); ok {
return m
}
return NewCorrelations()
}

func NewContext(ctx context.Context, keyvalues ...core.KeyValue) context.Context {
return WithMap(ctx, FromContext(ctx).Apply(MapUpdate{
MultiKV: keyvalues,
}))
func NewBaggageContext(ctx context.Context, baggage Baggage) context.Context {
return context.WithValue(ctx, ctxBaggageKey, baggage)
}

func FromContext(ctx context.Context) Map {
if m, ok := ctx.Value(ctxEntriesKey).(Map); ok {
func BaggageFromContext(ctx context.Context) Baggage {
if m, ok := ctx.Value(ctxBaggageKey).(Baggage); ok {
return m
}
return NewEmptyMap()
return NewBaggage()
}

func withCorrelationsMapUpdate(ctx context.Context, update CorrelationsUpdate) context.Context {
return context.WithValue(ctx, ctxCorrelationsKey, CorrelationsFromContext(ctx).Apply(update))
}

/*
// TODO(krnowak): I don't know what's the point of this function…

// Note: the golang pprof.Do API forces this memory allocation, we
// should file an issue about that. (There's a TODO in the source.)
func Do(ctx context.Context, f func(ctx context.Context)) {
Expand All @@ -54,3 +72,4 @@ func Do(ctx context.Context, f func(ctx context.Context)) {
}
pprof.Do(ctx, pprof.Labels(keyvals...), f)
}
*/
123 changes: 123 additions & 0 deletions api/distributedcontext/correlations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package distributedcontext

import (
"go.opentelemetry.io/otel/api/core"
)

type HopLimit int

const (
NoPropagation HopLimit = iota
UnlimitedPropagation
)

type CorrelationValue struct {
core.Value
HopLimit HopLimit
}

type Correlation struct {
core.KeyValue
HopLimit HopLimit
}

func (c *Correlation) CorrelationValue() CorrelationValue {
return CorrelationValue{
Value: c.Value,
HopLimit: c.HopLimit,
}
}

type Correlations struct {
m map[core.Key]CorrelationValue
}

type CorrelationsUpdate struct {
SingleKV Correlation
MultiKV []Correlation
Map Correlations
}

func NewCorrelations() Correlations {
return Correlations{
m: nil,
}
}

func (m Correlations) Apply(update CorrelationsUpdate) Correlations {
r := make(map[core.Key]CorrelationValue, len(m.m)+len(update.MultiKV)+update.Map.Len())
for k, v := range m.m {
r[k] = v
}
if update.SingleKV.Key.Defined() {
r[update.SingleKV.Key] = update.SingleKV.CorrelationValue()
}
for _, kv := range update.MultiKV {
r[kv.Key] = kv.CorrelationValue()
}
for k, v := range update.Map.m {
r[k] = v
}
if len(r) == 0 {
r = nil
}
return Correlations{
m: r,
}
}

func (m Correlations) Value(k core.Key) (CorrelationValue, bool) {
v, ok := m.m[k]
return v, ok
}

func (m Correlations) HasValue(k core.Key) bool {
_, has := m.Value(k)
return has
}

func (m Correlations) Len() int {
return len(m.m)
}

func (m Correlations) Foreach(f func(kv Correlation) bool) {
for k, v := range m.m {
if !f(Correlation{
KeyValue: core.KeyValue{
Key: k,
Value: v.Value,
},
HopLimit: v.HopLimit,
}) {
return
}
}
}

func (m Correlations) Correlations() []Correlation {
a := make([]Correlation, 0, len(m.m))
for k, v := range m.m {
a = append(a, Correlation{
KeyValue: core.KeyValue{
Key: k,
Value: v.Value,
},
HopLimit: v.HopLimit,
})
}
return a
}
2 changes: 2 additions & 0 deletions api/distributedcontext/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build ignore

package distributedcontext

import (
Expand Down
Loading