diff --git a/chromecast/cast_core/BUILD.gn b/chromecast/cast_core/BUILD.gn index eaa603e96725ce..db566835947f44 100644 --- a/chromecast/cast_core/BUILD.gn +++ b/chromecast/cast_core/BUILD.gn @@ -404,6 +404,7 @@ test("cast_cast_core_unittests") { "//chromecast/base/metrics", "//chromecast/browser:simple_client", "//chromecast/browser:simple_main_parts", + "//chromecast/browser:test_support", "//chromecast/shared:platform_info_serializer", "//components/cast_streaming/browser", "//components/cast_streaming/public/mojom", diff --git a/chromecast/cast_core/streaming_receiver_session_client.cc b/chromecast/cast_core/streaming_receiver_session_client.cc index de732a5eee4e39..dd74d5b8371781 100644 --- a/chromecast/cast_core/streaming_receiver_session_client.cc +++ b/chromecast/cast_core/streaming_receiver_session_client.cc @@ -7,6 +7,7 @@ #include #include "base/logging.h" +#include "base/sequenced_task_runner.h" #include "base/strings/string_util.h" #include "chromecast/shared/platform_info_serializer.h" #include "components/cast/message_port/cast_core/create_message_port_core.h" @@ -162,23 +163,32 @@ std::unique_ptr CreateReceiverSession( } // namespace +constexpr base::TimeDelta + StreamingReceiverSessionClient::kMaxAVSettingsWaitTime; + StreamingReceiverSessionClient::StreamingReceiverSessionClient( + scoped_refptr task_runner, cast_streaming::NetworkContextGetter network_context_getter, cast_streaming::ReceiverSession::MessagePortProvider message_port_provider, Handler* handler) : StreamingReceiverSessionClient( + std::move(task_runner), std::move(network_context_getter), base::BindOnce(&CreateReceiverSession, std::move(message_port_provider)), handler) {} StreamingReceiverSessionClient::StreamingReceiverSessionClient( + scoped_refptr task_runner, cast_streaming::NetworkContextGetter network_context_getter, ReceiverSessionFactory receiver_session_factory, Handler* handler) : handler_(handler), - receiver_session_factory_(std::move(receiver_session_factory)) { + task_runner_(std::move(task_runner)), + receiver_session_factory_(std::move(receiver_session_factory)), + weak_factory_(this) { DCHECK(handler_); + DCHECK(task_runner_); DCHECK(receiver_session_factory_); DCHECK(!network_context_getter.is_null()); @@ -191,6 +201,12 @@ StreamingReceiverSessionClient::StreamingReceiverSessionClient( message_port_->SetReceiver(this); handler_->StartAvSettingsQuery(std::move(server)); + + task_runner_->PostDelayedTask( + FROM_HERE, + base::BindOnce(&StreamingReceiverSessionClient::VerifyAVSettingsReceived, + weak_factory_.GetWeakPtr()), + kMaxAVSettingsWaitTime); } StreamingReceiverSessionClient::~StreamingReceiverSessionClient() { @@ -199,62 +215,99 @@ StreamingReceiverSessionClient::~StreamingReceiverSessionClient() { StreamingReceiverSessionClient::Handler::~Handler() = default; -void StreamingReceiverSessionClient::LaunchStreamingReceiver( +void StreamingReceiverSessionClient::LaunchStreamingReceiverAsync( CastWebContents* cast_web_contents) { - DCHECK(av_constraints_); - DCHECK(receiver_session_factory_); + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(cast_web_contents); + DCHECK(!is_streaming_launch_pending()); - receiver_session_ = - std::move(receiver_session_factory_).Run(av_constraints_.value()); + streaming_state_ |= LaunchState::kLaunchCalled; Observe(cast_web_contents); } void StreamingReceiverSessionClient::MainFrameReadyToCommitNavigation( content::NavigationHandle* navigation_handle) { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(is_streaming_launch_pending()); DCHECK(navigation_handle); - if (has_streaming_started_ || !receiver_session_ || + if ((streaming_state_ & LaunchState::kMojoHandleAcquired) || !cast_streaming::IsCastStreamingMediaSourceUrl( navigation_handle->GetURL())) { return; } - mojo::AssociatedRemote<::mojom::CastStreamingReceiver> - cast_streaming_receiver; navigation_handle->GetRenderFrameHost() ->GetRemoteAssociatedInterfaces() - ->GetInterface(&cast_streaming_receiver); + ->GetInterface(&cast_streaming_receiver_); + streaming_state_ |= LaunchState::kMojoHandleAcquired; + + if (!TryStartStreamingSession()) { + DCHECK(!has_received_av_settings()); + DLOG(INFO) << "AV Settings not yet received. Waiting..."; + } +} + +bool StreamingReceiverSessionClient::TryStartStreamingSession() { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + DCHECK(!has_streaming_launched()); + if (streaming_state_ != LaunchState::kReady) { + return false; + } + + DCHECK(av_constraints_); + DCHECK(receiver_session_factory_); + receiver_session_ = + std::move(receiver_session_factory_).Run(av_constraints_.value()); + DCHECK(receiver_session_); receiver_session_->SetCastStreamingReceiver( - std::move(cast_streaming_receiver)); + std::move(cast_streaming_receiver_)); - has_streaming_started_ = true; + streaming_state_ = LaunchState::kLaunched; handler_->OnStreamingSessionStarted(); + return true; +} + +void StreamingReceiverSessionClient::VerifyAVSettingsReceived() { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + if (!has_received_av_settings()) { + LOG(ERROR) << "AV Settings never received"; + TriggerError(); + } } bool StreamingReceiverSessionClient::OnMessage( base::StringPiece message, std::vector> ports) { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK(ports.empty()); + if (streaming_state_ == LaunchState::kError) { + LOG(WARNING) << "AV Settings received after an error: " << message; + return false; + } + absl::optional deserializer = PlatformInfoSerializer::TryParse(message); if (!deserializer) { - handler_->OnError(); LOG(ERROR) << "AV Settings with invalid JSON received: " << message; + TriggerError(); return false; } auto constraints = CreateConstraints(deserializer.value()); - if (!has_streaming_started_) { + streaming_state_ |= LaunchState::kAVSettingsReceived; + if (!has_streaming_launched()) { av_constraints_ = std::move(constraints); + TryStartStreamingSession(); return true; } DCHECK(av_constraints_); if (!constraints.IsSupersetOf(av_constraints_.value())) { - handler_->OnError(); LOG(WARNING) << "Device no longer supports capabilities used for " << "cast streaming session negotiation: " << message; + TriggerError(); return false; } @@ -263,6 +316,15 @@ bool StreamingReceiverSessionClient::OnMessage( void StreamingReceiverSessionClient::OnPipeError() { DLOG(WARNING) << "Pipe disconnected."; + TriggerError(); +} + +void StreamingReceiverSessionClient::TriggerError() { + if (streaming_state_ == LaunchState::kError) { + return; + } + + streaming_state_ = LaunchState::kError; handler_->OnError(); } diff --git a/chromecast/cast_core/streaming_receiver_session_client.h b/chromecast/cast_core/streaming_receiver_session_client.h index 607196a14c77e8..158b885f74b830 100644 --- a/chromecast/cast_core/streaming_receiver_session_client.h +++ b/chromecast/cast_core/streaming_receiver_session_client.h @@ -7,11 +7,18 @@ #include +#include "base/memory/scoped_refptr.h" +#include "base/sequence_checker.h" +#include "base/time/time.h" #include "chromecast/browser/cast_web_contents.h" #include "components/cast/message_port/message_port.h" #include "components/cast_streaming/browser/public/network_context_getter.h" #include "components/cast_streaming/browser/public/receiver_session.h" +namespace base { +class SequencedTaskRunner; +} // namespace base + namespace content { class NavigationHandle; } // namespace content @@ -21,7 +28,7 @@ namespace chromecast { // This class wraps all //components/cast_streaming functionality, only // expecting the caller to supply a MessagePortFactory. Internally, it // manages the lifetimes of cast streaming objects, and informs the caller -// of important events. +// of important events. Methods in this class may not be called in parallel. class StreamingReceiverSessionClient : public CastWebContents::Observer, public cast_api_bindings::MessagePort::Receiver { @@ -35,7 +42,9 @@ class StreamingReceiverSessionClient // URL. virtual void OnStreamingSessionStarted() = 0; - // Called when a fatal error occurs. + // Called when a nonrecoverable error occurs. Following this call, the + // associated StreamingReceiverSessionClient instance will be placed in an + // undefined state. virtual void OnError() = 0; // Called when an AV settings query must be started for |message_port|. @@ -43,23 +52,53 @@ class StreamingReceiverSessionClient std::unique_ptr message_port) = 0; }; + // Max time for which streaming may wait for AV Settings receipt before being + // treated as a failure. + static constexpr base::TimeDelta kMaxAVSettingsWaitTime = + base::TimeDelta::FromSeconds(5); + // Creates a new instance of this class. |handler| must persist for the // lifetime of this instance. StreamingReceiverSessionClient( + scoped_refptr task_runner, cast_streaming::NetworkContextGetter network_context_getter, cast_streaming::ReceiverSession::MessagePortProvider message_port_provider, Handler* handler); ~StreamingReceiverSessionClient() override; - // Starts the Streaming Receiver owned by this instance. May only be called - // once. At time of calling, this instance will be set as the observer of - // |cast_web_contents|, for which streaming will be started upon navigation to - // an associated URL. Following this call, the supported AV Settings are - // expected to remain constant. - void LaunchStreamingReceiver(CastWebContents* cast_web_contents); - - bool has_streaming_started() const { return has_streaming_started_; } + // Schedules starting the Streaming Receiver owned by this instance. May only + // be called once. At time of calling, this instance will be set as the + // observer of |cast_web_contents|, for which streaming will be started + // following the latter of: + // - Navigation to an associated URL by |cast_web_contents|. + // - Receipt of supported AV Settings. + // Following this call, the supported AV Settings are expected to remain + // constant. If valid AV Settings have not been received within + // |kMaxAVSettingsWaitTime| of this function call, it will be treated as an + // unrecoverable error, and this instance will be placed in an undefined + // state. + void LaunchStreamingReceiverAsync(CastWebContents* cast_web_contents); + + bool has_streaming_launched() const { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + return streaming_state_ == LaunchState::kLaunched; + } + + bool is_streaming_launch_pending() const { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + return streaming_state_ & LaunchState::kLaunchCalled; + } + + bool has_received_av_settings() const { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + return streaming_state_ & LaunchState::kAVSettingsReceived; + } + + bool is_healthy() const { + DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); + return streaming_state_ != LaunchState::kError; + } private: friend class StreamingReceiverSessionClientTest; @@ -68,12 +107,55 @@ class StreamingReceiverSessionClient base::OnceCallback( cast_streaming::ReceiverSession::AVConstraints)>; + enum LaunchState : int32_t { + kStopped = 0x00, + + // The three conditions which must be met for streaming to run. + kLaunchCalled = 0x01 << 0, + kAVSettingsReceived = 0x01 << 1, + kMojoHandleAcquired = 0x01 << 2, + + // Signifies that the above conditions have all been met. + kReady = kAVSettingsReceived | kLaunchCalled | kMojoHandleAcquired, + + // Signifies that streaming has started. + kLaunched = 0xFF, + + // Error state set after a Handler::OnError() call. + kError = 0x100 + }; + // This second ctor is required for Unit Testing. StreamingReceiverSessionClient( + scoped_refptr task_runner, cast_streaming::NetworkContextGetter network_context_getter, ReceiverSessionFactory factory, Handler* handler); + friend inline LaunchState operator&(LaunchState first, LaunchState second) { + return static_cast(static_cast(first) & + static_cast(second)); + } + + friend inline LaunchState operator|(LaunchState first, LaunchState second) { + return static_cast(static_cast(first) | + static_cast(second)); + } + + friend inline LaunchState& operator|=(LaunchState& first, + LaunchState second) { + return first = first | second; + } + + friend inline LaunchState& operator&=(LaunchState& first, + LaunchState second) { + return first = first & second; + } + + bool TryStartStreamingSession(); + void VerifyAVSettingsReceived(); + void TriggerError(); + // CastWebContents::Observer overrides. void MainFrameReadyToCommitNavigation( content::NavigationHandle* navigation_handle) override; @@ -87,20 +169,33 @@ class StreamingReceiverSessionClient // Handler for callbacks associated with this class. May be empty. Handler* const handler_; + // Task runner on which waiting for the result of an AV Settings query should + // occur. + scoped_refptr const task_runner_; + SEQUENCE_CHECKER(sequence_checker_); + // Most recently received AV Constraints, from bindings. absl::optional av_constraints_; + // The AssociatedRemote that must be provided when starting the + // |receiver_session_|. + mojo::AssociatedRemote<::mojom::CastStreamingReceiver> + cast_streaming_receiver_; + // Responsible for managing the streaming session. std::unique_ptr receiver_session_; - // MessagePort responsible for receiving AV Settings Bindings Messages + // MessagePort responsible for receiving AV Settings Bindings Messages. std::unique_ptr message_port_; // Factory method used to create a receiver session. ReceiverSessionFactory receiver_session_factory_; - bool has_streaming_started_ = false; + // Current state in initialization of |receiver_session_|. + LaunchState streaming_state_ = LaunchState::kStopped; + + base::WeakPtrFactory weak_factory_; }; } // namespace chromecast diff --git a/chromecast/cast_core/streaming_receiver_session_client_unittest.cc b/chromecast/cast_core/streaming_receiver_session_client_unittest.cc index da2ee227bb25c0..88d04709266d8e 100644 --- a/chromecast/cast_core/streaming_receiver_session_client_unittest.cc +++ b/chromecast/cast_core/streaming_receiver_session_client_unittest.cc @@ -5,6 +5,7 @@ #include "chromecast/cast_core/streaming_receiver_session_client.h" #include "base/test/task_environment.h" +#include "chromecast/browser/test/mock_cast_web_view.h" #include "chromecast/shared/platform_info_serializer.h" #include "components/cast_streaming/browser/public/receiver_session.h" #include "components/cast_streaming/public/mojom/cast_streaming_session.mojom.h" @@ -57,6 +58,7 @@ class StreamingReceiverSessionClientTest : public testing::Test { // Note: Can't use make_unique<> because the private ctor is needed. auto* client = new StreamingReceiverSessionClient( + task_environment_.GetMainThreadTaskRunner(), base::BindRepeating( []() -> network::mojom::NetworkContext* { return nullptr; }), base::BindOnce( @@ -66,18 +68,28 @@ class StreamingReceiverSessionClientTest : public testing::Test { receiver_session_client_.reset(client); } + ~StreamingReceiverSessionClientTest() { + ResetMessagePort(); + task_environment_.FastForwardBy( + StreamingReceiverSessionClient::kMaxAVSettingsWaitTime); + } + protected: - // Needed due to the complexity of mocking a CastWebContents to call - // LaunchStreamingReceiver directly. - void SetStarted() { - receiver_session_client_->has_streaming_started_ = true; - ASSERT_TRUE(receiver_session_client_->has_streaming_started()); + void SetMojoHandleAcquired() { + receiver_session_client_->streaming_state_ = + receiver_session_client_->streaming_state_ | + StreamingReceiverSessionClient::LaunchState::kMojoHandleAcquired; } bool PostMessage(base::StringPiece message) { return receiver_session_client_->OnMessage(message, {}); } + // When calling task_environment_.FastForwardBy(), OnPipeError() gets called. + // Resetting the pipe is cleaner than passing in a base::OnceCallback() to + // create the MessagePort pair. + void ResetMessagePort() { receiver_session_client_->message_port_.reset(); } + base::test::SingleThreadTaskEnvironment task_environment_{ base::test::TaskEnvironment::TimeSource::MOCK_TIME}; @@ -88,6 +100,8 @@ class StreamingReceiverSessionClientTest : public testing::Test { // Set when the session is launched. cast_streaming::ReceiverSession::AVConstraints session_constraints_; + MockCastWebView cast_web_view_; + private: std::unique_ptr CreateReceiverSession( std::unique_ptr ptr, @@ -98,9 +112,16 @@ class StreamingReceiverSessionClientTest : public testing::Test { }; TEST_F(StreamingReceiverSessionClientTest, OnSingleValidMessageEmpty) { + receiver_session_client_->LaunchStreamingReceiverAsync( + cast_web_view_.cast_web_contents()); + SetMojoHandleAcquired(); + PlatformInfoSerializer serializer; + EXPECT_FALSE(receiver_session_client_->has_received_av_settings()); + EXPECT_CALL(*receiver_session_, SetCastStreamingReceiver(_)); + EXPECT_CALL(handler_, OnStreamingSessionStarted()); EXPECT_TRUE(PostMessage(serializer.ToJson())); - receiver_session_client_->LaunchStreamingReceiver(nullptr); + EXPECT_TRUE(receiver_session_client_->has_received_av_settings()); cast_streaming::ReceiverSession::AVConstraints defaults; EXPECT_TRUE(defaults.IsSupersetOf(session_constraints_)); @@ -108,10 +129,17 @@ TEST_F(StreamingReceiverSessionClientTest, OnSingleValidMessageEmpty) { } TEST_F(StreamingReceiverSessionClientTest, OnSingleValidMessageNoCodecs) { + receiver_session_client_->LaunchStreamingReceiverAsync( + cast_web_view_.cast_web_contents()); + SetMojoHandleAcquired(); + PlatformInfoSerializer serializer; serializer.SetMaxChannels(9); + EXPECT_FALSE(receiver_session_client_->has_received_av_settings()); + EXPECT_CALL(*receiver_session_, SetCastStreamingReceiver(_)); + EXPECT_CALL(handler_, OnStreamingSessionStarted()); EXPECT_TRUE(PostMessage(serializer.ToJson())); - receiver_session_client_->LaunchStreamingReceiver(nullptr); + EXPECT_TRUE(receiver_session_client_->has_received_av_settings()); ASSERT_EQ(session_constraints_.audio_limits.size(), size_t{1}); auto& limit = session_constraints_.audio_limits.back(); @@ -120,6 +148,10 @@ TEST_F(StreamingReceiverSessionClientTest, OnSingleValidMessageNoCodecs) { } TEST_F(StreamingReceiverSessionClientTest, OnSingleValidMessageWithCodecs) { + receiver_session_client_->LaunchStreamingReceiverAsync( + cast_web_view_.cast_web_contents()); + SetMojoHandleAcquired(); + PlatformInfoSerializer serializer; std::vector audio_infos; audio_infos.push_back(PlatformInfoSerializer::AudioCodecInfo{ @@ -146,8 +178,11 @@ TEST_F(StreamingReceiverSessionClientTest, OnSingleValidMessageWithCodecs) { serializer.SetSupportedAudioCodecs(std::move(audio_infos)); serializer.SetSupportedVideoCodecs(std::move(video_infos)); + EXPECT_FALSE(receiver_session_client_->has_received_av_settings()); + EXPECT_CALL(*receiver_session_, SetCastStreamingReceiver(_)); + EXPECT_CALL(handler_, OnStreamingSessionStarted()); EXPECT_TRUE(PostMessage(serializer.ToJson())); - receiver_session_client_->LaunchStreamingReceiver(nullptr); + EXPECT_TRUE(receiver_session_client_->has_received_av_settings()); ASSERT_GE(session_constraints_.audio_codecs.size(), size_t{1}); EXPECT_EQ(session_constraints_.audio_codecs.size(), size_t{1}); @@ -173,13 +208,58 @@ TEST_F(StreamingReceiverSessionClientTest, OnSingleValidMessageWithCodecs) { } TEST_F(StreamingReceiverSessionClientTest, OnCapabilitiesDecrease) { + receiver_session_client_->LaunchStreamingReceiverAsync( + cast_web_view_.cast_web_contents()); + SetMojoHandleAcquired(); + PlatformInfoSerializer serializer; serializer.SetMaxChannels(9); + EXPECT_FALSE(receiver_session_client_->has_received_av_settings()); + EXPECT_CALL(*receiver_session_, SetCastStreamingReceiver(_)); + EXPECT_CALL(handler_, OnStreamingSessionStarted()); EXPECT_TRUE(PostMessage(serializer.ToJson())); - SetStarted(); + EXPECT_TRUE(receiver_session_client_->has_received_av_settings()); + serializer.SetMaxChannels(8); EXPECT_CALL(handler_, OnError()); EXPECT_FALSE(PostMessage(serializer.ToJson())); } +TEST_F(StreamingReceiverSessionClientTest, FailureWhenNoAvSettingsAfterLaunch) { + EXPECT_FALSE(receiver_session_client_->is_streaming_launch_pending()); + EXPECT_FALSE(receiver_session_client_->has_streaming_launched()); + EXPECT_FALSE(receiver_session_client_->has_received_av_settings()); + receiver_session_client_->LaunchStreamingReceiverAsync( + cast_web_view_.cast_web_contents()); + EXPECT_TRUE(receiver_session_client_->is_streaming_launch_pending()); + EXPECT_FALSE(receiver_session_client_->has_streaming_launched()); + EXPECT_FALSE(receiver_session_client_->has_received_av_settings()); + + ResetMessagePort(); + EXPECT_CALL(handler_, OnError()); + task_environment_.FastForwardBy( + StreamingReceiverSessionClient::kMaxAVSettingsWaitTime); +} + +TEST_F(StreamingReceiverSessionClientTest, LaunchWhenAvSettingsReceived) { + EXPECT_CALL(handler_, OnStreamingSessionStarted()); + EXPECT_FALSE(receiver_session_client_->is_streaming_launch_pending()); + EXPECT_FALSE(receiver_session_client_->has_streaming_launched()); + EXPECT_FALSE(receiver_session_client_->has_received_av_settings()); + receiver_session_client_->LaunchStreamingReceiverAsync( + cast_web_view_.cast_web_contents()); + + EXPECT_TRUE(receiver_session_client_->is_streaming_launch_pending()); + EXPECT_FALSE(receiver_session_client_->has_streaming_launched()); + EXPECT_FALSE(receiver_session_client_->has_received_av_settings()); + SetMojoHandleAcquired(); + + EXPECT_CALL(*receiver_session_, SetCastStreamingReceiver(_)); + PlatformInfoSerializer serializer; + EXPECT_TRUE(PostMessage(serializer.ToJson())); + EXPECT_TRUE(receiver_session_client_->is_streaming_launch_pending()); + EXPECT_TRUE(receiver_session_client_->has_streaming_launched()); + EXPECT_TRUE(receiver_session_client_->has_received_av_settings()); +} + } // namespace chromecast