Skip to content

Commit

Permalink
Merge pull request ceph#54525 from myoungwon/wip-delta-overwrite-inpl…
Browse files Browse the repository at this point in the history
…ace-replay

crimson/os/seastore:  introduce inplace rewrite for RBM

Reviewed-by: Yingxin Cheng <yingxin.cheng@intel.com>
  • Loading branch information
cyx1231st authored Jan 8, 2024
2 parents 930ed08 + 72c9d6d commit 60c4b64
Show file tree
Hide file tree
Showing 20 changed files with 314 additions and 55 deletions.
2 changes: 1 addition & 1 deletion src/crimson/os/seastore/async_cleaner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ JournalTrimmerImpl::config_t::get_test(
max_journal_bytes = 4 * roll_size;
} else {
assert(type == journal_type_t::RANDOM_BLOCK);
target_dirty_bytes = roll_size / 4;
target_dirty_bytes = roll_size / 36;
target_alloc_bytes = roll_size / 4;
max_journal_bytes = roll_size / 2;
}
Expand Down
77 changes: 63 additions & 14 deletions src/crimson/os/seastore/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,20 @@ record_t Cache::prepare_record(
if (!i->is_exist_mutation_pending()) {
DEBUGT("commit replace extent ... -- {}, prior={}",
t, *i, *i->prior_instance);
// If inplace rewrite occurs during mutation, prev->version will
// be zero. Although this results in the version mismatch here, we can
// correct this by changing version to 1. This is because the inplace rewrite
// does not introduce any actual modification that could negatively
// impact system reliability
if (i->prior_instance->version == 0 && i->version > 1) {
assert(can_inplace_rewrite(i->get_type()));
assert(can_inplace_rewrite(i->prior_instance->get_type()));
assert(i->prior_instance->dirty_from_or_retired_at == JOURNAL_SEQ_MIN);
assert(i->prior_instance->state == CachedExtent::extent_state_t::CLEAN);
assert(i->prior_instance->get_paddr().get_addr_type() ==
paddr_types_t::RANDOM_BLOCK);
i->version = 1;
}
// extent with EXIST_MUTATION_PENDING doesn't have
// prior_instance field so skip these extents.
// the existing extents should be added into Cache
Expand Down Expand Up @@ -1261,6 +1275,24 @@ record_t Cache::prepare_record(
}
}

for (auto &i: t.written_inplace_ool_block_list) {
if (!i->is_valid()) {
continue;
}
assert(i->state == CachedExtent::extent_state_t::DIRTY);
assert(i->version > 0);
remove_from_dirty(i);
// set the version to zero because the extent state is now clean
// in order to handle this transparently
i->version = 0;
i->dirty_from_or_retired_at = JOURNAL_SEQ_MIN;
i->state = CachedExtent::extent_state_t::CLEAN;
assert(i->is_logical());
i->clear_modified_region();
touch_extent(*i);
DEBUGT("inplace rewrite ool block is commmitted -- {}", t, *i);
}

for (auto &i: t.existing_block_list) {
if (i->is_valid()) {
alloc_delta.alloc_blk_ranges.emplace_back(
Expand Down Expand Up @@ -1330,7 +1362,8 @@ record_t Cache::prepare_record(
t.num_allocated_invalid_extents);

auto& ool_stats = t.get_ool_write_stats();
ceph_assert(ool_stats.extents.num == t.written_ool_block_list.size());
ceph_assert(ool_stats.extents.num == t.written_ool_block_list.size() +
t.written_inplace_ool_block_list.size());

if (record.is_empty()) {
SUBINFOT(seastore_t,
Expand Down Expand Up @@ -1699,22 +1732,25 @@ Cache::replay_delta(
segment_seq_printer_t{delta_paddr_segment_seq},
delta_paddr_segment_type,
delta);
return replay_delta_ertr::make_ready_future<bool>(false);
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
std::make_pair(false, nullptr));
}
}
}

if (delta.type == extent_types_t::JOURNAL_TAIL) {
// this delta should have been dealt with during segment cleaner mounting
return replay_delta_ertr::make_ready_future<bool>(false);
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
std::make_pair(false, nullptr));
}

// replay alloc
if (delta.type == extent_types_t::ALLOC_INFO) {
if (journal_seq < alloc_tail) {
DEBUG("journal_seq {} < alloc_tail {}, don't replay {}",
journal_seq, alloc_tail, delta);
return replay_delta_ertr::make_ready_future<bool>(false);
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
std::make_pair(false, nullptr));
}

alloc_delta_t alloc_delta;
Expand All @@ -1738,14 +1774,16 @@ Cache::replay_delta(
if (!backref_list.empty()) {
backref_batch_update(std::move(backref_list), journal_seq);
}
return replay_delta_ertr::make_ready_future<bool>(true);
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
std::make_pair(true, nullptr));
}

