Skip to content

Commit

Permalink
[Chromecast] Receive socket descriptors in AudioSocketService.
Browse files Browse the repository at this point in the history
Bug: b:199219433
Test: On device, cast_media_unittests
Change-Id: Id59a53345ee0923875a7e95325b38ea11517556a
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3215007
Reviewed-by: Ryan Hamilton <rch@chromium.org>
Reviewed-by: Kenneth MacKay <kmackay@chromium.org>
Reviewed-by: Yuchen Liu <yucliu@chromium.org>
Commit-Queue: Junbo Ke <juke@chromium.org>
Cr-Commit-Position: refs/heads/main@{#931274}
  • Loading branch information
Junbo Ke authored and Chromium LUCI CQ committed Oct 13, 2021
1 parent 2872a03 commit 9298abf
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 10 deletions.
4 changes: 4 additions & 0 deletions chromecast/media/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ test("cast_media_unittests") {
"//testing/gtest",
]

if (use_unix_sockets) {
deps += [ "//chromecast/media/audio/net:audio_socket_service_unittests" ]
}

if (is_android) {
deps += [
"//chromecast/media/cma/backend/android:audio_track_java",
Expand Down
16 changes: 16 additions & 0 deletions chromecast/media/audio/net/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,30 @@ cast_source_set("audio_socket_service") {
"audio_socket_service.h",
]

deps = [
"//base",
"//net",
]

if (use_unix_sockets) {
sources += [ "audio_socket_service_uds.cc" ]
deps += [ "//chromecast/net:socket_util" ]
} else {
sources += [ "audio_socket_service_tcp.cc" ]
}
}

cast_source_set("audio_socket_service_unittests") {
testonly = true

sources = [ "audio_socket_service_uds_unittest.cc" ]
deps = [
":audio_socket_service",
"//base",
"//base/test:test_support",
"//chromecast/net:socket_util",
"//net",
"//testing/gmock",
"//testing/gtest",
]
}
1 change: 1 addition & 0 deletions chromecast/media/audio/net/DEPS
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ include_rules = [
"+net/base/ip_endpoint.h",
"+net/log/net_log_source.h",
"+net/socket/server_socket.h",
"+net/socket/socket_descriptor.h",
"+net/socket/tcp_client_socket.h",
"+net/socket/tcp_server_socket.h",
"+net/socket/unix_domain_client_socket_posix.h",
Expand Down
10 changes: 4 additions & 6 deletions chromecast/media/audio/net/audio_socket_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,18 @@ void AudioSocketService::Accept() {
}

for (int i = 0; i < max_accept_loop_; ++i) {
int result = listen_socket_->Accept(
&accepted_socket_, base::BindRepeating(&AudioSocketService::OnAccept,
base::Unretained(this)));
int result = AcceptOne();
// If the result is ERR_IO_PENDING, OnAccept() will eventually be
// called; it will resume the accept loop.
if (result == net::ERR_IO_PENDING || !HandleAcceptResult(result)) {
return;
}
}
task_runner_->PostTask(FROM_HERE, base::BindOnce(&AudioSocketService::Accept,
base::Unretained(this)));
weak_factory_.GetWeakPtr()));
}

void AudioSocketService::OnAccept(int result) {
void AudioSocketService::OnAsyncAcceptComplete(int result) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
if (HandleAcceptResult(result)) {
Accept();
Expand All @@ -51,7 +49,7 @@ bool AudioSocketService::HandleAcceptResult(int result) {
LOG(ERROR) << "Accept failed: " << net::ErrorToString(result);
return false;
}
delegate_->HandleAcceptedSocket(std::move(accepted_socket_));
OnAcceptSuccess();
return true;
}

Expand Down
27 changes: 25 additions & 2 deletions chromecast/media/audio/net/audio_socket_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
#include <memory>
#include <string>

#include "base/containers/flat_map.h"
#include "base/files/file_descriptor_watcher_posix.h"
#include "base/files/scoped_file.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "net/socket/socket_descriptor.h"

namespace base {
class SequencedTaskRunner;
Expand Down Expand Up @@ -37,10 +42,17 @@ class AudioSocketService {
virtual ~Delegate() = default;
};

// When |use_socket_descriptor| is true, AudioSocketService will receive a
// socket descriptor from the connection created through |listen_socket_|, and
// then use the received socket descriptor to create the real socket for
// transferring data. This is useful when the actual client of the service is
// not able to create a socket themselves but instead needs a brokered socket
// descriptor (created with socketpair()) to connect.
AudioSocketService(const std::string& endpoint,
int port,
int max_accept_loop,
Delegate* delegate);
Delegate* delegate,
bool use_socket_descriptor = false);
AudioSocketService(const AudioSocketService&) = delete;
AudioSocketService& operator=(const AudioSocketService&) = delete;
~AudioSocketService();
Expand All @@ -55,16 +67,27 @@ class AudioSocketService {
int port);

private:
void OnAccept(int result);
void OnAsyncAcceptComplete(int result);
bool HandleAcceptResult(int result);

// The following methods are implemented in audio_socket_service_{uds|tcp}.cc.
int AcceptOne();
void OnAcceptSuccess();
void ReceiveFdFromSocket(int socket_fd);

const int max_accept_loop_;
const bool use_socket_descriptor_;
Delegate* const delegate_; // Not owned.

scoped_refptr<base::SequencedTaskRunner> task_runner_;

std::unique_ptr<net::ServerSocket> listen_socket_;
std::unique_ptr<net::StreamSocket> accepted_socket_;
net::SocketDescriptor accepted_descriptor_ = net::kInvalidSocket;
base::flat_map<int /* fd */,
std::unique_ptr<base::FileDescriptorWatcher::Controller>>
fd_watcher_controllers_;
base::WeakPtrFactory<AudioSocketService> weak_factory_{this};
};

} // namespace media
Expand Down
21 changes: 20 additions & 1 deletion chromecast/media/audio/net/audio_socket_service_tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "chromecast/media/audio/net/audio_socket_service.h"

