Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiling: add internal changes to support profiling of gRPC #3158

Merged
merged 19 commits into from
Dec 11, 2019
272 changes: 272 additions & 0 deletions internal/profiling/buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// +build !appengine

/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package buffer

import (
"errors"
"math/bits"
"runtime"
"sync"
"sync/atomic"
"unsafe"
)

type queue struct {
// An array of pointers as references to the items stored in this queue.
arr []unsafe.Pointer
// The maximum number of elements this queue may store before it wraps around
// and overwrites older values. Must be an exponent of 2.
size uint32
// Always size - 1. A bitwise AND is performed with this mask in place of a
// modulo operation by the Push operation.
mask uint32
// Each Push operation into this queue increments the acquired counter before
// proceeding forwarding with the actual write to arr. This counter is also
// used by the Drain operation's drainWait subroutine to wait for all pushes
// to complete.
acquired uint32
// After the completion of a Push operation, the written counter is
// incremented. Also used by drainWait to wait for all pushes to complete.
written uint32
}

// Allocates and returns a new *queue. size needs to be a exponent of two.
func newQueue(size uint32) *queue {
return &queue{
arr: make([]unsafe.Pointer, size),
size: size,
mask: size - 1,
}
}

// drainWait blocks the caller until all Pushes on this queue are complete.
func (q *queue) drainWait() {
for atomic.LoadUint32(&q.acquired) != atomic.LoadUint32(&q.written) {
runtime.Gosched()
}
}

// A queuePair has two queues. At any given time, Pushes go into the queue
// referenced by queuePair.q. The active queue gets switched when there's a
// drain operation on the circular buffer.
type queuePair struct {
q0 unsafe.Pointer
q1 unsafe.Pointer
q unsafe.Pointer
}

// Allocates and returns a new *queuePair with its internal queues allocated.
func newQueuePair(size uint32) *queuePair {
qp := &queuePair{}
qp.q0 = unsafe.Pointer(newQueue(size))
qp.q1 = unsafe.Pointer(newQueue(size))
qp.q = qp.q0
return qp
}

// Switches the current queue for future Pushes to proceed to the other queue
// so that there's no blocking in Push. Returns a pointer to the old queue that
// was in place before the switch.
func (qp *queuePair) switchQueues() *queue {
// Even though we have mutual exclusion across drainers (thanks to mu.Lock in
// drain), Push operations may access qp.q whilst we're writing to it.
if atomic.CompareAndSwapPointer(&qp.q, qp.q0, qp.q1) {
return (*queue)(qp.q0)
}

atomic.CompareAndSwapPointer(&qp.q, qp.q1, qp.q0)
return (*queue)(qp.q1)
}

// In order to not have expensive modulo operations, we require the maximum
// number of elements in the circular buffer (N) to be an exponent of two to
// use a bitwise AND mask. Since a CircularBuffer is a collection of queuePairs
// (see below), we need to divide N; since exponents of two are only divisible
// by other exponents of two, we use floorCPUCount number of queuePairs within
// each CircularBuffer.
//
// Floor of the number of CPUs (and not the ceiling) was found to the be the
// optimal number through experiments.
func floorCPUCount() uint32 {
floorExponent := bits.Len32(uint32(runtime.NumCPU())) - 1
if floorExponent < 0 {
floorExponent = 0
}
return 1 << uint32(floorExponent)
}

var numCircularBufferPairs = floorCPUCount()

// CircularBuffer is a lock-free data structure that supports Push and Drain
// operations.
//
// Note that CircularBuffer is built for performance more than reliability.
// That is, some Push operations may fail without retries in some situations
// (such as during a Drain operation). Order of pushes is not maintained
// either; that is, if A was pushed before B, the Drain operation may return an
// array with B before A. These restrictions are acceptable within gRPC's
// profiling, but if your use-case does not permit these relaxed constraints
// or if performance is not a primary concern, you should probably use a
// lock-based data structure such as internal/buffer.UnboundedBuffer.
type CircularBuffer struct {
drainMutex sync.Mutex
qp []*queuePair
// qpn is an monotonically incrementing counter that's used to determine
// which queuePair a Push operation should write to. This approach's
// performance was found to be better than writing to a random queue.
qpn uint32
qpMask uint32
}

var errInvalidCircularBufferSize = errors.New("buffer size is not an exponent of two")

// NewCircularBuffer allocates a circular buffer of size size and returns a
// reference to the struct. Only circular buffers of size 2^k are allowed
// (saves us from having to do expensive modulo operations).
func NewCircularBuffer(size uint32) (*CircularBuffer, error) {
if size&(size-1) != 0 {
return nil, errInvalidCircularBufferSize
}

n := numCircularBufferPairs
if size/numCircularBufferPairs < 8 {
// If each circular buffer is going to hold less than a very small number
// of items (let's say 8), using multiple circular buffers is very likely
// wasteful. Instead, fallback to one circular buffer holding everything.
n = 1
}

cb := &CircularBuffer{
qp: make([]*queuePair, n),
qpMask: n - 1,
}

for i := uint32(0); i < n; i++ {
cb.qp[i] = newQueuePair(size / n)
}

return cb, nil
}

