Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-1915 Ensure sync client always sends error before UNBIND #7262

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
35 changes: 14 additions & 21 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
util::UniqueFunction<ProgressHandler> m_progress_handler;
util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;

std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
bool m_in_debug_hook = false;
// This gets passed to the SessionImpl constructor and owned by the SessionImpl after
// actualization so that it can outlive the SessionWrapper.
std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook_for_sess_impl;

SessionReason m_session_reason;

Expand Down Expand Up @@ -984,19 +985,16 @@ void SessionImpl::on_flx_sync_version_complete(int64_t version)

SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
{
// Should never be called if session is not active
REALM_ASSERT_EX(m_state == State::Active, m_state);

// Make sure we don't call the debug hook recursively.
if (m_wrapper.m_in_debug_hook) {
if (m_in_debug_hook) {
return SyncClientHookAction::NoAction;
}
m_wrapper.m_in_debug_hook = true;
m_in_debug_hook = true;
auto in_hook_guard = util::make_scope_exit([&]() noexcept {
m_wrapper.m_in_debug_hook = false;
m_in_debug_hook = false;
});

auto action = m_wrapper.m_debug_hook(data);
auto action = m_debug_hook(data);
switch (action) {
case realm::SyncClientHookAction::SuspendWithRetryableError: {
SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
Expand All @@ -1019,13 +1017,9 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con
int64_t query_version, DownloadBatchState batch_state,
size_t num_changesets)
{
if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
return SyncClientHookAction::NoAction;
}
if (REALM_UNLIKELY(m_state != State::Active)) {
if (REALM_LIKELY(!m_debug_hook)) {
return SyncClientHookAction::NoAction;
}

SyncClientHookData data;
data.event = event;
data.batch_state = batch_state;
Expand All @@ -1038,13 +1032,11 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con

SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
{
if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
return SyncClientHookAction::NoAction;
}
if (REALM_UNLIKELY(m_state != State::Active)) {
if (REALM_LIKELY(!m_debug_hook)) {
return SyncClientHookAction::NoAction;
}


SyncClientHookData data;
data.event = event;
data.batch_state = DownloadBatchState::SteadyState;
Expand Down Expand Up @@ -1138,7 +1130,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
, m_signed_access_token{std::move(config.signed_user_token)}
, m_client_reset_config{std::move(config.client_reset_config)}
, m_proxy_config{config.proxy_config} // Throws
, m_debug_hook(std::move(config.on_sync_client_event_hook))
, m_debug_hook_for_sess_impl(std::move(config.on_sync_client_event_hook))
, m_session_reason(config.session_reason)
, m_flx_subscription_store(std::move(flx_sub_store))
, m_migration_store(std::move(migration_store))
Expand Down Expand Up @@ -1519,8 +1511,9 @@ void SessionWrapper::actualize(ServerEndpoint endpoint)
was_created); // Throws
try {
// FIXME: This only makes sense when each session uses a separate connection.
conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws
std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws
std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(
*this, conn, m_client.get_next_session_ident(), std::move(m_debug_hook_for_sess_impl)); // Throws
if (sync_mode == SyncServerMode::FLX) {
m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
}
Expand Down
2 changes: 2 additions & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ enum class SyncClientHookEvent {
ErrorMessageReceived,
SessionSuspended,
BindMessageSent,
IdentMessageSent,
ClientErrorMessageSent,
BootstrapBatchAboutToProcess,
};

Expand Down
104 changes: 57 additions & 47 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,6 @@ void Connection::initiate_session_deactivation(Session* sess)
if (sess->m_state == Session::Deactivated) {
finish_session_deactivation(sess);
}
if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
if (m_activated && m_state == ConnectionState::disconnected)
m_on_idle->trigger();
}
}


Expand Down Expand Up @@ -403,6 +399,11 @@ void ClientImpl::Connection::finish_session_deactivation(Session* sess)
auto ident = sess->m_ident;
m_sessions.erase(ident);
m_session_history.erase(ident);

if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
if (m_activated && m_state == ConnectionState::disconnected)
m_on_idle->trigger();
}
}

void Connection::force_close()
Expand All @@ -424,20 +425,13 @@ void Connection::force_close()
m_disconnect_delay_in_progress = false;
}

