diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 6da8fce5cef..d8448c53ca9 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -22,7 +22,8 @@ use kvproto::raft_cmdpb::{ StatusResponse, }; use kvproto::raft_serverpb::{ - MergeState, PeerState, RaftMessage, RaftSnapshotData, RaftTruncatedState, RegionLocalState, + ExtraMessageType, MergeState, PeerState, RaftMessage, RaftSnapshotData, RaftTruncatedState, + RegionLocalState, }; use pd_client::PdClient; use protobuf::Message; @@ -392,6 +393,12 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { self.register_split_region_check_tick(); self.register_check_peer_stale_state_tick(); self.on_check_merge(); + // Apply committed entries more quickly. + if self.fsm.peer.raft_group.get_store().committed_index() + > self.fsm.peer.raft_group.get_store().applied_index() + { + self.fsm.has_ready = true; + } } fn on_gc_snap(&mut self, snaps: Vec<(SnapKey, bool)>) { @@ -809,7 +816,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } if msg.has_extra_msg() { - // now noop + self.on_extra_message(&msg); return Ok(()); } @@ -855,6 +862,17 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { Ok(()) } + fn on_extra_message(&mut self, msg: &RaftMessage) { + let extra_msg = msg.get_extra_msg(); + match extra_msg.get_type() { + ExtraMessageType::MsgRegionWakeUp => { + if msg.get_message().get_index() < self.fsm.peer.get_store().committed_index() { + self.reset_raft_tick(GroupState::Ordered); + } + } + } + } + fn reset_raft_tick(&mut self, state: GroupState) { self.fsm.group_state = state; self.fsm.missing_ticks = 0; diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index c184a6aa18a..855f60e70e1 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -43,6 +43,7 @@ use crate::{Error, Result}; use keys::{enc_end_key, enc_start_key}; use pd_client::INVALID_ID; use tikv_util::collections::HashMap; +use tikv_util::time::Instant as UtilInstant; use tikv_util::time::{duration_to_sec, monotonic_raw_now}; use tikv_util::worker::Scheduler; @@ -56,6 +57,7 @@ use super::util::{self, check_region_epoch, is_initial_msg, Lease, LeaseState}; use super::DestroyPeerJob; const SHRINK_CACHE_CAPACITY: usize = 64; +const MIN_BCAST_WAKE_UP_INTERVAL: u64 = 1_000; // 1s /// The returned states of the peer after checking whether it is stale #[derive(Debug, PartialEq, Eq)] @@ -238,6 +240,9 @@ pub struct Peer { /// Write Statistics for PD to schedule hot spot. pub peer_stat: PeerStat, + + /// Time of the last attempt to wake up inactive leader. + pub bcast_wake_up_time: Option, } impl Peer { @@ -315,6 +320,7 @@ impl Peer { pending_messages: vec![], peer_stat: PeerStat::default(), catch_up_logs: None, + bcast_wake_up_time: None, }; // If this region has only one peer and I am the one, campaign directly. @@ -1625,7 +1631,7 @@ impl Peer { /// Propose a request. /// /// Return true means the request has been proposed successfully. - pub fn propose( + pub fn propose( &mut self, ctx: &mut PollContext, cb: Callback, @@ -1968,7 +1974,7 @@ impl Peer { // 1. The region is in merging or splitting; // 2. The message is stale and dropped by the Raft group internally; // 3. There is already a read request proposed in the current lease; - fn read_index( + fn read_index( &mut self, poll_ctx: &mut PollContext, req: RaftCmdRequest, @@ -2021,6 +2027,15 @@ impl Peer { box_err!("{} can not read index due to no leader", self.tag), ); poll_ctx.raft_metrics.invalid_proposal.read_index_no_leader += 1; + // The leader may be hibernated, send a message for trying to awaken the leader. + if poll_ctx.cfg.hibernate_regions + && (self.bcast_wake_up_time.is_none() + || self.bcast_wake_up_time.as_ref().unwrap().elapsed() + >= Duration::from_millis(MIN_BCAST_WAKE_UP_INTERVAL)) + { + self.bcast_wake_up_message(&mut poll_ctx.trans); + self.bcast_wake_up_time = Some(UtilInstant::now_coarse()); + } cb.invoke_with_response(err_resp); return false; } @@ -2031,6 +2046,7 @@ impl Peer { poll_ctx.raft_metrics.propose.read_index += 1; + self.bcast_wake_up_time = None; let id = Uuid::new_v4(); self.raft_group.read_index(id.as_bytes().to_vec()); @@ -2567,6 +2583,9 @@ impl Peer { send_msg.set_from_peer(self.peer.clone()); send_msg.set_region_epoch(self.region().get_region_epoch().clone()); send_msg.set_to_peer(peer.clone()); + send_msg + .mut_message() + .set_index(self.raft_group.get_store().committed_index()); let extra_msg = send_msg.mut_extra_msg(); extra_msg.set_type(ExtraMessageType::MsgRegionWakeUp); if let Err(e) = trans.send(send_msg) { diff --git a/tests/integrations/raftstore/test_replica_read.rs b/tests/integrations/raftstore/test_replica_read.rs index 52a8e9a93f0..f611ae0076b 100644 --- a/tests/integrations/raftstore/test_replica_read.rs +++ b/tests/integrations/raftstore/test_replica_read.rs @@ -5,6 +5,7 @@ use std::mem; use std::sync::atomic::AtomicBool; use std::sync::mpsc::{self, RecvTimeoutError}; use std::sync::{Arc, Mutex}; +use std::thread; use std::time::{Duration, Instant}; use kvproto::raft_serverpb::RaftMessage; @@ -148,6 +149,65 @@ fn test_replica_read_on_hibernate() { } } +#[test] +fn test_read_hibernated_region() { + let mut cluster = new_node_cluster(0, 3); + // Initialize the cluster. + configure_for_lease_read(&mut cluster, Some(100), Some(8)); + cluster.cfg.raft_store.raft_store_max_leader_lease = ReadableDuration(Duration::from_millis(1)); + cluster.pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + let p2 = new_peer(2, 2); + cluster.pd_client.must_add_peer(r1, p2.clone()); + let p3 = new_peer(3, 3); + cluster.pd_client.must_add_peer(r1, p3.clone()); + cluster.must_put(b"k0", b"v0"); + let region = cluster.get_region(b"k0"); + cluster.must_transfer_leader(region.get_id(), p3); + // Make sure leader writes the data. + must_get_equal(&cluster.get_engine(3), b"k0", b"v0"); + // Wait for region is hibernated. + thread::sleep(Duration::from_secs(1)); + cluster.stop_node(2); + cluster.run_node(2).unwrap(); + + let dropped_msgs = Arc::new(Mutex::new(Vec::new())); + let (tx, rx) = mpsc::sync_channel(1); + let filter = Box::new( + RegionPacketFilter::new(1, 3) + .direction(Direction::Recv) + .reserve_dropped(Arc::clone(&dropped_msgs)) + .set_msg_callback(Arc::new(move |msg: &RaftMessage| { + if msg.has_extra_msg() { + tx.send(msg.clone()).unwrap(); + } + })), + ); + cluster.sim.wl().add_recv_filter(3, filter); + // This request will fail because no valid leader. + let resp1_ch = async_read_on_peer(&mut cluster, p2.clone(), region.clone(), b"k1", true, true); + let resp1 = resp1_ch.recv_timeout(Duration::from_secs(5)).unwrap(); + assert!( + resp1 + .get_header() + .get_error() + .get_message() + .contains("can not read index due to no leader"), + "{:?}", + resp1.get_header() + ); + // Wait util receiving wake up message. + let wake_up_msg = rx.recv_timeout(Duration::from_secs(5)).unwrap(); + cluster.sim.wl().clear_recv_filters(3); + let router = cluster.sim.wl().get_router(3).unwrap(); + router.send_raft_message(wake_up_msg).unwrap(); + // Wait for the leader is woken up. + thread::sleep(Duration::from_millis(500)); + let resp2_ch = async_read_on_peer(&mut cluster, p2, region, b"k1", true, true); + let resp2 = resp2_ch.recv_timeout(Duration::from_secs(5)).unwrap(); + assert!(!resp2.get_header().has_error(), "{:?}", resp2); +} + #[derive(Default)] struct CommitToFilter { // map[peer_id] -> committed index.