// replay dirty
if (journal_seq < dirty_tail) {
DEBUG("journal_seq {} < dirty_tail {}, don't replay {}",
journal_seq, dirty_tail, delta);
return replay_delta_ertr::make_ready_future<bool>(false);
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
std::make_pair(false, nullptr));
}

if (delta.type == extent_types_t::ROOT) {
Expand All @@ -1759,7 +1797,8 @@ Cache::replay_delta(
journal_seq, record_base, delta, *root);
root->set_modify_time(modify_time);
add_extent(root);
return replay_delta_ertr::make_ready_future<bool>(true);
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
std::make_pair(true, root));
} else {
auto _get_extent_if_cached = [this](paddr_t addr)
-> get_extent_ertr::future<CachedExtentRef> {
Expand Down Expand Up @@ -1799,17 +1838,26 @@ Cache::replay_delta(
DEBUG("replay extent is not present, so delta is obsolete at {} {} -- {}",
journal_seq, record_base, delta);
assert(delta.pversion > 0);
return replay_delta_ertr::make_ready_future<bool>(true);
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
std::make_pair(false, nullptr));
}

DEBUG("replay extent delta at {} {} ... -- {}, prv_extent={}",
journal_seq, record_base, delta, *extent);

assert(extent->last_committed_crc == delta.prev_crc);
assert(extent->version == delta.pversion);
extent->apply_delta_and_adjust_crc(record_base, delta.bl);
extent->set_modify_time(modify_time);
assert(extent->last_committed_crc == delta.final_crc);
if (delta.paddr.get_addr_type() == paddr_types_t::SEGMENT ||
!can_inplace_rewrite(delta.type)) {
ceph_assert_always(extent->last_committed_crc == delta.prev_crc);
assert(extent->version == delta.pversion);
extent->apply_delta_and_adjust_crc(record_base, delta.bl);
extent->set_modify_time(modify_time);
ceph_assert_always(extent->last_committed_crc == delta.final_crc);
} else {
assert(delta.paddr.get_addr_type() == paddr_types_t::RANDOM_BLOCK);
extent->apply_delta_and_adjust_crc(record_base, delta.bl);
extent->set_modify_time(modify_time);
// crc will be checked after journal replay is done
}

extent->version++;
if (extent->version == 1) {
Expand All @@ -1821,7 +1869,8 @@ Cache::replay_delta(
journal_seq, record_base, delta, *extent);
}
mark_dirty(extent);
return replay_delta_ertr::make_ready_future<bool>(true);
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
std::make_pair(true, extent));
});
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/crimson/os/seastore/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,8 @@ class Cache {
*/
using replay_delta_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
using replay_delta_ret = replay_delta_ertr::future<bool>;
using replay_delta_ret = replay_delta_ertr::future<
std::pair<bool, CachedExtentRef>>;
replay_delta_ret replay_delta(
journal_seq_t seq,
paddr_t record_block_base,
Expand Down
19 changes: 19 additions & 0 deletions src/crimson/os/seastore/cached_extent.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ class CachedExtent
rewrite_generation = gen;
}

void set_inplace_rewrite_generation() {
user_hint = placement_hint_t::REWRITE;
rewrite_generation = OOL_GENERATION;
}

bool is_inline() const {
return poffset.is_relative();
}
Expand All @@ -606,6 +611,10 @@ class CachedExtent
return prior_instance;
}

