Skip to content

Commit

Permalink
[coordinator] [query] Add readiness probe for probing current consist…
Browse files Browse the repository at this point in the history
…ency level achievability (#2976)
  • Loading branch information
robskillington authored Dec 4, 2020
1 parent 421a988 commit b6dfbaa
Show file tree
Hide file tree
Showing 11 changed files with 662 additions and 31 deletions.
10 changes: 10 additions & 0 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ setup_single_m3db_node
echo "Start Prometheus containers"
docker-compose -f ${COMPOSE_FILE} up -d prometheus01

function test_readiness {
# Check readiness probe eventually succeeds
echo "Check readiness probe eventually succeeds"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl --write-out "%{http_code}" --silent --output /dev/null 0.0.0.0:7201/ready) -eq "200" ]]'
}

function test_prometheus_remote_read {
# Ensure Prometheus can proxy a Prometheus query
echo "Wait until the remote write endpoint generates and allows for data to be queried"
Expand Down Expand Up @@ -384,6 +391,9 @@ function test_series {
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]'
}

echo "Running readiness test"
test_readiness

echo "Running prometheus tests"
test_prometheus_remote_read
test_prometheus_remote_write_multi_namespaces
Expand Down
90 changes: 90 additions & 0 deletions src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/dbnode/client/replicated_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ func (s replicatedSession) replicate(params replicatedParams) error {
return s.session.Write(params.namespace, params.id, params.t, params.value, params.unit, params.annotation)
}

func (s *replicatedSession) ReadClusterAvailability() (bool, error) {
return s.session.ReadClusterAvailability()
}

func (s *replicatedSession) WriteClusterAvailability() (bool, error) {
return s.session.WriteClusterAvailability()
}

