Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adaptive concurrency: Gradient algorithm implementation #7908

Merged
merged 33 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ api_proto_library_internal(
srcs = ["adaptive_concurrency.proto"],
deps = [
"//envoy/api/v2/core:base",
"//envoy/type:percent",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,59 @@ option java_package = "io.envoyproxy.envoy.config.filter.http.adaptive_concurren
option java_outer_classname = "AdaptiveConcurrencyProto";
option java_multiple_files = true;

import "envoy/type/percent.proto";

import "google/protobuf/duration.proto";
import "google/api/annotations.proto";
import "google/protobuf/wrappers.proto";

import "validate/validate.proto";

// Configuration parameters for the gradient controller.
message GradientControllerConfig {
// The percentile to use when summarizing aggregated samples. Defaults to p50.
envoy.type.Percent sample_aggregate_percentile = 1;

// Parameters controlling the periodic recalculation of the concurrency limit from sampled request
// latencies.
message ConcurrencyLimitCalculationParams {
// The maximum value the gradient is allowed to take. This influences how aggressively the
// concurrency limit can increase. Defaults to 2.0.
google.protobuf.DoubleValue max_gradient = 1 [(validate.rules).double.gt = 1.0];

// The allowed upper-bound on the calculated concurrency limit. Defaults to 1000.
google.protobuf.UInt32Value max_concurrency_limit = 2 [(validate.rules).uint32.gt = 0];

// The period of time samples are taken to recalculate the concurrency limit.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.Duration concurrency_update_interval = 3 [(validate.rules).duration = {
required: true,
gt: {seconds: 0}
}];
}
ConcurrencyLimitCalculationParams concurrency_limit_params = 2
[(validate.rules).message.required = true];

// Parameters controlling the periodic minRTT recalculation.
message MinimumRTTCalculationParams {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
// The time interval between recalculating the minimum request round-trip time.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.Duration interval = 1 [(validate.rules).duration = {
required: true,
gt: {seconds: 0}
}];

// The number of requests to aggregate/sample during the minRTT recalculation window before
// updating. Defaults to 50.
google.protobuf.UInt32Value request_count = 2 [(validate.rules).uint32.gt = 0];
};
MinimumRTTCalculationParams min_rtt_calc_params = 3 [(validate.rules).message.required = true];
}

message AdaptiveConcurrency {
oneof concurrency_controller_config {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
option (validate.required) = true;

// Gradient concurrency control will be used.
GradientControllerConfig gradient_controller_config = 1
[(validate.rules).message.required = true];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,23 @@ Http::FilterHeadersStatus AdaptiveConcurrencyFilter::decodeHeaders(Http::HeaderM
return Http::FilterHeadersStatus::StopIteration;
}

rq_start_time_ = config_->timeSource().monotonicTime();
// When the deferred_sample_task_ object is destroyed, the time difference between its destruction
// and the request start time is measured as the request latency. This value is sampled by the
// concurrency controller either when encoding is complete or during destruction of this filter
// object.
const auto rq_start_time = config_->timeSource().monotonicTime();
deferred_sample_task_ = std::make_unique<Cleanup>([this, rq_start_time]() {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the reset case, I don't think you want to actually record a latency sample, you just want to decrement the active request count, so I think you need to account for that.

Also, I would do explicit cleanup on onDestroy() to avoid any issues around deferred deletion. Basically, in the onDestroy() case if there is still an active count, you should just decrement the count and not store a sample.

IMO it's cleaner the controller to return an interface object that hides all of this. It would have a single method to record a latency sample, and on destruction would decrement the count. But this can work too, with all callers having to deal with this accounting.

const auto now = config_->timeSource().monotonicTime();
const std::chrono::nanoseconds rq_latency = now - rq_start_time;
controller_->recordLatencySample(rq_latency);
});

return Http::FilterHeadersStatus::Continue;
}

void AdaptiveConcurrencyFilter::encodeComplete() {
const auto rq_latency = config_->timeSource().monotonicTime() - rq_start_time_;
controller_->recordLatencySample(rq_latency);
ASSERT(deferred_sample_task_);
deferred_sample_task_.reset();
}

} // namespace AdaptiveConcurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"

#include "common/common/cleanup.h"

#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h"
#include "extensions/filters/http/common/pass_through_filter.h"