// We must copy any session pointers we want to close to a vector because force_closing
// the session may remove it from m_sessions and invalidate the iterator uses to loop
// through the map. By copying to a separate vector we ensure our iterators remain valid.
std::vector<Session*> to_close;
for (auto& session_pair : m_sessions) {
if (session_pair.second->m_state == Session::State::Active) {
to_close.push_back(session_pair.second.get());
for (auto it = m_sessions.begin(); it != m_sessions.end();) {
auto cur_sess_it = it++;
if (cur_sess_it->second->m_state == Session::Active) {
cur_sess_it->second->force_close();
}
}

for (auto& sess : to_close) {
sess->force_close();
}

logger.debug("Force closed idle connection");
}

Expand Down Expand Up @@ -832,9 +826,9 @@ void Connection::handle_connection_established()
fast_reconnect = true;
}

for (auto& p : m_sessions) {
Session& sess = *p.second;
sess.connection_established(fast_reconnect); // Throws
for (auto it = m_sessions.begin(); it != m_sessions.end();) {
auto cur_sess_it = it++;
cur_sess_it->second->connection_established(fast_reconnect);
}

report_connection_state_change(ConnectionState::connected); // Throws
Expand Down Expand Up @@ -1174,8 +1168,11 @@ void Connection::disconnect(const SessionErrorInfo& info)
auto j = i++;
Session& sess = *j->second;
sess.connection_lost(); // Throws
if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated) {
m_sessions.erase(j);
REALM_ASSERT(m_num_active_sessions);
--m_num_active_sessions;
}
}
}

Expand Down Expand Up @@ -1592,7 +1589,7 @@ void Session::on_integration_failure(const IntegrationException& error)
// Since the deactivation process has not been initiated, the UNBIND
// message cannot have been sent unless an ERROR message was received.
REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
if (m_bind_message_sent && !m_error_message_received && !m_suspended) {
ensure_enlisted_to_send(); // Throws
}
}
Expand Down Expand Up @@ -1671,7 +1668,7 @@ void Session::activate()
m_download_progress = m_progress.download;
REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);

logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
logger.debug("progress_download_client_version = %1",
m_progress.download.last_integrated_client_version); // Throws
Expand Down Expand Up @@ -1710,9 +1707,6 @@ void Session::initiate_deactivation()

m_state = Deactivating;

if (!m_suspended)
m_conn.one_less_active_unsuspended_session(); // Throws

