Skip to content

Commit

Permalink
[remoting][FTL] Add SendMessage functionality
Browse files Browse the repository at this point in the history
This CL adds SendMessage functionality to FtlMessagingClient.

Bug: 927962
Change-Id: Iea54671d0c23a9c09a49d41754ffe725d07d6f00
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1526674
Commit-Queue: Yuwei Huang <yuweih@chromium.org>
Reviewed-by: Joe Downing <joedow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#643551}
  • Loading branch information
ywh233 authored and Commit Bot committed Mar 22, 2019
1 parent 91ddfe1 commit 4de6009
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 1 deletion.
16 changes: 16 additions & 0 deletions remoting/signaling/ftl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ message InboxMessage {
CHROMOTING_MESSAGE = 29;
}

enum MessageClass { USER = 0; }

string message_id = 1;
MessageType message_type = 2;
MessageClass message_class = 5;
Id sender_id = 8;
Id receiver_id = 9;
bytes message = 12;
Expand Down Expand Up @@ -216,3 +219,16 @@ message ReceiveMessagesResponse {
RefreshResult refresh_result = 6;
}
}

message InboxSendRequest {
Id dest_id = 1;
repeated bytes dest_registration_ids = 9;
InboxMessage message = 2;
RequestHeader header = 3;
int64 time_to_live = 5;
}

message InboxSendResponse {
ResponseHeader header = 1;
int64 timestamp = 2;
}
10 changes: 10 additions & 0 deletions remoting/signaling/ftl_grpc_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ std::string FtlGrpcContext::GetChromotingAppIdentifier() {
return kChromotingAppIdentifier;
}

// static
ftl::Id FtlGrpcContext::BuildIdFromString(const std::string& ftl_id) {
ftl::Id id;
id.set_id(ftl_id);
id.set_app(GetChromotingAppIdentifier());
// TODO(yuweih): Migrate to IdType.Type.CHROMOTING_ID.
id.set_type(ftl::IdType_Type_EMAIL);
return id;
}

FtlGrpcContext::FtlGrpcContext(OAuthTokenGetter* token_getter)
: weak_factory_(this) {
DCHECK(token_getter);
Expand Down
1 change: 1 addition & 0 deletions remoting/signaling/ftl_grpc_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class FtlGrpcContext final {
base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>;

static std::string GetChromotingAppIdentifier();
static ftl::Id BuildIdFromString(const std::string& ftl_id);

explicit FtlGrpcContext(OAuthTokenGetter* token_getter);
~FtlGrpcContext();
Expand Down
45 changes: 45 additions & 0 deletions remoting/signaling/ftl_messaging_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

#include "base/bind_helpers.h"
#include "base/callback.h"
#include "base/guid.h"
#include "base/logging.h"
#include "base/time/time.h"
#include "remoting/signaling/ftl_grpc_context.h"
#include "remoting/signaling/ftl_message_reception_channel.h"

Expand All @@ -24,6 +26,8 @@ void AddMessageToAckRequest(const ftl::InboxMessage& message,
new ftl::Id(message.receiver_id()));
}

constexpr base::TimeDelta kInboxMessageTtl = base::TimeDelta::FromMinutes(1);

} // namespace

FtlMessagingClient::FtlMessagingClient(FtlGrpcContext* context)
Expand Down Expand Up @@ -54,6 +58,40 @@ void FtlMessagingClient::PullMessages(DoneCallback on_done) {
weak_factory_.GetWeakPtr(), std::move(on_done)));
}