Expand Down Expand Up @@ -61,8 +63,7 @@ class AdaptiveConcurrencyFilter : public Http::PassThroughFilter,
private:
AdaptiveConcurrencyFilterConfigSharedPtr config_;
const ConcurrencyControllerSharedPtr controller_;
MonotonicTime rq_start_time_;
std::unique_ptr<ConcurrencyController::RequestForwardingAction> forwarding_action_;
std::unique_ptr<Cleanup> deferred_sample_task_;
};

} // namespace AdaptiveConcurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@ envoy_package()

envoy_cc_library(
name = "concurrency_controller_lib",
srcs = [],
srcs = ["gradient_controller.cc"],
hdrs = [
"concurrency_controller.h",
"gradient_controller.h",
],
external_deps = [
"libcircllhist",
],
deps = [
"//source/common/event:dispatcher_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_lib",
"//source/common/stats:isolated_store_lib",
"//source/common/stats:stats_lib",
"@envoy_api//envoy/config/filter/http/adaptive_concurrency/v2alpha:adaptive_concurrency_cc",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ class ConcurrencyController {
*
* @param rq_latency is the clocked round-trip time for the request.
*/
virtual void recordLatencySample(const std::chrono::nanoseconds& rq_latency) PURE;
virtual void recordLatencySample(std::chrono::nanoseconds rq_latency) PURE;

/**
* Returns the current concurrency limit.
*/
virtual uint32_t concurrencyLimit() const PURE;
};

} // namespace ConcurrencyController
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.h"

#include <atomic>
#include <chrono>

#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.h"
#include "envoy/event/dispatcher.h"
#include "envoy/runtime/runtime.h"
#include "envoy/stats/stats.h"

#include "common/common/cleanup.h"
#include "common/protobuf/protobuf.h"
#include "common/protobuf/utility.h"

#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h"

#include "absl/synchronization/mutex.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdaptiveConcurrency {
namespace ConcurrencyController {

GradientControllerConfig::GradientControllerConfig(
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
const envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig&
proto_config)
: min_rtt_calc_interval_(std::chrono::milliseconds(
DurationUtil::durationToMilliseconds(proto_config.min_rtt_calc_params().interval()))),
sample_rtt_calc_interval_(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
proto_config.concurrency_limit_params().concurrency_update_interval()))),
max_concurrency_limit_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
proto_config.concurrency_limit_params(), max_concurrency_limit, 1000)),
min_rtt_aggregate_request_count_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.min_rtt_calc_params(), request_count, 50)),
max_gradient_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.concurrency_limit_params(),
max_gradient, 2.0)),
sample_aggregate_percentile_(PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
proto_config, sample_aggregate_percentile, 1000, 500) /
1000.0) {}

GradientController::GradientController(GradientControllerConfigSharedPtr config,
Event::Dispatcher& dispatcher, Runtime::Loader&,
const std::string& stats_prefix, Stats::Scope& scope)
: config_(std::move(config)), dispatcher_(dispatcher), scope_(scope),
stats_(generateStats(scope_, stats_prefix)), deferred_limit_value_(1), num_rq_outstanding_(0),
concurrency_limit_(1), latency_sample_hist_(hist_fast_alloc(), hist_free) {
min_rtt_calc_timer_ = dispatcher_.createTimer([this]() -> void {
absl::MutexLock ml(&sample_mutation_mtx_);
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
enterMinRTTSamplingWindow();
});

sample_reset_timer_ = dispatcher_.createTimer([this]() -> void {
if (inMinRTTSamplingWindow()) {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
// Since the minRTT value is being calculated, let's give up on this timer. At the end of the
// minRTT calculation, this timer will be enabled again.
return;
}

{
absl::MutexLock ml(&sample_mutation_mtx_);
resetSampleWindow();
}

sample_reset_timer_->enableTimer(config_->sampleRTTCalcInterval());
});

sample_reset_timer_->enableTimer(config_->sampleRTTCalcInterval());
stats_.concurrency_limit_.set(concurrency_limit_.load());
}

GradientControllerStats GradientController::generateStats(Stats::Scope& scope,
const std::string& stats_prefix) {
return {ALL_GRADIENT_CONTROLLER_STATS(POOL_GAUGE_PREFIX(scope, stats_prefix))};
}

