From 088581cbb9316333fa3ea07af969bc24de739283 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 17 Apr 2024 12:45:49 +0800 Subject: [PATCH 1/7] test Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/peer.rs | 17 + components/raftstore/src/store/peer.rs | 2 + .../src/mock_cluster/v1/cluster.rs | 4 + .../mock_store/mock_engine_store_server.rs | 1 + .../proxy_ffi/src/read_index_helper.rs | 3 +- proxy_tests/proxy/shared/replica_read.rs | 313 ++++++++++++++++++ src/server/raftkv/mod.rs | 6 + 7 files changed, 345 insertions(+), 1 deletion(-) diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 7c33bf66b87..27359ae397f 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -2547,12 +2547,15 @@ where return Ok(()); } + debug!("!!!!!! handle 3"); + if msg.get_is_tombstone() { // we receive a message tells us to remove ourself. self.handle_gc_peer_msg(&msg); return Ok(()); } + debug!("!!!!!! handle 4"); if msg.has_merge_target() { fail_point!("on_has_merge_target", |_| Ok(())); if self.need_gc_merge(&msg)? { @@ -2561,10 +2564,12 @@ where return Ok(()); } + debug!("!!!!!! handle 5"); if self.check_msg(&msg) { return Ok(()); } + debug!("!!!!!! handle 6"); if msg.has_extra_msg() { self.on_extra_message(msg); return Ok(()); @@ -2572,6 +2577,7 @@ where let is_snapshot = msg.get_message().has_snapshot(); + debug!("!!!!!! handle 7"); // TODO: spin off the I/O code (delete_snapshot) let regions_to_destroy = match self.check_snapshot(&msg)? { Either::Left(key) => { @@ -2588,6 +2594,7 @@ where Either::Right(v) => v, }; + debug!("!!!!!! handle 8"); if util::is_vote_msg(msg.get_message()) || msg_type == MessageType::MsgTimeoutNow { if self.fsm.hibernate_state.group_state() != GroupState::Chaos { self.fsm.reset_hibernate_state(GroupState::Chaos); @@ -2600,10 +2607,12 @@ where let from_peer_id = msg.get_from_peer().get_id(); self.fsm.peer.insert_peer_cache(msg.take_from_peer()); + debug!("!!!!!! handle 8.1"); let result = if msg_type == MessageType::MsgTransferLeader { self.on_transfer_leader_msg(msg.get_message(), peer_disk_usage); Ok(()) } else { + debug!("!!!!!! handle 8.2"); // This can be a message that sent when it's still a follower. Nevertheleast, // it's meaningless to continue to handle the request as callbacks are cleared. if msg.get_message().get_msg_type() == MessageType::MsgReadIndex @@ -2611,14 +2620,22 @@ where && (msg.get_message().get_from() == raft::INVALID_ID || msg.get_message().get_from() == self.fsm.peer_id()) { + debug!( + "!!!!!! handle 8.2.2 {} {} {}", + self.fsm.peer.is_leader(), + msg.get_message().get_from(), + msg.get_message().get_from() + ); self.ctx.raft_metrics.message_dropped.stale_msg.inc(); return Ok(()); } + debug!("!!!!!! handle 8.3"); self.fsm.peer.step(self.ctx, msg.take_message()) }; stepped.set(result.is_ok()); + debug!("!!!!!! handle 9"); if is_snapshot { if !self.fsm.peer.has_pending_snapshot() { // This snapshot is rejected by raft-rs. diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 904d35fec2f..2b2bfaab944 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1704,6 +1704,7 @@ where "msg_size" => msg.get_message().compute_size(), "to" => to_peer_id, "disk_usage" => ?msg.get_disk_usage(), + "!!!!msg" => ?msg ); for (term, index) in msg @@ -1776,6 +1777,7 @@ where ctx: &mut PollContext, mut m: eraftpb::Message, ) -> Result<()> { + info!("!!!!!! raft step {:?}", m); fail_point!( "step_message_3_1", self.peer.get_store_id() == 3 && self.region_id == 1, diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs index e2d62d4a9c6..62e03331a82 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs @@ -447,6 +447,10 @@ impl> Cluster { } } + pub fn get_router(&self, node_id: u64) -> Option> { + self.sim.rl().get_router(node_id) + } + fn valid_leader_id(&self, region_id: u64, leader_id: u64) -> bool { let store_ids = match self.voter_store_ids_of_region(region_id) { None => return false, diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index 969c28af033..1f769af8d17 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -359,6 +359,7 @@ unsafe extern "C" fn ffi_release_pre_handled_snapshot( pub fn gen_engine_store_server_helper( wrap: Pin<&EngineStoreServerWrap>, ) -> EngineStoreServerHelper { + info!("mock gen_engine_store_server_helper"); EngineStoreServerHelper { magic_number: interfaces_ffi::RAFT_STORE_PROXY_MAGIC_NUMBER, version: interfaces_ffi::RAFT_STORE_PROXY_VERSION, diff --git a/proxy_components/proxy_ffi/src/read_index_helper.rs b/proxy_components/proxy_ffi/src/read_index_helper.rs index 604345c9bd4..471a912f635 100644 --- a/proxy_components/proxy_ffi/src/read_index_helper.rs +++ b/proxy_components/proxy_ffi/src/read_index_helper.rs @@ -82,7 +82,7 @@ fn into_read_index_response( resp } -fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest { +pub fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest { let region_id = req.get_context().get_region_id(); let mut cmd = RaftCmdRequest::default(); { @@ -91,6 +91,7 @@ fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest { inner_req.set_cmd_type(CmdType::ReadIndex); inner_req.mut_read_index().set_start_ts(req.get_start_ts()); if !req.get_ranges().is_empty() { + tikv_util::info!("!!!!!! not empty"); let r = &mut req.mut_ranges()[0]; let mut range = kvproto::kvrpcpb::KeyRange::default(); range.set_start_key(r.take_start_key()); diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index 1f991a6cf82..e636bc9365f 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -356,3 +356,316 @@ fn test_util() { } assert!(GC_MONITOR.valid_clean()); } + +use kvproto::{ + kvrpcpb::{Context, DiskFullOpt, KeyRange}, + raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request as RaftRequest}, + raft_serverpb::RaftMessage, +}; +use raftstore::{ + router::RaftStoreRouter, + store::{msg::Callback, RaftCmdExtraOpts, ReadIndexContext}, +}; +use tokio::sync::oneshot; +use txn_types::{Key, Lock, LockType, TimeStamp}; +use uuid::Uuid; + +use crate::utils::v1_server::{new_server_cluster, ChannelBuilder, Environment, TikvClient}; + +// https://github.com/tikv/tikv/issues/16823 +#[test] +fn test_raft_cmd_request_cant_advanve_max_ts() { + use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse}; + + let mut cluster = new_server_cluster(0, 1); + cluster.run(); + + let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); + let keys: Vec<_> = vec![b"k", b"l"] + .into_iter() + .map(|k| Key::from_raw(k)) + .collect(); + let guards = block_on(cm.lock_keys(keys.iter())); + let lock = Lock::new( + LockType::Put, + b"k".to_vec(), + 1.into(), + 20000, + None, + 1.into(), + 1, + 2.into(), + false, + ); + guards[0].with_lock(|l| *l = Some(lock.clone())); + + let region = cluster.get_region(b""); + let leader = region.get_peers()[0].clone(); + let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); + + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + let mut ctx = Context::default(); + let region_id = leader.get_id(); + ctx.set_region_id(leader.get_id()); + ctx.set_region_epoch(region.get_region_epoch().clone()); + ctx.set_peer(leader); + + let read_index = |ranges: &[(&[u8], &[u8])]| { + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + + let mut cmd = RaftCmdRequest::default(); + { + let mut header = RaftRequestHeader::default(); + let mut inner_req = RaftRequest::default(); + inner_req.set_cmd_type(CmdType::ReadIndex); + inner_req + .mut_read_index() + .set_start_ts(start_ts.into_inner()); + + let mut req = ReadIndexRequest::default(); + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + req.set_context(ctx.clone()); + req.set_start_ts(start_ts.into_inner()); + for &(start_key, end_key) in ranges { + let mut range = KeyRange::default(); + range.set_start_key(start_key.to_vec()); + range.set_end_key(end_key.to_vec()); + req.mut_ranges().push(range); + } + + header.set_region_id(region_id); + header.set_peer(req.get_context().get_peer().clone()); + header.set_region_epoch(req.get_context().get_region_epoch().clone()); + cmd.set_header(header); + cmd.set_requests(vec![inner_req].into()); + } + + let (result_tx, result_rx) = oneshot::channel(); + let router = cluster.get_router(1).unwrap(); + if let Err(e) = router.send_command( + cmd, + Callback::read(Box::new(move |resp| { + result_tx.send(resp.response).unwrap(); + })), + RaftCmdExtraOpts { + deadline: None, + disk_full_opt: DiskFullOpt::AllowedOnAlmostFull, + }, + ) { + panic!("router send msg failed, error: {}", e); + } + + let resp = block_on(result_rx).unwrap(); + (resp.get_responses()[0].get_read_index().clone(), start_ts) + }; + + // wait a while until the node updates its own max ts + std::thread::sleep(Duration::from_millis(300)); + + let prev_cm_max_ts = cm.max_ts(); + let (resp, start_ts) = read_index(&[(b"l", b"yz")]); + assert!(!resp.has_locked()); + // Actually not changed + assert_eq!(cm.max_ts(), prev_cm_max_ts); + assert_ne!(cm.max_ts(), start_ts); +} + +#[test] +fn test_raft_cmd_request_learner_advanve_max_ts() { + use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse}; + + let mut cluster = new_server_cluster(0, 2); + cluster.pd_client.disable_default_operator(); + let region_id = cluster.run_conf_change(); + let region = cluster.get_region(b""); + assert_eq!(region_id, 1); + assert_eq!(region.get_id(), 1); + info!("!!!!! region {:?}", region); + let leader = region.get_peers()[0].clone(); + + fail::cfg("on_pre_write_apply_state", "return(true)").unwrap(); + let learner = new_learner_peer(2, 2); + cluster.pd_client.must_add_peer(1, learner.clone()); + + let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); + let keys: Vec<_> = vec![b"k", b"l"] + .into_iter() + .map(|k| Key::from_raw(k)) + .collect(); + let guards = block_on(cm.lock_keys(keys.iter())); + let lock = Lock::new( + LockType::Put, + b"k".to_vec(), + 1.into(), + 20000, + None, + 1.into(), + 1, + 2.into(), + false, + ); + guards[0].with_lock(|l| *l = Some(lock.clone())); + + let addr = cluster.sim.rl().get_addr(learner.get_store_id()).to_owned(); + + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + // cluster.must_put(b"k", b"v"); + + let read_index = |ranges: &[(&[u8], &[u8])]| { + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + let mut ctx = Context::default(); + let learner = learner.clone(); + ctx.set_region_id(region_id); + ctx.set_region_epoch(region.get_region_epoch().clone()); + ctx.set_peer(learner); + let mut read_index_request = ReadIndexRequest::default(); + read_index_request.set_context(ctx); + read_index_request.set_start_ts(start_ts.into_inner()); + for (s, e) in ranges { + let mut r = KeyRange::new(); + r.set_start_key(s.to_vec()); + r.set_end_key(e.to_vec()); + read_index_request.mut_ranges().push(r); + } + let mut cmd = + proxy_ffi::read_index_helper::gen_read_index_raft_cmd_req(&mut read_index_request); + + let (result_tx, result_rx) = oneshot::channel(); + let router = cluster.get_router(2).unwrap(); + if let Err(e) = router.send_command( + cmd, + Callback::read(Box::new(move |resp| { + result_tx.send(resp.response).unwrap(); + })), + RaftCmdExtraOpts { + deadline: None, + disk_full_opt: DiskFullOpt::AllowedOnAlmostFull, + }, + ) { + panic!("router send msg failed, error: {}", e); + } + + let resp = block_on(result_rx).unwrap(); + info!("!!!!!! XZZZDD {:?}", resp); + (resp.get_responses()[0].get_read_index().clone(), start_ts) + }; + + // wait a while until the node updates its own max ts + std::thread::sleep(Duration::from_millis(3000)); + + must_wait_until_cond_node( + &cluster.cluster_ext, + region_id, + None, + &|states: &States| -> bool { + states.in_disk_region_state.get_region().get_peers().len() == 2 + }, + ); + + let prev_cm_max_ts = cm.max_ts(); + let (resp, start_ts) = read_index(&[(b"l", b"yz")]); + assert!(!resp.has_locked()); + // Actually not changed + assert_ne!(cm.max_ts(), prev_cm_max_ts); + assert_eq!(cm.max_ts(), start_ts); + + // `gen_read_index_raft_cmd_req` can only handle one key-range + let (resp, start_ts) = read_index(&[(b"j", b"k0")]); + assert_eq!(resp.get_locked(), &lock.into_lock_info(b"k".to_vec())); + assert_eq!(cm.max_ts(), start_ts); + + drop(guards); + + let (resp, start_ts) = read_index(&[(b"a", b"z")]); + assert!(!resp.has_locked()); + assert_eq!(cm.max_ts(), start_ts); +} + +#[test] +fn test_raft_message_can_advanve_max_ts() { + use kvproto::raft_cmdpb::{ReadIndexRequest, ReadIndexResponse}; + let mut cluster = new_server_cluster(0, 1); + cluster.run(); + + let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); + let keys: Vec<_> = vec![b"k", b"l"] + .into_iter() + .map(|k| Key::from_raw(k)) + .collect(); + let guards = block_on(cm.lock_keys(keys.iter())); + let lock = Lock::new( + LockType::Put, + b"k".to_vec(), + 1.into(), + 20000, + None, + 1.into(), + 1, + 2.into(), + false, + ); + guards[0].with_lock(|l| *l = Some(lock.clone())); + + let region = cluster.get_region(b""); + let leader = region.get_peers()[0].clone(); + let follower = new_learner_peer(2, 2); + let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); + + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + let mut ctx = Context::default(); + let region_id = leader.get_id(); + + let read_index = |ranges: &[(&[u8], &[u8])]| { + let mut m = raft::eraftpb::Message::default(); + m.set_msg_type(MessageType::MsgReadIndex); + let mut read_index_req = ReadIndexRequest::default(); + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + read_index_req.set_start_ts(start_ts.into_inner()); + for &(start_key, end_key) in ranges { + let mut range = KeyRange::default(); + range.set_start_key(start_key.to_vec()); + range.set_end_key(end_key.to_vec()); + read_index_req.mut_key_ranges().push(range); + } + + let rctx = ReadIndexContext { + id: Uuid::new_v4(), + request: Some(read_index_req), + locked: None, + }; + let mut e = raft::eraftpb::Entry::default(); + e.set_data(rctx.to_bytes().into()); + m.mut_entries().push(e); + m.set_from(2); + + let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default(); + raft_msg.set_region_id(region.get_id()); + raft_msg.set_from_peer(follower); + raft_msg.set_to_peer(leader); + raft_msg.set_region_epoch(region.get_region_epoch().clone()); + raft_msg.set_message(m); + cluster.send_raft_msg(raft_msg).unwrap(); + + (ReadIndexResponse::default(), start_ts) + }; + + // wait a while until the node updates its own max ts + + let prev_cm_max_ts = cm.max_ts(); + let (resp, start_ts) = read_index(&[(b"l", b"yz")]); + cluster.must_put(b"a", b"b"); + std::thread::sleep(Duration::from_millis(2000)); + // assert!(!resp.has_locked()); + // Actually not changed + assert_ne!(cm.max_ts(), prev_cm_max_ts); + assert_eq!(cm.max_ts(), start_ts); +} diff --git a/src/server/raftkv/mod.rs b/src/server/raftkv/mod.rs index 9f42925b6d4..8a44fb7aa2b 100644 --- a/src/server/raftkv/mod.rs +++ b/src/server/raftkv/mod.rs @@ -779,6 +779,7 @@ pub struct ReplicaReadLockChecker { impl ReplicaReadLockChecker { pub fn new(concurrency_manager: ConcurrencyManager) -> Self { + info!("!!!!!! ReplicaReadLockChecker::new"); ReplicaReadLockChecker { concurrency_manager, } @@ -797,6 +798,10 @@ impl ReadIndexObserver for ReplicaReadLockChecker { // Only check and return result if the current peer is a leader. // If it's not a leader, the read index request will be redirected to the leader // later. + info!( + "!!!!!! ReplicaReadLockChecker::on_step {:?} {:?}", + msg, role + ); if msg.get_msg_type() != MessageType::MsgReadIndex || role != StateRole::Leader { return; } @@ -808,6 +813,7 @@ impl ReadIndexObserver for ReplicaReadLockChecker { let start_ts = request.get_start_ts().into(); self.concurrency_manager.update_max_ts(start_ts); for range in request.mut_key_ranges().iter_mut() { + info!("!!!!!! ReplicaReadLockChecker::range {:?}", range); let key_bound = |key: Vec| { if key.is_empty() { None From 0546c4c486c54f6d4be5632372c37745427d3398 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 17 Apr 2024 14:36:25 +0800 Subject: [PATCH 2/7] removed logs Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/peer.rs | 17 ----------------- components/raftstore/src/store/peer.rs | 1 - .../proxy_ffi/src/read_index_helper.rs | 1 - proxy_tests/proxy/shared/replica_read.rs | 2 -- src/server/raftkv/mod.rs | 6 ------ 5 files changed, 27 deletions(-) diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 27359ae397f..7c33bf66b87 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -2547,15 +2547,12 @@ where return Ok(()); } - debug!("!!!!!! handle 3"); - if msg.get_is_tombstone() { // we receive a message tells us to remove ourself. self.handle_gc_peer_msg(&msg); return Ok(()); } - debug!("!!!!!! handle 4"); if msg.has_merge_target() { fail_point!("on_has_merge_target", |_| Ok(())); if self.need_gc_merge(&msg)? { @@ -2564,12 +2561,10 @@ where return Ok(()); } - debug!("!!!!!! handle 5"); if self.check_msg(&msg) { return Ok(()); } - debug!("!!!!!! handle 6"); if msg.has_extra_msg() { self.on_extra_message(msg); return Ok(()); @@ -2577,7 +2572,6 @@ where let is_snapshot = msg.get_message().has_snapshot(); - debug!("!!!!!! handle 7"); // TODO: spin off the I/O code (delete_snapshot) let regions_to_destroy = match self.check_snapshot(&msg)? { Either::Left(key) => { @@ -2594,7 +2588,6 @@ where Either::Right(v) => v, }; - debug!("!!!!!! handle 8"); if util::is_vote_msg(msg.get_message()) || msg_type == MessageType::MsgTimeoutNow { if self.fsm.hibernate_state.group_state() != GroupState::Chaos { self.fsm.reset_hibernate_state(GroupState::Chaos); @@ -2607,12 +2600,10 @@ where let from_peer_id = msg.get_from_peer().get_id(); self.fsm.peer.insert_peer_cache(msg.take_from_peer()); - debug!("!!!!!! handle 8.1"); let result = if msg_type == MessageType::MsgTransferLeader { self.on_transfer_leader_msg(msg.get_message(), peer_disk_usage); Ok(()) } else { - debug!("!!!!!! handle 8.2"); // This can be a message that sent when it's still a follower. Nevertheleast, // it's meaningless to continue to handle the request as callbacks are cleared. if msg.get_message().get_msg_type() == MessageType::MsgReadIndex @@ -2620,22 +2611,14 @@ where && (msg.get_message().get_from() == raft::INVALID_ID || msg.get_message().get_from() == self.fsm.peer_id()) { - debug!( - "!!!!!! handle 8.2.2 {} {} {}", - self.fsm.peer.is_leader(), - msg.get_message().get_from(), - msg.get_message().get_from() - ); self.ctx.raft_metrics.message_dropped.stale_msg.inc(); return Ok(()); } - debug!("!!!!!! handle 8.3"); self.fsm.peer.step(self.ctx, msg.take_message()) }; stepped.set(result.is_ok()); - debug!("!!!!!! handle 9"); if is_snapshot { if !self.fsm.peer.has_pending_snapshot() { // This snapshot is rejected by raft-rs. diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 2b2bfaab944..c0277cbc5a6 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1777,7 +1777,6 @@ where ctx: &mut PollContext, mut m: eraftpb::Message, ) -> Result<()> { - info!("!!!!!! raft step {:?}", m); fail_point!( "step_message_3_1", self.peer.get_store_id() == 3 && self.region_id == 1, diff --git a/proxy_components/proxy_ffi/src/read_index_helper.rs b/proxy_components/proxy_ffi/src/read_index_helper.rs index 471a912f635..02b2666c67d 100644 --- a/proxy_components/proxy_ffi/src/read_index_helper.rs +++ b/proxy_components/proxy_ffi/src/read_index_helper.rs @@ -91,7 +91,6 @@ pub fn gen_read_index_raft_cmd_req(req: &mut ReadIndexRequest) -> RaftCmdRequest inner_req.set_cmd_type(CmdType::ReadIndex); inner_req.mut_read_index().set_start_ts(req.get_start_ts()); if !req.get_ranges().is_empty() { - tikv_util::info!("!!!!!! not empty"); let r = &mut req.mut_ranges()[0]; let mut range = kvproto::kvrpcpb::KeyRange::default(); range.set_start_key(r.take_start_key()); diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index e636bc9365f..ee75bc0e0e4 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -483,7 +483,6 @@ fn test_raft_cmd_request_learner_advanve_max_ts() { let region = cluster.get_region(b""); assert_eq!(region_id, 1); assert_eq!(region.get_id(), 1); - info!("!!!!! region {:?}", region); let leader = region.get_peers()[0].clone(); fail::cfg("on_pre_write_apply_state", "return(true)").unwrap(); @@ -552,7 +551,6 @@ fn test_raft_cmd_request_learner_advanve_max_ts() { } let resp = block_on(result_rx).unwrap(); - info!("!!!!!! XZZZDD {:?}", resp); (resp.get_responses()[0].get_read_index().clone(), start_ts) }; diff --git a/src/server/raftkv/mod.rs b/src/server/raftkv/mod.rs index 8a44fb7aa2b..9f42925b6d4 100644 --- a/src/server/raftkv/mod.rs +++ b/src/server/raftkv/mod.rs @@ -779,7 +779,6 @@ pub struct ReplicaReadLockChecker { impl ReplicaReadLockChecker { pub fn new(concurrency_manager: ConcurrencyManager) -> Self { - info!("!!!!!! ReplicaReadLockChecker::new"); ReplicaReadLockChecker { concurrency_manager, } @@ -798,10 +797,6 @@ impl ReadIndexObserver for ReplicaReadLockChecker { // Only check and return result if the current peer is a leader. // If it's not a leader, the read index request will be redirected to the leader // later. - info!( - "!!!!!! ReplicaReadLockChecker::on_step {:?} {:?}", - msg, role - ); if msg.get_msg_type() != MessageType::MsgReadIndex || role != StateRole::Leader { return; } @@ -813,7 +808,6 @@ impl ReadIndexObserver for ReplicaReadLockChecker { let start_ts = request.get_start_ts().into(); self.concurrency_manager.update_max_ts(start_ts); for range in request.mut_key_ranges().iter_mut() { - info!("!!!!!! ReplicaReadLockChecker::range {:?}", range); let key_bound = |key: Vec| { if key.is_empty() { None From 7913d31cfa46f9aaf7b171b00e8937eb8cd9bf3c Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 18 Apr 2024 13:25:55 +0800 Subject: [PATCH 3/7] enrich Signed-off-by: CalvinNeo --- proxy_tests/proxy/shared/replica_read.rs | 44 +++++------------------- 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index ee75bc0e0e4..803acffc8da 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -381,23 +381,6 @@ fn test_raft_cmd_request_cant_advanve_max_ts() { cluster.run(); let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); - let keys: Vec<_> = vec![b"k", b"l"] - .into_iter() - .map(|k| Key::from_raw(k)) - .collect(); - let guards = block_on(cm.lock_keys(keys.iter())); - let lock = Lock::new( - LockType::Put, - b"k".to_vec(), - 1.into(), - 20000, - None, - 1.into(), - 1, - 2.into(), - false, - ); - guards[0].with_lock(|l| *l = Some(lock.clone())); let region = cluster.get_region(b""); let leader = region.get_peers()[0].clone(); @@ -471,6 +454,8 @@ fn test_raft_cmd_request_cant_advanve_max_ts() { // Actually not changed assert_eq!(cm.max_ts(), prev_cm_max_ts); assert_ne!(cm.max_ts(), start_ts); + cluster.shutdown(); + fail::remove("on_pre_write_apply_state") } #[test] @@ -518,6 +503,8 @@ fn test_raft_cmd_request_learner_advanve_max_ts() { let read_index = |ranges: &[(&[u8], &[u8])]| { let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + + // https://github.com/pingcap/tiflash/blob/14a127820d0530e496af624bb5b69acd48caf747/dbms/src/Storages/KVStore/Read/ReadIndex.cpp#L39 let mut ctx = Context::default(); let learner = learner.clone(); ctx.set_region_id(region_id); @@ -583,6 +570,8 @@ fn test_raft_cmd_request_learner_advanve_max_ts() { let (resp, start_ts) = read_index(&[(b"a", b"z")]); assert!(!resp.has_locked()); assert_eq!(cm.max_ts(), start_ts); + cluster.shutdown(); + fail::remove("on_pre_write_apply_state") } #[test] @@ -592,23 +581,6 @@ fn test_raft_message_can_advanve_max_ts() { cluster.run(); let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); - let keys: Vec<_> = vec![b"k", b"l"] - .into_iter() - .map(|k| Key::from_raw(k)) - .collect(); - let guards = block_on(cm.lock_keys(keys.iter())); - let lock = Lock::new( - LockType::Put, - b"k".to_vec(), - 1.into(), - 20000, - None, - 1.into(), - 1, - 2.into(), - false, - ); - guards[0].with_lock(|l| *l = Some(lock.clone())); let region = cluster.get_region(b""); let leader = region.get_peers()[0].clone(); @@ -619,7 +591,6 @@ fn test_raft_message_can_advanve_max_ts() { let channel = ChannelBuilder::new(env).connect(&addr); let client = TikvClient::new(channel); - let mut ctx = Context::default(); let region_id = leader.get_id(); let read_index = |ranges: &[(&[u8], &[u8])]| { @@ -657,7 +628,6 @@ fn test_raft_message_can_advanve_max_ts() { }; // wait a while until the node updates its own max ts - let prev_cm_max_ts = cm.max_ts(); let (resp, start_ts) = read_index(&[(b"l", b"yz")]); cluster.must_put(b"a", b"b"); @@ -666,4 +636,6 @@ fn test_raft_message_can_advanve_max_ts() { // Actually not changed assert_ne!(cm.max_ts(), prev_cm_max_ts); assert_eq!(cm.max_ts(), start_ts); + cluster.shutdown(); + fail::remove("on_pre_write_apply_state") } From 15712b87b64e51df93208f202a51ec0657a29590 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 19 Apr 2024 12:44:15 +0800 Subject: [PATCH 4/7] add test for async commit Signed-off-by: CalvinNeo --- proxy_tests/proxy/shared/replica_read.rs | 107 +++++++++++++++++++++++ src/server/raftkv/mod.rs | 2 + src/storage/mod.rs | 1 + 3 files changed, 110 insertions(+) diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index 803acffc8da..0ef3c00e153 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -458,6 +458,7 @@ fn test_raft_cmd_request_cant_advanve_max_ts() { fail::remove("on_pre_write_apply_state") } +// https://github.com/tikv/tikv/pull/8669/files #[test] fn test_raft_cmd_request_learner_advanve_max_ts() { use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse}; @@ -639,3 +640,109 @@ fn test_raft_message_can_advanve_max_ts() { cluster.shutdown(); fail::remove("on_pre_write_apply_state") } + +#[test] +fn test_concurrent_update_maxts_and_commit() { + use kvproto::{ + kvrpcpb::{Mutation, Op}, + raft_cmdpb::{ReadIndexRequest, ReadIndexResponse}, + }; + use test_raftstore::{ + must_kv_commit, must_kv_prewrite, must_kv_prewrite_with, must_kv_read_equal, new_mutation, + }; + let mut cluster = new_server_cluster(0, 1); + cluster.run(); + + let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); + + let region = cluster.get_region(b""); + let leader = region.get_peers()[0].clone(); + let follower = new_learner_peer(2, 2); + let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); + + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + let region_id = leader.get_id(); + + let mut ctx = Context::default(); + ctx.set_region_id(region_id); + ctx.set_peer(leader.clone()); + ctx.set_region_epoch(region.get_region_epoch().clone()); + + let read_index = |ranges: &[(&[u8], &[u8])], start_ts: u64| { + let mut m = raft::eraftpb::Message::default(); + m.set_msg_type(MessageType::MsgReadIndex); + let mut read_index_req = ReadIndexRequest::default(); + read_index_req.set_start_ts(start_ts); + for &(start_key, end_key) in ranges { + let mut range = KeyRange::default(); + range.set_start_key(start_key.to_vec()); + range.set_end_key(end_key.to_vec()); + read_index_req.mut_key_ranges().push(range); + } + + let rctx = ReadIndexContext { + id: Uuid::new_v4(), + request: Some(read_index_req), + locked: None, + }; + let mut e = raft::eraftpb::Entry::default(); + e.set_data(rctx.to_bytes().into()); + m.mut_entries().push(e); + m.set_from(2); + + let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default(); + raft_msg.set_region_id(region.get_id()); + raft_msg.set_from_peer(follower); + raft_msg.set_to_peer(leader); + raft_msg.set_region_epoch(region.get_region_epoch().clone()); + raft_msg.set_message(m); + cluster.send_raft_msg(raft_msg).unwrap(); + + (ReadIndexResponse::default(), start_ts) + }; + + // let (k, v) = (b"k1".to_vec(), b"k2".to_vec()); + // let mut mutation = Mutation::default(); + // mutation.set_op(Op::Put); + // mutation.set_key(k.clone()); + // mutation.set_value(v); + // must_kv_prewrite(&client, ctx.clone(), vec![mutation], k.clone(), 10); + + // let block_duration = Duration::from_millis(300); + // let client_clone = client.clone(); + // let ctx_clone = ctx.clone(); + // let k_clone = k.clone(); + // let handle = std::thread::spawn(move || { + // std::thread::sleep(block_duration); + // info!("!!!!!! ZZZZ must commit"); + // must_kv_commit(&client_clone, ctx_clone, vec![k_clone], 10, 30, 100); + // }); + let cli = { + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + TikvClient::new(channel) + }; + + must_kv_prewrite_with( + &cli, + ctx.clone(), + vec![new_mutation(Op::Put, &b"key2"[..], &b"value1"[..])], + vec![], + b"key2".to_vec(), + 10, + 0, + true, + false, + ); + + let (resp, start_ts) = read_index(&[(b"a", b"z")], 100); + + std::thread::sleep(std::time::Duration::from_millis(10000)); + + // must_kv_commit(&cli, ctx.clone(), vec![b"key2".to_vec()], 10, 30, 100); + must_kv_read_equal(&cli, ctx.clone(), b"key2".to_vec(), b"value1".to_vec(), 100); + // handle.join().unwrap(); +} diff --git a/src/server/raftkv/mod.rs b/src/server/raftkv/mod.rs index 9f42925b6d4..fd1291c493c 100644 --- a/src/server/raftkv/mod.rs +++ b/src/server/raftkv/mod.rs @@ -801,11 +801,13 @@ impl ReadIndexObserver for ReplicaReadLockChecker { return; } assert_eq!(msg.get_entries().len(), 1); + info!("!!!!!! ZZZZ update max_ts"); let mut rctx = ReadIndexContext::parse(msg.get_entries()[0].get_data()).unwrap(); if let Some(mut request) = rctx.request.take() { let begin_instant = Instant::now(); let start_ts = request.get_start_ts().into(); + info!("!!!!!! ZZZZ update max_ts to {}", start_ts); self.concurrency_manager.update_max_ts(start_ts); for range in request.mut_key_ranges().iter_mut() { let key_bound = |key: Vec| { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 13d868849f4..b1d3910f49e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1543,6 +1543,7 @@ impl Storage { match &cmd { Command::Prewrite(Prewrite { mutations, .. }) => { + info!("!!!!!! prewrite {:?}", mutations); let keys = mutations.iter().map(|m| m.key().as_encoded()); Self::check_api_version( self.api_version, From dfd40f228fd12d33c3aec52de746f8ea0fcf8a83 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 19 Apr 2024 16:31:17 +0800 Subject: [PATCH 5/7] Revert "add test for async commit" This reverts commit 15712b87b64e51df93208f202a51ec0657a29590. --- proxy_tests/proxy/shared/replica_read.rs | 107 ----------------------- src/server/raftkv/mod.rs | 2 - src/storage/mod.rs | 1 - 3 files changed, 110 deletions(-) diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index 0ef3c00e153..803acffc8da 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -458,7 +458,6 @@ fn test_raft_cmd_request_cant_advanve_max_ts() { fail::remove("on_pre_write_apply_state") } -// https://github.com/tikv/tikv/pull/8669/files #[test] fn test_raft_cmd_request_learner_advanve_max_ts() { use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse}; @@ -640,109 +639,3 @@ fn test_raft_message_can_advanve_max_ts() { cluster.shutdown(); fail::remove("on_pre_write_apply_state") } - -#[test] -fn test_concurrent_update_maxts_and_commit() { - use kvproto::{ - kvrpcpb::{Mutation, Op}, - raft_cmdpb::{ReadIndexRequest, ReadIndexResponse}, - }; - use test_raftstore::{ - must_kv_commit, must_kv_prewrite, must_kv_prewrite_with, must_kv_read_equal, new_mutation, - }; - let mut cluster = new_server_cluster(0, 1); - cluster.run(); - - let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); - - let region = cluster.get_region(b""); - let leader = region.get_peers()[0].clone(); - let follower = new_learner_peer(2, 2); - let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); - - let env = Arc::new(Environment::new(1)); - let channel = ChannelBuilder::new(env).connect(&addr); - let client = TikvClient::new(channel); - - let region_id = leader.get_id(); - - let mut ctx = Context::default(); - ctx.set_region_id(region_id); - ctx.set_peer(leader.clone()); - ctx.set_region_epoch(region.get_region_epoch().clone()); - - let read_index = |ranges: &[(&[u8], &[u8])], start_ts: u64| { - let mut m = raft::eraftpb::Message::default(); - m.set_msg_type(MessageType::MsgReadIndex); - let mut read_index_req = ReadIndexRequest::default(); - read_index_req.set_start_ts(start_ts); - for &(start_key, end_key) in ranges { - let mut range = KeyRange::default(); - range.set_start_key(start_key.to_vec()); - range.set_end_key(end_key.to_vec()); - read_index_req.mut_key_ranges().push(range); - } - - let rctx = ReadIndexContext { - id: Uuid::new_v4(), - request: Some(read_index_req), - locked: None, - }; - let mut e = raft::eraftpb::Entry::default(); - e.set_data(rctx.to_bytes().into()); - m.mut_entries().push(e); - m.set_from(2); - - let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default(); - raft_msg.set_region_id(region.get_id()); - raft_msg.set_from_peer(follower); - raft_msg.set_to_peer(leader); - raft_msg.set_region_epoch(region.get_region_epoch().clone()); - raft_msg.set_message(m); - cluster.send_raft_msg(raft_msg).unwrap(); - - (ReadIndexResponse::default(), start_ts) - }; - - // let (k, v) = (b"k1".to_vec(), b"k2".to_vec()); - // let mut mutation = Mutation::default(); - // mutation.set_op(Op::Put); - // mutation.set_key(k.clone()); - // mutation.set_value(v); - // must_kv_prewrite(&client, ctx.clone(), vec![mutation], k.clone(), 10); - - // let block_duration = Duration::from_millis(300); - // let client_clone = client.clone(); - // let ctx_clone = ctx.clone(); - // let k_clone = k.clone(); - // let handle = std::thread::spawn(move || { - // std::thread::sleep(block_duration); - // info!("!!!!!! ZZZZ must commit"); - // must_kv_commit(&client_clone, ctx_clone, vec![k_clone], 10, 30, 100); - // }); - let cli = { - let env = Arc::new(Environment::new(1)); - let channel = ChannelBuilder::new(env).connect(&addr); - TikvClient::new(channel) - }; - - must_kv_prewrite_with( - &cli, - ctx.clone(), - vec![new_mutation(Op::Put, &b"key2"[..], &b"value1"[..])], - vec![], - b"key2".to_vec(), - 10, - 0, - true, - false, - ); - - let (resp, start_ts) = read_index(&[(b"a", b"z")], 100); - - std::thread::sleep(std::time::Duration::from_millis(10000)); - - // must_kv_commit(&cli, ctx.clone(), vec![b"key2".to_vec()], 10, 30, 100); - must_kv_read_equal(&cli, ctx.clone(), b"key2".to_vec(), b"value1".to_vec(), 100); - // handle.join().unwrap(); -} diff --git a/src/server/raftkv/mod.rs b/src/server/raftkv/mod.rs index fd1291c493c..9f42925b6d4 100644 --- a/src/server/raftkv/mod.rs +++ b/src/server/raftkv/mod.rs @@ -801,13 +801,11 @@ impl ReadIndexObserver for ReplicaReadLockChecker { return; } assert_eq!(msg.get_entries().len(), 1); - info!("!!!!!! ZZZZ update max_ts"); let mut rctx = ReadIndexContext::parse(msg.get_entries()[0].get_data()).unwrap(); if let Some(mut request) = rctx.request.take() { let begin_instant = Instant::now(); let start_ts = request.get_start_ts().into(); - info!("!!!!!! ZZZZ update max_ts to {}", start_ts); self.concurrency_manager.update_max_ts(start_ts); for range in request.mut_key_ranges().iter_mut() { let key_bound = |key: Vec| { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b1d3910f49e..13d868849f4 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1543,7 +1543,6 @@ impl Storage { match &cmd { Command::Prewrite(Prewrite { mutations, .. }) => { - info!("!!!!!! prewrite {:?}", mutations); let keys = mutations.iter().map(|m| m.key().as_encoded()); Self::check_api_version( self.api_version, From 7e2db9decd2e0f4e8e5d7ad3dc1295443dee11b9 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 19 Apr 2024 16:32:17 +0800 Subject: [PATCH 6/7] recoment Signed-off-by: CalvinNeo --- proxy_tests/proxy/shared/replica_read.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index 803acffc8da..40c59967fbb 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -458,6 +458,7 @@ fn test_raft_cmd_request_cant_advanve_max_ts() { fail::remove("on_pre_write_apply_state") } +// https://github.com/tikv/tikv/pull/8669/files #[test] fn test_raft_cmd_request_learner_advanve_max_ts() { use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse}; From 157c55e6f4f362fd53a6e44513d164cdc5af8fb1 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 19 Apr 2024 17:14:39 +0800 Subject: [PATCH 7/7] recoment Signed-off-by: CalvinNeo --- components/raftstore/src/store/peer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index c0277cbc5a6..904d35fec2f 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1704,7 +1704,6 @@ where "msg_size" => msg.get_message().compute_size(), "to" => to_peer_id, "disk_usage" => ?msg.get_disk_usage(), - "!!!!msg" => ?msg ); for (term, index) in msg