#include "base/logging.h"
#include "base/notreached.h"
#include "base/task/sequenced_task_runner_forward.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "net/base/address_list.h"
Expand Down Expand Up @@ -36,8 +37,10 @@ std::unique_ptr<net::StreamSocket> AudioSocketService::Connect(
AudioSocketService::AudioSocketService(const std::string& endpoint,
int port,
int max_accept_loop,
Delegate* delegate)
Delegate* delegate,
bool /* use_socket_descriptor */)
: max_accept_loop_(max_accept_loop),
use_socket_descriptor_(false),
delegate_(delegate),
task_runner_(base::SequencedTaskRunnerHandle::Get()) {
DCHECK_GT(max_accept_loop_, 0);
Expand All @@ -56,5 +59,21 @@ AudioSocketService::AudioSocketService(const std::string& endpoint,
}
}

int AudioSocketService::AcceptOne() {
DCHECK(listen_socket_);
return listen_socket_->Accept(
&accepted_socket_,
base::BindOnce(&AudioSocketService::OnAsyncAcceptComplete,
base::Unretained(this)));
}

void AudioSocketService::OnAcceptSuccess() {
delegate_->HandleAcceptedSocket(std::move(accepted_socket_));
}

void AudioSocketService::ReceiveFdFromSocket(int socket_fd) {
NOTIMPLEMENTED();
}

} // namespace media
} // namespace chromecast
80 changes: 79 additions & 1 deletion chromecast/media/audio/net/audio_socket_service_uds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,21 @@

#include "chromecast/media/audio/net/audio_socket_service.h"

#include <unistd.h>

#include <cstring>
#include <utility>
#include <vector>

#include "base/bind.h"
#include "base/check.h"
#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
#include "base/posix/safe_strerror.h"
#include "base/posix/unix_domain_socket.h"
#include "base/task/sequenced_task_runner_forward.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "chromecast/net/socket_util.h"
#include "net/base/net_errors.h"
#include "net/socket/stream_socket.h"
#include "net/socket/unix_domain_client_socket_posix.h"
Expand All @@ -20,6 +29,13 @@ namespace media {

namespace {
constexpr int kListenBacklog = 10;
constexpr char kSocketMsg[] = "socket-handle";

void CloseSocket(int fd) {
int rv = IGNORE_EINTR(close(fd));
DCHECK_EQ(rv, 0) << "Error closing socket: " << base::safe_strerror(errno);
}

} // namespace

// static
Expand All @@ -33,8 +49,10 @@ std::unique_ptr<net::StreamSocket> AudioSocketService::Connect(
AudioSocketService::AudioSocketService(const std::string& endpoint,
int port,
int max_accept_loop,
Delegate* delegate)
Delegate* delegate,
bool use_socket_descriptor)
: max_accept_loop_(max_accept_loop),
use_socket_descriptor_(use_socket_descriptor),
delegate_(delegate),
task_runner_(base::SequencedTaskRunnerHandle::Get()) {
DCHECK_GT(max_accept_loop_, 0);
Expand All @@ -57,5 +75,65 @@ AudioSocketService::AudioSocketService(const std::string& endpoint,
}
}

int AudioSocketService::AcceptOne() {
DCHECK(listen_socket_);

if (use_socket_descriptor_) {
return static_cast<net::UnixDomainServerSocket*>(listen_socket_.get())
->AcceptSocketDescriptor(
&accepted_descriptor_,
base::BindOnce(&AudioSocketService::OnAsyncAcceptComplete,
base::Unretained(this)));
}
return listen_socket_->Accept(
&accepted_socket_,
base::BindOnce(&AudioSocketService::OnAsyncAcceptComplete,
base::Unretained(this)));
}

void AudioSocketService::OnAcceptSuccess() {
if (!use_socket_descriptor_) {
delegate_->HandleAcceptedSocket(std::move(accepted_socket_));
return;
}

if (accepted_descriptor_ == net::kInvalidSocket) {
LOG(ERROR) << "Accepted socket descriptor is invalid.";
return;
}
fd_watcher_controllers_.emplace(
accepted_descriptor_,
base::FileDescriptorWatcher::WatchReadable(
accepted_descriptor_,
base::BindRepeating(&AudioSocketService::ReceiveFdFromSocket,
base::Unretained(this), accepted_descriptor_)));
accepted_descriptor_ = net::kInvalidSocket;
}

void AudioSocketService::ReceiveFdFromSocket(int socket_fd) {
fd_watcher_controllers_.erase(socket_fd);

char buffer[sizeof(kSocketMsg)];
std::vector<base::ScopedFD> fds;
ssize_t res =
base::UnixDomainSocket::RecvMsg(socket_fd, buffer, sizeof(buffer), &fds);
CloseSocket(socket_fd);
if (res != sizeof(kSocketMsg)) {
LOG(ERROR) << "Failed to receive message from the descriptor " << socket_fd;
return;
}
if (memcmp(buffer, kSocketMsg, sizeof(kSocketMsg)) != 0) {
LOG(ERROR) << "Received invalid message.";
return;
}
if (fds.empty()) {
LOG(ERROR) << "No socket descriptors received.";
return;
}
for (auto& fd : fds) {
delegate_->HandleAcceptedSocket(AdoptUnnamedSocketHandle(std::move(fd)));
}
}

} // namespace media
} // namespace chromecast
Loading

0 comments on commit 9298abf

Please sign in to comment.