Skip to content

Commit

Permalink
Add log for current ReadIndex mechanism (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo authored Apr 19, 2024
1 parent 79da4a9 commit 5eae36f
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
}
}

pub fn get_router(&self, node_id: u64) -> Option<RaftRouter<TiFlashEngine, ProxyRaftEngine>> {
self.sim.rl().get_router(node_id)
}

fn valid_leader_id(&self, region_id: u64, leader_id: u64) -> bool {
let store_ids = match self.voter_store_ids_of_region(region_id) {
None => return false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ unsafe extern "C" fn ffi_release_pre_handled_snapshot(
pub fn gen_engine_store_server_helper(
wrap: Pin<&EngineStoreServerWrap>,
) -> EngineStoreServerHelper {
info!("mock gen_engine_store_server_helper");
EngineStoreServerHelper {
magic_number: interfaces_ffi::RAFT_STORE_PROXY_MAGIC_NUMBER,
version: interfaces_ffi::RAFT_STORE_PROXY_VERSION,
Expand Down
2 changes: 1 addition & 1 deletion proxy_components/proxy_ffi/src/read_index_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn into_read_index_response<S: engine_traits::Snapshot>(
resp
}

fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest {
pub fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest {
let region_id = req.get_context().get_region_id();
let mut cmd = RaftCmdRequest::default();
{
Expand Down
284 changes: 284 additions & 0 deletions proxy_tests/proxy/shared/replica_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,287 @@ fn test_util() {
}
assert!(GC_MONITOR.valid_clean());
}

use kvproto::{
kvrpcpb::{Context, DiskFullOpt, KeyRange},
raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request as RaftRequest},
raft_serverpb::RaftMessage,
};
use raftstore::{
router::RaftStoreRouter,
store::{msg::Callback, RaftCmdExtraOpts, ReadIndexContext},
};
use tokio::sync::oneshot;
use txn_types::{Key, Lock, LockType, TimeStamp};
use uuid::Uuid;

use crate::utils::v1_server::{new_server_cluster, ChannelBuilder, Environment, TikvClient};

// https://github.com/tikv/tikv/issues/16823
#[test]
fn test_raft_cmd_request_cant_advanve_max_ts() {
use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse};

let mut cluster = new_server_cluster(0, 1);
cluster.run();

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);

let region = cluster.get_region(b"");
let leader = region.get_peers()[0].clone();
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

let mut ctx = Context::default();
let region_id = leader.get_id();
ctx.set_region_id(leader.get_id());
ctx.set_region_epoch(region.get_region_epoch().clone());
ctx.set_peer(leader);

let read_index = |ranges: &[(&[u8], &[u8])]| {
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();

let mut cmd = RaftCmdRequest::default();
{
let mut header = RaftRequestHeader::default();
let mut inner_req = RaftRequest::default();
inner_req.set_cmd_type(CmdType::ReadIndex);
inner_req
.mut_read_index()
.set_start_ts(start_ts.into_inner());

let mut req = ReadIndexRequest::default();
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
req.set_context(ctx.clone());
req.set_start_ts(start_ts.into_inner());
for &(start_key, end_key) in ranges {
let mut range = KeyRange::default();
range.set_start_key(start_key.to_vec());
range.set_end_key(end_key.to_vec());
req.mut_ranges().push(range);
}

header.set_region_id(region_id);
header.set_peer(req.get_context().get_peer().clone());
header.set_region_epoch(req.get_context().get_region_epoch().clone());
cmd.set_header(header);
cmd.set_requests(vec![inner_req].into());
}

let (result_tx, result_rx) = oneshot::channel();
let router = cluster.get_router(1).unwrap();
if let Err(e) = router.send_command(
cmd,
Callback::read(Box::new(move |resp| {
result_tx.send(resp.response).unwrap();
})),
RaftCmdExtraOpts {
deadline: None,
disk_full_opt: DiskFullOpt::AllowedOnAlmostFull,
},
) {
panic!("router send msg failed, error: {}", e);
}

let resp = block_on(result_rx).unwrap();
(resp.get_responses()[0].get_read_index().clone(), start_ts)
};

// wait a while until the node updates its own max ts
std::thread::sleep(Duration::from_millis(300));

let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
assert!(!resp.has_locked());
// Actually not changed
assert_eq!(cm.max_ts(), prev_cm_max_ts);
assert_ne!(cm.max_ts(), start_ts);
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}

// https://github.com/tikv/tikv/pull/8669/files
#[test]
fn test_raft_cmd_request_learner_advanve_max_ts() {
use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse};

let mut cluster = new_server_cluster(0, 2);
cluster.pd_client.disable_default_operator();
let region_id = cluster.run_conf_change();
let region = cluster.get_region(b"");
assert_eq!(region_id, 1);
assert_eq!(region.get_id(), 1);
let leader = region.get_peers()[0].clone();

fail::cfg("on_pre_write_apply_state", "return(true)").unwrap();
let learner = new_learner_peer(2, 2);
cluster.pd_client.must_add_peer(1, learner.clone());

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);
let keys: Vec<_> = vec![b"k", b"l"]
.into_iter()
.map(|k| Key::from_raw(k))
.collect();
let guards = block_on(cm.lock_keys(keys.iter()));
let lock = Lock::new(
LockType::Put,
b"k".to_vec(),
1.into(),
20000,
None,
1.into(),
1,
2.into(),
false,
);
guards[0].with_lock(|l| *l = Some(lock.clone()));

let addr = cluster.sim.rl().get_addr(learner.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

// cluster.must_put(b"k", b"v");

let read_index = |ranges: &[(&[u8], &[u8])]| {
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();

// https://github.com/pingcap/tiflash/blob/14a127820d0530e496af624bb5b69acd48caf747/dbms/src/Storages/KVStore/Read/ReadIndex.cpp#L39
let mut ctx = Context::default();
let learner = learner.clone();
ctx.set_region_id(region_id);
ctx.set_region_epoch(region.get_region_epoch().clone());
ctx.set_peer(learner);
let mut read_index_request = ReadIndexRequest::default();
read_index_request.set_context(ctx);
read_index_request.set_start_ts(start_ts.into_inner());
for (s, e) in ranges {
let mut r = KeyRange::new();
r.set_start_key(s.to_vec());
r.set_end_key(e.to_vec());
read_index_request.mut_ranges().push(r);
}
let mut cmd =
proxy_ffi::read_index_helper::gen_read_index_raft_cmd_req(&mut read_index_request);

let (result_tx, result_rx) = oneshot::channel();
let router = cluster.get_router(2).unwrap();
if let Err(e) = router.send_command(
cmd,
Callback::read(Box::new(move |resp| {
result_tx.send(resp.response).unwrap();
})),
RaftCmdExtraOpts {
deadline: None,
disk_full_opt: DiskFullOpt::AllowedOnAlmostFull,
},
) {
panic!("router send msg failed, error: {}", e);
}

let resp = block_on(result_rx).unwrap();
(resp.get_responses()[0].get_read_index().clone(), start_ts)
};

// wait a while until the node updates its own max ts
std::thread::sleep(Duration::from_millis(3000));

must_wait_until_cond_node(
&cluster.cluster_ext,
region_id,
None,
&|states: &States| -> bool {
states.in_disk_region_state.get_region().get_peers().len() == 2
},
);

let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
assert!(!resp.has_locked());
// Actually not changed
assert_ne!(cm.max_ts(), prev_cm_max_ts);
assert_eq!(cm.max_ts(), start_ts);

// `gen_read_index_raft_cmd_req` can only handle one key-range
let (resp, start_ts) = read_index(&[(b"j", b"k0")]);
assert_eq!(resp.get_locked(), &lock.into_lock_info(b"k".to_vec()));
assert_eq!(cm.max_ts(), start_ts);

drop(guards);

let (resp, start_ts) = read_index(&[(b"a", b"z")]);
assert!(!resp.has_locked());
assert_eq!(cm.max_ts(), start_ts);
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}

#[test]
fn test_raft_message_can_advanve_max_ts() {
use kvproto::raft_cmdpb::{ReadIndexRequest, ReadIndexResponse};
let mut cluster = new_server_cluster(0, 1);
cluster.run();

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);

let region = cluster.get_region(b"");
let leader = region.get_peers()[0].clone();
let follower = new_learner_peer(2, 2);
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

let region_id = leader.get_id();

let read_index = |ranges: &[(&[u8], &[u8])]| {
let mut m = raft::eraftpb::Message::default();
m.set_msg_type(MessageType::MsgReadIndex);
let mut read_index_req = ReadIndexRequest::default();
let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
read_index_req.set_start_ts(start_ts.into_inner());
for &(start_key, end_key) in ranges {
let mut range = KeyRange::default();
range.set_start_key(start_key.to_vec());
range.set_end_key(end_key.to_vec());
read_index_req.mut_key_ranges().push(range);
}

let rctx = ReadIndexContext {
id: Uuid::new_v4(),
request: Some(read_index_req),
locked: None,
};
let mut e = raft::eraftpb::Entry::default();
e.set_data(rctx.to_bytes().into());
m.mut_entries().push(e);
m.set_from(2);

let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default();
raft_msg.set_region_id(region.get_id());
raft_msg.set_from_peer(follower);
raft_msg.set_to_peer(leader);
raft_msg.set_region_epoch(region.get_region_epoch().clone());
raft_msg.set_message(m);
cluster.send_raft_msg(raft_msg).unwrap();

(ReadIndexResponse::default(), start_ts)
};

// wait a while until the node updates its own max ts
let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
cluster.must_put(b"a", b"b");
std::thread::sleep(Duration::from_millis(2000));
// assert!(!resp.has_locked());
// Actually not changed
assert_ne!(cm.max_ts(), prev_cm_max_ts);
assert_eq!(cm.max_ts(), start_ts);
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}

0 comments on commit 5eae36f

Please sign in to comment.