diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/fap_snapshot.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/fap_snapshot.rs index 172e819e333..22724589004 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/fap_snapshot.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/fap_snapshot.rs @@ -78,20 +78,21 @@ impl ProxyForwarder { maybe_snap: Option<&store::Snapshot>, ) -> bool { let region_id = ob_region.get_id(); - let try_apply_fap_snapshot = |c: Arc, restarted: bool| { - debug!("fast path: start applying first snapshot {}:{} {}", self.store_id, region_id, peer_id; + let try_apply_fap_snapshot = |c: Arc| { + let already_existed = self + .engine_store_server_helper + .kvstore_region_exist(region_id); + if already_existed { + debug!("fast path: skip apply snapshot because not first {}:{} {}", self.store_id, region_id, peer_id; + "snap_key" => ?snap_key, + "region_id" => region_id, + ); + return false; + } + info!("fast path: start applying first fap snapshot {}:{} {}", self.store_id, region_id, peer_id; "snap_key" => ?snap_key, "region_id" => region_id, ); - let snap = match maybe_snap { - Some(s) => s, - None => { - c.snapshot_inflight.store(0, Ordering::SeqCst); - c.fast_add_peer_start.store(0, Ordering::SeqCst); - c.inited_or_fallback.store(true, Ordering::SeqCst); - return false; - } - }; // Even if the feature is not enabled, the snapshot could still be a previously // generated fap snapshot. So we have to also handle this snapshot, // to prevent error data. @@ -102,19 +103,17 @@ impl ProxyForwarder { .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); - let expected_snapshot_type = Self::deduce_snapshot_type(peer_id, snap); - let assert_exist = match expected_snapshot_type { - SnapshotDeducedType::Uncertain => { - if !restarted { - snapshot_sent_time != 0 - } else { - false - } + let snap = match maybe_snap { + Some(s) => s, + None => { + return false; } - SnapshotDeducedType::Fap => true, - SnapshotDeducedType::Regular => false, }; + // We can't rely on `snapshot_inflight`, because it will be undetermined ZERO + // after restart. + let expected_snapshot_type = Self::deduce_snapshot_type(peer_id, snap); + let quit_apply_fap = |tag: &str| { info!("fast path: fap snapshot mismatch/nonexist {}:{} {}", self.store_id, region_id, peer_id; "snap_key" => ?snap_key, @@ -122,7 +121,6 @@ impl ProxyForwarder { "cost_snapshot" => current.as_millis() - snapshot_sent_time, "cost_total" => current.as_millis() - fap_start_time, "current_enabled" => current_enabled, - "from_restart" => restarted, "tag" => tag ); if expected_snapshot_type == SnapshotDeducedType::Fap { @@ -176,7 +174,6 @@ impl ProxyForwarder { "cost_snapshot" => current.as_millis() - snapshot_sent_time, "cost_total" => current.as_millis() - fap_start_time, "current_enabled" => current_enabled, - "from_restart" => restarted, "replacement_of_regular" => expected_snapshot_type == SnapshotDeducedType::Regular ); c.snapshot_inflight.store(0, Ordering::SeqCst); @@ -200,26 +197,13 @@ impl ProxyForwarder { let mut applied_fap = false; #[allow(clippy::collapsible_if)] if should_check_fap_snapshot { - let mut maybe_cached_info: Option> = None; if self .get_cached_manager() .access_cached_region_info_mut( region_id, |info: MapEntry>| match info { MapEntry::Occupied(o) => { - maybe_cached_info = Some(o.get().clone()); - let already_existed = self.engine_store_server_helper.kvstore_region_exist(region_id); - debug!("fast path: check should apply fap snapshot {}:{} {}", self.store_id, region_id, peer_id; - "snap_key" => ?snap_key, - "region_id" => region_id, - "inited_or_fallback" => o.get().inited_or_fallback.load(Ordering::SeqCst), - "snapshot_inflight" => o.get().snapshot_inflight.load(Ordering::SeqCst), - "already_existed" => already_existed, - ); - if !already_existed { - // May be a fap snapshot, try to apply. - applied_fap = try_apply_fap_snapshot(o.get().clone(), false); - } + applied_fap = try_apply_fap_snapshot(o.get().clone()); } MapEntry::Vacant(_) => { // It won't go here because cached region info is inited after restart and on the first fap message. @@ -229,7 +213,7 @@ impl ProxyForwarder { ); assert!(self.is_initialized(region_id)); let o = Arc::new(CachedRegionInfo::default()); - applied_fap = try_apply_fap_snapshot(o, true); + applied_fap = try_apply_fap_snapshot(o); } }, ) diff --git a/proxy_tests/proxy/shared/fast_add_peer/fp.rs b/proxy_tests/proxy/shared/fast_add_peer/fp.rs index e92909a3bcc..b6329e7a516 100644 --- a/proxy_tests/proxy/shared/fast_add_peer/fp.rs +++ b/proxy_tests/proxy/shared/fast_add_peer/fp.rs @@ -61,6 +61,7 @@ fn test_disable_fap() { restart_tiflash_node(&mut cluster, 2); pd_client.must_add_peer(1, new_learner_peer(2, 2)); + // The original fap snapshot not match check_key(&cluster, b"k0", b"v0", Some(true), None, Some(vec![2])); check_key(&cluster, b"k10", b"v10", Some(true), None, Some(vec![2]));