if (m_enlisted_to_send) {
REALM_ASSERT(!unbind_process_complete());
return;
Expand All @@ -1721,14 +1715,17 @@ void Session::initiate_deactivation()
// Deactivate immediately if the BIND message has not yet been sent and the
// session is not enlisted to send, or if the unbinding process has already
// completed.
if (!m_bind_message_sent || unbind_process_complete()) {
if ((!m_bind_message_sent || unbind_process_complete()) && !pending_client_error()) {
complete_deactivation(); // Throws
// Life cycle state is now Deactivated
return;
}

// Ready to send the UNBIND message, if it has not already been sent
if (!m_unbind_message_sent) {
// Ready to send the UNBIND message, if it has not already been sent, unless we've
// never sent the BIND message but still have an error message to send. In that case
// when the connection becomes connected we'll send the error message and immediately
// complete de-activation.
if (!m_unbind_message_sent && m_bind_message_sent) {
enlist_to_send(); // Throws
return;
}
Expand All @@ -1739,8 +1736,9 @@ void Session::complete_deactivation()
{
REALM_ASSERT_EX(m_state == Deactivating, m_state);
m_state = Deactivated;

logger.debug("Deactivation completed"); // Throws
if (!m_suspended)
m_conn.one_less_active_unsuspended_session(); // Throws
logger.debug("Deactivation completed"); // Throws
}


Expand All @@ -1754,11 +1752,17 @@ void Session::send_message()
REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
REALM_ASSERT(m_enlisted_to_send);
m_enlisted_to_send = false;

if (m_error_to_send) {
send_json_error_message(); // Throws
return;
}

if (m_state == Deactivating || m_error_message_received || m_suspended) {
// Deactivation has been initiated. If the UNBIND message has not been
// sent yet, there is no point in sending it. Instead, we can let the
// deactivation process complete.
if (!m_bind_message_sent) {
if (!m_bind_message_sent && !pending_client_error()) {
return complete_deactivation(); // Throws
// Life cycle state is now Deactivated
}
Expand All @@ -1777,6 +1781,12 @@ void Session::send_message()
if (!m_bind_message_sent)
return send_bind_message(); // Throws


// Stop sending upload, mark and query messages when the client detects an error.
if (m_error_message_sent) {
return;
}

if (!m_ident_message_sent) {
if (have_client_file_ident())
send_ident_message(); // Throws
Expand All @@ -1791,13 +1801,6 @@ void Session::send_message()
return send_test_command_message();
}

if (m_error_to_send)
return send_json_error_message(); // Throws

// Stop sending upload, mark and query messages when the client detects an error.
if (m_client_error) {
return;
}

if (m_target_download_mark > m_last_download_mark_sent)
return send_mark_message(); // Throws
Expand Down Expand Up @@ -1943,7 +1946,8 @@ void Session::send_ident_message()
m_conn.initiate_write_message(out, this); // Throws

m_ident_message_sent = true;

call_debug_hook(SyncClientHookEvent::IdentMessageSent, m_progress, m_last_sent_flx_query_version,
DownloadBatchState::SteadyState, 0);
// Other messages may be waiting to be sent
enlist_to_send(); // Throws
}
Expand Down Expand Up @@ -2153,29 +2157,35 @@ void Session::send_unbind_message()

void Session::send_json_error_message()
{
REALM_ASSERT_EX(m_state == Active, m_state);
REALM_ASSERT(m_ident_message_sent);
REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
REALM_ASSERT(!m_unbind_message_sent);
REALM_ASSERT(m_error_to_send);
REALM_ASSERT(m_client_error);

auto client_error = std::move(m_client_error);
ClientProtocol& protocol = m_conn.get_client_protocol();
OutputBuffer& out = m_conn.get_output_buffer();
session_ident_type session_ident = get_ident();
auto protocol_error = m_client_error->error_for_server;
auto protocol_error = static_cast<int>(client_error->error_for_server);

auto message = util::format("%1", m_client_error->to_status());
logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
auto message = util::format("%1", client_error->to_status());
logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, protocol_error,
session_ident); // Throws

nlohmann::json error_body_json;
error_body_json["message"] = std::move(message);
protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
protocol.make_json_error_message(out, session_ident, protocol_error,
error_body_json.dump()); // Throws
m_conn.initiate_write_message(out, this); // Throws

m_error_to_send = false;
enlist_to_send(); // Throws
m_error_message_sent = true;

call_debug_hook(SyncClientHookEvent::ClientErrorMessageSent,
ProtocolErrorInfo(protocol_error, message, IsFatal{false}));

if (m_state == Active && m_bind_message_sent) {
enlist_to_send(); // Throws
}
}


Expand Down Expand Up @@ -2346,7 +2356,7 @@ Status Session::receive_download_message(const SyncProgress& progress, std::uint

// Ignore download messages when the client detects an error. This is to prevent transforming the same bad
// changeset over and over again.
if (m_client_error) {
if (m_error_message_sent) {
logger.debug("Ignoring download message because the client detected an integration error");
return Status::OK();
}
Expand Down
Loading
Loading