void FtlMessagingClient::SendMessage(
const std::string& destination,
const std::string& destination_registration_id,
const std::string& message_text,
DoneCallback on_done) {
ftl::InboxSendRequest request;
request.set_time_to_live(kInboxMessageTtl.InMicroseconds());
// TODO(yuweih): See if we need to set requester_id
(*request.mutable_dest_id()) = FtlGrpcContext::BuildIdFromString(destination);

ftl::ChromotingMessage crd_message;
crd_message.set_message(message_text);
std::string serialized_message;
bool succeeded = crd_message.SerializeToString(&serialized_message);
DCHECK(succeeded);

request.mutable_message()->set_message(serialized_message);
request.mutable_message()->set_message_id(base::GenerateGUID());
request.mutable_message()->set_message_type(
ftl::InboxMessage_MessageType_CHROMOTING_MESSAGE);
request.mutable_message()->set_message_class(
ftl::InboxMessage_MessageClass_USER);
if (!destination_registration_id.empty()) {
request.add_dest_registration_ids(destination_registration_id);
}

context_->ExecuteRpc(
base::BindOnce(&Messaging::Stub::AsyncSendMessage,
base::Unretained(messaging_stub_.get())),
request,
base::BindOnce(&FtlMessagingClient::OnSendMessageResponse,
weak_factory_.GetWeakPtr(), std::move(on_done)));
}

void FtlMessagingClient::StartReceivingMessages(DoneCallback on_done) {
reception_channel_->StartReceivingMessages(std::move(on_done));
}
Expand Down Expand Up @@ -101,6 +139,13 @@ void FtlMessagingClient::OnPullMessagesResponse(
AckMessages(ack_request, std::move(on_done));
}

void FtlMessagingClient::OnSendMessageResponse(
DoneCallback on_done,
const grpc::Status& status,
const ftl::InboxSendResponse& response) {
std::move(on_done).Run(status);
}

