diff --git a/cmake/dmitigr_pgfe.cmake b/cmake/dmitigr_pgfe.cmake index 5aabd5a..792f8cb 100644 --- a/cmake/dmitigr_pgfe.cmake +++ b/cmake/dmitigr_pgfe.cmake @@ -57,6 +57,7 @@ set(dmitigr_pgfe_headers pq.hpp prepared_statement.hpp problem.hpp + ready_for_query.hpp response.hpp row.hpp row_info.hpp @@ -79,6 +80,7 @@ set(dmitigr_pgfe_implementations misc.cpp prepared_statement.cpp problem.cpp + ready_for_query.cpp row_info.cpp sql_string.cpp sql_vector.cpp @@ -130,6 +132,7 @@ if(DMITIGR_CPPLIPA_TESTS) copier data hello_world + pipeline pq_vs_pgfe ps row diff --git a/src/pgfe/basics.hpp b/src/pgfe/basics.hpp index eb2932c..3bf120c 100644 --- a/src/pgfe/basics.hpp +++ b/src/pgfe/basics.hpp @@ -190,6 +190,8 @@ enum class Connection_status { connected = 400 }; +// ============================================================================= + /** * @ingroup main * @@ -238,6 +240,24 @@ enum class Transaction_status { // ============================================================================= +/** + * @ingroup main + * + * @brief A pipeline status. + */ +enum class Pipeline_status { + /// Pipeline is disabled. + disabled = 0, + + /// Pipeline is enabled. + enabled = 100, + + /// Error occurred while processing the pipeline. + aborted = 200 +}; + +// ============================================================================= + /** * @ingroup main * diff --git a/src/pgfe/connection.cpp b/src/pgfe/connection.cpp index 3b8e1a4..9ab264a 100644 --- a/src/pgfe/connection.cpp +++ b/src/pgfe/connection.cpp @@ -20,11 +20,13 @@ // Dmitry Igrishin // dmitigr@gmail.com +#include "../base/assert.hpp" #include "../net/socket.hpp" #include "connection.hpp" #include "copier.hpp" #include "exceptions.hpp" #include "large_object.hpp" +#include "ready_for_query.hpp" #include "sql_string.hpp" namespace dmitigr::pgfe { @@ -269,18 +271,18 @@ DMITIGR_PGFE_INLINE Response_status Connection::handle_input(const bool wait_res { assert(is_connected()); - const auto check_state = [this] + const auto check_state = [this]() noexcept { assert(response_status_ == Response_status::ready); assert(response_.status() == PGRES_SINGLE_TUPLE); assert(!requests_.empty()); - assert(requests_.front() == Request_id::execute); + assert(requests_.front().id_ == Request::Id::execute); }; const auto dismiss_request = [this]() noexcept { if (!requests_.empty()) { - last_processed_request_id_ = requests_.front(); + last_processed_request_ = std::move(requests_.front()); requests_.pop(); } }; @@ -288,12 +290,26 @@ DMITIGR_PGFE_INLINE Response_status Connection::handle_input(const bool wait_res static const auto is_completion_status = [](const auto status) noexcept { return status == PGRES_FATAL_ERROR || + status == PGRES_PIPELINE_ABORTED || + status == PGRES_PIPELINE_SYNC || status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK || status == PGRES_EMPTY_QUERY || status == PGRES_BAD_RESPONSE; }; + /* + * According to https://www.postgresql.org/docs/current/libpq-pipeline-mode.html, + * "To enter single-row mode, call PQsetSingleRowMode() before retrieving results + * with PQgetResult(). This mode selection is effective only for the query currently + * being processed." Therefore, set_single_row_mode_enabled() is called once for + * each query in a pipeline. + */ + if ((pipeline_status() == Pipeline_status::enabled) && + !is_single_row_mode_enabled_ && !requests_.empty() && + requests_.front().id_ == Request::Id::execute) + set_single_row_mode_enabled(); + if (wait_response) { if (response_status_ == Response_status::unready) { complete_response: @@ -361,33 +377,38 @@ DMITIGR_PGFE_INLINE Response_status Connection::handle_input(const bool wait_res assert(rstatus != PGRES_NONFATAL_ERROR); assert(rstatus != PGRES_SINGLE_TUPLE); if (rstatus == PGRES_TUPLES_OK) { - assert(last_processed_request_id_ == Request_id::execute); + assert(last_processed_request_.id_ == Request::Id::execute); + is_single_row_mode_enabled_ = false; } else if (rstatus == PGRES_COPY_OUT || rstatus == PGRES_COPY_IN) { is_copy_in_progress_ = true; } else if (rstatus == PGRES_FATAL_ERROR) { is_copy_in_progress_ = false; - request_prepared_statement_ = {}; - request_prepared_statement_name_.reset(); + is_single_row_mode_enabled_ = false; } else if (rstatus == PGRES_COMMAND_OK) { - assert(last_processed_request_id_ != Request_id::prepare || request_prepared_statement_); - assert(last_processed_request_id_ != Request_id::describe || request_prepared_statement_name_); - assert(last_processed_request_id_ != Request_id::unprepare || request_prepared_statement_name_); - if (last_processed_request_id_ == Request_id::prepare) { - last_prepared_statement_ = register_ps(std::move(request_prepared_statement_)); - assert(!request_prepared_statement_); - } else if (last_processed_request_id_ == Request_id::describe) { - last_prepared_statement_ = ps(*request_prepared_statement_name_); + auto& lpr = last_processed_request_; + assert(lpr.id_ != Request::Id::prepare || lpr.prepared_statement_); + assert(lpr.id_ != Request::Id::describe || lpr.prepared_statement_name_); + assert(lpr.id_ != Request::Id::unprepare || lpr.prepared_statement_name_); + if (lpr.id_ == Request::Id::prepare) { + last_prepared_statement_ = register_ps(std::move(lpr.prepared_statement_)); + assert(!lpr.prepared_statement_); + } else if (lpr.id_ == Request::Id::describe) { + last_prepared_statement_ = ps(*lpr.prepared_statement_name_); if (!last_prepared_statement_) - last_prepared_statement_ = register_ps(Prepared_statement{std::move(*request_prepared_statement_name_), - this, static_cast(response_.field_count())}); + last_prepared_statement_ = register_ps(Prepared_statement{ + std::move(*lpr.prepared_statement_name_), + this, + static_cast(response_.field_count())}); last_prepared_statement_->set_description(std::move(response_)); - request_prepared_statement_name_.reset(); - } else if (last_processed_request_id_ == Request_id::unprepare) { - assert(request_prepared_statement_name_ && !std::strcmp(response_.command_tag(), "DEALLOCATE")); - unregister_ps(*request_prepared_statement_name_); - request_prepared_statement_name_.reset(); + lpr.prepared_statement_name_.reset(); + } else if (lpr.id_ == Request::Id::unprepare) { + assert(lpr.prepared_statement_name_ && + !std::strcmp(response_.command_tag(), "DEALLOCATE")); + unregister_ps(*lpr.prepared_statement_name_); + lpr.prepared_statement_name_.reset(); } is_copy_in_progress_ = false; + is_single_row_mode_enabled_ = false; } } else if (response_status_ == Response_status::empty) dismiss_request(); // just in case @@ -504,17 +525,17 @@ DMITIGR_PGFE_INLINE Completion Connection::completion() noexcept return result; } case PGRES_COMMAND_OK: - switch (last_processed_request_id_) { - case Request_id::execute: { + switch (last_processed_request_.id_) { + case Request::Id::execute: { Completion result{response_.command_tag()}; response_.reset(); return result; } - case Request_id::prepare: + case Request::Id::prepare: [[fallthrough]]; - case Request_id::describe: + case Request::Id::describe: return {}; - case Request_id::unprepare: { + case Request::Id::unprepare: { Completion result{"unprepare"}; response_.reset(); return result; @@ -532,6 +553,17 @@ DMITIGR_PGFE_INLINE Completion Connection::completion() noexcept } } +DMITIGR_PGFE_INLINE Ready_for_query Connection::ready_for_query() +{ + return (response_.status() == PGRES_PIPELINE_SYNC) + ? Ready_for_query{std::move(response_)} : Ready_for_query{}; +} + +DMITIGR_PGFE_INLINE std::size_t Connection::request_queue_size() const +{ + return requests_.size(); +} + DMITIGR_PGFE_INLINE void Connection::prepare_nio(const Sql_string& statement, const std::string& name) { @@ -550,15 +582,12 @@ Connection::prepare(const Sql_string& statement, const std::string& name) DMITIGR_PGFE_INLINE void Connection::describe_nio(const std::string& name) { assert(is_ready_for_nio_request()); - assert(!request_prepared_statement_name_); - requests_.push(Request_id::describe); // can throw + requests_.emplace(Request::Id::describe, name); // can throw try { - auto name_copy = name; const int send_ok = ::PQsendDescribePrepared(conn(), name.c_str()); if (!send_ok) throw std::runtime_error{error_message()}; - request_prepared_statement_name_ = std::move(name_copy); // cannot throw } catch (...) { requests_.pop(); // rollback throw; @@ -570,18 +599,52 @@ DMITIGR_PGFE_INLINE void Connection::describe_nio(const std::string& name) DMITIGR_PGFE_INLINE void Connection::unprepare_nio(const std::string& name) { assert(!name.empty()); - assert(!request_prepared_statement_name_); auto name_copy = name; // can throw const auto query = "DEALLOCATE " + to_quoted_identifier(name); // can throw execute_nio(query); // can throw - assert(requests_.front() == Request_id::execute); - requests_.front() = Request_id::unprepare; // cannot throw - request_prepared_statement_name_ = std::move(name_copy); // cannot throw + assert(requests_.front().id_ == Request::Id::execute); + requests_.front().id_ = Request::Id::unprepare; // cannot throw + requests_.front().prepared_statement_name_ = std::move(name_copy); // cannot throw assert(is_invariant_ok()); } +DMITIGR_PGFE_INLINE void Connection::set_pipeline_enabled(const bool value) +{ + if (value) { + if (!PQenterPipelineMode(conn())) + throw Client_exception{"cannot enable pipeline on connection"}; + } else { + if (!PQexitPipelineMode(conn())) + throw Client_exception{error_message()}; + } +} + +DMITIGR_PGFE_INLINE Pipeline_status Connection::pipeline_status() const +{ + switch (PQpipelineStatus(conn())) { + case PQ_PIPELINE_OFF: return Pipeline_status::disabled; + case PQ_PIPELINE_ON: return Pipeline_status::enabled; + case PQ_PIPELINE_ABORTED: return Pipeline_status::aborted; + default: break; + } + DMITIGR_ASSERT(false); +} + +DMITIGR_PGFE_INLINE void Connection::send_sync() +{ + if (!PQpipelineSync(conn())) + throw Client_exception{"cannot send sync message to the server"}; + requests_.emplace(Request::Id::sync); +} + +DMITIGR_PGFE_INLINE void Connection::send_flush() +{ + if (!PQsendFlushRequest(conn())) + throw Client_exception{"cannot send flush message to the server"}; +} + DMITIGR_PGFE_INLINE Oid Connection::create_large_object(const Oid oid) noexcept { assert(is_ready_for_request()); @@ -641,9 +704,7 @@ DMITIGR_PGFE_INLINE bool Connection::is_invariant_ok() const noexcept (response_status_ == Response_status::empty) && named_prepared_statements_.empty() && !unnamed_prepared_statement_ && - requests_.empty() && - !request_prepared_statement_ && - !request_prepared_statement_name_; + requests_.empty(); const bool session_data_ok = session_data_empty || (status() == Status::failure) || (status() == Status::connected); const bool trans_ok = !is_connected() || transaction_status(); const bool sess_time_ok = !is_connected() || session_start_time(); @@ -685,8 +746,13 @@ DMITIGR_PGFE_INLINE void Connection::reset_session() noexcept unnamed_prepared_statement_ = {}; requests_ = {}; - request_prepared_statement_ = {}; - request_prepared_statement_name_.reset(); +} + +DMITIGR_PGFE_INLINE void Connection::set_single_row_mode_enabled() +{ + const auto set_ok = ::PQsetSingleRowMode(conn()); + DMITIGR_ASSERT(set_ok); + is_single_row_mode_enabled_ = true; } DMITIGR_PGFE_INLINE void Connection::notice_receiver(void* const arg, const ::PGresult* const r) noexcept @@ -716,17 +782,15 @@ Connection::prepare_nio__(const char* const query, const char* const name, const assert(query); assert(name); assert(is_ready_for_nio_request()); - assert(!request_prepared_statement_); - requests_.push(Request_id::prepare); // can throw + requests_.emplace(Request::Id::prepare); try { - Prepared_statement ps{name, this, preparsed}; - constexpr int n_params{0}; + requests_.front().prepared_statement_ = {name, this, preparsed}; + constexpr int n_params{}; constexpr const ::Oid* const param_types{}; const int send_ok = ::PQsendPrepare(conn(), name, query, n_params, param_types); if (!send_ok) throw std::runtime_error{error_message()}; - request_prepared_statement_ = std::move(ps); // cannot throw } catch (...) { requests_.pop(); // rollback throw; diff --git a/src/pgfe/connection.hpp b/src/pgfe/connection.hpp index 6a91eb7..0db623b 100644 --- a/src/pgfe/connection.hpp +++ b/src/pgfe/connection.hpp @@ -124,13 +124,11 @@ class Connection final { swap(session_start_time_, rhs.session_start_time_); swap(response_, rhs.response_); swap(response_status_, rhs.response_status_); - swap(last_processed_request_id_, rhs.last_processed_request_id_); + swap(last_processed_request_, rhs.last_processed_request_); swap(last_prepared_statement_, rhs.last_prepared_statement_); swap(named_prepared_statements_, rhs.named_prepared_statements_); unnamed_prepared_statement_.swap(rhs.unnamed_prepared_statement_); swap(requests_, rhs.requests_); - request_prepared_statement_.swap(rhs.request_prepared_statement_); - swap(request_prepared_statement_name_, rhs.request_prepared_statement_name_); } /// @name General observers @@ -165,6 +163,13 @@ class Connection final { return status() == Status::connected; } + /// @returns `true` if the connection is open and no operation in progress. + bool is_connected_and_idle() const + { + const auto ts = transaction_status(); + return ts && ts != Transaction_status::active; + } + /** * @returns The transaction status. * @@ -382,9 +387,7 @@ class Connection final { // ----------------------------------------------------------------------------- - /** - * @name Signals - */ + /// @name Signals /// @{ /// @returns The valid released instance if available. @@ -450,12 +453,6 @@ class Connection final { /// @name Responses /// @{ - /// @returns `true` if there is uncompleted request. - bool has_uncompleted_request() const noexcept - { - return !requests_.empty(); - } - /// @returns `true` if there is ready response available. bool has_response() const noexcept { @@ -718,6 +715,19 @@ class Connection final { return ps(name); } + /** + * @returns The released instance if available. + * + * @par Exception safety guarantee + * Strong. + * + * @remarks At present, this method is only useful to get the synchronization + * point in a pipeline sent by send_sync(). + * + * @see send_sync(), wait_response(). + */ + DMITIGR_PGFE_API Ready_for_query ready_for_query(); + ///@} // --------------------------------------------------------------------------- @@ -725,6 +735,12 @@ class Connection final { /// @name Requests /// @{ + /// @returns `true` if `COPY` command in progress. + bool is_copy_in_progress() const noexcept + { + return is_copy_in_progress_; + } + /** * @returns `true` if the connection is ready for requesting a server in a * non-blocking manner. @@ -733,18 +749,35 @@ class Connection final { */ bool is_ready_for_nio_request() const noexcept { - const auto ts = transaction_status(); - return ts && ts != Transaction_status::active; + return (pipeline_status() == Pipeline_status::disabled) + ? is_ready_for_request() : is_connected(); } /** - * @returns `true` if the connection is ready for requesting a server. + * @returns The request queue size. + * + * @remarks The request queue size can be greater than 1 only if pipeline + * is enabled. + * + * @see set_pipeline_enabled(). + */ + DMITIGR_PGFE_API std::size_t request_queue_size() const; + + /// @returns `true` if there is uncompleted request. + bool has_uncompleted_request() const noexcept + { + return !requests_.empty(); + } + + /** + * @returns `true` if the connection is ready for requesting a server, + * i.e. if the connection is open and no command in progress. * * @see is_ready_for_nio_request(). */ bool is_ready_for_request() const noexcept { - return !is_copy_in_progress_ && is_ready_for_nio_request(); + return (pipeline_status() == Pipeline_status::disabled) && is_connected_and_idle(); } /** @@ -900,9 +933,6 @@ class Connection final { * @par Responses * Similar to Prepared_statement::execute(). * - * @param queries A string, containing the SQL query(-es). Adjacent - * queries must be separated by a semicolon. - * * @par Effects * `has_uncompleted_request()`. * @@ -948,6 +978,7 @@ class Connection final { std::enable_if_t::is_valid, Completion> execute(F&& callback, const Sql_string& statement, Types&& ... parameters) { + assert(is_ready_for_request()); execute_nio(statement, std::forward(parameters)...); return process_responses(std::forward(callback)); } @@ -1087,6 +1118,59 @@ class Connection final { return call([](auto&&){}, procedure, std::forward(arguments)...); } + /** + * @briefs Enables or disables the pipeline on this instance. + * + * @details When pipeline is enabled, only asynchronous operations are + * permitted. After enabling the pipeline, the application queues requests + * using prepare_nio(), unprepare_nio(), prepare_nio_as_is(), describe_nio() + * or execute_nio(). These requests are flushed to the server when send_sync() + * is called to establish a synchronization point in the pipeline, or when + * flush_output() is called. + * The server executes commands in the pipeline as they are received, and + * sends the results of executed statements according to the queue. The + * results are buffered on the server side until the buffer is flushed, when + * either Sync or Flush messages, issued on client side with send_sync() or + * send_flush() accordingly, will be processed by the server. If any operation + * fails, the server aborts the current transaction and sends the Error + * response. After receiving the Error response, the application must skip the + * results of subsequent commands with either wait_response() or + * wait_response_throw() until the Ready_for_query response is received. + * + * @par Effects + * - if `value`, then `!is_ready_for_request()` and + * `is_ready_for_nio_request()` and `pipeline_status() == Pipeline_status::enabled`; + * - if `!value`, then `is_ready_for_request()` and + * `is_ready_for_nio_request()` and `pipeline_status() == Pipeline_status::disabled`. + * + * @see pipeline_status(), send_sync(), send_flush(). + */ + DMITIGR_PGFE_API void set_pipeline_enabled(bool value); + + /// @returns The status of pipeline. + DMITIGR_PGFE_API Pipeline_status pipeline_status() const; + + /** + * @brief Sends a Sync message to the server. + * + * @remarks At present, this method is only useful to mark a synchronization + * point in a pipeline and to cause the server to flush its output buffer. + * + * @see send_flush(), set_pipeline_enabled(). + */ + DMITIGR_PGFE_API void send_sync(); + + /** + * @brief Sends a Flush message to the server. + * + * @remarks At present, this method is only useful to flush the server's + * point in a pipeline and to cause the server to flush its output buffer + * without establishing a synchronization point (as with send_sync().) + * + * @see send_sync(), set_pipeline_enabled(). + */ + DMITIGR_PGFE_API void send_flush(); + /** * @brief Sets the default data format of statements execution results. * @@ -1304,31 +1388,53 @@ class Connection final { ::PGconn* conn() const noexcept { return conn_.get(); } // --------------------------------------------------------------------------- - // Session data / requests data + // Session data / requests // --------------------------------------------------------------------------- - enum class Request_id { - execute = 1, - prepare, - describe, - unprepare + struct Request final { + enum class Id { + execute = 1, + prepare, + describe, + unprepare, + sync + }; + + Request() = default; + + explicit Request(const Id id) + : id_{id} + {} + + Request(const Id id, Prepared_statement prepared_statement) + : id_{id} + , prepared_statement_{std::move(prepared_statement)} + {} + + Request(const Id id, std::string prepared_statement_name) + : id_{id} + , prepared_statement_name_{std::move(prepared_statement_name)} + {} + + Id id_{}; + Prepared_statement prepared_statement_; + std::optional prepared_statement_name_; }; std::optional session_start_time_; detail::pq::Result response_; // allowed to not match to response_status_ Response_status response_status_{}; // status last assigned by handle_input() - Request_id last_processed_request_id_{}; // type last assigned by handle_input() Prepared_statement* last_prepared_statement_{}; bool is_output_flushed_{true}; bool is_copy_in_progress_{}; + bool is_single_row_mode_enabled_{}; mutable std::list named_prepared_statements_; mutable Prepared_statement unnamed_prepared_statement_; - std::queue requests_; // for batch mode - Prepared_statement request_prepared_statement_; - std::optional request_prepared_statement_name_; + std::queue requests_; + Request last_processed_request_; bool is_invariant_ok() const noexcept; @@ -1337,6 +1443,7 @@ class Connection final { // --------------------------------------------------------------------------- void reset_session() noexcept; + void set_single_row_mode_enabled(); // --------------------------------------------------------------------------- // Handlers @@ -1401,7 +1508,7 @@ class Connection final { std::string error_message() const; - bool is_out_of_memory() const + bool is_out_of_memory() const noexcept { constexpr char msg[] = "out of memory"; return !std::strncmp(::PQerrorMessage(conn()), msg, sizeof(msg) - 1); diff --git a/src/pgfe/copier.cpp b/src/pgfe/copier.cpp index a53630b..407f61e 100644 --- a/src/pgfe/copier.cpp +++ b/src/pgfe/copier.cpp @@ -90,7 +90,7 @@ DMITIGR_PGFE_INLINE bool Copier::send(const std::string_view data) const if (r == 0 || r == 1) return r; else if (r == -1) - throw Client_exception{PQerrorMessage(connection().conn())}; + throw Client_exception{connection().error_message()}; DMITIGR_ASSERT(false); } @@ -104,7 +104,7 @@ DMITIGR_PGFE_INLINE bool Copier::end(const std::string& error_message) const if (r == 0 || r == 1) return r; else if (r == -1) - throw Client_exception{PQerrorMessage(connection().conn())}; + throw Client_exception{connection().error_message()}; DMITIGR_ASSERT(false); } @@ -127,7 +127,7 @@ DMITIGR_PGFE_INLINE Data_view Copier::receive(const bool wait) const else if (size > 0) return Data_view{buffer_.get(), static_cast(size), data_format(0)}; else if (size == -2) - throw Client_exception{PQerrorMessage(connection().conn())}; + throw Client_exception{connection().error_message()}; DMITIGR_ASSERT(false); } diff --git a/src/pgfe/copier.hpp b/src/pgfe/copier.hpp index e8f049d..75f4032 100644 --- a/src/pgfe/copier.hpp +++ b/src/pgfe/copier.hpp @@ -25,6 +25,7 @@ #include "data.hpp" #include "dll.hpp" +#include "pq.hpp" #include "response.hpp" #include diff --git a/src/pgfe/pgfe.hpp b/src/pgfe/pgfe.hpp index a2432ac..49c0a36 100644 --- a/src/pgfe/pgfe.hpp +++ b/src/pgfe/pgfe.hpp @@ -47,6 +47,7 @@ #include "notification.hpp" #include "parameterizable.hpp" #include "problem.hpp" +#include "ready_for_query.hpp" #include "response.hpp" #include "row.hpp" #include "row_info.hpp" diff --git a/src/pgfe/prepared_statement.cpp b/src/pgfe/prepared_statement.cpp index 6c0a60b..51cd0ef 100644 --- a/src/pgfe/prepared_statement.cpp +++ b/src/pgfe/prepared_statement.cpp @@ -120,7 +120,7 @@ DMITIGR_PGFE_INLINE void Prepared_statement::execute_nio__(const Sql_string* con std::vector lengths(static_cast(param_count), 0); std::vector formats(static_cast(param_count), 0); - connection_->requests_.push(Connection::Request_id::execute); // can throw + connection_->requests_.emplace(Connection::Request::Id::execute); // can throw try { // Prepare the input for libpq. for (unsigned i{}; i < static_cast(param_count); ++i) { @@ -147,9 +147,8 @@ DMITIGR_PGFE_INLINE void Prepared_statement::execute_nio__(const Sql_string* con if (!send_ok) throw std::runtime_error{connection_->error_message()}; - const auto set_ok = ::PQsetSingleRowMode(connection_->conn()); - if (!set_ok) - throw std::runtime_error{"cannot switch to single-row mode"}; + if (connection_->pipeline_status() == Pipeline_status::disabled) + connection_->set_single_row_mode_enabled(); } catch (...) { connection_->requests_.pop(); // rollback throw; diff --git a/src/pgfe/ready_for_query.cpp b/src/pgfe/ready_for_query.cpp new file mode 100644 index 0000000..bf3e7b8 --- /dev/null +++ b/src/pgfe/ready_for_query.cpp @@ -0,0 +1,57 @@ +// -*- C++ -*- +// Copyright (C) 2022 Dmitry Igrishin +// +// This software is provided 'as-is', without any express or implied +// warranty. In no event will the authors be held liable for any damages +// arising from the use of this software. +// +// Permission is granted to anyone to use this software for any purpose, +// including commercial applications, and to alter it and redistribute it +// freely, subject to the following restrictions: +// +// 1. The origin of this software must not be misrepresented; you must not +// claim that you wrote the original software. If you use this software +// in a product, an acknowledgment in the product documentation would be +// appreciated but is not required. +// 2. Altered source versions must be plainly marked as such, and must not be +// misrepresented as being the original software. +// 3. This notice may not be removed or altered from any source distribution. +// +// Dmitry Igrishin +// dmitigr@gmail.com + +#include "../base/assert.hpp" +#include "ready_for_query.hpp" + +namespace dmitigr::pgfe { + +DMITIGR_PGFE_INLINE +Ready_for_query::Ready_for_query(detail::pq::Result&& pq_result) noexcept + : pq_result_{std::move(pq_result)} +{} + +DMITIGR_PGFE_INLINE +Ready_for_query::Ready_for_query(Ready_for_query&& rhs) noexcept + : pq_result_{std::move(rhs.pq_result_)} +{} + +DMITIGR_PGFE_INLINE Ready_for_query& +Ready_for_query::operator=(Ready_for_query&& rhs) noexcept +{ + Ready_for_query tmp{std::move(rhs)}; + swap(tmp); + return *this; +} + +DMITIGR_PGFE_INLINE void Ready_for_query::swap(Ready_for_query& rhs) noexcept +{ + using std::swap; + swap(pq_result_, rhs.pq_result_); +} + +DMITIGR_PGFE_INLINE bool Ready_for_query::is_valid() const noexcept +{ + return static_cast(pq_result_); +} + +} // namespace dmitigr::pgfe diff --git a/src/pgfe/ready_for_query.hpp b/src/pgfe/ready_for_query.hpp new file mode 100644 index 0000000..66daaa0 --- /dev/null +++ b/src/pgfe/ready_for_query.hpp @@ -0,0 +1,85 @@ +// -*- C++ -*- +// Copyright (C) 2022 Dmitry Igrishin +// +// This software is provided 'as-is', without any express or implied +// warranty. In no event will the authors be held liable for any damages +// arising from the use of this software. +// +// Permission is granted to anyone to use this software for any purpose, +// including commercial applications, and to alter it and redistribute it +// freely, subject to the following restrictions: +// +// 1. The origin of this software must not be misrepresented; you must not +// claim that you wrote the original software. If you use this software +// in a product, an acknowledgment in the product documentation would be +// appreciated but is not required. +// 2. Altered source versions must be plainly marked as such, and must not be +// misrepresented as being the original software. +// 3. This notice may not be removed or altered from any source distribution. +// +// Dmitry Igrishin +// dmitigr@gmail.com + +#ifndef DMITIGR_PGFE_READY_FOR_QUERY_HPP +#define DMITIGR_PGFE_READY_FOR_QUERY_HPP + +#include "dll.hpp" +#include "pq.hpp" +#include "response.hpp" + +namespace dmitigr::pgfe { + +/** + * @ingroup main + * + * @brief An indicator of the server readiness for new requests. + * + * @remarks This response can be only received in pipeline mode. + * + * @see Connection::set_pipeline_enabled(). + */ +class Ready_for_query final : public Response { +public: + /// Constructs invalid instance. + Ready_for_query() noexcept = default; + + /// Non copy-constructible. + Ready_for_query(const Ready_for_query&) = delete; + + /// Move-constructible. + DMITIGR_PGFE_API Ready_for_query(Ready_for_query&& rhs) noexcept; + + /// Non copy-assignable. + Ready_for_query& operator=(const Ready_for_query&) = delete; + + /// Move-assignable. + DMITIGR_PGFE_API Ready_for_query& operator=(Ready_for_query&& rhs) noexcept; + + /// Swaps this instance with `rhs`. + DMITIGR_PGFE_API void swap(Ready_for_query& rhs) noexcept; + + /// @returns `true` if this instance is correctly initialized. + DMITIGR_PGFE_API bool is_valid() const noexcept override; + +private: + friend Connection; + + detail::pq::Result pq_result_; + + /// The constructor. + explicit DMITIGR_PGFE_API Ready_for_query(detail::pq::Result&& pq_result) noexcept; +}; + +/// Ready_for_query is swappable. +inline void swap(Ready_for_query& lhs, Ready_for_query& rhs) noexcept +{ + lhs.swap(rhs); +} + +} // namespace dmitigr::pgfe + +#ifndef DMITIGR_PGFE_NOT_HEADER_ONLY +#include "ready_for_query.cpp" +#endif + +#endif // DMITIGR_PGFE_READY_FOR_QUERY_HPP diff --git a/src/pgfe/response.hpp b/src/pgfe/response.hpp index 7146f55..ef9fcd6 100644 --- a/src/pgfe/response.hpp +++ b/src/pgfe/response.hpp @@ -40,6 +40,7 @@ class Response : public Message { friend Completion; friend Error; friend Prepared_statement; + friend Ready_for_query; friend Row; Response() = default; diff --git a/src/pgfe/types_fwd.hpp b/src/pgfe/types_fwd.hpp index 501fd21..b46f0db 100644 --- a/src/pgfe/types_fwd.hpp +++ b/src/pgfe/types_fwd.hpp @@ -75,6 +75,7 @@ class Notification; class Parameterizable; class Prepared_statement; class Problem; +class Ready_for_query; class Response; class Row; class Row_info; diff --git a/test/pgfe/pgfe-unit-copier.cpp b/test/pgfe/pgfe-unit-copier.cpp index 57c2888..a4f8688 100644 --- a/test/pgfe/pgfe-unit-copier.cpp +++ b/test/pgfe/pgfe-unit-copier.cpp @@ -41,6 +41,7 @@ try { // Test send. conn->execute("copy num from stdin (format csv)"); ASSERT(!conn->is_ready_for_request()); + ASSERT(conn->is_copy_in_progress()); auto copier = conn->copier(); ASSERT(copier); ASSERT(!conn->copier()); @@ -54,10 +55,12 @@ try { conn->wait_response_throw(); ASSERT(conn->completion().operation_name() == "COPY"); ASSERT(conn->is_ready_for_request()); + ASSERT(!conn->is_copy_in_progress()); // Test receive. conn->execute("copy num to stdout (format csv)"); ASSERT(!conn->is_ready_for_request()); + ASSERT(conn->is_copy_in_progress()); copier = conn->copier(); ASSERT(copier); ASSERT(!conn->copier()); @@ -74,6 +77,7 @@ try { conn->wait_response_throw(); ASSERT(conn->completion().operation_name() == "COPY"); ASSERT(conn->is_ready_for_request()); + ASSERT(!conn->is_copy_in_progress()); } catch (const std::exception& e) { std::cerr << e.what() << std::endl; return 1; diff --git a/test/pgfe/pgfe-unit-pipeline.cpp b/test/pgfe/pgfe-unit-pipeline.cpp new file mode 100644 index 0000000..b261859 --- /dev/null +++ b/test/pgfe/pgfe-unit-pipeline.cpp @@ -0,0 +1,202 @@ +// -*- C++ -*- +// Copyright (C) 2022 Dmitry Igrishin +// +// This software is provided 'as-is', without any express or implied +// warranty. In no event will the authors be held liable for any damages +// arising from the use of this software. +// +// Permission is granted to anyone to use this software for any purpose, +// including commercial applications, and to alter it and redistribute it +// freely, subject to the following restrictions: +// +// 1. The origin of this software must not be misrepresented; you must not +// claim that you wrote the original software. If you use this software +// in a product, an acknowledgment in the product documentation would be +// appreciated but is not required. +// 2. Altered source versions must be plainly marked as such, and must not be +// misrepresented as being the original software. +// 3. This notice may not be removed or altered from any source distribution. +// +// Dmitry Igrishin +// dmitigr@gmail.com + +#include "pgfe-unit.hpp" + +#define ASSERT DMITIGR_ASSERT + +int main() +try { + namespace pgfe = dmitigr::pgfe; + using pgfe::Pipeline_status; + using pgfe::to; + + // Prepare. + auto conn = pgfe::test::make_connection(); + conn->connect(); + ASSERT(conn->pipeline_status() == Pipeline_status::disabled); + conn->set_pipeline_enabled(true); + ASSERT(conn->pipeline_status() == Pipeline_status::enabled); + ASSERT(!conn->is_ready_for_request()); + + /* + * Test case 1. + */ + { + ASSERT(conn->is_ready_for_nio_request()); + conn->execute_nio("create temp table num(id integer not null, str text)"); + // + ASSERT(conn->is_ready_for_nio_request()); + conn->execute_nio("insert into num select 1, 'one'"); + // + ASSERT(conn->is_ready_for_nio_request()); + conn->execute_nio("insert into num select 2, 'two'"); + // + ASSERT(conn->is_ready_for_nio_request()); + conn->execute_nio("insert into num select 3, 'three'"); + // + conn->send_sync(); + ASSERT(conn->has_uncompleted_request()); + ASSERT(conn->request_queue_size() == 5); + // Process responses. + conn->wait_response(); + ASSERT(conn->completion().operation_name() == "CREATE TABLE"); + // + conn->wait_response(); + ASSERT(conn->completion().operation_name() == "INSERT"); + // + conn->wait_response(); + ASSERT(conn->completion().operation_name() == "INSERT"); + // + conn->wait_response(); + ASSERT(conn->completion().operation_name() == "INSERT"); + // Wait synchronization point. + conn->wait_response(); + ASSERT(conn->ready_for_query()); + ASSERT(!conn->has_uncompleted_request()); + ASSERT(conn->request_queue_size() == 0); + } + + /* + * Test case 2. + */ + { + ASSERT(conn->is_ready_for_nio_request()); + conn->execute_nio("select * from num"); + // + ASSERT(conn->is_ready_for_nio_request()); + conn->execute_nio("select * from num"); + // + ASSERT(conn->is_ready_for_nio_request()); + conn->execute_nio("select * from num"); + // + conn->send_sync(); + ASSERT(conn->request_queue_size() == 4); + // Process responses. + for (int i{}; i < 3; ++i) { + conn->wait_response(); + auto row = conn->row(); + ASSERT(row); + ASSERT(to(row["id"]) == 1); + ASSERT(to(row["str"]) == "one"); + // + conn->wait_response(); + row = conn->row(); + ASSERT(to(row["id"]) == 2); + ASSERT(to(row["str"]) == "two"); + // + conn->wait_response(); + row = conn->row(); + ASSERT(to(row["id"]) == 3); + ASSERT(to(row["str"]) == "three"); + // + conn->wait_response(); + auto completion = conn->completion(); + ASSERT(completion); + ASSERT(completion.operation_name() == "SELECT"); + } + // Wait synchronization point. + conn->wait_response(); + ASSERT(conn->has_response()); + ASSERT(conn->ready_for_query()); + ASSERT(conn->request_queue_size() == 0); + } + + // Re-enable pipeline. + conn->set_pipeline_enabled(false); + ASSERT(conn->is_ready_for_request()); + ASSERT(conn->is_ready_for_nio_request()); + conn->set_pipeline_enabled(true); + ASSERT(!conn->is_ready_for_request()); + ASSERT(conn->is_ready_for_nio_request()); + + /* + * Test case 3. + */ + { + conn->execute_nio("select 1 id"); + conn->execute_nio("syntax error"); + conn->execute_nio("syntax error"); + conn->execute_nio("syntax error"); + conn->execute_nio("select 3 id"); + ASSERT(!conn->is_ready_for_request()); + ASSERT(conn->is_ready_for_nio_request()); + ASSERT(conn->request_queue_size() == 5); + conn->send_sync(); + // Process responses. + conn->wait_response(); + auto row = conn->row(); + ASSERT(row); + ASSERT(to(row["id"]) == 1); + conn->wait_response(); + auto completion = conn->completion(); + ASSERT(completion); + ASSERT(completion.operation_name() == "SELECT"); + // + conn->wait_response(); + auto err = conn->error(); + ASSERT(err); + ASSERT(conn->pipeline_status() == pgfe::Pipeline_status::aborted); + // + conn->wait_response(); + ASSERT(!conn->ready_for_query()); + // + conn->wait_response(); + ASSERT(!conn->ready_for_query()); + // + conn->wait_response(); + ASSERT(!conn->ready_for_query()); + // + conn->wait_response(); + ASSERT(conn->ready_for_query()); + ASSERT(conn->request_queue_size() == 0); + } + + ASSERT(conn->pipeline_status() == pgfe::Pipeline_status::enabled); + + /* + * Test case 4. + */ + { + conn->execute_nio("select 1 id"); + ASSERT(conn->request_queue_size() == 1); + conn->send_flush(); + // Process responses. + conn->wait_response(); + auto row = conn->row(); + ASSERT(to(row["id"]) == 1); + conn->wait_response(); + auto completion = conn->completion(); + ASSERT(completion); + ASSERT(completion.operation_name() == "SELECT"); + } + + conn->set_pipeline_enabled(false); + ASSERT(conn->is_ready_for_request()); + ASSERT(conn->is_ready_for_nio_request()); +} catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + return 1; +} catch (...) { + std::cerr << "unknown error" << std::endl; + return 2; +}