diff --git a/internal/profiling/buffer/buffer.go b/internal/profiling/buffer/buffer.go new file mode 100644 index 000000000000..db41712a872f --- /dev/null +++ b/internal/profiling/buffer/buffer.go @@ -0,0 +1,272 @@ +// +build !appengine + +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package buffer + +import ( + "errors" + "math/bits" + "runtime" + "sync" + "sync/atomic" + "unsafe" +) + +type queue struct { + // An array of pointers as references to the items stored in this queue. + arr []unsafe.Pointer + // The maximum number of elements this queue may store before it wraps around + // and overwrites older values. Must be an exponent of 2. + size uint32 + // Always size - 1. A bitwise AND is performed with this mask in place of a + // modulo operation by the Push operation. + mask uint32 + // Each Push operation into this queue increments the acquired counter before + // proceeding forwarding with the actual write to arr. This counter is also + // used by the Drain operation's drainWait subroutine to wait for all pushes + // to complete. + acquired uint32 + // After the completion of a Push operation, the written counter is + // incremented. Also used by drainWait to wait for all pushes to complete. + written uint32 +} + +// Allocates and returns a new *queue. size needs to be a exponent of two. +func newQueue(size uint32) *queue { + return &queue{ + arr: make([]unsafe.Pointer, size), + size: size, + mask: size - 1, + } +} + +// drainWait blocks the caller until all Pushes on this queue are complete. +func (q *queue) drainWait() { + for atomic.LoadUint32(&q.acquired) != atomic.LoadUint32(&q.written) { + runtime.Gosched() + } +} + +// A queuePair has two queues. At any given time, Pushes go into the queue +// referenced by queuePair.q. The active queue gets switched when there's a +// drain operation on the circular buffer. +type queuePair struct { + q0 unsafe.Pointer + q1 unsafe.Pointer + q unsafe.Pointer +} + +// Allocates and returns a new *queuePair with its internal queues allocated. +func newQueuePair(size uint32) *queuePair { + qp := &queuePair{} + qp.q0 = unsafe.Pointer(newQueue(size)) + qp.q1 = unsafe.Pointer(newQueue(size)) + qp.q = qp.q0 + return qp +} + +// Switches the current queue for future Pushes to proceed to the other queue +// so that there's no blocking in Push. Returns a pointer to the old queue that +// was in place before the switch. +func (qp *queuePair) switchQueues() *queue { + // Even though we have mutual exclusion across drainers (thanks to mu.Lock in + // drain), Push operations may access qp.q whilst we're writing to it. + if atomic.CompareAndSwapPointer(&qp.q, qp.q0, qp.q1) { + return (*queue)(qp.q0) + } + + atomic.CompareAndSwapPointer(&qp.q, qp.q1, qp.q0) + return (*queue)(qp.q1) +} + +// In order to not have expensive modulo operations, we require the maximum +// number of elements in the circular buffer (N) to be an exponent of two to +// use a bitwise AND mask. Since a CircularBuffer is a collection of queuePairs +// (see below), we need to divide N; since exponents of two are only divisible +// by other exponents of two, we use floorCPUCount number of queuePairs within +// each CircularBuffer. +// +// Floor of the number of CPUs (and not the ceiling) was found to the be the +// optimal number through experiments. +func floorCPUCount() uint32 { + floorExponent := bits.Len32(uint32(runtime.NumCPU())) - 1 + if floorExponent < 0 { + floorExponent = 0 + } + return 1 << uint32(floorExponent) +} + +var numCircularBufferPairs = floorCPUCount() + +// CircularBuffer is a lock-free data structure that supports Push and Drain +// operations. +// +// Note that CircularBuffer is built for performance more than reliability. +// That is, some Push operations may fail without retries in some situations +// (such as during a Drain operation). Order of pushes is not maintained +// either; that is, if A was pushed before B, the Drain operation may return an +// array with B before A. These restrictions are acceptable within gRPC's +// profiling, but if your use-case does not permit these relaxed constraints +// or if performance is not a primary concern, you should probably use a +// lock-based data structure such as internal/buffer.UnboundedBuffer. +type CircularBuffer struct { + drainMutex sync.Mutex + qp []*queuePair + // qpn is an monotonically incrementing counter that's used to determine + // which queuePair a Push operation should write to. This approach's + // performance was found to be better than writing to a random queue. + qpn uint32 + qpMask uint32 +} + +var errInvalidCircularBufferSize = errors.New("buffer size is not an exponent of two") + +// NewCircularBuffer allocates a circular buffer of size size and returns a +// reference to the struct. Only circular buffers of size 2^k are allowed +// (saves us from having to do expensive modulo operations). +func NewCircularBuffer(size uint32) (*CircularBuffer, error) { + if size&(size-1) != 0 { + return nil, errInvalidCircularBufferSize + } + + n := numCircularBufferPairs + if size/numCircularBufferPairs < 8 { + // If each circular buffer is going to hold less than a very small number + // of items (let's say 8), using multiple circular buffers is very likely + // wasteful. Instead, fallback to one circular buffer holding everything. + n = 1 + } + + cb := &CircularBuffer{ + qp: make([]*queuePair, n), + qpMask: n - 1, + } + + for i := uint32(0); i < n; i++ { + cb.qp[i] = newQueuePair(size / n) + } + + return cb, nil +} + +// Push pushes an element in to the circular buffer. Guaranteed to complete in +// a finite number of steps (also lock-free). Does not guarantee that push +// order will be retained. Does not guarantee that the operation will succeed +// if a Drain operation concurrently begins execution. +func (cb *CircularBuffer) Push(x interface{}) { + n := atomic.AddUint32(&cb.qpn, 1) & cb.qpMask + qptr := atomic.LoadPointer(&cb.qp[n].q) + q := (*queue)(qptr) + + acquired := atomic.AddUint32(&q.acquired, 1) - 1 + + // If true, it means that we have incremented acquired before any queuePair + // was switched, and therefore before any drainWait completion. Therefore, it + // is safe to proceed with the Push operation on this queue. Otherwise, it + // means that a Drain operation has begun execution, but we don't know how + // far along the process it is. If it is past the drainWait check, it is not + // safe to proceed with the Push operation. We choose to drop this sample + // entirely instead of retrying, as retrying may potentially send the Push + // operation into a spin loop (we want to guarantee completion of the Push + // operation within a finite time). Before exiting, we increment written so + // that any existing drainWaits can proceed. + if atomic.LoadPointer(&cb.qp[n].q) != qptr { + atomic.AddUint32(&q.written, 1) + return + } + + // At this point, we're definitely writing to the right queue. That is, one + // of the following is true: + // 1. No drainer is in execution on this queue. + // 2. A drainer is in execution on this queue and it is waiting at the + // acquired == written barrier. + // + // Let's say two Pushes A and B happen on the same queue. Say A and B are + // q.size apart; i.e. they get the same index. That is, + // + // index_A = index_B + // acquired_A + q.size = acquired_B + // + // We say "B has wrapped around A" when this happens. In this case, since A + // occurred before B, B's Push should be the final value. However, we + // accommodate A being the final value because wrap-arounds are extremely + // rare and accounting for them requires an additional counter and a + // significant performance penalty. Note that the below approach never leads + // to any data corruption. + index := acquired & q.mask + atomic.StorePointer(&q.arr[index], unsafe.Pointer(&x)) + + // Allows any drainWait checks to proceed. + atomic.AddUint32(&q.written, 1) +} + +// Dereferences non-nil pointers from arr into result. Range of elements from +// arr that are copied is [from, to). Assumes that the result slice is already +// allocated and is large enough to hold all the elements that might be copied. +// Also assumes mutual exclusion on the array of pointers. +func dereferenceAppend(result []interface{}, arr []unsafe.Pointer, from, to uint32) []interface{} { + for i := from; i < to; i++ { + // We have mutual exclusion on arr, there's no need for atomics. + x := (*interface{})(arr[i]) + if x != nil { + result = append(result, *x) + } + } + return result +} + +// Drain allocates and returns an array of things Pushed in to the circular +// buffer. Push order is not maintained; that is, if B was Pushed after A, +// drain may return B at a lower index than A in the returned array. +func (cb *CircularBuffer) Drain() []interface{} { + cb.drainMutex.Lock() + + qs := make([]*queue, len(cb.qp)) + for i := 0; i < len(cb.qp); i++ { + qs[i] = cb.qp[i].switchQueues() + } + + var wg sync.WaitGroup + wg.Add(int(len(qs))) + for i := 0; i < len(qs); i++ { + go func(qi int) { + qs[qi].drainWait() + wg.Done() + }(i) + } + wg.Wait() + + result := make([]interface{}, 0) + for i := 0; i < len(qs); i++ { + if qs[i].acquired < qs[i].size { + result = dereferenceAppend(result, qs[i].arr, 0, qs[i].acquired) + } else { + result = dereferenceAppend(result, qs[i].arr, 0, qs[i].size) + } + } + + for i := 0; i < len(qs); i++ { + atomic.StoreUint32(&qs[i].acquired, 0) + atomic.StoreUint32(&qs[i].written, 0) + } + + cb.drainMutex.Unlock() + return result +} diff --git a/internal/profiling/buffer/buffer_appengine.go b/internal/profiling/buffer/buffer_appengine.go new file mode 100644 index 000000000000..b38a3da5af59 --- /dev/null +++ b/internal/profiling/buffer/buffer_appengine.go @@ -0,0 +1,38 @@ +// +build appengine + +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Appengine does not support stats because of lack of support for unsafe +// pointers, which are necessary to efficiently store and retrieve things into +// and from a circular buffer. As a result, Push does not do anything and Drain +// returns an empty slice. +package buffer + +type CircularBuffer struct{} + +func NewCircularBuffer(size uint32) (*CircularBuffer, error) { + return nil, nil +} + +func (cb *CircularBuffer) Push(x interface{}) { +} + +func (cb *CircularBuffer) Drain() []interface{} { + return nil +} diff --git a/internal/profiling/buffer/buffer_test.go b/internal/profiling/buffer/buffer_test.go new file mode 100644 index 000000000000..6698098f32f9 --- /dev/null +++ b/internal/profiling/buffer/buffer_test.go @@ -0,0 +1,178 @@ +// +build !appengine + +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package buffer + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func TestCircularBufferSerial(t *testing.T) { + var size, i uint32 + var result []interface{} + + size = 1 << 15 + cb, err := NewCircularBuffer(size) + if err != nil { + t.Fatalf("error allocating CircularBuffer: %v", err) + } + + for i = 0; i < size/2; i++ { + cb.Push(i) + } + + result = cb.Drain() + if uint32(len(result)) != size/2 { + t.Fatalf("len(result) = %d; want %d", len(result), size/2) + } + + // The returned result isn't necessarily sorted. + seen := make(map[uint32]bool) + for _, r := range result { + seen[r.(uint32)] = true + } + + for i = 0; i < uint32(len(result)); i++ { + if !seen[i] { + t.Fatalf("seen[%d] = false; want true", i) + } + } + + for i = 0; i < size; i++ { + cb.Push(i) + } + + result = cb.Drain() + if uint32(len(result)) != size { + t.Fatalf("len(result) = %d; want %d", len(result), size/2) + } +} + +func TestCircularBufferOverflow(t *testing.T) { + var size, i uint32 + var result []interface{} + + size = 1 << 10 + cb, err := NewCircularBuffer(size) + if err != nil { + t.Fatalf("error allocating CircularBuffer: %v", err) + } + + for i = 0; i < 10*size; i++ { + cb.Push(i) + } + + result = cb.Drain() + + if uint32(len(result)) != size { + t.Fatalf("len(result) = %d; want %d", len(result), size) + } + + for idx, x := range result { + if x.(uint32) < size { + t.Fatalf("result[%d] = %d; want it to be >= %d", idx, x, size) + } + } +} + +func TestCircularBufferConcurrent(t *testing.T) { + for tn := 0; tn < 2; tn++ { + var size uint32 + var result []interface{} + + size = 1 << 6 + cb, err := NewCircularBuffer(size) + if err != nil { + t.Fatalf("error allocating CircularBuffer: %v", err) + } + + type item struct { + R uint32 + N uint32 + T time.Time + } + + var wg sync.WaitGroup + for r := uint32(0); r < 1024; r++ { + wg.Add(1) + go func(r uint32) { + for n := uint32(0); n < size; n++ { + cb.Push(item{R: r, N: n, T: time.Now()}) + } + wg.Done() + }(r) + } + + // Wait for all goroutines to finish only in one test. Draining + // concurrently while Pushes are still happening will test for races in the + // Draining lock. + if tn == 0 { + wg.Wait() + } + + result = cb.Drain() + + // Can't expect the buffer to be full if the Pushes aren't necessarily done. + if tn == 0 { + if uint32(len(result)) != size { + t.Fatalf("len(result) = %d; want %d", len(result), size) + } + } + + // There can be absolutely no expectation on the order of the data returned + // by Drain because: (a) everything is happening concurrently (b) a + // round-robin is used to write to different queues (and therefore + // different cachelines) for less write contention. + + // Wait for all goroutines to complete before moving on to other tests. If + // the benchmarks run after this, it might affect performance unfairly. + wg.Wait() + } +} + +func BenchmarkCircularBuffer(b *testing.B) { + x := 1 + for size := 1 << 16; size <= 1<<20; size <<= 1 { + for routines := 1; routines <= 1<<8; routines <<= 1 { + b.Run(fmt.Sprintf("goroutines:%d/size:%d", routines, size), func(b *testing.B) { + cb, err := NewCircularBuffer(uint32(size)) + if err != nil { + b.Fatalf("error allocating CircularBuffer: %v", err) + } + + perRoutine := b.N / routines + var wg sync.WaitGroup + for r := 0; r < routines; r++ { + wg.Add(1) + go func() { + for i := 0; i < perRoutine; i++ { + cb.Push(&x) + } + wg.Done() + }() + } + wg.Wait() + }) + } + } +} diff --git a/internal/profiling/goid_modified.go b/internal/profiling/goid_modified.go new file mode 100644 index 000000000000..b186499cd0d1 --- /dev/null +++ b/internal/profiling/goid_modified.go @@ -0,0 +1,81 @@ +// +build grpcgoid + +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package profiling + +import ( + "runtime" +) + +// This stubbed function usually returns zero (see goid_regular.go); however, +// if grpc is built with `-tags 'grpcgoid'`, a runtime.Goid function, which +// does not exist in the Go standard library, is expected. While not necessary, +// sometimes, visualising grpc profiling data in trace-viewer is much nicer +// with goroutines separated from each other. +// +// Several other approaches were considered before arriving at this: +// +// 1. Using a CGO module: CGO usually has access to some things that regular +// Go does not. Till go1.4, CGO used to have access to the goroutine struct +// because the Go runtime was written in C. However, 1.5+ uses a native Go +// runtime; as a result, CGO does not have access to the goroutine structure +// anymore in modern Go. Besides, CGO interop wasn't fast enough (estimated +// to be ~170ns/op). This would also make building grpc require a C +// compiler, which isn't a requirement currently, breaking a lot of stuff. +// +// 2. Using runtime.Stack stacktrace: While this would remove the need for a +// modified Go runtime, this is ridiculously slow, thanks to the all the +// string processing shenanigans required to extract the goroutine ID (about +// ~2000ns/op). +// +// 3. Using Go version-specific build tags: For any given Go version, the +// goroutine struct has a fixed structure. As a result, the goroutine ID +// could be extracted if we know the offset using some assembly. This would +// be faster then #1 and #2, but is harder to maintain. This would require +// special Go code that's both architecture-specific and go version-specific +// (a quadratic number of variants to maintain). +// +// 4. This approach, which requires a simple modification [1] to the Go runtime +// to expose the current goroutine's ID. This is the chosen approach and it +// takes about ~2 ns/op, which is negligible in the face of the tens of +// microseconds that grpc takes to complete a RPC request. +// +// [1] To make the goroutine ID visible to Go programs apply the following +// change to the runtime2.go file in your Go runtime installation: +// +// diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go +// --- a/src/runtime/runtime2.go +// +++ b/src/runtime/runtime2.go +// @@ -392,6 +392,10 @@ type stack struct { +// hi uintptr +// } +// +// +func Goid() int64 { +// + return getg().goid +// +} +// + +// type g struct { +// // Stack parameters. +// // stack describes the actual stack memory: [stack.lo, stack.hi). +// +// The exposed runtime.Goid() function will return a int64 goroutine ID. +func goid() int64 { + return runtime.Goid() +} diff --git a/internal/profiling/goid_regular.go b/internal/profiling/goid_regular.go new file mode 100644 index 000000000000..891c2e98f9db --- /dev/null +++ b/internal/profiling/goid_regular.go @@ -0,0 +1,29 @@ +// +build !grpcgoid + +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package profiling + +// This dummy function always returns 0. In some modified dev environments, +// this may be replaced with a call to a function in a modified Go runtime that +// retrieves the goroutine ID efficiently. See goid_modified.go for a different +// version of goId that requires a grpcgoid build tag to compile. +func goid() int64 { + return 0 +} diff --git a/internal/profiling/profiling.go b/internal/profiling/profiling.go new file mode 100644 index 000000000000..11363f73e4d7 --- /dev/null +++ b/internal/profiling/profiling.go @@ -0,0 +1,221 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// The profiling package contains two logical components: buffer.go and +// profiling.go. The former implements a circular buffer (a.k.a. ring buffer) +// in a lock-free manner using atomics. This ring buffer is used by +// profiling.go to store various statistics. For example, StreamStats is a +// circular buffer of Stat objects, each of which is comprised of Timers. +// +// This abstraction is designed to accommodate more stats in the future; for +// example, if one wants to profile the load balancing layer, which is +// independent of RPC queries, a separate CircularBuffer can be used. +// +// Note that the circular buffer simply takes any interface{}. In the future, +// more types of measurements (such as the number of memory allocations) could +// be measured, which might require a different type of object being pushed +// into the circular buffer. + +package profiling + +import ( + "sync" + "sync/atomic" + "time" + + "google.golang.org/grpc/internal/profiling/buffer" +) + +// 0 or 1 representing profiling off and on, respectively. Use IsEnabled and +// Enable to get and set this in a safe manner. +var profilingEnabled uint32 + +// IsEnabled returns whether or not profiling is enabled. +func IsEnabled() bool { + return atomic.LoadUint32(&profilingEnabled) > 0 +} + +// Enable turns profiling on and off. +// +// Note that it is impossible to enable profiling for one server and leave it +// turned off for another. This is intentional and by design -- if the status +// of profiling was server-specific, clients wouldn't be able to profile +// themselves. As a result, Enable turns profiling on and off for all servers +// and clients in the binary. Each stat will be, however, tagged with whether +// it's a client stat or a server stat; so you should be able to filter for the +// right type of stats in post-processing. +func Enable(enabled bool) { + if enabled { + atomic.StoreUint32(&profilingEnabled, 1) + } else { + atomic.StoreUint32(&profilingEnabled, 0) + } +} + +// A Timer represents the wall-clock beginning and ending of a logical +// operation. +type Timer struct { + // Tags is a comma-separated list of strings (usually forward-slash-separated + // hierarchical strings) used to categorize a Timer. + Tags string + // Begin marks the beginning of this timer. The timezone is unspecified, but + // must use the same timezone as End; this is so shave off the small, but + // non-zero time required to convert to a standard timezone such as UTC. + Begin time.Time + // End marks the end of a timer. + End time.Time + // Each Timer must be started and ended within the same goroutine; GoID + // captures this goroutine ID. The Go runtime does not typically expose this + // information, so this is set to zero in the typical case. However, a + // trivial patch to the runtime package can make this field useful. See + // goid_modified.go in this package for more details. + GoID int64 +} + +// NewTimer creates and returns a new Timer object. This is useful when you +// don't already have a Stat object to associate this Timer with; for example, +// before the context of a new RPC query is created, a Timer may be needed to +// measure transport-related operations. +// +// Use AppendTimer to append the returned Timer to a Stat. +func NewTimer(tags string) *Timer { + return &Timer{ + Tags: tags, + Begin: time.Now(), + GoID: goid(), + } +} + +// Egress sets the End field of a timer to the current time. +func (timer *Timer) Egress() { + if timer == nil { + return + } + + timer.End = time.Now() +} + +// A Stat is a collection of Timers that represent timing information for +// different components within this Stat. For example, a Stat may be used to +// reference the entire lifetime of an RPC request, with Timers within it +// representing different components such as encoding, compression, and +// transport. +// +// The user is expected to use the included helper functions to do operations +// on the Stat such as creating or appending a new timer. Direct operations on +// the Stat's exported fields (which are exported for encoding reasons) may +// lead to data races. +type Stat struct { + // Tags is a comma-separated list of strings used to categorize a Stat. + Tags string + // Stats may also need to store other unstructured information specific to + // this stat. For example, a StreamStat will use these bytes to encode the + // connection ID and stream ID for each RPC to uniquely identify it. The + // encoding that must be used is unspecified. + Metadata []byte + // A collection of *Timers and a mutex for append operations on the slice. + mu sync.Mutex + Timers []*Timer +} + +// A power of two that's large enough to hold all timers within an average RPC +// request (defined to be a unary request) without any reallocation. A typical +// unary RPC creates 80-100 timers for various things. While this number is +// purely anecdotal and may change in the future as the resolution of profiling +// increases or decreases, it serves as a good estimate for what the initial +// allocation size should be. +const defaultStatAllocatedTimers int32 = 128 + +// NewStat creates and returns a new Stat object. +func NewStat(tags string) *Stat { + return &Stat{ + Tags: tags, + Timers: make([]*Timer, 0, defaultStatAllocatedTimers), + } +} + +// NewTimer creates a Timer object within the given stat if stat is non-nil. +// The value passed in tags will be attached to the newly created Timer. +// NewTimer also automatically sets the Begin value of the Timer to the current +// time. The user is expected to call stat.Egress with the returned index as +// argument to mark the end. +func (stat *Stat) NewTimer(tags string) *Timer { + if stat == nil { + return nil + } + + timer := &Timer{ + Tags: tags, + GoID: goid(), + Begin: time.Now(), + } + stat.mu.Lock() + stat.Timers = append(stat.Timers, timer) + stat.mu.Unlock() + return timer +} + +// AppendTimer appends a given Timer object to the internal slice of timers. A +// deep copy of the timer is made (i.e. no reference is retained to this +// pointer) and the user is expected to lose their reference to the timer to +// allow the Timer object to be garbage collected. +func (stat *Stat) AppendTimer(timer *Timer) { + if stat == nil || timer == nil { + return + } + + stat.mu.Lock() + stat.Timers = append(stat.Timers, timer) + stat.mu.Unlock() +} + +// statsInitialized is 0 before InitStats has been called. Changed to 1 by +// exactly one call to InitStats. +var statsInitialized int32 + +// Stats for the last defaultStreamStatsBufsize RPCs will be stored in memory. +// This is can be configured by the registering server at profiling service +// initialization with google.golang.org/grpc/profiling/service.ProfilingConfig +const defaultStreamStatsSize uint32 = 16 << 10 + +// StreamStats is a CircularBuffer containing data from the last N RPC calls +// served, where N is set by the user. This will contain both server stats and +// client stats (but each stat will be tagged with whether it's a server or a +// client in its Tags). +var StreamStats *buffer.CircularBuffer + +// InitStats initializes all the relevant Stat objects. Must be called exactly +// once per lifetime of a process; calls after the first one are ignored. +func InitStats(streamStatsSize uint32) error { + var err error + if !atomic.CompareAndSwapInt32(&statsInitialized, 0, 1) { + // If initialized, do nothing. + return nil + } + + if streamStatsSize == 0 { + streamStatsSize = defaultStreamStatsSize + } + + StreamStats, err = buffer.NewCircularBuffer(streamStatsSize) + if err != nil { + return err + } + + return nil +} diff --git a/internal/profiling/profiling_test.go b/internal/profiling/profiling_test.go new file mode 100644 index 000000000000..257997b63beb --- /dev/null +++ b/internal/profiling/profiling_test.go @@ -0,0 +1,147 @@ +// +build !appengine + +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package profiling + +import ( + "fmt" + "strconv" + "sync" + "testing" + "time" + + "google.golang.org/grpc/internal/profiling/buffer" +) + +func TestProfiling(t *testing.T) { + cb, err := buffer.NewCircularBuffer(128) + if err != nil { + t.Fatalf("error creating circular buffer: %v", err) + } + + stat := NewStat("foo") + cb.Push(stat) + bar := func(n int) { + if n%2 == 0 { + defer stat.NewTimer(strconv.Itoa(n)).Egress() + } else { + timer := NewTimer(strconv.Itoa(n)) + stat.AppendTimer(timer) + defer timer.Egress() + } + time.Sleep(1 * time.Microsecond) + } + + numTimers := int(8 * defaultStatAllocatedTimers) + for i := 0; i < numTimers; i++ { + bar(i) + } + + results := cb.Drain() + if len(results) != 1 { + t.Fatalf("len(results) = %d; want 1", len(results)) + } + + statReturned := results[0].(*Stat) + if stat.Tags != "foo" { + t.Fatalf("stat.Tags = %s; want foo", stat.Tags) + } + + if len(stat.Timers) != numTimers { + t.Fatalf("len(stat.Timers) = %d; want %d", len(stat.Timers), numTimers) + } + + lastIdx := 0 + for i, timer := range statReturned.Timers { + // Check that they're in the order of append. + if n, err := strconv.Atoi(timer.Tags); err != nil && n != lastIdx { + t.Fatalf("stat.Timers[%d].Tags = %s; wanted %d", i, timer.Tags, lastIdx) + } + + // Check that the timestamps are consistent. + if diff := timer.End.Sub(timer.Begin); diff.Nanoseconds() < 1000 { + t.Fatalf("stat.Timers[%d].End - stat.Timers[%d].Begin = %v; want >= 1000ns", i, i, diff) + } + + lastIdx++ + } +} + +func TestProfilingRace(t *testing.T) { + stat := NewStat("foo") + + var wg sync.WaitGroup + numTimers := int(8 * defaultStatAllocatedTimers) // also tests the slice growth code path + wg.Add(numTimers) + for i := 0; i < numTimers; i++ { + go func(n int) { + defer wg.Done() + if n%2 == 0 { + defer stat.NewTimer(strconv.Itoa(n)).Egress() + } else { + timer := NewTimer(strconv.Itoa(n)) + stat.AppendTimer(timer) + defer timer.Egress() + } + }(i) + } + wg.Wait() + + if len(stat.Timers) != numTimers { + t.Fatalf("len(stat.Timers) = %d; want %d", len(stat.Timers), numTimers) + } + + // The timers need not be ordered, so we can't expect them to be consecutive + // like above. + seen := make(map[int]bool) + for i, timer := range stat.Timers { + n, err := strconv.Atoi(timer.Tags) + if err != nil { + t.Fatalf("stat.Timers[%d].Tags = %s; wanted integer", i, timer.Tags) + } + seen[n] = true + } + + for i := 0; i < numTimers; i++ { + if _, ok := seen[i]; !ok { + t.Fatalf("seen[%d] = false or does not exist; want it to be true", i) + } + } +} + +func BenchmarkProfiling(b *testing.B) { + for routines := 1; routines <= 1<<8; routines <<= 1 { + b.Run(fmt.Sprintf("goroutines:%d", routines), func(b *testing.B) { + perRoutine := b.N / routines + stat := NewStat("foo") + var wg sync.WaitGroup + wg.Add(routines) + for r := 0; r < routines; r++ { + go func() { + for i := 0; i < perRoutine; i++ { + stat.NewTimer("bar").Egress() + } + wg.Done() + }() + } + wg.Wait() + }) + } +} diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index c26e71cc2254..e18935653c16 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -45,6 +45,11 @@ import ( "google.golang.org/grpc/status" ) +// clientConnectionCounter counts the number of connections a client has +// initiated (equal to the number of http2Clients created). Must be accessed +// atomically. +var clientConnectionCounter uint64 + // http2Client implements the ClientTransport interface with HTTP2. type http2Client struct { lastRead int64 // Keep this field 64-bit aligned. Accessed atomically. @@ -126,6 +131,8 @@ type http2Client struct { onClose func() bufferPool *bufferPool + + connectionID uint64 } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { @@ -329,6 +336,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne } } + t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1) + if err := t.framer.writer.Flush(); err != nil { return nil, err } diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 1c5ce276948d..8b04b0392a0a 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -62,6 +62,10 @@ var ( statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status) ) +// serverConnectionCounter counts the number of connections a server has seen +// (equal to the number of http2Servers created). Must be accessed atomically. +var serverConnectionCounter uint64 + // http2Server implements the ServerTransport interface with HTTP2. type http2Server struct { lastRead int64 // Keep this field 64-bit aligned. Accessed atomically. @@ -121,6 +125,8 @@ type http2Server struct { channelzID int64 // channelz unique identification number czData *channelzData bufferPool *bufferPool + + connectionID uint64 } // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is @@ -250,6 +256,9 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err if channelz.IsOn() { t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) } + + t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1) + t.framer.writer.Flush() defer func() {