Skip to content

Commit

Permalink
Implement pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitigr committed Mar 18, 2022
1 parent 0557f4a commit 6b31ae2
Show file tree
Hide file tree
Showing 14 changed files with 626 additions and 81 deletions.
3 changes: 3 additions & 0 deletions cmake/dmitigr_pgfe.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ set(dmitigr_pgfe_headers
pq.hpp
prepared_statement.hpp
problem.hpp
ready_for_query.hpp
response.hpp
row.hpp
row_info.hpp
Expand All @@ -79,6 +80,7 @@ set(dmitigr_pgfe_implementations
misc.cpp
prepared_statement.cpp
problem.cpp
ready_for_query.cpp
row_info.cpp
sql_string.cpp
sql_vector.cpp
Expand Down Expand Up @@ -130,6 +132,7 @@ if(DMITIGR_CPPLIPA_TESTS)
copier
data
hello_world
pipeline
pq_vs_pgfe
ps
row
Expand Down
20 changes: 20 additions & 0 deletions src/pgfe/basics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ enum class Connection_status {
connected = 400
};

// =============================================================================

/**
* @ingroup main
*
Expand Down Expand Up @@ -238,6 +240,24 @@ enum class Transaction_status {

// =============================================================================

/**
* @ingroup main
*
* @brief A pipeline status.
*/
enum class Pipeline_status {
/// Pipeline is disabled.
disabled = 0,

/// Pipeline is enabled.
enabled = 100,

/// Error occurred while processing the pipeline.
aborted = 200
};

// =============================================================================

/**
* @ingroup main
*
Expand Down
152 changes: 108 additions & 44 deletions src/pgfe/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
// Dmitry Igrishin
// dmitigr@gmail.com

#include "../base/assert.hpp"
#include "../net/socket.hpp"
#include "connection.hpp"
#include "copier.hpp"
#include "exceptions.hpp"
#include "large_object.hpp"
#include "ready_for_query.hpp"
#include "sql_string.hpp"

namespace dmitigr::pgfe {
Expand Down Expand Up @@ -269,31 +271,45 @@ DMITIGR_PGFE_INLINE Response_status Connection::handle_input(const bool wait_res
{
assert(is_connected());

const auto check_state = [this]
const auto check_state = [this]() noexcept
{
assert(response_status_ == Response_status::ready);
assert(response_.status() == PGRES_SINGLE_TUPLE);
assert(!requests_.empty());
assert(requests_.front() == Request_id::execute);
assert(requests_.front().id_ == Request::Id::execute);
};

const auto dismiss_request = [this]() noexcept
{
if (!requests_.empty()) {
last_processed_request_id_ = requests_.front();
last_processed_request_ = std::move(requests_.front());
requests_.pop();
}
};

static const auto is_completion_status = [](const auto status) noexcept
{
return status == PGRES_FATAL_ERROR ||
status == PGRES_PIPELINE_ABORTED ||
status == PGRES_PIPELINE_SYNC ||
status == PGRES_COMMAND_OK ||
status == PGRES_TUPLES_OK ||
status == PGRES_EMPTY_QUERY ||
status == PGRES_BAD_RESPONSE;
};

/*
* According to https://www.postgresql.org/docs/current/libpq-pipeline-mode.html,
* "To enter single-row mode, call PQsetSingleRowMode() before retrieving results
* with PQgetResult(). This mode selection is effective only for the query currently
* being processed." Therefore, set_single_row_mode_enabled() is called once for
* each query in a pipeline.
*/
if ((pipeline_status() == Pipeline_status::enabled) &&
!is_single_row_mode_enabled_ && !requests_.empty() &&
requests_.front().id_ == Request::Id::execute)
set_single_row_mode_enabled();

if (wait_response) {
if (response_status_ == Response_status::unready) {
complete_response:
Expand Down Expand Up @@ -361,33 +377,38 @@ DMITIGR_PGFE_INLINE Response_status Connection::handle_input(const bool wait_res
assert(rstatus != PGRES_NONFATAL_ERROR);
assert(rstatus != PGRES_SINGLE_TUPLE);
if (rstatus == PGRES_TUPLES_OK) {
assert(last_processed_request_id_ == Request_id::execute);
assert(last_processed_request_.id_ == Request::Id::execute);
is_single_row_mode_enabled_ = false;
} else if (rstatus == PGRES_COPY_OUT || rstatus == PGRES_COPY_IN) {
is_copy_in_progress_ = true;
} else if (rstatus == PGRES_FATAL_ERROR) {
is_copy_in_progress_ = false;
request_prepared_statement_ = {};
request_prepared_statement_name_.reset();
is_single_row_mode_enabled_ = false;
} else if (rstatus == PGRES_COMMAND_OK) {
assert(last_processed_request_id_ != Request_id::prepare || request_prepared_statement_);
assert(last_processed_request_id_ != Request_id::describe || request_prepared_statement_name_);
assert(last_processed_request_id_ != Request_id::unprepare || request_prepared_statement_name_);
if (last_processed_request_id_ == Request_id::prepare) {
last_prepared_statement_ = register_ps(std::move(request_prepared_statement_));
assert(!request_prepared_statement_);
} else if (last_processed_request_id_ == Request_id::describe) {
last_prepared_statement_ = ps(*request_prepared_statement_name_);
auto& lpr = last_processed_request_;
assert(lpr.id_ != Request::Id::prepare || lpr.prepared_statement_);
assert(lpr.id_ != Request::Id::describe || lpr.prepared_statement_name_);
assert(lpr.id_ != Request::Id::unprepare || lpr.prepared_statement_name_);
if (lpr.id_ == Request::Id::prepare) {
last_prepared_statement_ = register_ps(std::move(lpr.prepared_statement_));
assert(!lpr.prepared_statement_);
} else if (lpr.id_ == Request::Id::describe) {
last_prepared_statement_ = ps(*lpr.prepared_statement_name_);
if (!last_prepared_statement_)
last_prepared_statement_ = register_ps(Prepared_statement{std::move(*request_prepared_statement_name_),
this, static_cast<std::size_t>(response_.field_count())});
last_prepared_statement_ = register_ps(Prepared_statement{
std::move(*lpr.prepared_statement_name_),
this,
static_cast<std::size_t>(response_.field_count())});
last_prepared_statement_->set_description(std::move(response_));
request_prepared_statement_name_.reset();
} else if (last_processed_request_id_ == Request_id::unprepare) {
assert(request_prepared_statement_name_ && !std::strcmp(response_.command_tag(), "DEALLOCATE"));
unregister_ps(*request_prepared_statement_name_);
request_prepared_statement_name_.reset();
lpr.prepared_statement_name_.reset();
} else if (lpr.id_ == Request::Id::unprepare) {
assert(lpr.prepared_statement_name_ &&
!std::strcmp(response_.command_tag(), "DEALLOCATE"));
unregister_ps(*lpr.prepared_statement_name_);
lpr.prepared_statement_name_.reset();
}
is_copy_in_progress_ = false;
is_single_row_mode_enabled_ = false;
}
} else if (response_status_ == Response_status::empty)
dismiss_request(); // just in case
Expand Down Expand Up @@ -504,17 +525,17 @@ DMITIGR_PGFE_INLINE Completion Connection::completion() noexcept
return result;
}
case PGRES_COMMAND_OK:
switch (last_processed_request_id_) {
case Request_id::execute: {
switch (last_processed_request_.id_) {
case Request::Id::execute: {
Completion result{response_.command_tag()};
response_.reset();
return result;
}
case Request_id::prepare:
case Request::Id::prepare:
[[fallthrough]];
case Request_id::describe:
case Request::Id::describe:
return {};
case Request_id::unprepare: {
case Request::Id::unprepare: {
Completion result{"unprepare"};
response_.reset();
return result;
Expand All @@ -532,6 +553,17 @@ DMITIGR_PGFE_INLINE Completion Connection::completion() noexcept
}
}

DMITIGR_PGFE_INLINE Ready_for_query Connection::ready_for_query()
{
return (response_.status() == PGRES_PIPELINE_SYNC)
? Ready_for_query{std::move(response_)} : Ready_for_query{};
}

DMITIGR_PGFE_INLINE std::size_t Connection::request_queue_size() const
{
return requests_.size();
}

DMITIGR_PGFE_INLINE void
Connection::prepare_nio(const Sql_string& statement, const std::string& name)
{
Expand All @@ -550,15 +582,12 @@ Connection::prepare(const Sql_string& statement, const std::string& name)
DMITIGR_PGFE_INLINE void Connection::describe_nio(const std::string& name)
{
assert(is_ready_for_nio_request());
assert(!request_prepared_statement_name_);

requests_.push(Request_id::describe); // can throw
requests_.emplace(Request::Id::describe, name); // can throw
try {
auto name_copy = name;
const int send_ok = ::PQsendDescribePrepared(conn(), name.c_str());
if (!send_ok)
throw std::runtime_error{error_message()};
request_prepared_statement_name_ = std::move(name_copy); // cannot throw
} catch (...) {
requests_.pop(); // rollback
throw;
Expand All @@ -570,18 +599,52 @@ DMITIGR_PGFE_INLINE void Connection::describe_nio(const std::string& name)
DMITIGR_PGFE_INLINE void Connection::unprepare_nio(const std::string& name)
{
assert(!name.empty());
assert(!request_prepared_statement_name_);

auto name_copy = name; // can throw
const auto query = "DEALLOCATE " + to_quoted_identifier(name); // can throw
execute_nio(query); // can throw
assert(requests_.front() == Request_id::execute);
requests_.front() = Request_id::unprepare; // cannot throw
request_prepared_statement_name_ = std::move(name_copy); // cannot throw
assert(requests_.front().id_ == Request::Id::execute);
requests_.front().id_ = Request::Id::unprepare; // cannot throw
requests_.front().prepared_statement_name_ = std::move(name_copy); // cannot throw

assert(is_invariant_ok());
}

DMITIGR_PGFE_INLINE void Connection::set_pipeline_enabled(const bool value)
{
if (value) {
if (!PQenterPipelineMode(conn()))
throw Client_exception{"cannot enable pipeline on connection"};
} else {
if (!PQexitPipelineMode(conn()))
throw Client_exception{error_message()};
}
}

DMITIGR_PGFE_INLINE Pipeline_status Connection::pipeline_status() const
{
switch (PQpipelineStatus(conn())) {
case PQ_PIPELINE_OFF: return Pipeline_status::disabled;
case PQ_PIPELINE_ON: return Pipeline_status::enabled;
case PQ_PIPELINE_ABORTED: return Pipeline_status::aborted;
default: break;
}
DMITIGR_ASSERT(false);
}

DMITIGR_PGFE_INLINE void Connection::send_sync()
{
if (!PQpipelineSync(conn()))
throw Client_exception{"cannot send sync message to the server"};
requests_.emplace(Request::Id::sync);
}

DMITIGR_PGFE_INLINE void Connection::send_flush()
{
if (!PQsendFlushRequest(conn()))
throw Client_exception{"cannot send flush message to the server"};
}

DMITIGR_PGFE_INLINE Oid Connection::create_large_object(const Oid oid) noexcept
{
assert(is_ready_for_request());
Expand Down Expand Up @@ -641,9 +704,7 @@ DMITIGR_PGFE_INLINE bool Connection::is_invariant_ok() const noexcept
(response_status_ == Response_status::empty) &&
named_prepared_statements_.empty() &&
!unnamed_prepared_statement_ &&
requests_.empty() &&
!request_prepared_statement_ &&
!request_prepared_statement_name_;
requests_.empty();
const bool session_data_ok = session_data_empty || (status() == Status::failure) || (status() == Status::connected);
const bool trans_ok = !is_connected() || transaction_status();
const bool sess_time_ok = !is_connected() || session_start_time();
Expand Down Expand Up @@ -685,8 +746,13 @@ DMITIGR_PGFE_INLINE void Connection::reset_session() noexcept
unnamed_prepared_statement_ = {};

requests_ = {};
request_prepared_statement_ = {};
request_prepared_statement_name_.reset();
}

DMITIGR_PGFE_INLINE void Connection::set_single_row_mode_enabled()
{
const auto set_ok = ::PQsetSingleRowMode(conn());
DMITIGR_ASSERT(set_ok);
is_single_row_mode_enabled_ = true;
}

DMITIGR_PGFE_INLINE void Connection::notice_receiver(void* const arg, const ::PGresult* const r) noexcept
Expand Down Expand Up @@ -716,17 +782,15 @@ Connection::prepare_nio__(const char* const query, const char* const name, const
assert(query);
assert(name);
assert(is_ready_for_nio_request());
assert(!request_prepared_statement_);

requests_.push(Request_id::prepare); // can throw
requests_.emplace(Request::Id::prepare);
try {
Prepared_statement ps{name, this, preparsed};
constexpr int n_params{0};
requests_.front().prepared_statement_ = {name, this, preparsed};
constexpr int n_params{};
constexpr const ::Oid* const param_types{};
const int send_ok = ::PQsendPrepare(conn(), name, query, n_params, param_types);
if (!send_ok)
throw std::runtime_error{error_message()};
request_prepared_statement_ = std::move(ps); // cannot throw
} catch (...) {
requests_.pop(); // rollback
throw;
Expand Down
Loading

0 comments on commit 6b31ae2

Please sign in to comment.