// Write value to the database for an ID.
func (s replicatedSession) Write(namespace, id ident.ID, t time.Time, value float64, unit xtime.Unit, annotation []byte) error {
return s.replicate(replicatedParams{
Expand Down
135 changes: 104 additions & 31 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,6 @@ func (s *session) hostQueues(
newQueues = append(newQueues, newQueue)
}

shards := topoMap.ShardSet().AllIDs()
minConnectionCount := s.opts.MinConnectionCount()
replicas := topoMap.Replicas()
majority := topoMap.MajorityReplicas()

Expand Down Expand Up @@ -731,35 +729,25 @@ func (s *session) hostQueues(
return nil, 0, 0, ErrClusterConnectTimeout
}
}
// Be optimistic
clusterAvailable := true
for _, shardID := range shards {
shardReplicasAvailable := 0
routeErr := topoMap.RouteShardForEach(shardID, func(idx int, _ shard.Shard, _ topology.Host) {
if queues[idx].ConnectionCount() >= minConnectionCount {
shardReplicasAvailable++
}
})
if routeErr != nil {
return nil, 0, 0, routeErr
}
var clusterAvailableForShard bool
switch connectConsistencyLevel {
case topology.ConnectConsistencyLevelAll:
clusterAvailableForShard = shardReplicasAvailable == replicas
case topology.ConnectConsistencyLevelMajority:
clusterAvailableForShard = shardReplicasAvailable >= majority
case topology.ConnectConsistencyLevelOne:
clusterAvailableForShard = shardReplicasAvailable > 0
default:
return nil, 0, 0, errSessionInvalidConnectClusterConnectConsistencyLevel
}
if !clusterAvailableForShard {
clusterAvailable = false
break
}

var level topology.ConsistencyLevel
switch connectConsistencyLevel {
case topology.ConnectConsistencyLevelAll:
level = topology.ConsistencyLevelAll
case topology.ConnectConsistencyLevelMajority:
level = topology.ConsistencyLevelMajority
case topology.ConnectConsistencyLevelOne:
level = topology.ConsistencyLevelOne
default:
return nil, 0, 0, errSessionInvalidConnectClusterConnectConsistencyLevel
}
if clusterAvailable { // All done
clusterAvailable, err := s.clusterAvailabilityWithQueuesAndMap(level,
queues, topoMap)
if err != nil {
return nil, 0, 0, err
}
if clusterAvailable {
// All done
break
}
time.Sleep(clusterConnectWaitInterval)
Expand All @@ -769,6 +757,86 @@ func (s *session) hostQueues(
return queues, replicas, majority, nil
}

func (s *session) WriteClusterAvailability() (bool, error) {
level := s.opts.WriteConsistencyLevel()
return s.clusterAvailability(level)
}

func (s *session) ReadClusterAvailability() (bool, error) {
var convertedConsistencyLevel topology.ConsistencyLevel
level := s.opts.ReadConsistencyLevel()
switch level {
case topology.ReadConsistencyLevelNone:
// Already ready.
return true, nil
case topology.ReadConsistencyLevelOne:
convertedConsistencyLevel = topology.ConsistencyLevelOne
case topology.ReadConsistencyLevelUnstrictMajority:
convertedConsistencyLevel = topology.ConsistencyLevelOne
case topology.ReadConsistencyLevelMajority:
convertedConsistencyLevel = topology.ConsistencyLevelMajority
case topology.ReadConsistencyLevelUnstrictAll:
convertedConsistencyLevel = topology.ConsistencyLevelOne
case topology.ReadConsistencyLevelAll:
convertedConsistencyLevel = topology.ConsistencyLevelAll
default:
return false, fmt.Errorf("unknown consistency level: %d", level)
}
return s.clusterAvailability(convertedConsistencyLevel)
}

func (s *session) clusterAvailability(
level topology.ConsistencyLevel,
) (bool, error) {
s.state.RLock()
queues := s.state.queues
topoMap, err := s.topologyMapWithStateRLock()
s.state.RUnlock()
if err != nil {
return false, err
}
return s.clusterAvailabilityWithQueuesAndMap(level, queues, topoMap)
}

func (s *session) clusterAvailabilityWithQueuesAndMap(
level topology.ConsistencyLevel,
queues []hostQueue,
topoMap topology.Map,
) (bool, error) {
shards := topoMap.ShardSet().AllIDs()
minConnectionCount := s.opts.MinConnectionCount()
replicas := topoMap.Replicas()
majority := topoMap.MajorityReplicas()

for _, shardID := range shards {
shardReplicasAvailable := 0
routeErr := topoMap.RouteShardForEach(shardID, func(idx int, _ shard.Shard, _ topology.Host) {
if queues[idx].ConnectionCount() >= minConnectionCount {
shardReplicasAvailable++
}
})
if routeErr != nil {
return false, routeErr
}
var clusterAvailableForShard bool
switch level {
case topology.ConsistencyLevelAll:
clusterAvailableForShard = shardReplicasAvailable == replicas
case topology.ConsistencyLevelMajority:
clusterAvailableForShard = shardReplicasAvailable >= majority
case topology.ConsistencyLevelOne:
clusterAvailableForShard = shardReplicasAvailable > 0
default:
return false, fmt.Errorf("unknown consistency level: %d", level)
}
if !clusterAvailableForShard {
return false, nil
}
}

return true, nil
}

func (s *session) setTopologyWithLock(topoMap topology.Map, queues []hostQueue, replicas, majority int) {
prevQueues := s.state.queues

Expand Down Expand Up @@ -1879,9 +1947,14 @@ func (s *session) Replicas() int {

func (s *session) TopologyMap() (topology.Map, error) {
s.state.RLock()
topoMap, err := s.topologyMapWithStateRLock()
s.state.RUnlock()
return topoMap, err
}

func (s *session) topologyMapWithStateRLock() (topology.Map, error) {
status := s.state.status
topoMap := s.state.topoMap
s.state.RUnlock()

// Make sure the session is open, as thats what sets the initial topology.
if status != statusOpen {
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ type Client interface {

// Session can write and read to a cluster.
type Session interface {
// WriteClusterAvailability returns whether cluster is available for writes.
WriteClusterAvailability() (bool, error)

// ReadClusterAvailability returns whether cluster is available for reads.
ReadClusterAvailability() (bool, error)

// Write value to the database for an ID.
Write(namespace, id ident.ID, t time.Time, value float64, unit xtime.Unit, annotation []byte) error

Expand Down
Loading

0 comments on commit b6dfbaa

Please sign in to comment.