Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Jan 26, 2024
1 parent e80af19 commit f1b7e33
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,21 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
maybe_snap: Option<&store::Snapshot>,
) -> bool {
let region_id = ob_region.get_id();
let try_apply_fap_snapshot = |c: Arc<CachedRegionInfo>, restarted: bool| {
debug!("fast path: start applying first snapshot {}:{} {}", self.store_id, region_id, peer_id;
let try_apply_fap_snapshot = |c: Arc<CachedRegionInfo>| {
let already_existed = self
.engine_store_server_helper
.kvstore_region_exist(region_id);
if already_existed {
debug!("fast path: skip apply snapshot because not first {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);
return false;
}
info!("fast path: start applying first fap snapshot {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);
let snap = match maybe_snap {
Some(s) => s,
None => {
c.snapshot_inflight.store(0, Ordering::SeqCst);
c.fast_add_peer_start.store(0, Ordering::SeqCst);
c.inited_or_fallback.store(true, Ordering::SeqCst);
return false;
}
};
// Even if the feature is not enabled, the snapshot could still be a previously
// generated fap snapshot. So we have to also handle this snapshot,
// to prevent error data.
Expand All @@ -102,27 +103,24 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();

let expected_snapshot_type = Self::deduce_snapshot_type(peer_id, snap);
let assert_exist = match expected_snapshot_type {
SnapshotDeducedType::Uncertain => {
if !restarted {
snapshot_sent_time != 0
} else {
false
}
let snap = match maybe_snap {
Some(s) => s,
None => {
return false;
}
SnapshotDeducedType::Fap => true,
SnapshotDeducedType::Regular => false,
};

// We can't rely on `snapshot_inflight`, because it will be undetermined ZERO
// after restart.
let expected_snapshot_type = Self::deduce_snapshot_type(peer_id, snap);

let quit_apply_fap = |tag: &str| {
info!("fast path: fap snapshot mismatch/nonexist {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"cost_snapshot" => current.as_millis() - snapshot_sent_time,
"cost_total" => current.as_millis() - fap_start_time,
"current_enabled" => current_enabled,
"from_restart" => restarted,
"tag" => tag
);
if expected_snapshot_type == SnapshotDeducedType::Fap {
Expand Down Expand Up @@ -176,7 +174,6 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
"cost_snapshot" => current.as_millis() - snapshot_sent_time,
"cost_total" => current.as_millis() - fap_start_time,
"current_enabled" => current_enabled,
"from_restart" => restarted,
"replacement_of_regular" => expected_snapshot_type == SnapshotDeducedType::Regular
);
c.snapshot_inflight.store(0, Ordering::SeqCst);
Expand All @@ -200,26 +197,13 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
let mut applied_fap = false;
#[allow(clippy::collapsible_if)]
if should_check_fap_snapshot {
let mut maybe_cached_info: Option<Arc<CachedRegionInfo>> = None;
if self
.get_cached_manager()
.access_cached_region_info_mut(
region_id,
|info: MapEntry<u64, Arc<CachedRegionInfo>>| match info {
MapEntry::Occupied(o) => {
maybe_cached_info = Some(o.get().clone());
let already_existed = self.engine_store_server_helper.kvstore_region_exist(region_id);
debug!("fast path: check should apply fap snapshot {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"inited_or_fallback" => o.get().inited_or_fallback.load(Ordering::SeqCst),
"snapshot_inflight" => o.get().snapshot_inflight.load(Ordering::SeqCst),
"already_existed" => already_existed,
);
if !already_existed {
// May be a fap snapshot, try to apply.
applied_fap = try_apply_fap_snapshot(o.get().clone(), false);
}
applied_fap = try_apply_fap_snapshot(o.get().clone());
}
MapEntry::Vacant(_) => {
// It won't go here because cached region info is inited after restart and on the first fap message.
Expand All @@ -229,7 +213,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
);
assert!(self.is_initialized(region_id));
let o = Arc::new(CachedRegionInfo::default());
applied_fap = try_apply_fap_snapshot(o, true);
applied_fap = try_apply_fap_snapshot(o);
}
},
)
Expand Down
1 change: 1 addition & 0 deletions proxy_tests/proxy/shared/fast_add_peer/fp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ fn test_disable_fap() {

restart_tiflash_node(&mut cluster, 2);
pd_client.must_add_peer(1, new_learner_peer(2, 2));
// The original fap snapshot not match
check_key(&cluster, b"k0", b"v0", Some(true), None, Some(vec![2]));
check_key(&cluster, b"k10", b"v10", Some(true), None, Some(vec![2]));

Expand Down

0 comments on commit f1b7e33

Please sign in to comment.