Skip to content

Commit

Permalink
crimson/osd: optimize crimson-osd's client requests process parallelism
Browse files Browse the repository at this point in the history
Make client requests go to the concurrent pipeline stage "wait_repop" once they
are "submitted" to the underlying objectstore, which means their on-disk order
is guaranteed, so that successive client requests can go into the "process"
pipeline stage.

Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
  • Loading branch information
xxhdx1985126 committed May 16, 2021
1 parent a0eaf67 commit f7181ab
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 116 deletions.
5 changes: 3 additions & 2 deletions src/crimson/osd/ec_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ECBackend::_read(const hobject_t& hoid,
return seastar::make_ready_future<bufferlist>();
}

ECBackend::interruptible_future<crimson::osd::acked_peers_t>
ECBackend::rep_op_fut_t
ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
Expand All @@ -31,5 +31,6 @@ ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
std::vector<pg_log_entry_t>&& log_entries)
{
// todo
return seastar::make_ready_future<crimson::osd::acked_peers_t>();
return {seastar::now(),
seastar::make_ready_future<crimson::osd::acked_peers_t>()};
}
2 changes: 1 addition & 1 deletion src/crimson/osd/ec_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ECBackend : public PGBackend
private:
ll_read_ierrorator::future<ceph::bufferlist>
_read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override;
interruptible_future<crimson::osd::acked_peers_t>
rep_op_fut_t
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
Expand Down
36 changes: 25 additions & 11 deletions src/crimson/osd/ops_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,12 @@ class OpsExecuter {
interruptible_errorated_future<osd_op_errorator>
execute_op(OSDOp& osd_op);

using rep_op_fut_tuple =
std::tuple<interruptible_future<>, osd_op_ierrorator::future<>>;
using rep_op_fut_t =
interruptible_future<rep_op_fut_tuple>;
template <typename MutFunc>
osd_op_ierrorator::future<> flush_changes_n_do_ops_effects(
rep_op_fut_t flush_changes_n_do_ops_effects(
Ref<PG> pg,
MutFunc&& mut_func) &&;

Expand Down Expand Up @@ -325,32 +329,42 @@ auto OpsExecuter::with_effect_on_obc(
}

template <typename MutFunc>
OpsExecuter::osd_op_ierrorator::future<>
OpsExecuter::rep_op_fut_t
OpsExecuter::flush_changes_n_do_ops_effects(Ref<PG> pg, MutFunc&& mut_func) &&
{
const bool want_mutate = !txn.empty();
// osd_op_params are instantiated by every wr-like operation.
assert(osd_op_params || !want_mutate);
assert(obc);
auto maybe_mutated = interruptor::make_interruptible(osd_op_errorator::now());
rep_op_fut_t maybe_mutated =
interruptor::make_ready_future<rep_op_fut_tuple>(
seastar::now(),
interruptor::make_interruptible(osd_op_errorator::now()));
if (want_mutate) {
osd_op_params->req_id = msg->get_reqid();
osd_op_params->mtime = msg->get_mtime();
maybe_mutated = std::forward<MutFunc>(mut_func)(std::move(txn),
auto [submitted, all_completed] = std::forward<MutFunc>(mut_func)(std::move(txn),
std::move(obc),
std::move(*osd_op_params),
user_modify);
maybe_mutated = interruptor::make_ready_future<rep_op_fut_tuple>(
std::move(submitted),
osd_op_ierrorator::future<>(std::move(all_completed)));
}
if (__builtin_expect(op_effects.empty(), true)) {
return maybe_mutated;
} else {
return maybe_mutated.safe_then_interruptible([pg=std::move(pg),
this] () mutable {
// let's do the cleaning of `op_effects` in destructor
return interruptor::do_for_each(op_effects,
[pg=std::move(pg)] (auto& op_effect) {
return op_effect->execute(pg);
});
return maybe_mutated.then_unpack_interruptible(
[this, pg=std::move(pg)](auto&& submitted, auto&& all_completed) mutable {
return interruptor::make_ready_future<rep_op_fut_tuple>(
std::move(submitted),
all_completed.safe_then_interruptible([this, pg=std::move(pg)] {
// let's do the cleaning of `op_effects` in destructor
return interruptor::do_for_each(op_effects,
[pg=std::move(pg)](auto& op_effect) {
return op_effect->execute(pg);
});
}));
});
}
}
Expand Down
22 changes: 19 additions & 3 deletions src/crimson/osd/osd_operations/client_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,25 @@ ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
return conn->send(std::move(reply));
}
}
return pg->do_osd_ops(m, obc, op_info).safe_then_interruptible(
[this](Ref<MOSDOpReply> reply) -> interruptible_future<> {
return conn->send(std::move(reply));
return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
[this, pg](auto submitted, auto all_completed) mutable {
return submitted.then_interruptible(
[this, pg] {
return with_blocking_future_interruptible<IOInterruptCondition>(
handle.enter(pp(*pg).wait_repop));
}).then_interruptible(
[this, pg, all_completed=std::move(all_completed)]() mutable {
return all_completed.safe_then_interruptible(
[this, pg](Ref<MOSDOpReply> reply) {
return with_blocking_future_interruptible<IOInterruptCondition>(
handle.enter(pp(*pg).send_reply)).then_interruptible(
[this, reply=std::move(reply)] {
return conn->send(std::move(reply));
});
}, crimson::ct_error::eagain::handle([this, pg]() mutable {
return process_op(pg);
}));
});
}, crimson::ct_error::eagain::handle([this, pg]() mutable {
return process_op(pg);
}));
Expand Down
9 changes: 6 additions & 3 deletions src/crimson/osd/osd_operations/internal_client_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ seastar::future<> InternalClientRequest::start()
[] (const std::error_code& e) {
return PG::do_osd_ops_iertr::now();
}
).safe_then_interruptible(
[] {
return interruptor::now();
).safe_then_unpack_interruptible(
[](auto submitted, auto all_completed) {
return all_completed.handle_error_interruptible(
crimson::ct_error::eagain::handle([] {
return seastar::now();
}));
}, crimson::ct_error::eagain::handle([] {
return interruptor::now();
})
Expand Down
92 changes: 58 additions & 34 deletions src/crimson/osd/pg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -566,16 +566,19 @@ seastar::future<> PG::WaitForActiveBlocker::stop()
return seastar::now();
}

PG::interruptible_future<> PG::submit_transaction(
std::tuple<PG::interruptible_future<>,
PG::interruptible_future<>>
PG::submit_transaction(
const OpInfo& op_info,
const std::vector<OSDOp>& ops,
ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p)
{
if (__builtin_expect(stopping, false)) {
return seastar::make_exception_future<>(
crimson::common::system_shutdown_exception());
return {seastar::make_exception_future<>(
crimson::common::system_shutdown_exception()),
seastar::now()};
}

epoch_t map_epoch = get_osdmap_epoch();
Expand Down Expand Up @@ -603,13 +606,15 @@ PG::interruptible_future<> PG::submit_transaction(
peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
txn, true, false);

return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
std::move(obc),
std::move(txn),
std::move(osd_op_p),
peering_state.get_last_peering_reset(),
map_epoch,
std::move(log_entries)).then_interruptible(
auto [submitted, all_completed] = backend->mutate_object(
peering_state.get_acting_recovery_backfill(),
std::move(obc),
std::move(txn),
std::move(osd_op_p),
peering_state.get_last_peering_reset(),
map_epoch,
std::move(log_entries));
return std::make_tuple(std::move(submitted), all_completed.then_interruptible(
[this, last_complete=peering_state.get_info().last_complete,
at_version=osd_op_p.at_version](auto acked) {
for (const auto& peer : acked) {
Expand All @@ -618,7 +623,7 @@ PG::interruptible_future<> PG::submit_transaction(
}
peering_state.complete_write(at_version, last_complete);
return seastar::now();
});
}));
}

void PG::fill_op_params_bump_pg_version(
Expand Down Expand Up @@ -697,7 +702,8 @@ PG::interruptible_future<> PG::repair_object(
}

template <class Ret, class SuccessFunc, class FailureFunc>
PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
PG::do_osd_ops_execute(
OpsExecuter&& ox,
std::vector<OSDOp> ops,
const OpInfo &op_info,
Expand All @@ -708,6 +714,7 @@ PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
return reload_obc(obc).handle_error_interruptible(
load_obc_ertr::assert_all{"can't live with object state messed up"});
});
auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
return interruptor::do_for_each(ops, [&ox](OSDOp& osd_op) {
logger().debug(
"do_osd_ops_execute: object {} - handling op {}",
Expand Down Expand Up @@ -735,31 +742,48 @@ PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
std::move(txn),
std::move(osd_op_p));
});
}).safe_then_interruptible_tuple([success_func=std::move(success_func)] {
return std::move(success_func)();
}, crimson::ct_error::object_corrupted::handle(
[rollbacker, this] (const std::error_code& e) mutable {
// this is a path for EIO. it's special because we want to fix the obejct
// and try again. that is, the layer above `PG::do_osd_ops` is supposed to
// restart the execution.
return rollbacker.rollback_obc_if_modified(e).then_interruptible(
[obc=rollbacker.get_obc(), this] {
return repair_object(obc->obs.oi.soid,
obc->obs.oi.version).then_interruptible([] {
return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
});
});
}), OpsExecuter::osd_op_errorator::all_same_way(
[rollbacker, failure_func=std::move(failure_func)]
}).safe_then_unpack_interruptible(
[success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
(auto submitted_fut, auto all_completed_fut) mutable {
return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
std::move(submitted_fut),
all_completed_fut.safe_then_interruptible_tuple(
std::move(success_func),
crimson::ct_error::object_corrupted::handle(
[rollbacker, this] (const std::error_code& e) mutable {
// this is a path for EIO. it's special because we want to fix the obejct
// and try again. that is, the layer above `PG::do_osd_ops` is supposed to
// restart the execution.
return rollbacker.rollback_obc_if_modified(e).then_interruptible(
[obc=rollbacker.get_obc(), this] {
return repair_object(obc->obs.oi.soid,
obc->obs.oi.version).then_interruptible([] {
return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
});
});
}), OpsExecuter::osd_op_errorator::all_same_way(
[rollbacker, failure_func_ptr]
(const std::error_code& e) mutable {
return rollbacker.rollback_obc_if_modified(e).then_interruptible(
[&e, failure_func_ptr] {
return (*failure_func_ptr)(e);
});
})
)
);
}, OpsExecuter::osd_op_errorator::all_same_way(
[rollbacker, failure_func_ptr]
(const std::error_code& e) mutable {
return rollbacker.rollback_obc_if_modified(e).then_interruptible(
[&e, failure_func=std::move(failure_func)] {
return std::move(failure_func)(e);
});
return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
seastar::now(),
rollbacker.rollback_obc_if_modified(e).then_interruptible(
[&e, failure_func_ptr] {
return (*failure_func_ptr)(e);
}));
}));
}

PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>>
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ref<MOSDOpReply>>>
PG::do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc,
Expand Down Expand Up @@ -803,7 +827,7 @@ PG::do_osd_ops(
).finally([ox_deleter=std::move(ox)] {});
}

PG::do_osd_ops_iertr::future<>
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
PG::do_osd_ops(
ObjectContextRef obc,
std::vector<OSDOp> ops,
Expand Down
13 changes: 9 additions & 4 deletions src/crimson/osd/pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,11 @@ class PG : public boost::intrusive_ref_counter<
::crimson::interruptible::interruptible_errorator<
::crimson::osd::IOInterruptCondition,
::crimson::errorator<crimson::ct_error::eagain>>;
do_osd_ops_iertr::future<Ref<MOSDOpReply>> do_osd_ops(
template <typename Ret = void>
using pg_rep_op_fut_t =
std::tuple<interruptible_future<>,
do_osd_ops_iertr::future<Ret>>;
do_osd_ops_iertr::future<pg_rep_op_fut_t<Ref<MOSDOpReply>>> do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc,
const OpInfo &op_info);
Expand All @@ -582,22 +586,23 @@ class PG : public boost::intrusive_ref_counter<
using do_osd_ops_failure_func_t =
std::function<do_osd_ops_iertr::future<>(const std::error_code&)>;
struct do_osd_ops_params_t;
do_osd_ops_iertr::future<> do_osd_ops(
do_osd_ops_iertr::future<pg_rep_op_fut_t<>> do_osd_ops(
ObjectContextRef obc,
std::vector<OSDOp> ops,
const OpInfo &op_info,
const do_osd_ops_params_t& params,
do_osd_ops_success_func_t success_func,
do_osd_ops_failure_func_t failure_func);
template <class Ret, class SuccessFunc, class FailureFunc>
do_osd_ops_iertr::future<Ret> do_osd_ops_execute(
do_osd_ops_iertr::future<pg_rep_op_fut_t<Ret>> do_osd_ops_execute(
OpsExecuter&& ox,
std::vector<OSDOp> ops,
const OpInfo &op_info,
SuccessFunc&& success_func,
FailureFunc&& failure_func);
interruptible_future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
interruptible_future<> submit_transaction(
std::tuple<interruptible_future<>, interruptible_future<>>
submit_transaction(
const OpInfo& op_info,
const std::vector<OSDOp>& ops,
ObjectContextRef&& obc,
Expand Down
2 changes: 1 addition & 1 deletion src/crimson/osd/pg_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ PGBackend::load_metadata(const hobject_t& oid)
}));
}

PGBackend::interruptible_future<crimson::osd::acked_peers_t>
PGBackend::rep_op_fut_t
PGBackend::mutate_object(
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
Expand Down
7 changes: 5 additions & 2 deletions src/crimson/osd/pg_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class PGBackend
using interruptible_future =
::crimson::interruptible::interruptible_future<
::crimson::osd::IOInterruptCondition, T>;
using rep_op_fut_t =
std::tuple<interruptible_future<>,
interruptible_future<crimson::osd::acked_peers_t>>;
PGBackend(shard_id_t shard, CollectionRef coll, crimson::os::FuturizedStore* store);
virtual ~PGBackend() = default;
static std::unique_ptr<PGBackend> create(pg_t pgid,
Expand Down Expand Up @@ -158,7 +161,7 @@ class PGBackend
const OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
interruptible_future<crimson::osd::acked_peers_t> mutate_object(
rep_op_fut_t mutate_object(
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
ceph::os::Transaction&& txn,
Expand Down Expand Up @@ -279,7 +282,7 @@ class PGBackend
uint32_t flags) = 0;

bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn);
virtual interruptible_future<crimson::osd::acked_peers_t>
virtual rep_op_fut_t
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
Expand Down
6 changes: 2 additions & 4 deletions src/crimson/osd/recovery_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ RecoveryBackend::handle_backfill_progress(
m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
t);
return shard_services.get_store().do_transaction(
pg.get_collection_ref(), std::move(t)
).or_terminate();
pg.get_collection_ref(), std::move(t)).or_terminate();
}

RecoveryBackend::interruptible_future<>
Expand Down Expand Up @@ -153,8 +152,7 @@ RecoveryBackend::handle_backfill_remove(
ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
}
return shard_services.get_store().do_transaction(
pg.get_collection_ref(), std::move(t)
).or_terminate();
pg.get_collection_ref(), std::move(t)).or_terminate();
}

RecoveryBackend::interruptible_future<BackfillInterval>
Expand Down
Loading

0 comments on commit f7181ab

Please sign in to comment.