Skip to content

Commit

Permalink
Connection Migration V2: split the QuicChromiumPacketWriter's write b…
Browse files Browse the repository at this point in the history
…locked status into two separate invariants.

- |write_in_progress_| is tracking writer status, and is set when an asynchronous write doesn't complete, or HandleWriteError/MaybeRetryAfterWriteError is in progress.

- |force_write_blocked_| is tracking whether writer should be forced blocked regardless of |write_in_progress_|.

QuicChromiumClientSession will set force_write_blocked to true when no alternate network is available or connection migration is in progress to complete to ensure no packets can be written until migration completes. QuicChromiumClientSession::WriteToNewPacket will undo force writer blocked. If packet writer becomes unblocked immediately or after the in-flight write completes, QuicChromiumClientSession::OnWriteUnblocked will be invoked to send a packet after migration. The packet can be a cached packet if |packet_| is set. If there's no cached packet, a queued packet will be sent if there's any, and a ping packet will be sent if writer is still unblocked.

See http://crbug.com/859674 for detailed motivations.

Bug: 859674
Change-Id: Icdbbb5e14f8439c6601e2c19d9a03f05f17d4bc0
Reviewed-on: https://chromium-review.googlesource.com/1125286
Commit-Queue: Zhongyi Shi <zhongyi@chromium.org>
Reviewed-by: Ryan Hamilton <rch@chromium.org>
Cr-Commit-Position: refs/heads/master@{#573869}
  • Loading branch information
zyshi authored and Commit Bot committed Jul 10, 2018
1 parent 9e85e67 commit b3bc982
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 64 deletions.
66 changes: 37 additions & 29 deletions net/quic/chromium/quic_chromium_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ QuicChromiumClientSession::QuicChromiumClientSession(
probing_manager_(this, task_runner_),
retry_migrate_back_count_(0),
current_connection_migration_cause_(UNKNOWN),
send_packet_after_migration_(false),
migration_pending_(false),
headers_include_h2_stream_dependency_(
headers_include_h2_stream_dependency &&
Expand Down Expand Up @@ -1589,7 +1590,8 @@ void QuicChromiumClientSession::OnSuccessfulVersionNegotiation(
void QuicChromiumClientSession::OnConnectivityProbeReceived(
const quic::QuicSocketAddress& self_address,
const quic::QuicSocketAddress& peer_address) {
DVLOG(1) << "Probing response from ip:port: " << peer_address.ToString()
DVLOG(1) << "Speculative probing response from ip:port: "
<< peer_address.ToString()
<< " to ip:port: " << self_address.ToString() << " is received";
// Notify the probing manager that a connectivity probing packet is received.
probing_manager_.OnConnectivityProbingReceived(self_address, peer_address);
Expand Down Expand Up @@ -1702,9 +1704,11 @@ void QuicChromiumClientSession::MigrateSessionOnWriteError(int error_code) {
void QuicChromiumClientSession::OnNoNewNetwork() {
migration_pending_ = true;

// Block the packet writer to avoid any writes while migration is in progress.
DVLOG(1) << "Force blocking the packet writer";
// Force blocking the packet writer to avoid any writes since there is no
// alternate network available.
static_cast<QuicChromiumPacketWriter*>(connection()->writer())
->set_write_blocked(true);
->set_force_write_blocked(true);

// Post a task to maybe close the session if the alarm fires.
task_runner_->PostDelayedTask(
Expand All @@ -1716,30 +1720,15 @@ void QuicChromiumClientSession::OnNoNewNetwork() {
void QuicChromiumClientSession::WriteToNewSocket() {
// Prevent any pending migration from executing.
migration_pending_ = false;
static_cast<QuicChromiumPacketWriter*>(connection()->writer())
->set_write_blocked(false);
if (packet_ == nullptr) {
// Unblock the connection before sending a PING packet, since it
// may have been blocked before the migration started.
connection()->OnCanWrite();
SendPing();
return;
}
// Set |send_packet_after_migration_| to true so that a packet will be
// sent when the writer becomes unblocked.
send_packet_after_migration_ = true;

// The connection is waiting for the original write to complete
// asynchronously. The new writer will notify the connection if the
// write below completes asynchronously, but a synchronous competion
// must be propagated back to the connection here.
quic::WriteResult result =
static_cast<QuicChromiumPacketWriter*>(connection()->writer())
->WritePacketToSocket(std::move(packet_));
if (result.error_code == ERR_IO_PENDING)
return;

// All write errors should be mapped into ERR_IO_PENDING by
// HandleWriteError.
DCHECK_LT(0, result.error_code);
connection()->OnCanWrite();
DVLOG(1) << "Cancel force blocking the packet writer";
// Notify writer that it is no longer forced blocked, which may call
// OnWriteUnblocked() if the writer has no write in progress.
static_cast<QuicChromiumPacketWriter*>(connection()->writer())
->set_force_write_blocked(false);
}

void QuicChromiumClientSession::OnMigrationTimeout(size_t num_sockets) {
Expand Down Expand Up @@ -1974,7 +1963,25 @@ void QuicChromiumClientSession::OnWriteError(int error_code) {
}

void QuicChromiumClientSession::OnWriteUnblocked() {
DCHECK(!connection()->writer()->IsWriteBlocked());

if (packet_) {
DCHECK(send_packet_after_migration_);
send_packet_after_migration_ = false;
static_cast<QuicChromiumPacketWriter*>(connection()->writer())
->WritePacketToSocket(std::move(packet_));
return;
}

// Unblock the connection, which may send queued packets.
connection()->OnCanWrite();
if (send_packet_after_migration_) {
send_packet_after_migration_ = false;
if (!connection()->writer()->IsWriteBlocked()) {
SendPing();
}
}
return;
}

void QuicChromiumClientSession::OnPathDegrading() {
Expand Down Expand Up @@ -2655,9 +2662,10 @@ bool QuicChromiumClientSession::MigrateToSocket(
packet_readers_.push_back(std::move(reader));
sockets_.push_back(std::move(socket));
StartReading();
// Block the writer to prevent it being used until WriteToNewSocket
// completes.
writer->set_write_blocked(true);
// Froce the writer to be blocked to prevent it being used until
// WriteToNewSocket completes.
DVLOG(1) << "Force blocking the packet writer";
writer->set_force_write_blocked(true);
connection()->SetQuicPacketWriter(writer.release(), /*owns_writer=*/true);

// Post task to write the pending packet or a PING packet to the new
Expand Down
19 changes: 14 additions & 5 deletions net/quic/chromium/quic_chromium_client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ class NET_EXPORT_PRIVATE QuicChromiumClientSession
scoped_refptr<QuicChromiumPacketWriter::ReusableIOBuffer>
last_packet) override;
void OnWriteError(int error_code) override;
// Called when the associated writer is unblocked. Write the cached |packet_|
// if |packet_| is set. May send a PING packet if
// |send_packet_after_migration_| is set and writer is not blocked after
// writing queued packets.
void OnWriteUnblocked() override;

// QuicConnectivityProbingManager::Delegate override.
Expand Down Expand Up @@ -551,9 +555,9 @@ class NET_EXPORT_PRIVATE QuicChromiumClientSession
// Attempts to migrate session when a write error is encountered.
void MigrateSessionOnWriteError(int error_code);

// Helper method that writes a packet on the new socket after
// migration completes. If not null, the packet_ member is written,
// otherwise a PING packet is written.
// Helper method that completes connection/server migration.
// Unblocks packet writer on network level. If the writer becomes unblocked
// then, OnWriteUnblocked() will be invoked to send packet after migration.
void WriteToNewSocket();

// Migrates session over to use |peer_address| and |network|.
Expand Down Expand Up @@ -765,15 +769,20 @@ class NET_EXPORT_PRIVATE QuicChromiumClientSession
int streams_pushed_and_claimed_count_;
uint64_t bytes_pushed_count_;
uint64_t bytes_pushed_and_unclaimed_count_;
// Stores packet that witnesses socket write error. This packet is
// written to a new socket after migration completes.
// Stores the packet that witnesses socket write error. This packet will be
// written to an alternate socket when the migration completes and the
// alternate socket is unblocked.
scoped_refptr<QuicChromiumPacketWriter::ReusableIOBuffer> packet_;
// Stores the latest default network platform marks.
NetworkChangeNotifier::NetworkHandle default_network_;
QuicConnectivityProbingManager probing_manager_;
int retry_migrate_back_count_;
base::OneShotTimer migrate_back_to_default_timer_;
ConnectionMigrationCause current_connection_migration_cause_;
// True if a packet needs to be sent when packet writer is unblocked to
// complete connection migration. The packet can be a cached packet if
// |packet_| is set, a queued packet, or a PING packet.
bool send_packet_after_migration_;
// TODO(jri): Replace use of migration_pending_ sockets_.size().
// When a task is posted for MigrateSessionOnError, pass in
// sockets_.size(). Then in MigrateSessionOnError, check to see if
Expand Down
35 changes: 24 additions & 11 deletions net/quic/chromium/quic_chromium_packet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ QuicChromiumPacketWriter::QuicChromiumPacketWriter(
: socket_(socket),
delegate_(nullptr),
packet_(new ReusableIOBuffer(quic::kMaxPacketSize)),
write_blocked_(false),
write_in_progress_(false),
force_write_blocked_(false),
retry_count_(0),
weak_factory_(this) {
retry_timer_.SetTaskRunner(task_runner);
Expand All @@ -98,6 +99,13 @@ QuicChromiumPacketWriter::QuicChromiumPacketWriter(

QuicChromiumPacketWriter::~QuicChromiumPacketWriter() {}

void QuicChromiumPacketWriter::set_force_write_blocked(
bool force_write_blocked) {
force_write_blocked_ = force_write_blocked;
if (!IsWriteBlocked() && delegate_ != nullptr)
delegate_->OnWriteUnblocked();
}

void QuicChromiumPacketWriter::SetPacket(const char* buffer, size_t buf_len) {
if (UNLIKELY(!packet_)) {
packet_ = new ReusableIOBuffer(
Expand Down Expand Up @@ -127,10 +135,13 @@ quic::WriteResult QuicChromiumPacketWriter::WritePacket(
return WritePacketToSocketImpl();
}

quic::WriteResult QuicChromiumPacketWriter::WritePacketToSocket(
void QuicChromiumPacketWriter::WritePacketToSocket(
scoped_refptr<ReusableIOBuffer> packet) {
DCHECK(!force_write_blocked_);
packet_ = std::move(packet);
return QuicChromiumPacketWriter::WritePacketToSocketImpl();
quic::WriteResult result = WritePacketToSocketImpl();
if (result.error_code != ERR_IO_PENDING)
OnWriteComplete(result.error_code);
}

quic::WriteResult QuicChromiumPacketWriter::WritePacketToSocketImpl() {
Expand All @@ -156,7 +167,7 @@ quic::WriteResult QuicChromiumPacketWriter::WritePacketToSocketImpl() {
status = quic::WRITE_STATUS_ERROR;
} else {
status = quic::WRITE_STATUS_BLOCKED;
write_blocked_ = true;
write_in_progress_ = true;
}
}

Expand Down Expand Up @@ -184,17 +195,17 @@ bool QuicChromiumPacketWriter::IsWriteBlockedDataBuffered() const {
}

bool QuicChromiumPacketWriter::IsWriteBlocked() const {
return write_blocked_;
return (force_write_blocked_ || write_in_progress_);
}

void QuicChromiumPacketWriter::SetWritable() {
write_blocked_ = false;
write_in_progress_ = false;
}

void QuicChromiumPacketWriter::OnWriteComplete(int rv) {
DCHECK_NE(rv, ERR_IO_PENDING);
DCHECK(delegate_) << "Uninitialized delegate.";
write_blocked_ = false;
write_in_progress_ = false;
if (rv < 0) {
if (MaybeRetryAfterWriteError(rv))
return;
Expand All @@ -205,8 +216,10 @@ void QuicChromiumPacketWriter::OnWriteComplete(int rv) {
rv = delegate_->HandleWriteError(rv, std::move(packet_));
DCHECK(packet_ == nullptr);
if (rv == ERR_IO_PENDING) {
// Set write blocked back as HandleWriteError hasn't complete migration.
write_blocked_ = true;
// Set write blocked back as write error is encountered in this writer,
// delegate may be able to handle write error but this writer will never
// be used to write any new data.
write_in_progress_ = true;
return;
}
}
Expand All @@ -217,7 +230,7 @@ void QuicChromiumPacketWriter::OnWriteComplete(int rv) {

if (rv < 0)
delegate_->OnWriteError(rv);
else
else if (!force_write_blocked_)
delegate_->OnWriteUnblocked();
}

Expand All @@ -235,7 +248,7 @@ bool QuicChromiumPacketWriter::MaybeRetryAfterWriteError(int rv) {
base::Bind(&QuicChromiumPacketWriter::RetryPacketAfterNoBuffers,
weak_factory_.GetWeakPtr()));
retry_count_++;
write_blocked_ = true;
write_in_progress_ = true;
return true;
}

Expand Down
19 changes: 14 additions & 5 deletions net/quic/chromium/quic_chromium_packet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ class NET_EXPORT_PRIVATE QuicChromiumPacketWriter
// |delegate| must outlive writer.
void set_delegate(Delegate* delegate) { delegate_ = delegate; }

void set_write_blocked(bool write_blocked) { write_blocked_ = write_blocked; }
// This method may unblock the packet writer if |force_write_blocked| is
// false.
void set_force_write_blocked(bool force_write_blocked);

// Writes |packet| to the socket and returns the error code from the write.
quic::WriteResult WritePacketToSocket(scoped_refptr<ReusableIOBuffer> packet);
// Writes |packet| to the socket and handles write result if the write
// completes synchronously.
void WritePacketToSocket(scoped_refptr<ReusableIOBuffer> packet);

// quic::QuicPacketWriter
quic::WriteResult WritePacket(const char* buffer,
Expand Down Expand Up @@ -108,8 +111,14 @@ class NET_EXPORT_PRIVATE QuicChromiumPacketWriter
// moved to the delegate in the case of a write error.
scoped_refptr<ReusableIOBuffer> packet_;

// Whether a write is currently in flight.
bool write_blocked_;
// Whether a write is currently in progress: true if an asynchronous write is
// in flight, or a retry of a previous write is in progress, or session is
// handling write error of a previous write.
bool write_in_progress_;

// If ture, IsWriteBlocked() will return true regardless of
// |write_in_progress_|.
bool force_write_blocked_;

int retry_count_;
// Timer set when a packet should be retried after ENOBUFS.
Expand Down
Loading

0 comments on commit b3bc982

Please sign in to comment.