Skip to content

Commit

Permalink
Merge branch 'master' into linasn/series-resolver
Browse files Browse the repository at this point in the history
* master: (22 commits)
  Remove deprecated fields (#3327)
  Add quotas to Permits (#3333)
  [aggregator] Drop messages that have a drop policy applied (#3341)
  Fix NPE due to race with a closing series (#3056)
  [coordinator] Apply auto-mapping rules if-and-only-if no drop policies are in effect (#3339)
  [aggregator] Add validation in AddTimedWithStagedMetadatas (#3338)
  [coordinator] Fix panic in Ready endpoint for admin coordinator (#3335)
  [instrument] Config option to emit detailed Go runtime metrics only (#3332)
  [aggregator] Sort heap in one go, instead of iterating one-by-one (#3331)
  [pool] Add support for dynamic, sync.Pool backed, object pools (#3334)
  Enable PANIC_ON_INVARIANT_VIOLATED for tests (#3326)
  [aggregator] CanLead for unflushed window takes BufferPast into account (#3328)
  Optimize StagedMetadatas conversion (#3330)
  [m3msg] Improve message scan performance (#3319)
  [dbnode] Add reason tag to bootstrap retries metric (#3317)
  [coordinator] Enable rule filtering on prom metric type (#3325)
  Update m3dbnode-all-config.yml (#3204)
  [coordinator] Include Type in RollupOp.Equal (#3322)
  [coordinator] Simplify iteration logic of matchRollupTarget (#3321)
  [coordinator] Add rollup type to remove specific dimensions (#3318)
  ...
  • Loading branch information
soundvibe committed Mar 9, 2021
2 parents 28caa33 + 5875a94 commit a892422
Show file tree
Hide file tree
Showing 134 changed files with 4,757 additions and 1,950 deletions.
2 changes: 1 addition & 1 deletion .ci
5 changes: 0 additions & 5 deletions kube/bundle.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions kube/m3dbnode-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ data:
m3dbnode.yml: |+
coordinator:
listenAddress: 0.0.0.0:7201
local:
namespaces:
- namespace: default
type: unaggregated
retention: 48h
logging:
level: info
metrics:
Expand Down
2 changes: 1 addition & 1 deletion kube/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ resource "kubernetes_config_map" "m3dbnode_config" {
namespace = "m3db"
}
data {
m3dbnode.yml = "coordinator:\n listenAddress: \"0.0.0.0:7201\"\n local:\n namespaces:\n - namespace: default\n type: unaggregated\n retention: 48h\n metrics:\n scope:\n prefix: \"coordinator\"\n prometheus:\n handlerPath: /metrics\n listenAddress: 0.0.0.0:7203\n sanitization: prometheus\n samplingRate: 1.0\n extended: none\n tagOptions:\n idScheme: quoted\n\ndb:\n logging:\n level: info\n\n metrics:\n prometheus:\n handlerPath: /metrics\n sanitization: prometheus\n samplingRate: 1.0\n extended: detailed\n\n listenAddress: 0.0.0.0:9000\n clusterListenAddress: 0.0.0.0:9001\n httpNodeListenAddress: 0.0.0.0:9002\n httpClusterListenAddress: 0.0.0.0:9003\n debugListenAddress: 0.0.0.0:9004\n\n hostID:\n resolver: hostname\n\n client:\n writeConsistencyLevel: majority\n readConsistencyLevel: unstrict_majority\n\n gcPercentage: 100\n\n writeNewSeriesAsync: true\n writeNewSeriesBackoffDuration: 2ms\n\n commitlog:\n flushMaxBytes: 524288\n flushEvery: 1s\n queue:\n calculationType: fixed\n size: 2097152\n\n filesystem:\n filePathPrefix: /var/lib/m3db\n\n config:\n service:\n env: default_env\n zone: embedded\n service: m3db\n cacheDir: /var/lib/m3kv\n etcdClusters:\n - zone: embedded\n endpoints:\n - http://etcd-0.etcd:2379\n - http://etcd-1.etcd:2379\n - http://etcd-2.etcd:2379\n"
m3dbnode.yml = "coordinator:\n listenAddress: \"0.0.0.0:7201\"\n metrics:\n scope:\n prefix: \"coordinator\"\n prometheus:\n handlerPath: /metrics\n listenAddress: 0.0.0.0:7203\n sanitization: prometheus\n samplingRate: 1.0\n extended: none\n tagOptions:\n idScheme: quoted\n\ndb:\n logging:\n level: info\n\n metrics:\n prometheus:\n handlerPath: /metrics\n sanitization: prometheus\n samplingRate: 1.0\n extended: detailed\n\n listenAddress: 0.0.0.0:9000\n clusterListenAddress: 0.0.0.0:9001\n httpNodeListenAddress: 0.0.0.0:9002\n httpClusterListenAddress: 0.0.0.0:9003\n debugListenAddress: 0.0.0.0:9004\n\n hostID:\n resolver: hostname\n\n client:\n writeConsistencyLevel: majority\n readConsistencyLevel: unstrict_majority\n\n gcPercentage: 100\n\n writeNewSeriesAsync: true\n writeNewSeriesBackoffDuration: 2ms\n\n commitlog:\n flushMaxBytes: 524288\n flushEvery: 1s\n queue:\n calculationType: fixed\n size: 2097152\n\n filesystem:\n filePathPrefix: /var/lib/m3db\n\n config:\n service:\n env: default_env\n zone: embedded\n service: m3db\n cacheDir: /var/lib/m3kv\n etcdClusters:\n - zone: embedded\n endpoints:\n - http://etcd-0.etcd:2379\n - http://etcd-1.etcd:2379\n - http://etcd-2.etcd:2379\n"
}
}

Expand Down
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/m3aggregator.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ ADD ./m3aggregator.yml /etc/m3aggregator/m3aggregator.yml

EXPOSE 6000-6001/tcp

ENV PANIC_ON_INVARIANT_VIOLATED=true

ENTRYPOINT [ "/bin/m3aggregator" ]
CMD [ "-f", "/etc/m3aggregator/m3aggregator.yml" ]
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/m3coordinator.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ ADD ./m3coordinator-local-etcd.yml /etc/m3coordinator/m3coordinator.yml

EXPOSE 7201/tcp 7203/tcp

ENV PANIC_ON_INVARIANT_VIOLATED=true

ENTRYPOINT [ "/bin/m3coordinator" ]
CMD [ "-f", "/etc/m3coordinator/m3coordinator.yml" ]
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/m3dbnode.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ ADD ./m3dbnode-local-etcd.yml /etc/m3dbnode/m3dbnode.yml

EXPOSE 2379/tcp 2380/tcp 7201/tcp 7203/tcp 9000-9004/tcp

ENV PANIC_ON_INVARIANT_VIOLATED=true

ENTRYPOINT [ "/bin/m3dbnode" ]
CMD [ "-f", "/etc/m3dbnode/m3dbnode.yml" ]
4 changes: 4 additions & 0 deletions scripts/dtest/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
- "0.0.0.0:9000:9000"
volumes:
- "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml"
env:
- PANIC_ON_INVARIANT_VIOLATED=true
coord01:
networks:
- dtest
Expand All @@ -20,5 +22,7 @@ services:
- "0.0.0.0:7204:7204"
volumes:
- "./m3coordinator.yml:/etc/m3coordinator/m3coordinator.yml"
env:
- PANIC_ON_INVARIANT_VIOLATED=true
networks:
dtest:
6 changes: 0 additions & 6 deletions scripts/dtest/m3dbnode.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
coordinator:
listenAddress: 0.0.0.0:7201

local:
namespaces:
- namespace: default
type: unaggregated
retention: 48h

logging:
level: info

Expand Down
59 changes: 46 additions & 13 deletions src/aggregator/aggregation/quantile/cm/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,16 @@ func (h *minHeap) Push(value float64) {
heap := *h
n := len(heap)
i := n - 1
for {
for i < n && i >= 0 {
parent := (i - 1) / 2
if parent == i || heap[parent] <= heap[i] {
if parent == i || parent >= n || parent < 0 || heap[parent] <= heap[i] {
break
}
heap[parent], heap[i] = heap[i], heap[parent]
i = parent
}
}

func (h *minHeap) ensureSize() {
heap := *h
targetCap := cap(heap) * 2
newHeap := sharedHeapPool.Get(targetCap)
(*newHeap) = append(*newHeap, heap...)
if cap(heap) >= _initialHeapBucketSize {
sharedHeapPool.Put(heap)
}
(*h) = *newHeap
}

func (h *minHeap) Reset() {
if heap := *h; cap(heap) >= _initialHeapBucketSize {
sharedHeapPool.Put(heap)
Expand Down Expand Up @@ -99,3 +88,47 @@ func (h *minHeap) Pop() float64 {
*h = old[0:n]
return val
}

func (h minHeap) SortDesc() {
heap := h
// this is equivalent to Pop() in a loop (heapsort)
// all the redundant-looking conditions are there to eliminate bounds-checks
for n := len(heap) - 1; n > 0 && n < len(heap); n = len(heap) - 1 {
var (
i int
smallest int
)
heap[0], heap[n] = heap[n], heap[0]
for smallest >= 0 && smallest <= n {
var (
left = smallest*2 + 1
right = left + 1
)
if left < n && left >= 0 && heap[left] < heap[smallest] {
smallest = left
}
if right < n && right >= 0 && heap[right] < heap[smallest] {
smallest = right
}
if smallest == i {
break
}
heap[i], heap[smallest] = heap[smallest], heap[i]
i = smallest
}
heap = heap[0:n]
}
}

func (h *minHeap) ensureSize() {
var (
heap = *h
targetCap = cap(heap) * 2
newHeap = sharedHeapPool.Get(targetCap)
)
(*newHeap) = append(*newHeap, heap...)
if cap(heap) >= _initialHeapBucketSize {
sharedHeapPool.Put(heap)
}
(*h) = *newHeap
}
27 changes: 27 additions & 0 deletions src/aggregator/aggregation/quantile/cm/heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package cm

import (
"math/rand"
"sort"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -33,6 +35,7 @@ func TestMinHeapPushInDecreasingOrder(t *testing.T) {
h.Push(float64(i))
require.Equal(t, iter-i, h.Len())
}
validateSort(t, *h)
for i := 0; i < iter; i++ {
require.Equal(t, float64(i), h.Min())
require.Equal(t, float64(i), h.Pop())
Expand All @@ -47,13 +50,37 @@ func TestMinHeapPushInIncreasingOrder(t *testing.T) {
h.Push(float64(i))
require.Equal(t, i+1, h.Len())
}
validateSort(t, *h)
for i := 0; i < iter; i++ {
require.Equal(t, float64(i), h.Min())
require.Equal(t, float64(i), h.Pop())
validateInvariant(t, *h, 0)
}
}

func TestMinHeapPushInRandomOrderAndSort(t *testing.T) {
h := &minHeap{}
iter := 42
for i := 0; i < iter; i++ {
h.Push(rand.ExpFloat64())
}
validateSort(t, *h)
}

func validateSort(t *testing.T, h minHeap) {
t.Helper()
// copy heap before applying reference sort and minheap-sort
a := make([]float64, h.Len())
b := make([]float64, h.Len())
for i := 0; i < len(h); i++ {
a[i], b[i] = h[i], h[i]
}
sort.Sort(sort.Reverse(sort.Float64Slice(a)))
heap := (*minHeap)(&b)
heap.SortDesc()
require.Equal(t, a, b)
}

func validateInvariant(t *testing.T, h minHeap, i int) {
var (
n = h.Len()
Expand Down
91 changes: 59 additions & 32 deletions src/aggregator/aggregation/quantile/cm/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ func (s *Stream) calcQuantiles() {
)
}

for curr != nil {
if idx == len(s.quantiles) {
break
}
for curr != nil && idx < len(s.computedQuantiles) {
maxRank = minRank + curr.numRanks + curr.delta
rank, threshold := s.thresholdBuf[idx].rank, s.thresholdBuf[idx].threshold

Expand All @@ -271,7 +268,7 @@ func (s *Stream) calcQuantiles() {
}

// check if the last sample value should satisfy unprocessed quantiles
for i := idx; i < len(s.quantiles); i++ {
for i := idx; i < len(s.thresholdBuf); i++ {
rank, threshold := s.thresholdBuf[i].rank, s.thresholdBuf[i].threshold
if maxRank >= rank+threshold || minRank > rank {
s.computedQuantiles[i] = prev.value
Expand All @@ -284,6 +281,7 @@ func (s *Stream) insert() {
var (
compCur = s.compressCursor
compValue = math.NaN()
samples = &s.samples
insertPointValue float64
sample *Sample
)
Expand All @@ -292,15 +290,22 @@ func (s *Stream) insert() {
compValue = compCur.value
}

samples := &s.samples
// break heap invariant and just sort all the times, as we'll consume all of them in one go
s.bufMore.SortDesc()

for s.insertCursor != nil {
var (
vals = []float64(s.bufMore)
idx = len(vals) - 1
)

for s.insertCursor != nil && idx < len(vals) {
curr := s.insertCursor
insertPointValue = curr.value

for s.bufMore.Len() > 0 && s.bufMore.Min() <= insertPointValue {
for idx >= 0 && vals[idx] <= insertPointValue {
val := vals[idx]
idx--
sample = s.samples.Acquire()
val := s.bufMore.Pop()
sample.value = val
sample.numRanks = 1
sample.delta = curr.numRanks + curr.delta - 1
Expand All @@ -316,19 +321,20 @@ func (s *Stream) insert() {
s.insertCursor = s.insertCursor.next
}

if s.insertCursor != nil {
return
}

for s.bufMore.Len() > 0 && s.bufMore.Min() >= samples.Back().value {
sample = s.samples.Acquire()
sample.value = s.bufMore.Pop()
sample.numRanks = 1
sample.delta = 0
samples.PushBack(sample)
s.numValues++
if s.insertCursor == nil && idx < len(vals) {
for idx >= 0 && vals[idx] >= samples.Back().value {
val := vals[idx]
idx--
sample = s.samples.Acquire()
sample.value = val
sample.numRanks = 1
sample.delta = 0
samples.PushBack(sample)
s.numValues++
}
}

s.bufMore = s.bufMore[:0]
s.resetInsertCursor()
}

Expand All @@ -345,27 +351,48 @@ func (s *Stream) compress() {
s.compressCursor = s.compressCursor.prev
}

var (
numVals = s.numValues
eps = 2.0 * s.eps
)

for s.compressCursor != s.samples.Front() {
next := s.compressCursor.next
maxRank := s.compressMinRank + s.compressCursor.numRanks + s.compressCursor.delta
threshold := s.threshold(maxRank)
s.compressMinRank -= s.compressCursor.numRanks
testVal := s.compressCursor.numRanks + next.numRanks + next.delta
var (
curr = s.compressCursor
next = curr.next
prev = curr.prev

maxRank = s.compressMinRank + curr.numRanks + curr.delta

threshold = int64(math.MaxInt64)
quantileMin int64
)

for i := range s.quantiles {
if maxRank >= int64(s.quantiles[i]*float64(numVals)) {
quantileMin = int64(eps * float64(maxRank) / s.quantiles[i])
} else {
quantileMin = int64(eps * float64(numVals-maxRank) / (1.0 - s.quantiles[i]))
}
if quantileMin < threshold {
threshold = quantileMin
}
}

s.compressMinRank -= curr.numRanks
testVal := curr.numRanks + next.numRanks + next.delta

if testVal <= threshold {
if s.insertCursor == s.compressCursor {
if s.insertCursor == curr {
s.insertCursor = next
}

next.numRanks += s.compressCursor.numRanks
next.numRanks += curr.numRanks

prev := s.compressCursor.prev
// no need to release sample here
s.samples.Remove(s.compressCursor)
s.compressCursor = prev
} else {
s.compressCursor = s.compressCursor.prev
s.samples.Remove(curr)
}
s.compressCursor = prev
}

if s.compressCursor == s.samples.Front() {
Expand Down
Loading

0 comments on commit a892422

Please sign in to comment.