uint32_t get_last_committed_crc() const {
return last_committed_crc;
}

private:
template <typename T>
friend class read_set_item_t;
Expand Down Expand Up @@ -1237,6 +1246,16 @@ class LogicalCachedExtent : public ChildableCachedExtent {

void on_replace_prior(Transaction &t) final;

struct modified_region_t {
extent_len_t offset;
extent_len_t len;
};
virtual std::optional<modified_region_t> get_modified_region() {
return std::nullopt;
}

virtual void clear_modified_region() {}

virtual ~LogicalCachedExtent();
protected:

Expand Down
36 changes: 28 additions & 8 deletions src/crimson/os/seastore/extent_placement_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -790,16 +790,36 @@ RandomBlockOolWriter::do_write(
stats.num_records += 1;

ex->prepare_write();
return rbm->write(paddr,
ex->get_bptr()
).handle_error(
alloc_write_iertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error when writing record"}
).safe_then([&t, &ex, paddr, FNAME]() {
extent_len_t offset = 0;
bufferptr bp;
if (can_inplace_rewrite(t, ex)) {
auto r = ex->get_modified_region();
ceph_assert(r.has_value());
offset = p2align(r->offset, rbm->get_block_size());
extent_len_t len =
p2roundup(r->offset + r->len, rbm->get_block_size()) - offset;
bp = ceph::bufferptr(ex->get_bptr(), offset, len);
} else {
bp = ex->get_bptr();
}
return trans_intr::make_interruptible(
rbm->write(paddr + offset,
bp
).handle_error(
alloc_write_iertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error when writing record"}
)
).si_then([this, &t, &ex, paddr, FNAME] {
TRACET("ool extent written at {} -- {}",
t, paddr, *ex);
t.mark_allocated_extent_ool(ex);
if (ex->is_initial_pending()) {
t.mark_allocated_extent_ool(ex);
} else if (can_inplace_rewrite(t, ex)) {
t.mark_inplace_rewrite_extent_ool(ex);
} else {
ceph_assert("impossible");
}
return alloc_write_iertr::now();
});
});
Expand Down
27 changes: 27 additions & 0 deletions src/crimson/os/seastore/extent_placement_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class ExtentOolWriter {

using close_ertr = base_ertr;
virtual close_ertr::future<> close() = 0;

virtual bool can_inplace_rewrite(Transaction& t,
CachedExtentRef extent) = 0;
};
using ExtentOolWriterRef = std::unique_ptr<ExtentOolWriter>;

Expand Down Expand Up @@ -79,6 +82,11 @@ class SegmentedOolWriter : public ExtentOolWriter {
return make_delayed_temp_paddr(0);
}

bool can_inplace_rewrite(Transaction& t,
CachedExtentRef extent) final {
return false;
}

private:
alloc_write_iertr::future<> do_write(
Transaction& t,
Expand Down Expand Up @@ -122,6 +130,17 @@ class RandomBlockOolWriter : public ExtentOolWriter {
return rb_cleaner->alloc_paddr(length);
}

bool can_inplace_rewrite(Transaction& t,
CachedExtentRef extent) final {
if (!extent->is_dirty()) {
return false;
}
assert(t.get_src() == transaction_type_t::TRIM_DIRTY);
ceph_assert_always(extent->get_type() == extent_types_t::ROOT ||
extent->get_paddr().is_absolute());
return crimson::os::seastore::can_inplace_rewrite(extent->get_type());
}

private:
alloc_write_iertr::future<> do_write(
Transaction& t,
Expand Down Expand Up @@ -199,6 +218,14 @@ class ExtentPlacementManager {
background_process.set_extent_callback(cb);
}

bool can_inplace_rewrite(Transaction& t, CachedExtentRef extent) {
auto writer = get_writer(placement_hint_t::REWRITE,
get_extent_category(extent->get_type()),
OOL_GENERATION);
ceph_assert(writer);
return writer->can_inplace_rewrite(t, extent);
}

journal_type_t get_journal_type() const {
return background_process.get_journal_type();
}
Expand Down
3 changes: 2 additions & 1 deletion src/crimson/os/seastore/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/os/seastore/segment_seq_allocator.h"
#include "crimson/os/seastore/cached_extent.h"

namespace crimson::os::seastore {

Expand Down Expand Up @@ -88,7 +89,7 @@ class Journal {
crimson::ct_error::erange>;
using replay_ret = replay_ertr::future<>;
using delta_handler_t = std::function<
replay_ertr::future<bool>(
replay_ertr::future<std::pair<bool, CachedExtentRef>>(
const record_locator_t&,
const delta_info_t&,
const journal_seq_t&, // dirty_tail
Expand Down
26 changes: 21 additions & 5 deletions src/crimson/os/seastore/journal/circular_bounded_journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ Journal::replay_ret CircularBoundedJournal::replay(
return seastar::do_with(
std::move(delta_handler),
std::map<paddr_t, journal_seq_t>(),
[this](auto &d_handler, auto &map) {
std::map<paddr_t, std::pair<CachedExtentRef, uint32_t>>(),
[this](auto &d_handler, auto &map, auto &crc_info) {
auto build_paddr_seq_map = [&map](
const auto &offsets,
const auto &e,
Expand All @@ -339,8 +340,8 @@ Journal::replay_ret CircularBoundedJournal::replay(
// The first pass to build the paddr->journal_seq_t map
// from extent allocations
return scan_valid_record_delta(std::move(build_paddr_seq_map), tail
).safe_then([this, &map, &d_handler, tail]() {
auto call_d_handler_if_valid = [this, &map, &d_handler](
).safe_then([this, &map, &d_handler, tail, &crc_info]() {
auto call_d_handler_if_valid = [this, &map, &d_handler, &crc_info](
const auto &offsets,
const auto &e,
sea_time_point modify_time)
Expand All @@ -353,12 +354,27 @@ Journal::replay_ret CircularBoundedJournal::replay(
get_dirty_tail(),
get_alloc_tail(),
modify_time
);
).safe_then([&e, &crc_info](auto ret) {
auto [applied, ext] = ret;
if (applied && ext && can_inplace_rewrite(
ext->get_type())) {
crc_info[ext->get_paddr()] =
std::make_pair(ext, e.final_crc);
}
return replay_ertr::make_ready_future<bool>(applied);
});
}
return replay_ertr::make_ready_future<bool>(true);
};
// The second pass to replay deltas
return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail);
return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail
).safe_then([&crc_info]() {
for (auto p : crc_info) {
ceph_assert_always(p.second.first->get_last_committed_crc() == p.second.second);
}
crc_info.clear();
return replay_ertr::now();
});
});
}).safe_then([this]() {
// make sure that committed_to is JOURNAL_SEQ_NULL if jounal is the initial state
Expand Down
3 changes: 2 additions & 1 deletion src/crimson/os/seastore/journal/segmented_journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ SegmentedJournal::replay_segment(
trimmer.get_dirty_tail(),
trimmer.get_alloc_tail(),
modify_time
).safe_then([&stats, delta_type=delta.type](bool is_applied) {
).safe_then([&stats, delta_type=delta.type](auto ret) {
auto [is_applied, ext] = ret;
if (is_applied) {
// see Cache::replay_delta()
assert(delta_type != extent_types_t::JOURNAL_TAIL);
Expand Down
Loading

0 comments on commit 60c4b64

Please sign in to comment.