Skip to content

Commit

Permalink
reduce diff
Browse files Browse the repository at this point in the history
Signed-off-by: Ali Beyad <abeyad@google.com>
  • Loading branch information
abeyad committed Mar 7, 2024
1 parent 1f5dadf commit aa4381f
Showing 1 changed file with 93 additions and 124 deletions.
217 changes: 93 additions & 124 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -598,152 +598,121 @@ void passPayloadToProcessor(uint64_t bytes_read, Buffer::InstancePtr buffer,
std::move(buffer), receive_time);
}

namespace {

// This function is used by readFromSocket() when the UdpRecvMsgMethod is RecvMsg.
Api::IoCallUint64Result recvMsgFromSocket(IoHandle& handle, const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor,
MonotonicTime receive_time, uint32_t* packets_dropped) {
Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
IoHandle::RecvMsgOutput output(1, packets_dropped);

ENVOY_LOG_MISC(trace, "starting recvmsg with max={}", udp_packet_processor.maxDatagramSize());
Api::IoCallUint64Result result =
receiveMessage(udp_packet_processor.maxDatagramSize(), buffer, output, handle, local_address);
Api::IoCallUint64Result
Utility::readFromSocket(IoHandle& handle, const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time,
UdpRecvMsgMethod recv_msg_method, uint32_t* packets_dropped) {
if (recv_msg_method == UdpRecvMsgMethod::RecvMsgWithGro) {
Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
IoHandle::RecvMsgOutput output(1, packets_dropped);

if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
return result;
}
// TODO(yugant): Avoid allocating 24k for each read by getting memory from UdpPacketProcessor
const uint64_t max_rx_datagram_size_with_gro =
NUM_DATAGRAMS_PER_RECEIVE * udp_packet_processor.maxDatagramSize();
ENVOY_LOG_MISC(trace, "starting gro recvmsg with max={}", max_rx_datagram_size_with_gro);

ENVOY_LOG_MISC(trace, "recvmsg bytes {}", result.return_value_);
Api::IoCallUint64Result result =
receiveMessage(max_rx_datagram_size_with_gro, buffer, output, handle, local_address);

passPayloadToProcessor(
result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
std::move(output.msg_[0].local_address_), udp_packet_processor, receive_time);
return result;
}
if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
return result;
}

// This function is used by readFromSocket() when the UdpRecvMsgMethod is RecvMsgWithGro.
Api::IoCallUint64Result recvMsgWithGroFromSocket(IoHandle& handle,
const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor,
MonotonicTime receive_time,
uint32_t* packets_dropped) {
Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
IoHandle::RecvMsgOutput output(1, packets_dropped);
const uint64_t gso_size = output.msg_[0].gso_size_;
ENVOY_LOG_MISC(trace, "gro recvmsg bytes {} with gso_size as {}", result.return_value_,
gso_size);

// TODO(yugant): Avoid allocating 24k for each read by getting memory from UdpPacketProcessor
const uint64_t max_rx_datagram_size_with_gro =
NUM_DATAGRAMS_PER_RECEIVE * udp_packet_processor.maxDatagramSize();
ENVOY_LOG_MISC(trace, "starting gro recvmsg with max={}", max_rx_datagram_size_with_gro);
// Skip gso segmentation and proceed as a single payload.
if (gso_size == 0u) {
passPayloadToProcessor(
result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
std::move(output.msg_[0].local_address_), udp_packet_processor, receive_time);
return result;
}

Api::IoCallUint64Result result =
receiveMessage(max_rx_datagram_size_with_gro, buffer, output, handle, local_address);
// Segment the buffer read by the recvmsg syscall into gso_sized sub buffers.
// TODO(mattklein123): The following code should be optimized to avoid buffer copies, either by
// switching to slices or by using a CoW buffer type.
while (buffer->length() > 0) {
const uint64_t bytes_to_copy = std::min(buffer->length(), gso_size);
Buffer::InstancePtr sub_buffer = std::make_unique<Buffer::OwnedImpl>();
sub_buffer->move(*buffer, bytes_to_copy);
passPayloadToProcessor(bytes_to_copy, std::move(sub_buffer), output.msg_[0].peer_address_,
output.msg_[0].local_address_, udp_packet_processor, receive_time);
}

if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
return result;
}

const uint64_t gso_size = output.msg_[0].gso_size_;
ENVOY_LOG_MISC(trace, "gro recvmsg bytes {} with gso_size as {}", result.return_value_, gso_size);
if (recv_msg_method == UdpRecvMsgMethod::RecvMmsg) {
const auto max_rx_datagram_size = udp_packet_processor.maxDatagramSize();

// Buffer::ReservationSingleSlice is always passed by value, and can only be constructed
// by Buffer::Instance::reserve(), so this is needed to keep a fixed array
// in which all elements are legally constructed.
struct BufferAndReservation {
BufferAndReservation(uint64_t max_rx_datagram_size)
: buffer_(std::make_unique<Buffer::OwnedImpl>()),
reservation_(buffer_->reserveSingleSlice(max_rx_datagram_size, true)) {}

Buffer::InstancePtr buffer_;
Buffer::ReservationSingleSlice reservation_;
};
constexpr uint32_t num_slices_per_packet = 1u;
absl::InlinedVector<BufferAndReservation, NUM_DATAGRAMS_PER_RECEIVE> buffers;
RawSliceArrays slices(NUM_DATAGRAMS_PER_RECEIVE,
absl::FixedArray<Buffer::RawSlice>(num_slices_per_packet));
for (uint32_t i = 0; i < NUM_DATAGRAMS_PER_RECEIVE; i++) {
buffers.push_back(max_rx_datagram_size);
slices[i][0] = buffers[i].reservation_.slice();
}

// Skip gso segmentation and proceed as a single payload.
if (gso_size == 0u) {
passPayloadToProcessor(
result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
std::move(output.msg_[0].local_address_), udp_packet_processor, receive_time);
return result;
}
IoHandle::RecvMsgOutput output(NUM_DATAGRAMS_PER_RECEIVE, packets_dropped);
ENVOY_LOG_MISC(trace, "starting recvmmsg with packets={} max={}", NUM_DATAGRAMS_PER_RECEIVE,
max_rx_datagram_size);
Api::IoCallUint64Result result = handle.recvmmsg(slices, local_address.ip()->port(), output);
if (!result.ok()) {
return result;
}

// Segment the buffer read by the recvmsg syscall into gso_sized sub buffers.
// TODO(mattklein123): The following code should be optimized to avoid buffer copies, either by
// switching to slices or by using a CoW buffer type.
while (buffer->length() > 0) {
const uint64_t bytes_to_copy = std::min(buffer->length(), gso_size);
Buffer::InstancePtr sub_buffer = std::make_unique<Buffer::OwnedImpl>();
sub_buffer->move(*buffer, bytes_to_copy);
passPayloadToProcessor(bytes_to_copy, std::move(sub_buffer), output.msg_[0].peer_address_,
output.msg_[0].local_address_, udp_packet_processor, receive_time);
}
uint64_t packets_read = result.return_value_;
ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read);
for (uint64_t i = 0; i < packets_read; ++i) {
if (output.msg_[i].truncated_and_dropped_) {
continue;
}

return result;
}
Buffer::RawSlice* slice = slices[i].data();
const uint64_t msg_len = output.msg_[i].msg_len_;
ASSERT(msg_len <= slice->len_);
ENVOY_LOG_MISC(debug, "Receive a packet with {} bytes from {}", msg_len,
output.msg_[i].peer_address_->asString());

// This function is used by readFromSocket() when the UdpRecvMsgMethod is RecvMmsg.
Api::IoCallUint64Result recvMmsgFromSocket(IoHandle& handle, const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor,
MonotonicTime receive_time, uint32_t* packets_dropped) {
const auto max_rx_datagram_size = udp_packet_processor.maxDatagramSize();

// Buffer::ReservationSingleSlice is always passed by value, and can only be constructed
// by Buffer::Instance::reserve(), so this is needed to keep a fixed array
// in which all elements are legally constructed.
struct BufferAndReservation {
BufferAndReservation(uint64_t max_rx_datagram_size)
: buffer_(std::make_unique<Buffer::OwnedImpl>()),
reservation_(buffer_->reserveSingleSlice(max_rx_datagram_size, true)) {}

Buffer::InstancePtr buffer_;
Buffer::ReservationSingleSlice reservation_;
};

constexpr uint32_t num_slices_per_packet = 1u;
absl::InlinedVector<BufferAndReservation, NUM_DATAGRAMS_PER_RECEIVE> buffers;
RawSliceArrays slices(NUM_DATAGRAMS_PER_RECEIVE,
absl::FixedArray<Buffer::RawSlice>(num_slices_per_packet));
for (uint32_t i = 0; i < NUM_DATAGRAMS_PER_RECEIVE; i++) {
buffers.push_back(max_rx_datagram_size);
slices[i][0] = buffers[i].reservation_.slice();
}

IoHandle::RecvMsgOutput output(NUM_DATAGRAMS_PER_RECEIVE, packets_dropped);
ENVOY_LOG_MISC(trace, "starting recvmmsg with packets={} max={}", NUM_DATAGRAMS_PER_RECEIVE,
max_rx_datagram_size);
Api::IoCallUint64Result result = handle.recvmmsg(slices, local_address.ip()->port(), output);
if (!result.ok()) {
return result;
}
buffers[i].reservation_.commit(std::min(max_rx_datagram_size, msg_len));

uint64_t packets_read = result.return_value_;
ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read);
for (uint64_t i = 0; i < packets_read; ++i) {
if (output.msg_[i].truncated_and_dropped_) {
continue;
passPayloadToProcessor(msg_len, std::move(buffers[i].buffer_), output.msg_[i].peer_address_,
output.msg_[i].local_address_, udp_packet_processor, receive_time);
}
return result;
}

Buffer::RawSlice* slice = slices[i].data();
const uint64_t msg_len = output.msg_[i].msg_len_;
ASSERT(msg_len <= slice->len_);
ENVOY_LOG_MISC(debug, "Receive a packet with {} bytes from {}", msg_len,
output.msg_[i].peer_address_->asString());
Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
IoHandle::RecvMsgOutput output(1, packets_dropped);

buffers[i].reservation_.commit(std::min(max_rx_datagram_size, msg_len));
ENVOY_LOG_MISC(trace, "starting recvmsg with max={}", udp_packet_processor.maxDatagramSize());
Api::IoCallUint64Result result =
receiveMessage(udp_packet_processor.maxDatagramSize(), buffer, output, handle, local_address);

passPayloadToProcessor(msg_len, std::move(buffers[i].buffer_), output.msg_[i].peer_address_,
output.msg_[i].local_address_, udp_packet_processor, receive_time);
if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
return result;
}
return result;
}

} // namespace
ENVOY_LOG_MISC(trace, "recvmsg bytes {}", result.return_value_);

Api::IoCallUint64Result
Utility::readFromSocket(IoHandle& handle, const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time,
UdpRecvMsgMethod recv_msg_method, uint32_t* packets_dropped) {
switch (recv_msg_method) {
case UdpRecvMsgMethod::RecvMsgWithGro:
return recvMsgWithGroFromSocket(handle, local_address, udp_packet_processor, receive_time,
packets_dropped);
case UdpRecvMsgMethod::RecvMmsg:
return recvMmsgFromSocket(handle, local_address, udp_packet_processor, receive_time,
packets_dropped);
case UdpRecvMsgMethod::RecvMsg:
return recvMsgFromSocket(handle, local_address, udp_packet_processor, receive_time,
packets_dropped);
}
// Should never get here; need this to make gcc happy.
return Api::ioCallUint64ResultNoError();
passPayloadToProcessor(
result.return_value_, std::move(buffer), std::move(output.msg_[0].peer_address_),
std::move(output.msg_[0].local_address_), udp_packet_processor, receive_time);
return result;
}

Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
Expand Down

0 comments on commit aa4381f

Please sign in to comment.