Skip to content

Commit

Permalink
[CWR] Add Async Wait to StreamingReceiverSessionClient
Browse files Browse the repository at this point in the history
This CL adds an asynchronous wait to the existing
StreamingReceiverSessionClient class.

Currently, the StreamingReceiverSessionClient class expects that the
AV Settings query will return prior to the initialization of streaming.
This has the potential to create a race condition, so this CL instead
adds a wait (up to 5 seconds) for the AV Settings to return following
the initial call, after which time it is treated as an error.

Bug: b/182426595
Change-Id: I56602241a8e64e492a90d8e5a47ebe9f034e56c3
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3120331
Commit-Queue: Ryan Keane <rwkeane@google.com>
Reviewed-by: Yuchen Liu <yucliu@chromium.org>
Cr-Commit-Position: refs/heads/main@{#919544}
  • Loading branch information
Ryan Keane authored and Chromium LUCI CQ committed Sep 9, 2021
1 parent b2ddcbb commit 93f4ba2
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 36 deletions.
1 change: 1 addition & 0 deletions chromecast/cast_core/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ test("cast_cast_core_unittests") {
"//chromecast/base/metrics",
"//chromecast/browser:simple_client",
"//chromecast/browser:simple_main_parts",
"//chromecast/browser:test_support",
"//chromecast/shared:platform_info_serializer",
"//components/cast_streaming/browser",
"//components/cast_streaming/public/mojom",
Expand Down
92 changes: 77 additions & 15 deletions chromecast/cast_core/streaming_receiver_session_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <utility>

#include "base/logging.h"
#include "base/sequenced_task_runner.h"
#include "base/strings/string_util.h"
#include "chromecast/shared/platform_info_serializer.h"
#include "components/cast/message_port/cast_core/create_message_port_core.h"
Expand Down Expand Up @@ -162,23 +163,32 @@ std::unique_ptr<cast_streaming::ReceiverSession> CreateReceiverSession(

} // namespace

constexpr base::TimeDelta
StreamingReceiverSessionClient::kMaxAVSettingsWaitTime;

StreamingReceiverSessionClient::StreamingReceiverSessionClient(
scoped_refptr<base::SequencedTaskRunner> task_runner,
cast_streaming::NetworkContextGetter network_context_getter,
cast_streaming::ReceiverSession::MessagePortProvider message_port_provider,
Handler* handler)
: StreamingReceiverSessionClient(
std::move(task_runner),
std::move(network_context_getter),
base::BindOnce(&CreateReceiverSession,
std::move(message_port_provider)),
handler) {}

StreamingReceiverSessionClient::StreamingReceiverSessionClient(
scoped_refptr<base::SequencedTaskRunner> task_runner,
cast_streaming::NetworkContextGetter network_context_getter,
ReceiverSessionFactory receiver_session_factory,
Handler* handler)
: handler_(handler),
receiver_session_factory_(std::move(receiver_session_factory)) {
task_runner_(std::move(task_runner)),
receiver_session_factory_(std::move(receiver_session_factory)),
weak_factory_(this) {
DCHECK(handler_);
DCHECK(task_runner_);
DCHECK(receiver_session_factory_);
DCHECK(!network_context_getter.is_null());

Expand All @@ -191,6 +201,12 @@ StreamingReceiverSessionClient::StreamingReceiverSessionClient(
message_port_->SetReceiver(this);

handler_->StartAvSettingsQuery(std::move(server));

task_runner_->PostDelayedTask(
FROM_HERE,
base::BindOnce(&StreamingReceiverSessionClient::VerifyAVSettingsReceived,
weak_factory_.GetWeakPtr()),
kMaxAVSettingsWaitTime);
}

StreamingReceiverSessionClient::~StreamingReceiverSessionClient() {
Expand All @@ -199,62 +215,99 @@ StreamingReceiverSessionClient::~StreamingReceiverSessionClient() {

StreamingReceiverSessionClient::Handler::~Handler() = default;

void StreamingReceiverSessionClient::LaunchStreamingReceiver(
void StreamingReceiverSessionClient::LaunchStreamingReceiverAsync(
CastWebContents* cast_web_contents) {
DCHECK(av_constraints_);
DCHECK(receiver_session_factory_);
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(cast_web_contents);
DCHECK(!is_streaming_launch_pending());

receiver_session_ =
std::move(receiver_session_factory_).Run(av_constraints_.value());
streaming_state_ |= LaunchState::kLaunchCalled;
Observe(cast_web_contents);
}

void StreamingReceiverSessionClient::MainFrameReadyToCommitNavigation(
content::NavigationHandle* navigation_handle) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(is_streaming_launch_pending());
DCHECK(navigation_handle);

if (has_streaming_started_ || !receiver_session_ ||
if ((streaming_state_ & LaunchState::kMojoHandleAcquired) ||
!cast_streaming::IsCastStreamingMediaSourceUrl(
navigation_handle->GetURL())) {
return;
}

mojo::AssociatedRemote<::mojom::CastStreamingReceiver>
cast_streaming_receiver;
navigation_handle->GetRenderFrameHost()
->GetRemoteAssociatedInterfaces()
->GetInterface(&cast_streaming_receiver);
->GetInterface(&cast_streaming_receiver_);
streaming_state_ |= LaunchState::kMojoHandleAcquired;

if (!TryStartStreamingSession()) {
DCHECK(!has_received_av_settings());
DLOG(INFO) << "AV Settings not yet received. Waiting...";
}
}

bool StreamingReceiverSessionClient::TryStartStreamingSession() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!has_streaming_launched());
if (streaming_state_ != LaunchState::kReady) {
return false;
}

DCHECK(av_constraints_);
DCHECK(receiver_session_factory_);
receiver_session_ =
std::move(receiver_session_factory_).Run(av_constraints_.value());
DCHECK(receiver_session_);
receiver_session_->SetCastStreamingReceiver(
std::move(cast_streaming_receiver));
std::move(cast_streaming_receiver_));

has_streaming_started_ = true;
streaming_state_ = LaunchState::kLaunched;
handler_->OnStreamingSessionStarted();
return true;
}

void StreamingReceiverSessionClient::VerifyAVSettingsReceived() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!has_received_av_settings()) {
LOG(ERROR) << "AV Settings never received";
TriggerError();
}
}

bool StreamingReceiverSessionClient::OnMessage(
base::StringPiece message,
std::vector<std::unique_ptr<cast_api_bindings::MessagePort>> ports) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(ports.empty());

if (streaming_state_ == LaunchState::kError) {
LOG(WARNING) << "AV Settings received after an error: " << message;
return false;
}

absl::optional<PlatformInfoSerializer> deserializer =
PlatformInfoSerializer::TryParse(message);
if (!deserializer) {
handler_->OnError();
LOG(ERROR) << "AV Settings with invalid JSON received: " << message;
TriggerError();
return false;
}

auto constraints = CreateConstraints(deserializer.value());
if (!has_streaming_started_) {
streaming_state_ |= LaunchState::kAVSettingsReceived;
if (!has_streaming_launched()) {
av_constraints_ = std::move(constraints);
TryStartStreamingSession();
return true;
}

DCHECK(av_constraints_);
if (!constraints.IsSupersetOf(av_constraints_.value())) {
handler_->OnError();
LOG(WARNING) << "Device no longer supports capabilities used for "
<< "cast streaming session negotiation: " << message;
TriggerError();
return false;
}

Expand All @@ -263,6 +316,15 @@ bool StreamingReceiverSessionClient::OnMessage(

void StreamingReceiverSessionClient::OnPipeError() {
DLOG(WARNING) << "Pipe disconnected.";
TriggerError();
}

void StreamingReceiverSessionClient::TriggerError() {
if (streaming_state_ == LaunchState::kError) {
return;
}

streaming_state_ = LaunchState::kError;
handler_->OnError();
}

Expand Down
119 changes: 107 additions & 12 deletions chromecast/cast_core/streaming_receiver_session_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@

#include <memory>

#include "base/memory/scoped_refptr.h"
#include "base/sequence_checker.h"
#include "base/time/time.h"
#include "chromecast/browser/cast_web_contents.h"
#include "components/cast/message_port/message_port.h"
#include "components/cast_streaming/browser/public/network_context_getter.h"
#include "components/cast_streaming/browser/public/receiver_session.h"

namespace base {
class SequencedTaskRunner;
} // namespace base

namespace content {
class NavigationHandle;
} // namespace content
Expand All @@ -21,7 +28,7 @@ namespace chromecast {
// This class wraps all //components/cast_streaming functionality, only
// expecting the caller to supply a MessagePortFactory. Internally, it
// manages the lifetimes of cast streaming objects, and informs the caller
// of important events.
// of important events. Methods in this class may not be called in parallel.
class StreamingReceiverSessionClient
: public CastWebContents::Observer,
public cast_api_bindings::MessagePort::Receiver {
Expand All @@ -35,31 +42,63 @@ class StreamingReceiverSessionClient
// URL.
virtual void OnStreamingSessionStarted() = 0;

// Called when a fatal error occurs.
// Called when a nonrecoverable error occurs. Following this call, the
// associated StreamingReceiverSessionClient instance will be placed in an
// undefined state.
virtual void OnError() = 0;

// Called when an AV settings query must be started for |message_port|.
virtual void StartAvSettingsQuery(
std::unique_ptr<cast_api_bindings::MessagePort> message_port) = 0;
};

// Max time for which streaming may wait for AV Settings receipt before being
// treated as a failure.
static constexpr base::TimeDelta kMaxAVSettingsWaitTime =
base::TimeDelta::FromSeconds(5);

// Creates a new instance of this class. |handler| must persist for the
// lifetime of this instance.
StreamingReceiverSessionClient(
scoped_refptr<base::SequencedTaskRunner> task_runner,
cast_streaming::NetworkContextGetter network_context_getter,
cast_streaming::ReceiverSession::MessagePortProvider
message_port_provider,
Handler* handler);
~StreamingReceiverSessionClient() override;

// Starts the Streaming Receiver owned by this instance. May only be called
// once. At time of calling, this instance will be set as the observer of
// |cast_web_contents|, for which streaming will be started upon navigation to
// an associated URL. Following this call, the supported AV Settings are
// expected to remain constant.
void LaunchStreamingReceiver(CastWebContents* cast_web_contents);

bool has_streaming_started() const { return has_streaming_started_; }
// Schedules starting the Streaming Receiver owned by this instance. May only
// be called once. At time of calling, this instance will be set as the
// observer of |cast_web_contents|, for which streaming will be started
// following the latter of:
// - Navigation to an associated URL by |cast_web_contents|.
// - Receipt of supported AV Settings.
// Following this call, the supported AV Settings are expected to remain
// constant. If valid AV Settings have not been received within
// |kMaxAVSettingsWaitTime| of this function call, it will be treated as an
// unrecoverable error, and this instance will be placed in an undefined
// state.
void LaunchStreamingReceiverAsync(CastWebContents* cast_web_contents);

bool has_streaming_launched() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return streaming_state_ == LaunchState::kLaunched;
}

bool is_streaming_launch_pending() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return streaming_state_ & LaunchState::kLaunchCalled;
}

bool has_received_av_settings() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return streaming_state_ & LaunchState::kAVSettingsReceived;
}

bool is_healthy() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return streaming_state_ != LaunchState::kError;
}

private:
friend class StreamingReceiverSessionClientTest;
Expand All @@ -68,12 +107,55 @@ class StreamingReceiverSessionClient
base::OnceCallback<std::unique_ptr<cast_streaming::ReceiverSession>(
cast_streaming::ReceiverSession::AVConstraints)>;

enum LaunchState : int32_t {
kStopped = 0x00,

// The three conditions which must be met for streaming to run.
kLaunchCalled = 0x01 << 0,
kAVSettingsReceived = 0x01 << 1,
kMojoHandleAcquired = 0x01 << 2,

// Signifies that the above conditions have all been met.
kReady = kAVSettingsReceived | kLaunchCalled | kMojoHandleAcquired,

// Signifies that streaming has started.
kLaunched = 0xFF,

// Error state set after a Handler::OnError() call.
kError = 0x100
};

// This second ctor is required for Unit Testing.
StreamingReceiverSessionClient(
scoped_refptr<base::SequencedTaskRunner> task_runner,
cast_streaming::NetworkContextGetter network_context_getter,
ReceiverSessionFactory factory,
Handler* handler);

friend inline LaunchState operator&(LaunchState first, LaunchState second) {
return static_cast<LaunchState>(static_cast<int32_t>(first) &
static_cast<int32_t>(second));
}

friend inline LaunchState operator|(LaunchState first, LaunchState second) {
return static_cast<LaunchState>(static_cast<int32_t>(first) |
static_cast<int32_t>(second));
}

friend inline LaunchState& operator|=(LaunchState& first,
LaunchState second) {
return first = first | second;
}

friend inline LaunchState& operator&=(LaunchState& first,
LaunchState second) {
return first = first & second;
}

bool TryStartStreamingSession();
void VerifyAVSettingsReceived();
void TriggerError();

// CastWebContents::Observer overrides.
void MainFrameReadyToCommitNavigation(
content::NavigationHandle* navigation_handle) override;
Expand All @@ -87,20 +169,33 @@ class StreamingReceiverSessionClient
// Handler for callbacks associated with this class. May be empty.
Handler* const handler_;

// Task runner on which waiting for the result of an AV Settings query should
// occur.
scoped_refptr<base::SequencedTaskRunner> const task_runner_;
SEQUENCE_CHECKER(sequence_checker_);

// Most recently received AV Constraints, from bindings.
absl::optional<cast_streaming::ReceiverSession::AVConstraints>
av_constraints_;

// The AssociatedRemote that must be provided when starting the
// |receiver_session_|.
mojo::AssociatedRemote<::mojom::CastStreamingReceiver>
cast_streaming_receiver_;

// Responsible for managing the streaming session.
std::unique_ptr<cast_streaming::ReceiverSession> receiver_session_;

// MessagePort responsible for receiving AV Settings Bindings Messages
// MessagePort responsible for receiving AV Settings Bindings Messages.
std::unique_ptr<cast_api_bindings::MessagePort> message_port_;

// Factory method used to create a receiver session.
ReceiverSessionFactory receiver_session_factory_;

bool has_streaming_started_ = false;
// Current state in initialization of |receiver_session_|.
LaunchState streaming_state_ = LaunchState::kStopped;

base::WeakPtrFactory<StreamingReceiverSessionClient> weak_factory_;
};

} // namespace chromecast
Expand Down
Loading

0 comments on commit 93f4ba2

Please sign in to comment.