Skip to content

Commit

Permalink
raftstore: wake up hibernated leader after failed read index (tikv#6648)
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers authored and c1ay committed May 9, 2020
1 parent 2467150 commit 38f9649
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 4 deletions.
22 changes: 20 additions & 2 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use kvproto::raft_cmdpb::{
StatusResponse,
};
use kvproto::raft_serverpb::{
MergeState, PeerState, RaftMessage, RaftSnapshotData, RaftTruncatedState, RegionLocalState,
ExtraMessageType, MergeState, PeerState, RaftMessage, RaftSnapshotData, RaftTruncatedState,
RegionLocalState,
};
use pd_client::PdClient;
use protobuf::Message;
Expand Down Expand Up @@ -392,6 +393,12 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.register_split_region_check_tick();
self.register_check_peer_stale_state_tick();
self.on_check_merge();
// Apply committed entries more quickly.
if self.fsm.peer.raft_group.get_store().committed_index()
> self.fsm.peer.raft_group.get_store().applied_index()
{
self.fsm.has_ready = true;
}
}

fn on_gc_snap(&mut self, snaps: Vec<(SnapKey, bool)>) {
Expand Down Expand Up @@ -809,7 +816,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

if msg.has_extra_msg() {
// now noop
self.on_extra_message(&msg);
return Ok(());
}

Expand Down Expand Up @@ -855,6 +862,17 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
Ok(())
}

fn on_extra_message(&mut self, msg: &RaftMessage) {
let extra_msg = msg.get_extra_msg();
match extra_msg.get_type() {
ExtraMessageType::MsgRegionWakeUp => {
if msg.get_message().get_index() < self.fsm.peer.get_store().committed_index() {
self.reset_raft_tick(GroupState::Ordered);
}
}
}
}

fn reset_raft_tick(&mut self, state: GroupState) {
self.fsm.group_state = state;
self.fsm.missing_ticks = 0;
Expand Down
23 changes: 21 additions & 2 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::{Error, Result};
use keys::{enc_end_key, enc_start_key};
use pd_client::INVALID_ID;
use tikv_util::collections::HashMap;
use tikv_util::time::Instant as UtilInstant;
use tikv_util::time::{duration_to_sec, monotonic_raw_now};
use tikv_util::worker::Scheduler;

Expand All @@ -56,6 +57,7 @@ use super::util::{self, check_region_epoch, is_initial_msg, Lease, LeaseState};
use super::DestroyPeerJob;

const SHRINK_CACHE_CAPACITY: usize = 64;
const MIN_BCAST_WAKE_UP_INTERVAL: u64 = 1_000; // 1s

/// The returned states of the peer after checking whether it is stale
#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -238,6 +240,9 @@ pub struct Peer {

/// Write Statistics for PD to schedule hot spot.
pub peer_stat: PeerStat,

/// Time of the last attempt to wake up inactive leader.
pub bcast_wake_up_time: Option<UtilInstant>,
}