void FtlMessagingClient::AckMessages(const ftl::AckMessagesRequest& request,
DoneCallback on_done) {
context_->ExecuteRpc(
Expand Down
8 changes: 8 additions & 0 deletions remoting/signaling/ftl_messaging_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class FtlMessagingClient final {
// |on_done| is called once the messages have been received and acked on the
// server's inbox.
void PullMessages(DoneCallback on_done);
void SendMessage(const std::string& destination,
const std::string& destination_registration_id,
const std::string& message_text,
DoneCallback on_done);

// Opens a stream to continuously receive new messages from the server and
// calls the registered MessageCallback once a new message is received.
Expand All @@ -66,6 +70,10 @@ class FtlMessagingClient final {
const grpc::Status& status,
const ftl::PullMessagesResponse& response);

void OnSendMessageResponse(DoneCallback on_done,
const grpc::Status& status,
const ftl::InboxSendResponse& response);

void AckMessages(const ftl::AckMessagesRequest& request,
DoneCallback on_done);

Expand Down
85 changes: 85 additions & 0 deletions remoting/signaling/ftl_messaging_client_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ using AckMessagesResponder =

constexpr char kFakeSenderId[] = "fake_sender@gmail.com";
constexpr char kFakeReceiverId[] = "fake_receiver@gmail.com";
constexpr char kFakeRegistrationId[] = "fake_reg_id";
constexpr char kMessage1Id[] = "msg_1";
constexpr char kMessage2Id[] = "msg_1";
constexpr char kMessage1Text[] = "Message 1";
Expand All @@ -63,6 +64,14 @@ ftl::InboxMessage CreateMessage(const std::string& message_id,
return message;
}

std::string GetChromotingMessageText(const ftl::InboxMessage& message) {
EXPECT_EQ(ftl::InboxMessage_MessageType_CHROMOTING_MESSAGE,
message.message_type());
ftl::ChromotingMessage chromoting_message;
chromoting_message.ParseFromString(message.message());
return chromoting_message.message();
}

class MockMessageReceptionChannel : public MessageReceptionChannel {
public:
MockMessageReceptionChannel() = default;
Expand Down Expand Up @@ -104,6 +113,9 @@ class FtlMessagingClientTest : public testing::Test {
void ServerWaitAndRespondToPullMessagesRequest(
const ftl::PullMessagesResponse& response,
const grpc::Status& status);
void ServerWaitAndRespondToInboxSendRequest(
base::OnceCallback<grpc::Status(const ftl::InboxSendRequest&)> handler,
base::OnceClosure on_done);
void ServerWaitAndRespondToAckMessagesRequest(
base::OnceCallback<grpc::Status(const ftl::AckMessagesRequest&)> handler,
base::OnceClosure on_done);
Expand Down Expand Up @@ -153,6 +165,25 @@ void FtlMessagingClientTest::ServerWaitAndRespondToPullMessagesRequest(
response, status, server_.get()));
}

void FtlMessagingClientTest::ServerWaitAndRespondToInboxSendRequest(
base::OnceCallback<grpc::Status(const ftl::InboxSendRequest&)> handler,
base::OnceClosure on_done) {
server_task_runner_->PostTaskAndReply(
FROM_HERE,
base::BindOnce(
[](base::OnceCallback<grpc::Status(const ftl::InboxSendRequest&)>
handler,
test::GrpcAsyncTestServer* server) {
ftl::InboxSendRequest request;
auto responder = server->HandleRequest(
&Messaging::AsyncService::RequestSendMessage, &request);
grpc::Status status = std::move(handler).Run(request);
responder->Respond(ftl::InboxSendResponse(), status);
},
std::move(handler), server_.get()),
std::move(on_done));
}

void FtlMessagingClientTest::ServerWaitAndRespondToAckMessagesRequest(
base::OnceCallback<grpc::Status(const ftl::AckMessagesRequest&)> handler,
base::OnceClosure on_done) {
Expand Down Expand Up @@ -265,6 +296,60 @@ TEST_F(FtlMessagingClientTest, TestPullMessages_ReturnsAndAcksTwoMessages) {
run_loop.Run();
}

TEST_F(FtlMessagingClientTest, TestSendMessage_Unauthenticated) {
base::RunLoop run_loop;
messaging_client_->SendMessage(
kFakeReceiverId, kFakeRegistrationId, kMessage1Text,
test::CheckStatusThenQuitRunLoopCallback(
FROM_HERE, grpc::StatusCode::UNAUTHENTICATED, &run_loop));
ServerWaitAndRespondToInboxSendRequest(
base::BindOnce([](const ftl::InboxSendRequest&) {
return grpc::Status(grpc::StatusCode::UNAUTHENTICATED,
"Unauthenticated");
}),
base::DoNothing());
run_loop.Run();
}

TEST_F(FtlMessagingClientTest, TestSendMessage_SendOneMessageWithoutRegId) {
base::RunLoop run_loop;
messaging_client_->SendMessage(
kFakeReceiverId, "", kMessage1Text,
test::CheckStatusThenQuitRunLoopCallback(FROM_HERE, grpc::StatusCode::OK,
&run_loop));
ServerWaitAndRespondToInboxSendRequest(
base::BindOnce([](const ftl::InboxSendRequest& request) {
EXPECT_EQ(0, request.dest_registration_ids_size());
EXPECT_LT(0, request.time_to_live());
EXPECT_EQ(kFakeReceiverId, request.dest_id().id());
EXPECT_FALSE(request.message().message_id().empty());
EXPECT_EQ(kMessage1Text, GetChromotingMessageText(request.message()));
return grpc::Status::OK;
}),
base::DoNothing());
run_loop.Run();
}

TEST_F(FtlMessagingClientTest, TestSendMessage_SendOneMessageWithRegId) {
base::RunLoop run_loop;
messaging_client_->SendMessage(
kFakeReceiverId, kFakeRegistrationId, kMessage1Text,
test::CheckStatusThenQuitRunLoopCallback(FROM_HERE, grpc::StatusCode::OK,
&run_loop));
ServerWaitAndRespondToInboxSendRequest(
base::BindOnce([](const ftl::InboxSendRequest& request) {
EXPECT_EQ(1, request.dest_registration_ids_size());
EXPECT_EQ(kFakeRegistrationId, request.dest_registration_ids(0));
EXPECT_LT(0, request.time_to_live());
EXPECT_EQ(kFakeReceiverId, request.dest_id().id());
EXPECT_FALSE(request.message().message_id().empty());
EXPECT_EQ(kMessage1Text, GetChromotingMessageText(request.message()));
return grpc::Status::OK;
}),
base::DoNothing());
run_loop.Run();
}

TEST_F(FtlMessagingClientTest,
TestStartReceivingMessages_DoneCallbackForwarded) {
base::RunLoop run_loop;
Expand Down
2 changes: 2 additions & 0 deletions remoting/signaling/ftl_services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ service Messaging {
returns (remoting.ftl.AckMessagesResponse) {}
rpc ReceiveMessages(remoting.ftl.ReceiveMessagesRequest)
returns (stream remoting.ftl.ReceiveMessagesResponse) {}
rpc SendMessage(remoting.ftl.InboxSendRequest)
returns (remoting.ftl.InboxSendResponse) {}
}
64 changes: 63 additions & 1 deletion remoting/test/ftl_signaling_playground.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ void FtlSignalingPlayground::StartLoop() {
" 2. GetIceServer\n"
" 3. PullMessages\n"
" 4. ReceiveMessages\n"
" 5. Quit\n\n"
" 5. SendMessage\n"
" 6. Quit\n\n"
"Your choice [number]: ");
int choice = 0;
base::StringToInt(ReadString(), &choice);
Expand All @@ -169,6 +170,9 @@ void FtlSignalingPlayground::StartLoop() {
StartReceivingMessages(run_loop.QuitWhenIdleClosure());
break;
case 5:
SendMessage(run_loop.QuitClosure());
break;
case 6:
return;
default:
fprintf(stderr, "Unknown option\n");
Expand Down Expand Up @@ -358,6 +362,64 @@ void FtlSignalingPlayground::OnPullMessagesResponse(
std::move(on_done).Run();
}

void FtlSignalingPlayground::SendMessage(base::OnceClosure on_done) {
DCHECK(messaging_client_);
VLOG(0) << "Running SendMessage...";

printf("Receiver ID: ");
std::string receiver_id = ReadString();

printf("Receiver registration ID (base64, optional): ");
std::string registration_id_base64 = ReadString();

std::string registration_id;
bool success = base::Base64Decode(registration_id_base64, &registration_id);
if (!success) {
fprintf(stderr, "Your input can't be base64 decoded.\n");
std::move(on_done).Run();
return;
}
DoSendMessage(receiver_id, registration_id, std::move(on_done), true);
}

void FtlSignalingPlayground::DoSendMessage(const std::string& receiver_id,
const std::string& registration_id,
base::OnceClosure on_done,
bool should_keep_running) {
if (!should_keep_running) {
std::move(on_done).Run();
return;
}

printf("Message (enter nothing to quit): ");
std::string message = ReadString();

if (message.empty()) {
std::move(on_done).Run();
return;
}

auto on_continue = base::BindOnce(&FtlSignalingPlayground::DoSendMessage,
weak_factory_.GetWeakPtr(), receiver_id,
registration_id, std::move(on_done));

messaging_client_->SendMessage(
receiver_id, registration_id, message,
base::BindOnce(&FtlSignalingPlayground::OnSendMessageResponse,
weak_factory_.GetWeakPtr(), std::move(on_continue)));
}

void FtlSignalingPlayground::OnSendMessageResponse(
base::OnceCallback<void(bool)> on_continue,
const grpc::Status& status) {
if (!status.ok()) {
PrintGrpcStatusError(status);
} else {
printf("Message successfully sent.\n");
}
std::move(on_continue).Run(status.ok());
}

void FtlSignalingPlayground::StartReceivingMessages(base::OnceClosure on_done) {
VLOG(0) << "Running StartReceivingMessages...";
messaging_client_->StartReceivingMessages(
Expand Down
7 changes: 7 additions & 0 deletions remoting/test/ftl_signaling_playground.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ class FtlSignalingPlayground {
void PullMessages(base::OnceClosure on_done);
void OnPullMessagesResponse(base::OnceClosure on_done,
const grpc::Status& status);
void SendMessage(base::OnceClosure on_done);
void DoSendMessage(const std::string& receiver_id,
const std::string& registration_id,
base::OnceClosure on_done,
bool should_keep_running);
void OnSendMessageResponse(base::OnceCallback<void(bool)> on_continue,
const grpc::Status& status);
void StartReceivingMessages(base::OnceClosure on_done);
void StopReceivingMessages(base::OnceClosure on_done);
void OnMessageReceived(const std::string& sender_id,
Expand Down

0 comments on commit 4de6009

Please sign in to comment.