Skip to content

Commit

Permalink
QuicChromiumClientSession::StreamRequest to be created by the session
Browse files Browse the repository at this point in the history
on demand to the client. Make the request hold the stream until the
caller asks for it, and make explicit cancellation unnecessary.

This is in preparation for subsquent cleanups of the session.

Review-Url: https://codereview.chromium.org/2844493002
Cr-Commit-Position: refs/heads/master@{#467247}
  • Loading branch information
rch-chromium-org authored and Commit bot committed Apr 26, 2017
1 parent ea39d1d commit 1baa747
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 43 deletions.
9 changes: 5 additions & 4 deletions net/quic/chromium/bidirectional_stream_quic_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ void BidirectionalStreamQuicImpl::Start(
delegate_ = delegate;
request_info_ = request_info;

int rv = stream_request_.StartRequest(
session_, &stream_,
base::Bind(&BidirectionalStreamQuicImpl::OnStreamReady,
weak_factory_.GetWeakPtr()));
stream_request_ = session_->CreateStreamRequest();
int rv = stream_request_->StartRequest(base::Bind(
&BidirectionalStreamQuicImpl::OnStreamReady, weak_factory_.GetWeakPtr()));
if (rv == OK) {
OnStreamReady(rv);
} else if (!was_handshake_confirmed_) {
Expand Down Expand Up @@ -306,6 +305,8 @@ void BidirectionalStreamQuicImpl::OnStreamReady(int rv) {
DCHECK_NE(ERR_IO_PENDING, rv);
DCHECK(rv == OK || !stream_);
if (rv == OK) {
stream_ = stream_request_->ReleaseStream();
stream_request_.reset();
stream_->SetDelegate(this);
if (!was_handshake_confirmed_ && request_info_->method == "POST") {
waiting_for_confirmation_ = true;
Expand Down
2 changes: 1 addition & 1 deletion net/quic/chromium/bidirectional_stream_quic_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class NET_EXPORT_PRIVATE BidirectionalStreamQuicImpl

base::WeakPtr<QuicChromiumClientSession> session_;
bool was_handshake_confirmed_; // True if the crypto handshake succeeded.
QuicChromiumClientSession::StreamRequest stream_request_;
std::unique_ptr<QuicChromiumClientSession::StreamRequest> stream_request_;
QuicChromiumClientStream* stream_; // Non-owning.

const BidirectionalStreamRequestInfo* request_info_;
Expand Down
46 changes: 29 additions & 17 deletions net/quic/chromium/quic_chromium_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,37 +186,43 @@ class QuicServerPushHelper : public ServerPushDelegate::ServerPushHelper {

} // namespace

QuicChromiumClientSession::StreamRequest::StreamRequest() : stream_(nullptr) {}
QuicChromiumClientSession::StreamRequest::StreamRequest(
const base::WeakPtr<QuicChromiumClientSession>& session)
: session_(session), stream_(nullptr) {}

QuicChromiumClientSession::StreamRequest::~StreamRequest() {
CancelRequest();
if (stream_)
stream_->Reset(QUIC_STREAM_CANCELLED);

if (session_)
session_->CancelRequest(this);
}

int QuicChromiumClientSession::StreamRequest::StartRequest(
const base::WeakPtr<QuicChromiumClientSession>& session,
QuicChromiumClientStream** stream,
const CompletionCallback& callback) {
session_ = session;
stream_ = stream;
int rv = session_->TryCreateStream(this, stream_);
DCHECK(session_);
int rv = session_->TryCreateStream(this);
if (rv == ERR_IO_PENDING) {
callback_ = callback;
} else {
session_.reset();
}

return rv;
}

void QuicChromiumClientSession::StreamRequest::CancelRequest() {
if (session_)
session_->CancelRequest(this);
session_.reset();
callback_.Reset();
QuicChromiumClientStream*
QuicChromiumClientSession::StreamRequest::ReleaseStream() {
DCHECK(stream_);
QuicChromiumClientStream* stream = stream_;
stream_ = nullptr;
return stream;
}

void QuicChromiumClientSession::StreamRequest::OnRequestCompleteSuccess(
QuicChromiumClientStream* stream) {
session_.reset();
*stream_ = stream;
stream_ = stream;
base::ResetAndReturn(&callback_).Run(OK);
}

Expand Down Expand Up @@ -456,9 +462,15 @@ void QuicChromiumClientSession::RemoveObserver(Observer* observer) {
observers_.erase(observer);
}

int QuicChromiumClientSession::TryCreateStream(
StreamRequest* request,
QuicChromiumClientStream** stream) {
std::unique_ptr<QuicChromiumClientSession::StreamRequest>
QuicChromiumClientSession::CreateStreamRequest() {
// base::MakeUnique does not work because the StreamRequest constructor
// is private.
return std::unique_ptr<StreamRequest>(
new StreamRequest(weak_factory_.GetWeakPtr()));
}

int QuicChromiumClientSession::TryCreateStream(StreamRequest* request) {
if (goaway_received()) {
DVLOG(1) << "Going away.";
return ERR_CONNECTION_CLOSED;
Expand All @@ -475,7 +487,7 @@ int QuicChromiumClientSession::TryCreateStream(
}

if (GetNumOpenOutgoingStreams() < max_open_outgoing_streams()) {
*stream = CreateOutgoingReliableStreamImpl();
request->stream_ = CreateOutgoingReliableStreamImpl();
return OK;
}

Expand Down
23 changes: 12 additions & 11 deletions net/quic/chromium/quic_chromium_client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,24 @@ class NET_EXPORT_PRIVATE QuicChromiumClientSession
// A helper class used to manage a request to create a stream.
class NET_EXPORT_PRIVATE StreamRequest {
public:
StreamRequest();
// Cancels any pending stream creation request and resets |stream_| if
// it has not yet been released.
~StreamRequest();

// Starts a request to create a stream. If OK is returned, then
// |stream| will be updated with the newly created stream. If
// |stream_| will be updated with the newly created stream. If
// ERR_IO_PENDING is returned, then when the request is eventuallly
// complete |callback| will be called.
int StartRequest(const base::WeakPtr<QuicChromiumClientSession>& session,
QuicChromiumClientStream** stream,
const CompletionCallback& callback);
int StartRequest(const CompletionCallback& callback);

// Cancels any pending stream creation request. May be called
// repeatedly.
void CancelRequest();
// Releases |stream_| to the caller
QuicChromiumClientStream* ReleaseStream();

private:
friend class QuicChromiumClientSession;

StreamRequest(const base::WeakPtr<QuicChromiumClientSession>& session);

// Called by |session_| for an asynchronous request when the stream
// request has finished successfully.
void OnRequestCompleteSuccess(QuicChromiumClientStream* stream);
Expand All @@ -107,7 +107,7 @@ class NET_EXPORT_PRIVATE QuicChromiumClientSession

base::WeakPtr<QuicChromiumClientSession> session_;
CompletionCallback callback_;
QuicChromiumClientStream** stream_;
QuicChromiumClientStream* stream_;
// For tracking how much time pending stream requests wait.
base::TimeTicks pending_start_time_;

Expand Down Expand Up @@ -147,15 +147,16 @@ class NET_EXPORT_PRIVATE QuicChromiumClientSession
void AddObserver(Observer* observer);
void RemoveObserver(Observer* observer);

std::unique_ptr<StreamRequest> CreateStreamRequest();

// Attempts to create a new stream. If the stream can be
// created immediately, returns OK. If the open stream limit
// has been reached, returns ERR_IO_PENDING, and |request|
// will be added to the stream requets queue and will
// be completed asynchronously.
// TODO(rch): remove |stream| from this and use setter on |request|
// and fix in spdy too.
int TryCreateStream(StreamRequest* request,
QuicChromiumClientStream** stream);
int TryCreateStream(StreamRequest* request);

// Cancels the pending stream creation request.
void CancelRequest(StreamRequest* request);
Expand Down
137 changes: 130 additions & 7 deletions net/quic/chromium/quic_chromium_client_session_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "net/log/test_net_log.h"
#include "net/quic/chromium/crypto/proof_verifier_chromium.h"
#include "net/quic/chromium/mock_crypto_client_stream_factory.h"
#include "net/quic/chromium/mock_quic_data.h"
#include "net/quic/chromium/quic_chromium_alarm_factory.h"
#include "net/quic/chromium/quic_chromium_client_session_peer.h"
#include "net/quic/chromium/quic_chromium_connection_helper.h"
Expand Down Expand Up @@ -103,7 +104,8 @@ class QuicChromiumClientSessionTest
}

void Initialize() {
socket_factory_.AddSocketDataProvider(socket_data_.get());
if (socket_data_)
socket_factory_.AddSocketDataProvider(socket_data_.get());
std::unique_ptr<DatagramClientSocket> socket =
socket_factory_.CreateDatagramClientSocket(DatagramSocket::DEFAULT_BIND,
base::Bind(&base::RandInt),
Expand Down Expand Up @@ -193,6 +195,129 @@ TEST_P(QuicChromiumClientSessionTest, CryptoConnect) {
CompleteCryptoHandshake();
}

TEST_P(QuicChromiumClientSessionTest, StreamRequest) {
MockQuicData quic_data;
quic_data.AddWrite(client_maker_.MakeInitialSettingsPacket(1, nullptr));
quic_data.AddRead(ASYNC, ERR_IO_PENDING);
quic_data.AddRead(ASYNC, OK); // EOF
quic_data.AddSocketDataToFactory(&socket_factory_);

Initialize();
CompleteCryptoHandshake();

// Request a stream and verify that a stream was created.
std::unique_ptr<QuicChromiumClientSession::StreamRequest> stream_request =
session_->CreateStreamRequest();
TestCompletionCallback callback;
ASSERT_EQ(OK, stream_request->StartRequest(callback.callback()));
EXPECT_TRUE(stream_request->ReleaseStream() != nullptr);

quic_data.Resume();
EXPECT_TRUE(quic_data.AllReadDataConsumed());
EXPECT_TRUE(quic_data.AllWriteDataConsumed());
}

TEST_P(QuicChromiumClientSessionTest, CancelStreamRequestBeforeRelease) {
MockQuicData quic_data;
quic_data.AddWrite(client_maker_.MakeInitialSettingsPacket(1, nullptr));
quic_data.AddWrite(client_maker_.MakeRstPacket(2, true, kClientDataStreamId1,
QUIC_STREAM_CANCELLED));
quic_data.AddRead(ASYNC, ERR_IO_PENDING);
quic_data.AddRead(ASYNC, OK); // EOF
quic_data.AddSocketDataToFactory(&socket_factory_);

Initialize();
CompleteCryptoHandshake();

// Request a stream and cancel it without releasing the stream.
std::unique_ptr<QuicChromiumClientSession::StreamRequest> stream_request =
session_->CreateStreamRequest();
TestCompletionCallback callback;
ASSERT_EQ(OK, stream_request->StartRequest(callback.callback()));
stream_request.reset();

quic_data.Resume();
EXPECT_TRUE(quic_data.AllReadDataConsumed());
EXPECT_TRUE(quic_data.AllWriteDataConsumed());
}

TEST_P(QuicChromiumClientSessionTest, AsyncStreamRequest) {
MockQuicData quic_data;
quic_data.AddWrite(client_maker_.MakeInitialSettingsPacket(1, nullptr));
quic_data.AddWrite(client_maker_.MakeRstPacket(2, true, kClientDataStreamId1,
QUIC_RST_ACKNOWLEDGEMENT));
quic_data.AddRead(ASYNC, ERR_IO_PENDING);
quic_data.AddRead(ASYNC, OK); // EOF
quic_data.AddSocketDataToFactory(&socket_factory_);

Initialize();
CompleteCryptoHandshake();

// Open the maximum number of streams so that a subsequent request
// can not proceed immediately.
const size_t kMaxOpenStreams = session_->max_open_outgoing_streams();
for (size_t i = 0; i < kMaxOpenStreams; i++) {
session_->CreateOutgoingDynamicStream(kDefaultPriority);
}
EXPECT_EQ(kMaxOpenStreams, session_->GetNumOpenOutgoingStreams());

// Request a stream and verify that it's pending.
std::unique_ptr<QuicChromiumClientSession::StreamRequest> stream_request =
session_->CreateStreamRequest();
TestCompletionCallback callback;
ASSERT_EQ(ERR_IO_PENDING, stream_request->StartRequest(callback.callback()));

// Close a stream and ensure the stream request completes.
QuicRstStreamFrame rst(kClientDataStreamId1, QUIC_STREAM_CANCELLED, 0);
session_->OnRstStream(rst);
ASSERT_TRUE(callback.have_result());
EXPECT_THAT(callback.WaitForResult(), IsOk());
EXPECT_TRUE(stream_request->ReleaseStream() != nullptr);

quic_data.Resume();
EXPECT_TRUE(quic_data.AllReadDataConsumed());
EXPECT_TRUE(quic_data.AllWriteDataConsumed());
}

TEST_P(QuicChromiumClientSessionTest, CancelPendingStreamRequest) {
MockQuicData quic_data;
quic_data.AddWrite(client_maker_.MakeInitialSettingsPacket(1, nullptr));
quic_data.AddWrite(client_maker_.MakeRstPacket(2, true, kClientDataStreamId1,
QUIC_RST_ACKNOWLEDGEMENT));
quic_data.AddRead(ASYNC, ERR_IO_PENDING);
quic_data.AddRead(ASYNC, OK); // EOF
quic_data.AddSocketDataToFactory(&socket_factory_);

Initialize();
CompleteCryptoHandshake();

// Open the maximum number of streams so that a subsequent request
// can not proceed immediately.
const size_t kMaxOpenStreams = session_->max_open_outgoing_streams();
for (size_t i = 0; i < kMaxOpenStreams; i++) {
session_->CreateOutgoingDynamicStream(kDefaultPriority);
}
EXPECT_EQ(kMaxOpenStreams, session_->GetNumOpenOutgoingStreams());

// Request a stream and verify that it's pending.
std::unique_ptr<QuicChromiumClientSession::StreamRequest> stream_request =
session_->CreateStreamRequest();
TestCompletionCallback callback;
ASSERT_EQ(ERR_IO_PENDING, stream_request->StartRequest(callback.callback()));

// Cancel the pending stream request.
stream_request.reset();

// Close a stream and ensure that no new stream is created.
QuicRstStreamFrame rst(kClientDataStreamId1, QUIC_STREAM_CANCELLED, 0);
session_->OnRstStream(rst);
EXPECT_EQ(kMaxOpenStreams - 1, session_->GetNumOpenOutgoingStreams());

quic_data.Resume();
EXPECT_TRUE(quic_data.AllReadDataConsumed());
EXPECT_TRUE(quic_data.AllWriteDataConsumed());
}

TEST_P(QuicChromiumClientSessionTest, MaxNumStreams) {
MockRead reads[] = {MockRead(SYNCHRONOUS, ERR_IO_PENDING, 0)};
std::unique_ptr<QuicEncryptedPacket> settings_packet(
Expand Down Expand Up @@ -544,12 +669,10 @@ TEST_P(QuicChromiumClientSessionTest, MaxNumStreamsViaRequest) {
streams.push_back(stream);
}

QuicChromiumClientStream* stream;
QuicChromiumClientSession::StreamRequest stream_request;
std::unique_ptr<QuicChromiumClientSession::StreamRequest> stream_request =
session_->CreateStreamRequest();
TestCompletionCallback callback;
ASSERT_EQ(ERR_IO_PENDING,
stream_request.StartRequest(session_->GetWeakPtr(), &stream,
callback.callback()));
ASSERT_EQ(ERR_IO_PENDING, stream_request->StartRequest(callback.callback()));

// Close a stream and ensure I can now open a new one.
QuicStreamId stream_id = streams[0]->id();
Expand All @@ -558,7 +681,7 @@ TEST_P(QuicChromiumClientSessionTest, MaxNumStreamsViaRequest) {
session_->OnRstStream(rst1);
ASSERT_TRUE(callback.have_result());
EXPECT_THAT(callback.WaitForResult(), IsOk());
EXPECT_TRUE(stream != nullptr);
EXPECT_TRUE(stream_request->ReleaseStream() != nullptr);
}

TEST_P(QuicChromiumClientSessionTest, GoAwayReceived) {
Expand Down
6 changes: 4 additions & 2 deletions net/quic/chromium/quic_http_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,8 @@ int QuicHttpStream::DoLoop(int rv) {

int QuicHttpStream::DoRequestStream() {
next_state_ = STATE_REQUEST_STREAM_COMPLETE;
return stream_request_.StartRequest(
session_, &stream_,
stream_request_ = session_->CreateStreamRequest();
return stream_request_->StartRequest(
base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr()));
}

Expand All @@ -641,6 +641,8 @@ int QuicHttpStream::DoRequestStreamComplete(int rv) {
return GetResponseStatus();
}

stream_ = stream_request_->ReleaseStream();
stream_request_.reset();
stream_->SetDelegate(this);
if (request_info_->load_flags & LOAD_DISABLE_CONNECTION_MIGRATION) {
stream_->DisableConnectionMigration();
Expand Down
2 changes: 1 addition & 1 deletion net/quic/chromium/quic_http_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class NET_EXPORT_PRIVATE QuicHttpStream
QuicVersion quic_version_;
int session_error_; // Error code from the connection shutdown.
bool was_handshake_confirmed_; // True if the crypto handshake succeeded.
QuicChromiumClientSession::StreamRequest stream_request_;
std::unique_ptr<QuicChromiumClientSession::StreamRequest> stream_request_;
QuicChromiumClientStream* stream_; // Non-owning.

// The following three fields are all owned by the caller and must
Expand Down

0 comments on commit 1baa747

Please sign in to comment.