impl Peer {
Expand Down Expand Up @@ -315,6 +320,7 @@ impl Peer {
pending_messages: vec![],
peer_stat: PeerStat::default(),
catch_up_logs: None,
bcast_wake_up_time: None,
};

// If this region has only one peer and I am the one, campaign directly.
Expand Down Expand Up @@ -1625,7 +1631,7 @@ impl Peer {
/// Propose a request.
///
/// Return true means the request has been proposed successfully.
pub fn propose<T, C>(
pub fn propose<T: Transport, C>(
&mut self,
ctx: &mut PollContext<T, C>,
cb: Callback<RocksEngine>,
Expand Down Expand Up @@ -1968,7 +1974,7 @@ impl Peer {
// 1. The region is in merging or splitting;
// 2. The message is stale and dropped by the Raft group internally;
// 3. There is already a read request proposed in the current lease;
fn read_index<T, C>(
fn read_index<T: Transport, C>(
&mut self,
poll_ctx: &mut PollContext<T, C>,
req: RaftCmdRequest,
Expand Down Expand Up @@ -2021,6 +2027,15 @@ impl Peer {
box_err!("{} can not read index due to no leader", self.tag),
);
poll_ctx.raft_metrics.invalid_proposal.read_index_no_leader += 1;
// The leader may be hibernated, send a message for trying to awaken the leader.
if poll_ctx.cfg.hibernate_regions
&& (self.bcast_wake_up_time.is_none()
|| self.bcast_wake_up_time.as_ref().unwrap().elapsed()
>= Duration::from_millis(MIN_BCAST_WAKE_UP_INTERVAL))
{
self.bcast_wake_up_message(&mut poll_ctx.trans);
self.bcast_wake_up_time = Some(UtilInstant::now_coarse());
}
cb.invoke_with_response(err_resp);
return false;
}
Expand All @@ -2031,6 +2046,7 @@ impl Peer {

poll_ctx.raft_metrics.propose.read_index += 1;

self.bcast_wake_up_time = None;
let id = Uuid::new_v4();
self.raft_group.read_index(id.as_bytes().to_vec());

Expand Down Expand Up @@ -2567,6 +2583,9 @@ impl Peer {
send_msg.set_from_peer(self.peer.clone());
send_msg.set_region_epoch(self.region().get_region_epoch().clone());
send_msg.set_to_peer(peer.clone());
send_msg
.mut_message()
.set_index(self.raft_group.get_store().committed_index());
let extra_msg = send_msg.mut_extra_msg();
extra_msg.set_type(ExtraMessageType::MsgRegionWakeUp);
if let Err(e) = trans.send(send_msg) {
Expand Down
60 changes: 60 additions & 0 deletions tests/integrations/raftstore/test_replica_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::mem;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{self, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

use kvproto::raft_serverpb::RaftMessage;
Expand Down Expand Up @@ -148,6 +149,65 @@ fn test_replica_read_on_hibernate() {
}
}

#[test]
fn test_read_hibernated_region() {
let mut cluster = new_node_cluster(0, 3);
// Initialize the cluster.
configure_for_lease_read(&mut cluster, Some(100), Some(8));
cluster.cfg.raft_store.raft_store_max_leader_lease = ReadableDuration(Duration::from_millis(1));
cluster.pd_client.disable_default_operator();
let r1 = cluster.run_conf_change();
let p2 = new_peer(2, 2);
cluster.pd_client.must_add_peer(r1, p2.clone());
let p3 = new_peer(3, 3);
cluster.pd_client.must_add_peer(r1, p3.clone());
cluster.must_put(b"k0", b"v0");
let region = cluster.get_region(b"k0");
cluster.must_transfer_leader(region.get_id(), p3);
// Make sure leader writes the data.
must_get_equal(&cluster.get_engine(3), b"k0", b"v0");
// Wait for region is hibernated.
thread::sleep(Duration::from_secs(1));
cluster.stop_node(2);
cluster.run_node(2).unwrap();

let dropped_msgs = Arc::new(Mutex::new(Vec::new()));
let (tx, rx) = mpsc::sync_channel(1);
let filter = Box::new(
RegionPacketFilter::new(1, 3)
.direction(Direction::Recv)
.reserve_dropped(Arc::clone(&dropped_msgs))
.set_msg_callback(Arc::new(move |msg: &RaftMessage| {
if msg.has_extra_msg() {
tx.send(msg.clone()).unwrap();
}
})),
);
cluster.sim.wl().add_recv_filter(3, filter);
// This request will fail because no valid leader.
let resp1_ch = async_read_on_peer(&mut cluster, p2.clone(), region.clone(), b"k1", true, true);
let resp1 = resp1_ch.recv_timeout(Duration::from_secs(5)).unwrap();
assert!(
resp1
.get_header()
.get_error()
.get_message()
.contains("can not read index due to no leader"),
"{:?}",
resp1.get_header()
);
// Wait util receiving wake up message.
let wake_up_msg = rx.recv_timeout(Duration::from_secs(5)).unwrap();
cluster.sim.wl().clear_recv_filters(3);
let router = cluster.sim.wl().get_router(3).unwrap();
router.send_raft_message(wake_up_msg).unwrap();
// Wait for the leader is woken up.
thread::sleep(Duration::from_millis(500));
let resp2_ch = async_read_on_peer(&mut cluster, p2, region, b"k1", true, true);
let resp2 = resp2_ch.recv_timeout(Duration::from_secs(5)).unwrap();
assert!(!resp2.get_header().has_error(), "{:?}", resp2);
}

#[derive(Default)]
struct CommitToFilter {
// map[peer_id] -> committed index.
Expand Down

0 comments on commit 38f9649

Please sign in to comment.