diff --git a/components/test_raftstore/src/transport_simulate.rs b/components/test_raftstore/src/transport_simulate.rs index a745b401e0a8..076851be1e31 100644 --- a/components/test_raftstore/src/transport_simulate.rs +++ b/components/test_raftstore/src/transport_simulate.rs @@ -735,3 +735,21 @@ impl Filter for LeaseReadFilter { Ok(()) } } + +#[derive(Clone)] +pub struct DropMessageFilter { + ty: MessageType, +} + +impl DropMessageFilter { + pub fn new(ty: MessageType) -> DropMessageFilter { + DropMessageFilter { ty } + } +} + +impl Filter for DropMessageFilter { + fn before(&self, msgs: &mut Vec) -> Result<()> { + msgs.retain(|m| m.get_message().get_msg_type() != self.ty); + Ok(()) + } +} diff --git a/components/test_raftstore/src/util.rs b/components/test_raftstore/src/util.rs index f9e8f037a84d..4fe1d065d5aa 100644 --- a/components/test_raftstore/src/util.rs +++ b/components/test_raftstore/src/util.rs @@ -577,6 +577,13 @@ pub fn create_test_engine( (engines, path) } +pub fn configure_for_hibernate(cluster: &mut Cluster) { + // Uses long check interval to make leader keep sleeping during tests. + cluster.cfg.raft_store.abnormal_leader_missing_duration = ReadableDuration::secs(20); + cluster.cfg.raft_store.max_leader_missing_duration = ReadableDuration::secs(40); + cluster.cfg.raft_store.peer_stale_state_check_interval = ReadableDuration::secs(10); +} + pub fn configure_for_snapshot(cluster: &mut Cluster) { // Truncate the log quickly so that we can force sending snapshot. cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(20); diff --git a/src/raftstore/store/fsm/peer.rs b/src/raftstore/store/fsm/peer.rs index 5c0440d72fde..81878f851ec2 100644 --- a/src/raftstore/store/fsm/peer.rs +++ b/src/raftstore/store/fsm/peer.rs @@ -839,10 +839,13 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { let from_peer_id = msg.get_from_peer().get_id(); self.fsm.peer.insert_peer_cache(msg.take_from_peer()); self.fsm.peer.step(self.ctx, msg.take_message())?; + if self.fsm.peer.should_wake_up { + self.reset_raft_tick(GroupState::Ordered); + } if self.fsm.peer.any_new_peer_catch_up(from_peer_id) { self.fsm.peer.heartbeat_pd(self.ctx); - self.register_raft_base_tick(); + self.reset_raft_tick(GroupState::Ordered); } self.fsm.has_ready = true; @@ -863,6 +866,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { fn reset_raft_tick(&mut self, state: GroupState) { self.fsm.group_state = state; self.fsm.missing_ticks = 0; + self.fsm.peer.should_wake_up = false; self.register_raft_base_tick(); } @@ -2289,8 +2293,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { bind_term(&mut resp, term); if self.fsm.peer.propose(self.ctx, cb, msg, resp) { self.fsm.has_ready = true; - self.fsm.group_state = GroupState::Ordered; - self.register_raft_base_tick(); + } + + if self.fsm.peer.should_wake_up { + self.reset_raft_tick(GroupState::Ordered); } self.register_pd_heartbeat_tick(); diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index de5a9d047098..70a37a063c64 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -182,6 +182,8 @@ pub struct Peer { /// If it fails to send messages to leader. pub leader_unreachable: bool, + /// Indicates whether the peer should be woken up. + pub should_wake_up: bool, /// Whether this peer is destroyed asynchronously. /// If it's true when merging, its data in storeMeta will be removed early by the target peer pub pending_remove: bool, @@ -291,6 +293,7 @@ impl Peer { compaction_declined_bytes: 0, leader_unreachable: false, pending_remove: false, + should_wake_up: false, pending_merge_state: None, last_committed_prepare_merge_idx: 0, leader_missing_time: Some(Instant::now()), @@ -524,6 +527,12 @@ impl Peer { return res; } } + if self.raft_group.raft.pending_read_count() > 0 { + return res; + } + if self.raft_group.raft.lead_transferee.is_some() { + return res; + } // Unapplied entries can change the configuration of the group. res.up_to_date = self.get_store().applied_index() == last_index; res @@ -531,7 +540,7 @@ impl Peer { pub fn check_after_tick(&self, state: GroupState, res: CheckTickResult) -> bool { if res.leader { - res.up_to_date && self.is_leader() && self.raft_group.raft.pending_read_count() == 0 + res.up_to_date && self.is_leader() } else { // If follower keeps receiving data from leader, then it's safe to stop // ticking, as leader will make sure it has the latest logs. @@ -541,6 +550,8 @@ impl Peer { && self.raft_group.raft.leader_id != raft::INVALID_ID && self.raft_group.raft.raft_log.last_term() == self.raft_group.raft.term && !self.has_unresolved_reads() + // If it becomes leader, the stats is not valid anymore. + && !self.is_leader() } } @@ -717,7 +728,8 @@ impl Peer { if msg_type == MessageType::MsgReadIndex && expected_term == self.raft_group.raft.term { // If the leader hasn't committed any entries in its term, it can't response read only // requests. Please also take a look at raft-rs. - if let LeaseState::Valid = self.inspect_lease() { + let state = self.inspect_lease(); + if let LeaseState::Valid = state { let mut resp = eraftpb::Message::default(); resp.set_msg_type(MessageType::MsgReadIndexResp); resp.to = m.from; @@ -727,6 +739,7 @@ impl Peer { self.pending_messages.push(resp); return Ok(()); } + self.should_wake_up = state == LeaseState::Expired; } if msg_type == MessageType::MsgTransferLeader { @@ -1605,6 +1618,7 @@ impl Peer { // possible. self.raft_group.skip_bcast_commit(false); } + self.should_wake_up = true; let meta = ProposalMeta { index: idx, term: self.term(), @@ -1669,7 +1683,7 @@ impl Peer { /// need to be up to date for now. If 'allow_remove_leader' is false then /// the peer to be removed should not be the leader. fn check_conf_change( - &self, + &mut self, ctx: &mut PollContext, cmd: &RaftCmdRequest, ) -> Result<()> { @@ -1744,6 +1758,8 @@ impl Peer { "healthy" => healthy, "quorum_after_change" => quorum_after_change, ); + // Waking it up to replicate logs to candidate. + self.should_wake_up = true; Err(box_err!( "unsafe to perform conf change {:?}, total {}, healthy {}, quorum after \ change {}", @@ -1981,6 +1997,7 @@ impl Peer { let read = ReadIndexRequest::with_command(id, req, cb, renew_lease_time); self.pending_reads.push_back(read, self.is_leader()); + self.should_wake_up = true; debug!( "request to get a read index"; @@ -2183,7 +2200,10 @@ impl Peer { "last_index" => self.get_store().last_index(), ); } - None => self.transfer_leader(&from), + None => { + self.transfer_leader(&from); + self.should_wake_up = true; + } } return; } diff --git a/tests/integrations/raftstore/mod.rs b/tests/integrations/raftstore/mod.rs index f1557dfb2188..faebd009f7b2 100644 --- a/tests/integrations/raftstore/mod.rs +++ b/tests/integrations/raftstore/mod.rs @@ -6,6 +6,7 @@ mod test_compact_after_delete; mod test_compact_lock_cf; mod test_compact_log; mod test_conf_change; +mod test_hibernate; mod test_lease_read; mod test_merge; mod test_multi; diff --git a/tests/integrations/raftstore/test_hibernate.rs b/tests/integrations/raftstore/test_hibernate.rs new file mode 100644 index 000000000000..ff44a926e591 --- /dev/null +++ b/tests/integrations/raftstore/test_hibernate.rs @@ -0,0 +1,237 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::sync::*; +use std::thread; +use std::time::*; + +use futures::Future; +use raft::eraftpb::{ConfChangeType, MessageType}; +use test_raftstore::*; +use tikv::pd::PdClient; +use tikv_util::HandyRwLock; + +#[test] +fn test_proposal_prevent_sleep() { + let mut cluster = new_node_cluster(0, 3); + configure_for_hibernate(&mut cluster); + cluster.run(); + cluster.must_transfer_leader(1, new_peer(1, 1)); + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + // Wait till leader peer goes to sleep. + thread::sleep( + cluster.cfg.raft_store.raft_base_tick_interval.0 + * 2 + * cluster.cfg.raft_store.raft_election_timeout_ticks as u32, + ); + + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 1).direction(Direction::Send), + )); + let region = cluster + .pd_client + .get_region_by_id(1) + .wait() + .unwrap() + .unwrap(); + + let put = new_put_cmd(b"k2", b"v2"); + let mut req = new_request(1, region.get_region_epoch().clone(), vec![put], true); + req.mut_header().set_peer(new_peer(1, 1)); + // ignore error, we just want to send this command to peer (1, 1), + // and the command can't be executed because we have only one peer, + // so here will return timeout error, we should ignore it. + let _ = cluster.call_command(req, Duration::from_millis(10)); + cluster.clear_send_filters(); + must_get_equal(&cluster.get_engine(3), b"k2", b"v2"); + assert_eq!(cluster.leader_of_region(1), Some(new_peer(1, 1))); + + // Wait till leader peer goes to sleep. + thread::sleep( + cluster.cfg.raft_store.raft_base_tick_interval.0 + * 2 + * cluster.cfg.raft_store.raft_election_timeout_ticks as u32, + ); + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 1).direction(Direction::Send), + )); + let mut request = new_request( + region.get_id(), + region.get_region_epoch().clone(), + vec![new_read_index_cmd()], + true, + ); + request.mut_header().set_peer(new_peer(1, 1)); + let (cb, rx) = make_cb(&request); + // send to peer 2 + cluster + .sim + .rl() + .async_command_on_node(1, request, cb) + .unwrap(); + thread::sleep(Duration::from_millis(10)); + cluster.clear_send_filters(); + let resp = rx.recv_timeout(Duration::from_secs(5)).unwrap(); + assert!( + !resp.get_header().has_error(), + "{:?}", + resp.get_header().get_error() + ); + + // Wait till leader peer goes to sleep. + thread::sleep( + cluster.cfg.raft_store.raft_base_tick_interval.0 + * 2 + * cluster.cfg.raft_store.raft_election_timeout_ticks as u32, + ); + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 1).direction(Direction::Send), + )); + let conf_change = new_change_peer_request(ConfChangeType::RemoveNode, new_peer(3, 3)); + let mut admin_req = new_admin_request(1, ®ion.get_region_epoch(), conf_change); + admin_req.mut_header().set_peer(new_peer(1, 1)); + let (cb, _rx) = make_cb(&admin_req); + cluster + .sim + .rl() + .async_command_on_node(1, admin_req, cb) + .unwrap(); + thread::sleep(Duration::from_millis(10)); + cluster.clear_send_filters(); + cluster.pd_client.must_none_peer(1, new_peer(3, 3)); +} + +/// Tests whether single voter still replicates log to learner after restart. +/// +/// A voter will become leader in a single tick. The case check if the role +/// change is detected correctly. +#[test] +fn test_single_voter_restart() { + let mut cluster = new_server_cluster(0, 2); + configure_for_hibernate(&mut cluster); + cluster.pd_client.disable_default_operator(); + cluster.run_conf_change(); + cluster.pd_client.must_add_peer(1, new_learner_peer(2, 2)); + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + cluster.stop_node(2); + cluster.must_put(b"k2", b"v2"); + cluster.stop_node(1); + // Restart learner first to avoid network influence. + cluster.run_node(2).unwrap(); + cluster.run_node(1).unwrap(); + must_get_equal(&cluster.get_engine(2), b"k2", b"v2"); +} + +/// Tests whether an isolated learner can be prompted to voter. +#[test] +fn test_prompt_learner() { + let mut cluster = new_server_cluster(0, 4); + configure_for_hibernate(&mut cluster); + cluster.cfg.raft_store.raft_log_gc_count_limit = 20; + cluster.pd_client.disable_default_operator(); + cluster.run_conf_change(); + cluster.pd_client.must_add_peer(1, new_peer(2, 2)); + cluster.pd_client.must_add_peer(1, new_peer(3, 3)); + + cluster.pd_client.must_add_peer(1, new_learner_peer(4, 4)); + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(4), b"k1", b"v1"); + + // Suppose there is only one way partition. + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 3).direction(Direction::Send), + )); + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 4).direction(Direction::Send), + )); + let idx = cluster.truncated_state(1, 1).get_index(); + // Trigger a log compaction. + for i in 0..cluster.cfg.raft_store.raft_log_gc_count_limit * 2 { + cluster.must_put(format!("k{}", i).as_bytes(), format!("v{}", i).as_bytes()); + } + let timer = Instant::now(); + loop { + if cluster.truncated_state(1, 1).get_index() > idx { + break; + } + thread::sleep(Duration::from_millis(10)); + if timer.elapsed() > Duration::from_secs(3) { + panic!("log is not compact after 3 seconds"); + } + } + // Wait till leader peer goes to sleep again. + thread::sleep( + cluster.cfg.raft_store.raft_base_tick_interval.0 + * 2 + * cluster.cfg.raft_store.raft_election_timeout_ticks as u32, + ); + cluster.clear_send_filters(); + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 3).direction(Direction::Send), + )); + cluster.pd_client.must_add_peer(1, new_peer(4, 4)); +} + +/// Tests whether leader resumes correctly when pre-transfer +/// leader response is delayed more than an election timeout. +#[test] +fn test_transfer_leader_delay() { + let mut cluster = new_node_cluster(0, 3); + configure_for_hibernate(&mut cluster); + cluster.run(); + cluster.must_transfer_leader(1, new_peer(1, 1)); + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + let messages = Arc::new(Mutex::new(vec![])); + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 3) + .direction(Direction::Send) + .msg_type(MessageType::MsgTransferLeader) + .reserve_dropped(messages.clone()), + )); + cluster.transfer_leader(1, new_peer(3, 3)); + let timer = Instant::now(); + while timer.elapsed() < Duration::from_secs(3) && messages.lock().unwrap().is_empty() { + thread::sleep(Duration::from_millis(10)); + } + assert_eq!(messages.lock().unwrap().len(), 1); + // Wait till leader peer goes to sleep again. + thread::sleep( + cluster.cfg.raft_store.raft_base_tick_interval.0 + * 2 + * cluster.cfg.raft_store.raft_election_timeout_ticks as u32, + ); + cluster.clear_send_filters(); + cluster.add_send_filter(CloneFilterFactory(DropMessageFilter::new( + MessageType::MsgTimeoutNow, + ))); + let router = cluster.sim.wl().get_router(1).unwrap(); + router + .send_raft_message(messages.lock().unwrap().pop().unwrap()) + .unwrap(); + let timer = Instant::now(); + while timer.elapsed() < Duration::from_secs(3) { + let resp = cluster.request( + b"k2", + vec![new_put_cmd(b"k2", b"v2")], + false, + Duration::from_secs(5), + ); + let header = resp.get_header(); + if !header.has_error() { + return; + } + if !header + .get_error() + .get_message() + .contains("proposal dropped") + { + panic!("response {:?} has error", resp); + } + thread::sleep(Duration::from_millis(10)); + } + panic!("failed to request after 3 seconds"); +}