// Push pushes an element in to the circular buffer. Guaranteed to complete in
// a finite number of steps (also lock-free). Does not guarantee that push
// order will be retained. Does not guarantee that the operation will succeed
// if a Drain operation concurrently begins execution.
func (cb *CircularBuffer) Push(x interface{}) {
adtac marked this conversation as resolved.
Show resolved Hide resolved
n := atomic.AddUint32(&cb.qpn, 1) & cb.qpMask
qptr := atomic.LoadPointer(&cb.qp[n].q)
q := (*queue)(qptr)

acquired := atomic.AddUint32(&q.acquired, 1) - 1

// If true, it means that we have incremented acquired before any queuePair
// was switched, and therefore before any drainWait completion. Therefore, it
// is safe to proceed with the Push operation on this queue. Otherwise, it
// means that a Drain operation has begun execution, but we don't know how
// far along the process it is. If it is past the drainWait check, it is not
// safe to proceed with the Push operation. We choose to drop this sample
// entirely instead of retrying, as retrying may potentially send the Push
// operation into a spin loop (we want to guarantee completion of the Push
// operation within a finite time). Before exiting, we increment written so
// that any existing drainWaits can proceed.
if atomic.LoadPointer(&cb.qp[n].q) != qptr {
atomic.AddUint32(&q.written, 1)
return
menghanl marked this conversation as resolved.
Show resolved Hide resolved
}

// At this point, we're definitely writing to the right queue. That is, one
// of the following is true:
// 1. No drainer is in execution on this queue.
// 2. A drainer is in execution on this queue and it is waiting at the
// acquired == written barrier.
//
// Let's say two Pushes A and B happen on the same queue. Say A and B are
// q.size apart; i.e. they get the same index. That is,
//
// index_A = index_B
// acquired_A + q.size = acquired_B
//
// We say "B has wrapped around A" when this happens. In this case, since A
// occurred before B, B's Push should be the final value. However, we
// accommodate A being the final value because wrap-arounds are extremely
// rare and accounting for them requires an additional counter and a
menghanl marked this conversation as resolved.
Show resolved Hide resolved
// significant performance penalty. Note that the below approach never leads
// to any data corruption.
index := acquired & q.mask
atomic.StorePointer(&q.arr[index], unsafe.Pointer(&x))

// Allows any drainWait checks to proceed.
atomic.AddUint32(&q.written, 1)
}

// Dereferences non-nil pointers from arr into result. Range of elements from
// arr that are copied is [from, to). Assumes that the result slice is already
// allocated and is large enough to hold all the elements that might be copied.
// Also assumes mutual exclusion on the array of pointers.
func dereferenceAppend(result []interface{}, arr []unsafe.Pointer, from, to uint32) []interface{} {
for i := from; i < to; i++ {
// We have mutual exclusion on arr, there's no need for atomics.
x := (*interface{})(arr[i])
if x != nil {
result = append(result, *x)
}
}
return result
}

// Drain allocates and returns an array of things Pushed in to the circular
// buffer. Push order is not maintained; that is, if B was Pushed after A,
// drain may return B at a lower index than A in the returned array.
func (cb *CircularBuffer) Drain() []interface{} {
cb.drainMutex.Lock()

qs := make([]*queue, len(cb.qp))
for i := 0; i < len(cb.qp); i++ {
qs[i] = cb.qp[i].switchQueues()
}

var wg sync.WaitGroup
wg.Add(int(len(qs)))
for i := 0; i < len(qs); i++ {
go func(qi int) {
qs[qi].drainWait()
wg.Done()
}(i)
}
wg.Wait()

result := make([]interface{}, 0)
for i := 0; i < len(qs); i++ {
if qs[i].acquired < qs[i].size {
result = dereferenceAppend(result, qs[i].arr, 0, qs[i].acquired)
} else {
result = dereferenceAppend(result, qs[i].arr, 0, qs[i].size)
}
}

for i := 0; i < len(qs); i++ {
atomic.StoreUint32(&qs[i].acquired, 0)
atomic.StoreUint32(&qs[i].written, 0)
}

cb.drainMutex.Unlock()
return result
}
38 changes: 38 additions & 0 deletions internal/profiling/buffer/buffer_appengine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// +build appengine

/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Appengine does not support stats because of lack of support for unsafe
// pointers, which are necessary to efficiently store and retrieve things into
// and from a circular buffer. As a result, Push does not do anything and Drain
// returns an empty slice.
package buffer

type CircularBuffer struct{}

func NewCircularBuffer(size uint32) (*CircularBuffer, error) {
return nil, nil
}

func (cb *CircularBuffer) Push(x interface{}) {
}

func (cb *CircularBuffer) Drain() []interface{} {
return nil
}
Loading