void GradientController::enterMinRTTSamplingWindow() {
// Set the minRTT flag to indicate we're gathering samples to update the value. This will
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
// prevent the sample window from resetting until enough requests are gathered to complete the
// recalculation.
deferred_limit_value_.store(concurrencyLimit());
updateConcurrencyLimit(1);

// Throw away any latency samples from before the recalculation window as it may not represent
// the minRTT.
hist_clear(latency_sample_hist_.get());
}

void GradientController::updateMinRTT() {
ASSERT(inMinRTTSamplingWindow());

{
absl::MutexLock ml(&sample_mutation_mtx_);
min_rtt_ = processLatencySamplesAndClear();
stats_.min_rtt_msecs_.set(
std::chrono::duration_cast<std::chrono::milliseconds>(min_rtt_).count());
updateConcurrencyLimit(deferred_limit_value_.load());
deferred_limit_value_.store(0);
}

min_rtt_calc_timer_->enableTimer(config_->minRTTCalcInterval());
}

void GradientController::resetSampleWindow() {
// The sampling window must not be reset while sampling for the new minRTT value.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
ASSERT(!inMinRTTSamplingWindow());

if (hist_sample_count(latency_sample_hist_.get()) == 0) {
return;
}

sample_rtt_ = processLatencySamplesAndClear();
updateConcurrencyLimit(calculateNewLimit());
}

std::chrono::microseconds GradientController::processLatencySamplesAndClear() {
const std::array<double, 1> quantile{config_->sampleAggregatePercentile()};
std::array<double, 1> calculated_quantile;
hist_approx_quantile(latency_sample_hist_.get(), quantile.data(), 1, calculated_quantile.data());
hist_clear(latency_sample_hist_.get());
return std::chrono::microseconds(static_cast<int>(calculated_quantile[0]));
}

uint32_t GradientController::calculateNewLimit() {
// Calculate the gradient value, ensuring it remains below the configured maximum.
ASSERT(sample_rtt_.count() > 0);
const double raw_gradient = static_cast<double>(min_rtt_.count()) / sample_rtt_.count();
const double gradient = std::min(config_->maxGradient(), raw_gradient);
stats_.gradient_.set(gradient);

const double limit = concurrencyLimit() * gradient;
const double burst_headroom = sqrt(limit);
stats_.burst_queue_size_.set(burst_headroom);

// The final concurrency value factors in the burst headroom and must be clamped to keep the value
// in the range [1, configured_max].
const auto clamp = [](int min, int max, int val) { return std::max(min, std::min(max, val)); };
const uint32_t new_limit = limit + burst_headroom;
return clamp(1, config_->maxConcurrencyLimit(), new_limit);
}

RequestForwardingAction GradientController::forwardingDecision() {
// Note that a race condition exists here which would allow more outstanding requests than the
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
// concurrency limit bounded by the number of worker threads. After loading num_rq_outstanding_
// and before loading concurrency_limit_, another thread could potentially swoop in and modify
// num_rq_outstanding_, causing us to move forward with stale values and increment
// num_rq_outstanding_.
//
// TODO (tonya11en): Reconsider using a CAS loop here.
if (num_rq_outstanding_.load() < concurrencyLimit()) {
++num_rq_outstanding_;
return RequestForwardingAction::Forward;
}
return RequestForwardingAction::Block;
}

void GradientController::recordLatencySample(std::chrono::nanoseconds rq_latency) {
const uint32_t latency_usec =
std::chrono::duration_cast<std::chrono::microseconds>(rq_latency).count();
ASSERT(num_rq_outstanding_.load() > 0);
--num_rq_outstanding_;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
tonya11en marked this conversation as resolved.
Show resolved Hide resolved

uint32_t sample_count;
{
absl::MutexLock ml(&sample_mutation_mtx_);
hist_insert(latency_sample_hist_.get(), latency_usec, 1);
sample_count = hist_sample_count(latency_sample_hist_.get());
}

if (inMinRTTSamplingWindow() && sample_count >= config_->minRTTAggregateRequestCount()) {
// This sample has pushed the request count over the request count requirement for the minRTT
// recalculation. It must now be finished.
updateMinRTT();
}
}

} // namespace ConcurrencyController
} // namespace AdaptiveConcurrency
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Loading