-
Notifications
You must be signed in to change notification settings - Fork 4.8k
/
codec_helper.h
145 lines (122 loc) · 4.32 KB
/
codec_helper.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#pragma once
#include "envoy/event/dispatcher.h"
#include "envoy/event/timer.h"
#include "envoy/http/codec.h"
#include "source/common/common/assert.h"
#include "absl/container/inlined_vector.h"
namespace Envoy {
namespace Http {
class StreamCallbackHelper {
public:
void runLowWatermarkCallbacks() {
if (reset_callbacks_started_ || local_end_stream_) {
return;
}
ASSERT(high_watermark_callbacks_ > 0);
--high_watermark_callbacks_;
for (StreamCallbacks* callbacks : callbacks_) {
if (callbacks) {
callbacks->onBelowWriteBufferLowWatermark();
}
}
}
void runHighWatermarkCallbacks() {
if (reset_callbacks_started_ || local_end_stream_) {
return;
}
++high_watermark_callbacks_;
for (StreamCallbacks* callbacks : callbacks_) {
if (callbacks) {
callbacks->onAboveWriteBufferHighWatermark();
}
}
}
void runResetCallbacks(StreamResetReason reason, absl::string_view details) {
// Reset callbacks are a special case, and the only StreamCallbacks allowed
// to run after local_end_stream_.
if (reset_callbacks_started_) {
return;
}
reset_callbacks_started_ = true;
for (StreamCallbacks* callbacks : callbacks_) {
if (callbacks) {
callbacks->onResetStream(reason, details);
}
}
}
bool local_end_stream_{};
protected:
void addCallbacksHelper(StreamCallbacks& callbacks) {
ASSERT(!reset_callbacks_started_ && !local_end_stream_);
callbacks_.push_back(&callbacks);
for (uint32_t i = 0; i < high_watermark_callbacks_; ++i) {
callbacks.onAboveWriteBufferHighWatermark();
}
}
void removeCallbacksHelper(StreamCallbacks& callbacks) {
// For performance reasons we just clear the callback and do not resize the vector.
// Reset callbacks scale with the number of filters per request and do not get added and
// removed multiple times.
// The vector may not be safely resized without making sure the run.*Callbacks() helper
// functions above still handle removeCallbacksHelper() calls mid-loop.
for (auto& callback : callbacks_) {
if (callback == &callbacks) {
callback = nullptr;
return;
}
}
}
private:
absl::InlinedVector<StreamCallbacks*, 8> callbacks_;
bool reset_callbacks_started_{};
uint32_t high_watermark_callbacks_{};
};
// A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream
// with buffered data and register the stream adapter.
class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper {
public:
MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
~MultiplexedStreamImplBase() override { ASSERT(stream_idle_timer_ == nullptr); }
// TODO(mattklein123): Optimally this would be done in the destructor but there are currently
// deferred delete lifetime issues that need sorting out if the destructor of the stream is
// going to be able to refer to the parent connection.
virtual void destroy() { disarmStreamIdleTimer(); }
void onLocalEndStream() {
ASSERT(local_end_stream_);
if (hasPendingData()) {
createPendingFlushTimer();
}
}
void disarmStreamIdleTimer() {
if (stream_idle_timer_ != nullptr) {
// To ease testing and the destructor assertion.
stream_idle_timer_->disableTimer();
stream_idle_timer_.reset();
}
}
CodecEventCallbacks* registerCodecEventCallbacks(CodecEventCallbacks* codec_callbacks) override {
std::swap(codec_callbacks, codec_callbacks_);
return codec_callbacks;
}
protected:
void setFlushTimeout(std::chrono::milliseconds timeout) override {
stream_idle_timeout_ = timeout;
}
void createPendingFlushTimer() {
ASSERT(stream_idle_timer_ == nullptr);
if (stream_idle_timeout_.count() > 0) {
stream_idle_timer_ = dispatcher_.createTimer([this] { onPendingFlushTimer(); });
stream_idle_timer_->enableTimer(stream_idle_timeout_);
}
}
virtual void onPendingFlushTimer() { stream_idle_timer_.reset(); }
virtual bool hasPendingData() PURE;
CodecEventCallbacks* codec_callbacks_{nullptr};
private:
Event::Dispatcher& dispatcher_;
// See HttpConnectionManager.stream_idle_timeout.
std::chrono::milliseconds stream_idle_timeout_{};
Event::TimerPtr stream_idle_timer_;
};
} // namespace Http
} // namespace Envoy