diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index b0fd90b7002..45fb761e3aa 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -162,8 +162,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener util::UniqueFunction m_progress_handler; util::UniqueFunction m_connection_state_change_listener; - std::function m_debug_hook; - bool m_in_debug_hook = false; + // This gets passed to the SessionImpl constructor and owned by the SessionImpl after + // actualization so that it can outlive the SessionWrapper. + std::function m_debug_hook_for_sess_impl; SessionReason m_session_reason; @@ -984,19 +985,16 @@ void SessionImpl::on_flx_sync_version_complete(int64_t version) SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data) { - // Should never be called if session is not active - REALM_ASSERT_EX(m_state == State::Active, m_state); - // Make sure we don't call the debug hook recursively. - if (m_wrapper.m_in_debug_hook) { + if (m_in_debug_hook) { return SyncClientHookAction::NoAction; } - m_wrapper.m_in_debug_hook = true; + m_in_debug_hook = true; auto in_hook_guard = util::make_scope_exit([&]() noexcept { - m_wrapper.m_in_debug_hook = false; + m_in_debug_hook = false; }); - auto action = m_wrapper.m_debug_hook(data); + auto action = m_debug_hook(data); switch (action) { case realm::SyncClientHookAction::SuspendWithRetryableError: { SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false}); @@ -1019,13 +1017,9 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con int64_t query_version, DownloadBatchState batch_state, size_t num_changesets) { - if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { - return SyncClientHookAction::NoAction; - } - if (REALM_UNLIKELY(m_state != State::Active)) { + if (REALM_LIKELY(!m_debug_hook)) { return SyncClientHookAction::NoAction; } - SyncClientHookData data; data.event = event; data.batch_state = batch_state; @@ -1038,13 +1032,11 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info) { - if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { - return SyncClientHookAction::NoAction; - } - if (REALM_UNLIKELY(m_state != State::Active)) { + if (REALM_LIKELY(!m_debug_hook)) { return SyncClientHookAction::NoAction; } + SyncClientHookData data; data.event = event; data.batch_state = DownloadBatchState::SteadyState; @@ -1138,7 +1130,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr sess = std::make_unique(*this, conn); // Throws + conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws + std::unique_ptr sess = std::make_unique( + *this, conn, m_client.get_next_session_ident(), std::move(m_debug_hook_for_sess_impl)); // Throws if (sync_mode == SyncServerMode::FLX) { m_flx_pending_bootstrap_store = std::make_unique(m_db, sess->logger); } diff --git a/src/realm/sync/config.hpp b/src/realm/sync/config.hpp index 1ac0c374468..75371cc90ff 100644 --- a/src/realm/sync/config.hpp +++ b/src/realm/sync/config.hpp @@ -131,6 +131,8 @@ enum class SyncClientHookEvent { ErrorMessageReceived, SessionSuspended, BindMessageSent, + IdentMessageSent, + ClientErrorMessageSent, BootstrapBatchAboutToProcess, }; diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 5f7c03c8b33..2879811ec99 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -350,10 +350,6 @@ void Connection::initiate_session_deactivation(Session* sess) if (sess->m_state == Session::Deactivated) { finish_session_deactivation(sess); } - if (REALM_UNLIKELY(--m_num_active_sessions == 0)) { - if (m_activated && m_state == ConnectionState::disconnected) - m_on_idle->trigger(); - } } @@ -403,6 +399,11 @@ void ClientImpl::Connection::finish_session_deactivation(Session* sess) auto ident = sess->m_ident; m_sessions.erase(ident); m_session_history.erase(ident); + + if (REALM_UNLIKELY(--m_num_active_sessions == 0)) { + if (m_activated && m_state == ConnectionState::disconnected) + m_on_idle->trigger(); + } } void Connection::force_close() @@ -424,20 +425,13 @@ void Connection::force_close() m_disconnect_delay_in_progress = false; } - // We must copy any session pointers we want to close to a vector because force_closing - // the session may remove it from m_sessions and invalidate the iterator uses to loop - // through the map. By copying to a separate vector we ensure our iterators remain valid. - std::vector to_close; - for (auto& session_pair : m_sessions) { - if (session_pair.second->m_state == Session::State::Active) { - to_close.push_back(session_pair.second.get()); + for (auto it = m_sessions.begin(); it != m_sessions.end();) { + auto cur_sess_it = it++; + if (cur_sess_it->second->m_state == Session::Active) { + cur_sess_it->second->force_close(); } } - for (auto& sess : to_close) { - sess->force_close(); - } - logger.debug("Force closed idle connection"); } @@ -832,9 +826,9 @@ void Connection::handle_connection_established() fast_reconnect = true; } - for (auto& p : m_sessions) { - Session& sess = *p.second; - sess.connection_established(fast_reconnect); // Throws + for (auto it = m_sessions.begin(); it != m_sessions.end();) { + auto cur_sess_it = it++; + cur_sess_it->second->connection_established(fast_reconnect); } report_connection_state_change(ConnectionState::connected); // Throws @@ -1174,8 +1168,11 @@ void Connection::disconnect(const SessionErrorInfo& info) auto j = i++; Session& sess = *j->second; sess.connection_lost(); // Throws - if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated) + if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated) { m_sessions.erase(j); + REALM_ASSERT(m_num_active_sessions); + --m_num_active_sessions; + } } } @@ -1592,7 +1589,7 @@ void Session::on_integration_failure(const IntegrationException& error) // Since the deactivation process has not been initiated, the UNBIND // message cannot have been sent unless an ERROR message was received. REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent); - if (m_ident_message_sent && !m_error_message_received && !m_suspended) { + if (m_bind_message_sent && !m_error_message_received && !m_suspended) { ensure_enlisted_to_send(); // Throws } } @@ -1671,7 +1668,7 @@ void Session::activate() m_download_progress = m_progress.download; REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version); - logger.debug("last_version_available = %1", m_last_version_available); // Throws + logger.debug("last_version_available = %1", m_last_version_available); // Throws logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws logger.debug("progress_download_client_version = %1", m_progress.download.last_integrated_client_version); // Throws @@ -1710,9 +1707,6 @@ void Session::initiate_deactivation() m_state = Deactivating; - if (!m_suspended) - m_conn.one_less_active_unsuspended_session(); // Throws - if (m_enlisted_to_send) { REALM_ASSERT(!unbind_process_complete()); return; @@ -1721,14 +1715,17 @@ void Session::initiate_deactivation() // Deactivate immediately if the BIND message has not yet been sent and the // session is not enlisted to send, or if the unbinding process has already // completed. - if (!m_bind_message_sent || unbind_process_complete()) { + if ((!m_bind_message_sent || unbind_process_complete()) && !pending_client_error()) { complete_deactivation(); // Throws // Life cycle state is now Deactivated return; } - // Ready to send the UNBIND message, if it has not already been sent - if (!m_unbind_message_sent) { + // Ready to send the UNBIND message, if it has not already been sent, unless we've + // never sent the BIND message but still have an error message to send. In that case + // when the connection becomes connected we'll send the error message and immediately + // complete de-activation. + if (!m_unbind_message_sent && m_bind_message_sent) { enlist_to_send(); // Throws return; } @@ -1739,8 +1736,9 @@ void Session::complete_deactivation() { REALM_ASSERT_EX(m_state == Deactivating, m_state); m_state = Deactivated; - - logger.debug("Deactivation completed"); // Throws + if (!m_suspended) + m_conn.one_less_active_unsuspended_session(); // Throws + logger.debug("Deactivation completed"); // Throws } @@ -1754,11 +1752,17 @@ void Session::send_message() REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state); REALM_ASSERT(m_enlisted_to_send); m_enlisted_to_send = false; + + if (m_error_to_send) { + send_json_error_message(); // Throws + return; + } + if (m_state == Deactivating || m_error_message_received || m_suspended) { // Deactivation has been initiated. If the UNBIND message has not been // sent yet, there is no point in sending it. Instead, we can let the // deactivation process complete. - if (!m_bind_message_sent) { + if (!m_bind_message_sent && !pending_client_error()) { return complete_deactivation(); // Throws // Life cycle state is now Deactivated } @@ -1777,6 +1781,12 @@ void Session::send_message() if (!m_bind_message_sent) return send_bind_message(); // Throws + + // Stop sending upload, mark and query messages when the client detects an error. + if (m_error_message_sent) { + return; + } + if (!m_ident_message_sent) { if (have_client_file_ident()) send_ident_message(); // Throws @@ -1791,13 +1801,6 @@ void Session::send_message() return send_test_command_message(); } - if (m_error_to_send) - return send_json_error_message(); // Throws - - // Stop sending upload, mark and query messages when the client detects an error. - if (m_client_error) { - return; - } if (m_target_download_mark > m_last_download_mark_sent) return send_mark_message(); // Throws @@ -1943,7 +1946,8 @@ void Session::send_ident_message() m_conn.initiate_write_message(out, this); // Throws m_ident_message_sent = true; - + call_debug_hook(SyncClientHookEvent::IdentMessageSent, m_progress, m_last_sent_flx_query_version, + DownloadBatchState::SteadyState, 0); // Other messages may be waiting to be sent enlist_to_send(); // Throws } @@ -2153,29 +2157,35 @@ void Session::send_unbind_message() void Session::send_json_error_message() { - REALM_ASSERT_EX(m_state == Active, m_state); - REALM_ASSERT(m_ident_message_sent); + REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state); REALM_ASSERT(!m_unbind_message_sent); REALM_ASSERT(m_error_to_send); REALM_ASSERT(m_client_error); + auto client_error = std::move(m_client_error); ClientProtocol& protocol = m_conn.get_client_protocol(); OutputBuffer& out = m_conn.get_output_buffer(); session_ident_type session_ident = get_ident(); - auto protocol_error = m_client_error->error_for_server; + auto protocol_error = static_cast(client_error->error_for_server); - auto message = util::format("%1", m_client_error->to_status()); - logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast(protocol_error), + auto message = util::format("%1", client_error->to_status()); + logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, protocol_error, session_ident); // Throws nlohmann::json error_body_json; error_body_json["message"] = std::move(message); - protocol.make_json_error_message(out, session_ident, static_cast(protocol_error), + protocol.make_json_error_message(out, session_ident, protocol_error, error_body_json.dump()); // Throws m_conn.initiate_write_message(out, this); // Throws - m_error_to_send = false; - enlist_to_send(); // Throws + m_error_message_sent = true; + + call_debug_hook(SyncClientHookEvent::ClientErrorMessageSent, + ProtocolErrorInfo(protocol_error, message, IsFatal{false})); + + if (m_state == Active && m_bind_message_sent) { + enlist_to_send(); // Throws + } } @@ -2346,7 +2356,7 @@ Status Session::receive_download_message(const SyncProgress& progress, std::uint // Ignore download messages when the client detects an error. This is to prevent transforming the same bad // changeset over and over again. - if (m_client_error) { + if (m_error_message_sent) { logger.debug("Ignoring download message because the client detected an integration error"); return Status::OK(); } diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index 6f67fc2b1aa..ad41b65bb66 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -866,7 +866,8 @@ class ClientImpl::Session { /// The specified transaction reporter (via the config object) is guaranteed /// to not be called before activation, and also not after initiation of /// deactivation. - Session(SessionWrapper&, ClientImpl::Connection&); + Session(SessionWrapper&, ClientImpl::Connection&, session_ident_type, + std::function); ~Session(); void force_close(); @@ -1040,6 +1041,8 @@ class ClientImpl::Session { bool m_error_message_received; // Session specific ERROR message received bool m_unbound_message_received; // UNBOUND message received bool m_error_to_send; + bool m_error_message_sent; + bool m_error_message_send_complete; // True when there is a new FLX sync query we need to send to the server. util::Optional m_pending_flx_sub_set; @@ -1136,13 +1139,14 @@ class ClientImpl::Session { SessionWrapper& m_wrapper; + std::function m_debug_hook; + bool m_in_debug_hook = false; + request_ident_type m_last_pending_test_command_ident = 0; std::list m_pending_test_commands; static std::string make_logger_prefix(session_ident_type); - Session(SessionWrapper& wrapper, Connection&, session_ident_type); - bool do_recognize_sync_version(version_type) noexcept; bool have_client_file_ident() const noexcept; @@ -1159,6 +1163,7 @@ class ClientImpl::Session { // session is in the Active state, and the unbinding process has completed // (unbind_process_complete()). bool unbind_process_complete() const noexcept; + bool pending_client_error() const noexcept; void activate(); void initiate_deactivation(); @@ -1417,12 +1422,8 @@ inline void ClientImpl::Session::request_download_completion_notification() ensure_enlisted_to_send(); // Throws } -inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn) - : Session{wrapper, conn, conn.get_client().get_next_session_ident()} // Throws -{ -} - -inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn, session_ident_type ident) +inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn, session_ident_type ident, + std::function debug_hook) : logger_ptr{std::make_shared(make_logger_prefix(ident), conn.logger_ptr)} // Throws , logger{*logger_ptr} , m_conn{conn} @@ -1431,6 +1432,7 @@ inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn, s , m_is_flx_sync_session(conn.is_flx_sync_connection()) , m_fix_up_object_ids(get_client().m_fix_up_object_ids) , m_wrapper{wrapper} + , m_debug_hook(std::move(debug_hook)) { if (get_client().m_disable_upload_activation_delay) m_allow_upload = true; @@ -1455,9 +1457,14 @@ inline bool ClientImpl::Session::unbind_process_complete() const noexcept return (m_unbind_message_send_complete && (m_error_message_received || m_unbound_message_received)); } +inline bool ClientImpl::Session::pending_client_error() const noexcept +{ + return m_error_to_send || (m_error_message_sent && !m_error_message_send_complete); +} + inline void ClientImpl::Session::connection_established(bool fast_reconnect) { - REALM_ASSERT(m_state == Active); + REALM_ASSERT(m_state == Active || (m_state == Deactivating && m_error_to_send)); if (!fast_reconnect && !get_client().m_disable_upload_activation_delay) { // Disallow immediate activation of the upload process, even if download @@ -1504,6 +1511,14 @@ inline void ClientImpl::Session::message_sent() // No message will be sent after the UNBIND message REALM_ASSERT(!m_unbind_message_send_complete); + if (m_error_message_sent) { + m_error_message_send_complete = true; + if (m_state == Deactivating && !m_bind_message_sent) { + complete_deactivation(); + return; + } + } + if (m_unbind_message_sent) { REALM_ASSERT(!m_enlisted_to_send); @@ -1550,6 +1565,9 @@ inline void ClientImpl::Session::reset_protocol_state() noexcept m_enlisted_to_send = false; m_bind_message_sent = false; m_error_to_send = false; + m_error_message_sent = false; + m_error_message_send_complete = false; + m_ident_message_sent = false; m_unbind_message_sent = false; m_unbind_message_send_complete = false; diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 467d29e318a..7f9478ca649 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -2665,20 +2665,32 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot SyncConfig::FLXSyncEnabled{}); interrupted_realm_config.cache = false; - auto check_interrupted_state = [&](const DBRef& realm) { - auto tr = realm->start_read(); - auto top_level = tr->get_table("class_TopLevel"); - REQUIRE(top_level); - REQUIRE(top_level->is_empty()); - - auto sub_store = sync::SubscriptionStore::create(realm); - auto version_info = sub_store->get_version_info(); - REQUIRE(version_info.latest == 1); - REQUIRE(version_info.active == 0); - auto latest_subs = sub_store->get_latest(); - REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping); - REQUIRE(latest_subs.size() == 1); - REQUIRE(latest_subs.at(0).object_class_name == "TopLevel"); + auto check_interrupted_state = [&](bool bootstrap_fully_received) { + _impl::RealmCoordinator::assert_no_open_realms(); + DBOptions options; + options.encryption_key = test_util::crypt_key(); + auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); + auto logger = util::Logger::get_default_logger(); + sync::PendingBootstrapStore bootstrap_store(realm, *logger); + REQUIRE(bootstrap_store.has_pending()); + auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); + REQUIRE(pending_batch.query_version == 1); + REQUIRE(pending_batch.progress.has_value() == bootstrap_fully_received); + { + auto tr = realm->start_read(); + auto top_level = tr->get_table("class_TopLevel"); + REQUIRE(top_level); + REQUIRE(top_level->is_empty()); + + auto sub_store = sync::SubscriptionStore::create(realm); + auto version_info = sub_store->get_version_info(); + REQUIRE(version_info.latest == 1); + REQUIRE(version_info.active == 0); + auto latest_subs = sub_store->get_latest(); + REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping); + REQUIRE(latest_subs.size() == 1); + REQUIRE(latest_subs.at(0).object_class_name == "TopLevel"); + } }; auto mutate_realm = [&] { @@ -2690,7 +2702,9 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot }); }; - SECTION("unknown exception occurs during bootstrap application on session startup") { + SECTION("exception occurs during bootstrap application on session startup") { + enum ExceptionType { Novel, BadChangeset }; + auto exception_to_throw = GENERATE(ExceptionType::Novel, ExceptionType::BadChangeset); { auto [interrupted_promise, interrupted] = util::make_promise_future(); Realm::Config config = interrupted_realm_config; @@ -2727,53 +2741,94 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - _impl::RealmCoordinator::assert_no_open_realms(); - - // Open up the realm without the sync client attached and verify that the realm got interrupted in the state - // we expected it to be in. - { - DBOptions options; - options.encryption_key = test_util::crypt_key(); - auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); - auto logger = util::Logger::get_default_logger(); - sync::PendingBootstrapStore bootstrap_store(realm, *logger); - REQUIRE(bootstrap_store.has_pending()); - auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); - REQUIRE(pending_batch.query_version == 1); - REQUIRE(pending_batch.progress); - - check_interrupted_state(realm); - } - - auto error_pf = util::make_promise_future(); - interrupted_realm_config.sync_config->error_handler = - [promise = std::make_shared>(std::move(error_pf.promise))]( - std::shared_ptr, SyncError error) { - promise->emplace_value(std::move(error)); - }; + check_interrupted_state(true); + enum class State { + Interrupted, + ExceptionThrown, + ClientErrorPropagatedToHandler, + RealmDestroyed, + ClientErrorMessageSentToServer, + }; + TestingStateMachine state(State::Interrupted); + + std::optional propagated_error; + interrupted_realm_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { + propagated_error = std::move(error); + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::ExceptionThrown); + return State::ClientErrorPropagatedToHandler; + }); + }; interrupted_realm_config.sync_config->on_sync_client_event_hook = - [&, download_message_received = false](std::weak_ptr, - const SyncClientHookData& data) mutable { + [&, download_message_received = false, + ident_message_sent = false](std::weak_ptr, const SyncClientHookData& data) mutable { if (data.event == SyncClientHookEvent::DownloadMessageReceived) { download_message_received = true; } - if (data.event != SyncClientHookEvent::BootstrapBatchAboutToProcess) { - return SyncClientHookAction::NoAction; + else if (data.event == SyncClientHookEvent::IdentMessageSent) { + ident_message_sent = true; + } + else if (data.event == SyncClientHookEvent::BootstrapBatchAboutToProcess) { + REQUIRE(!ident_message_sent); + REQUIRE(!download_message_received); + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::Interrupted); + return State::ExceptionThrown; + }); + switch (exception_to_throw) { + case ExceptionType::Novel: + throw NovelException{}; + case ExceptionType::BadChangeset: + throw sync::IntegrationException(ErrorCodes::BadChangeset, "simulated failure"); + } + } + else if (data.event == SyncClientHookEvent::ClientErrorMessageSent) { + REQUIRE(!ident_message_sent); + state.wait_for(State::RealmDestroyed); + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::RealmDestroyed); + return State::ClientErrorMessageSentToServer; + }); } - REQUIRE(!download_message_received); - throw NovelException{}; return SyncClientHookAction::NoAction; }; - auto realm = Realm::get_shared_realm(interrupted_realm_config); - const auto& error = error_pf.future.get(); - REQUIRE(!error.is_fatal); - REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::Warning); - REQUIRE(error.status == ErrorCodes::UnknownError); - REQUIRE_THAT(error.status.reason(), - Catch::Matchers::ContainsSubstring("Oh no, a really weird exception happened!")); + { + auto realm = Realm::get_shared_realm(interrupted_realm_config); + state.wait_for(State::ClientErrorPropagatedToHandler); + } + + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::ClientErrorPropagatedToHandler); + return State::RealmDestroyed; + }); + state.wait_for(State::ClientErrorMessageSentToServer); + + std::vector server_errors; + timed_sleeping_wait_for([&] { + server_errors = harness.session().app_session().admin_api.get_errors( + harness.session().app_session().server_app_id, + {{"user_id", harness.app()->current_user()->identity()}}); + return !server_errors.empty(); + }); + + std::pair expected_error = [&] { + switch (exception_to_throw) { + case ExceptionType::Novel: + return std::make_pair(ErrorCodes::UnknownError, NovelException{}.what()); + case ExceptionType::BadChangeset: + return std::make_pair(ErrorCodes::BadChangeset, "simulated failure"); + } + }(); + REQUIRE(propagated_error); + REQUIRE(!propagated_error->is_fatal); + REQUIRE(propagated_error->server_requests_action == sync::ProtocolErrorInfo::Action::Warning); + REQUIRE(propagated_error->status == expected_error.first); + REQUIRE_THAT(propagated_error->status.reason(), Catch::Matchers::ContainsSubstring(expected_error.second)); + + REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring(expected_error.second))); } SECTION("exception occurs during bootstrap application") { @@ -2820,23 +2875,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - _impl::RealmCoordinator::assert_no_open_realms(); - - // Open up the realm without the sync client attached and verify that the realm got interrupted in the state - // we expected it to be in. - { - DBOptions options; - options.encryption_key = test_util::crypt_key(); - auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); - util::StderrLogger logger; - sync::PendingBootstrapStore bootstrap_store(realm, logger); - REQUIRE(bootstrap_store.has_pending()); - auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); - REQUIRE(pending_batch.query_version == 1); - REQUIRE(pending_batch.progress); - - check_interrupted_state(realm); - } + check_interrupted_state(true); auto realm = Realm::get_shared_realm(interrupted_realm_config); auto table = realm->read_group().get_table("class_TopLevel"); @@ -2850,6 +2889,15 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot for (auto& id : obj_ids_at_end) { REQUIRE(table->find_primary_key(Mixed{id})); } + + std::vector server_errors; + timed_sleeping_wait_for([&] { + server_errors = harness.session().app_session().admin_api.get_errors( + harness.session().app_session().server_app_id, + {{"user_id", harness.app()->current_user()->identity()}}); + return !server_errors.empty(); + }); + REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring(error_status.reason()))); } SECTION("interrupted before final bootstrap message") { @@ -2889,25 +2937,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - _impl::RealmCoordinator::assert_no_open_realms(); - - // Open up the realm without the sync client attached and verify that the realm got interrupted in the state - // we expected it to be in. - { - DBOptions options; - options.encryption_key = test_util::crypt_key(); - auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); - auto logger = util::Logger::get_default_logger(); - sync::PendingBootstrapStore bootstrap_store(realm, *logger); - REQUIRE(bootstrap_store.has_pending()); - auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); - REQUIRE(pending_batch.query_version == 1); - REQUIRE(!pending_batch.progress); - REQUIRE(pending_batch.remaining_changesets == 0); - REQUIRE(pending_batch.changesets.size() == 1); - - check_interrupted_state(realm); - } + check_interrupted_state(false); // Now we'll open a different realm and make some changes that would leave orphan objects on the client // if the bootstrap batches weren't being cached until lastInBatch were true. @@ -2966,25 +2996,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - _impl::RealmCoordinator::assert_no_open_realms(); - - // Open up the realm without the sync client attached and verify that the realm got interrupted in the state - // we expected it to be in. - { - DBOptions options; - options.encryption_key = test_util::crypt_key(); - auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); - auto logger = util::Logger::get_default_logger(); - sync::PendingBootstrapStore bootstrap_store(realm, *logger); - REQUIRE(bootstrap_store.has_pending()); - auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); - REQUIRE(pending_batch.query_version == 1); - REQUIRE(static_cast(pending_batch.progress)); - REQUIRE(pending_batch.remaining_changesets == 0); - REQUIRE(pending_batch.changesets.size() == 6); - - check_interrupted_state(realm); - } + check_interrupted_state(true); // Now we'll open a different realm and make some changes that would leave orphan objects on the client // if the bootstrap batches weren't being cached until lastInBatch were true. @@ -3250,6 +3262,17 @@ TEST_CASE("flx: data ingest", "[sync][flx][data ingest][baas]") { auto err = error_future.get(); CHECK(error_count == 2); + + std::vector server_errors; + timed_sleeping_wait_for([&] { + server_errors = harness->session().app_session().admin_api.get_errors( + harness->session().app_session().server_app_id, + {{"user_id", harness->app()->current_user()->identity()}}); + return !server_errors.empty(); + }); + REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring( + "BadChangeset: Failed to transform received changeset: Schema mismatch: " + "'Asymmetric' is asymmetric on one side, but not on the other."s))); }); } @@ -3537,34 +3560,80 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { // An integration error is simulated while bootstrapping. // This results in the client sending an error message to the server. SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); - config.sync_config->simulate_integration_error = true; + SECTION("immediately close session after bad changeset") { + enum class State { Initial, AboutToThrow, RealmDestroyed, Thrown, ClientErrorMessageSent }; + TestingStateMachine state(State::Initial); + config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr, + const SyncClientHookData& data) mutable { + if (data.event == SyncClientHookEvent::ClientErrorMessageSent) { + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::Thrown); + return State::ClientErrorMessageSent; + }); + return SyncClientHookAction::NoAction; + } + if (data.event != SyncClientHookEvent::BootstrapBatchAboutToProcess) { + return SyncClientHookAction::NoAction; + } - auto [error_promise, error_future] = util::make_promise_future(); - auto error_count = 0; - auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)), - &error_count](std::shared_ptr, SyncError err) mutable { - ++error_count; - if (error_count == 1) { - // Bad changeset detected by the client. - CHECK(err.status == ErrorCodes::BadChangeset); - } - else if (error_count == 2) { - // Server asking for a client reset. - CHECK(err.status == ErrorCodes::SyncClientResetRequired); - CHECK(err.is_client_reset_requested()); - promise.get_promise().emplace_value(std::move(err)); + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::Initial); + return State::AboutToThrow; + }); + state.wait_for(State::RealmDestroyed); + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::RealmDestroyed); + return State::Thrown; + }); + throw sync::IntegrationException(ErrorCodes::BadChangeset, "simulated failure"); + }; + + { + auto realm = Realm::get_shared_realm(config); + state.wait_for(State::AboutToThrow); } - }; + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::AboutToThrow); + return State::RealmDestroyed; + }); + state.wait_for(State::ClientErrorMessageSent); + } - config.sync_config->error_handler = err_handler; - auto realm = Realm::get_shared_realm(config); - auto table = realm->read_group().get_table("class_TopLevel"); - auto new_query = realm->get_latest_subscription_set().make_mutable_copy(); - new_query.insert_or_assign(Query(table)); - new_query.commit(); + SECTION("check for client reset error") { + config.sync_config->simulate_integration_error = true; + auto [error_promise, error_future] = util::make_promise_future(); + auto error_count = 0; + auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)), + &error_count](std::shared_ptr, SyncError err) mutable { + ++error_count; + if (error_count == 1) { + // Bad changeset detected by the client. + CHECK(err.status == ErrorCodes::BadChangeset); + } + else if (error_count == 2) { + // Server asking for a client reset. + CHECK(err.status == ErrorCodes::SyncClientResetRequired); + CHECK(err.is_client_reset_requested()); + promise.get_promise().emplace_value(std::move(err)); + } + }; + + config.sync_config->error_handler = err_handler; - auto err = error_future.get(); - CHECK(error_count == 2); + auto realm = Realm::get_shared_realm(config); + + auto err = error_future.get(); + CHECK(error_count == 2); + } + + std::vector server_errors; + timed_sleeping_wait_for([&] { + server_errors = harness.session().app_session().admin_api.get_errors( + harness.session().app_session().server_app_id, {{"user_id", harness.app()->current_user()->identity()}}); + return !server_errors.empty(); + }); + REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring( + "BadChangeset: simulated failure (ProtocolErrorCode=201)"s))); } TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][bootstrap][baas]") { diff --git a/test/object-store/util/sync/baas_admin_api.cpp b/test/object-store/util/sync/baas_admin_api.cpp index 0b8a0074bfb..46885f0e656 100644 --- a/test/object-store/util/sync/baas_admin_api.cpp +++ b/test/object-store/util/sync/baas_admin_api.cpp @@ -556,10 +556,16 @@ std::vector AdminAPISession::get_services(const std::s } -std::vector AdminAPISession::get_errors(const std::string& app_id) const +std::vector AdminAPISession::get_errors(const std::string& app_id, + std::vector> filters) const { auto endpoint = apps()[app_id]["logs"]; - auto response = endpoint.get_json({{"errors_only", "true"}}); + if (!std::any_of(filters.begin(), filters.end(), [](auto& pair) { + return pair.first == "errors_only"; + })) { + filters.push_back({"errors_only", "true"}); + } + auto response = endpoint.get_json(std::move(filters)); std::vector errors; const auto& logs = response["logs"]; std::transform(logs.begin(), logs.end(), std::back_inserter(errors), [](const auto& err) { diff --git a/test/object-store/util/sync/baas_admin_api.hpp b/test/object-store/util/sync/baas_admin_api.hpp index fcf3d5e3ab4..550e3f03b69 100644 --- a/test/object-store/util/sync/baas_admin_api.hpp +++ b/test/object-store/util/sync/baas_admin_api.hpp @@ -111,7 +111,8 @@ class AdminAPISession { }; std::vector get_services(const std::string& app_id) const; - std::vector get_errors(const std::string& app_id) const; + std::vector get_errors(const std::string& app_id, + std::vector> filters = {}) const; Service get_sync_service(const std::string& app_id) const; ServiceConfig get_config(const std::string& app_id, const Service& service) const; ServiceConfig disable_sync(const std::string& app_id, const std::string& service_id, diff --git a/test/object-store/util/sync/flx_sync_harness.hpp b/test/object-store/util/sync/flx_sync_harness.hpp index 5c4b0701268..bdcb61541e2 100644 --- a/test/object-store/util/sync/flx_sync_harness.hpp +++ b/test/object-store/util/sync/flx_sync_harness.hpp @@ -22,6 +22,7 @@ #include #include +#include #include diff --git a/test/object-store/util/test_utils.hpp b/test/object-store/util/test_utils.hpp index 7ae8cb73c00..9d090f93933 100644 --- a/test/object-store/util/test_utils.hpp +++ b/test/object-store/util/test_utils.hpp @@ -66,6 +66,37 @@ class TestingStateMachine { E m_cur_state; }; +template +class VectorElemMatchesMatcher final : public Catch::Matchers::MatcherGenericBase { +public: + explicit VectorElemMatchesMatcher(Matcher&& matcher) + : m_matcher(std::move(matcher)) + { + } + + template + bool match(Container const& vec) const + { + return std::any_of(vec.begin(), vec.end(), [&](const auto& e) { + return m_matcher.match(e); + }); + } + + std::string describe() const override + { + return util::format("VectorElemMatches(%1)", m_matcher.toString()); + } + +private: + Matcher m_matcher; +}; + +template +inline auto VectorElemMatches(Matcher&& matcher) +{ + return VectorElemMatchesMatcher(std::move(matcher)); +} + template class ExceptionMatcher final : public Catch::Matchers::MatcherBase { public: