Skip to content

Commit

Permalink
Provide split key in tablet reader (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo authored Oct 9, 2023
1 parent 2452c4a commit 28583d7
Show file tree
Hide file tree
Showing 17 changed files with 537 additions and 57 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proxy_components/mock-engine-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ rand = "0.8"
resolved_ts = { workspace = true }
resource_control = { workspace = true }
resource_metering = { workspace = true }
serde_json = "1.0"
service = { workspace = true }
security = { workspace = true, default-features = false }
server = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ impl ClusterExt {
cluster_ext_ptr: isize,
mock_cfg: MockConfig,
pd_client: Option<Arc<dyn pd_client::PdClient>>,
proxy_cfg: &ProxyConfig,
) -> (FFIHelperSet, TikvConfig) {
// We must allocate on heap to avoid move.

let proxy_config_str = serde_json::to_string(proxy_cfg).unwrap_or_default();
let proxy = Box::new(engine_store_ffi::ffi::RaftStoreProxy::new(
AtomicU8::new(RaftProxyStatus::Idle as u8),
key_mgr.clone(),
Expand All @@ -96,6 +99,7 @@ impl ClusterExt {
},
None,
pd_client,
proxy_config_str,
));

let proxy_ref = proxy.as_ref();
Expand Down Expand Up @@ -163,6 +167,7 @@ impl ClusterExt {
cluster_ext_ptr,
mock_cfg,
pd_client,
proxy_cfg,
);

// We can not use moved or cloned engines any more.
Expand Down
2 changes: 2 additions & 0 deletions proxy_components/proxy_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ lazy_static = "1.3"
protobuf = { version = "2.8", features = ["bytes"] }
pd_client = { workspace = true }
raftstore = { workspace = true, default-features = false }
serde_json = "1.0"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" }
tikv_util = { workspace = true, default-features = false }
Expand All @@ -46,6 +47,7 @@ tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hot
tracker = { workspace = true, default-features = false }
reqwest = { version = "0.11", features = ["blocking"] }
url = "2.4.0"
collections = { workspace = true }

[dependencies.rocksdb]
git = "https://github.com/tikv/rust-rocksdb.git"
Expand Down
169 changes: 167 additions & 2 deletions proxy_components/proxy_ffi/src/domain_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE};
use super::{
interfaces_ffi,
interfaces_ffi::{
BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, SSTView, SSTViewVec,
WriteCmdType, WriteCmdsView,
BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, RustStrWithView,
RustStrWithViewVec, SSTView, SSTViewVec, WriteCmdType, WriteCmdsView,
},
read_index_helper, utils,
};
Expand Down Expand Up @@ -131,6 +131,8 @@ pub enum RawRustPtrType {
ReadIndexTask = 1,
ArcFutureWaker = 2,
TimerTask = 3,
String = 4,
VecOfString = 5,
}

impl From<u32> for RawRustPtrType {
Expand Down Expand Up @@ -162,6 +164,12 @@ pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRu
RawRustPtrType::TimerTask => unsafe {
drop(Box::from_raw(data as *mut utils::TimerTask));
},
RawRustPtrType::String => unsafe {
drop(Box::from_raw(data as *mut RustStrWithViewInner));
},
RawRustPtrType::VecOfString => unsafe {
drop(Box::from_raw(data as *mut RustStrWithViewVecInner));
},
_ => unreachable!(),
}
}
Expand All @@ -180,3 +188,160 @@ impl RawRustPtr {
self.ptr.is_null()
}
}

#[derive(Default)]
pub struct TestGcObjectMonitor {
rust: std::sync::Mutex<collections::HashMap<interfaces_ffi::RawRustPtrType, isize>>,
}

impl TestGcObjectMonitor {
pub fn add_rust(&self, t: &interfaces_ffi::RawRustPtrType, x: isize) {
use std::collections::hash_map::Entry;
let data = &mut *self.rust.lock().unwrap();
match data.entry(*t) {
Entry::Occupied(mut v) => {
*v.get_mut() += x;
}
Entry::Vacant(v) => {
v.insert(x);
}
}
}
pub fn valid_clean_rust(&self) -> bool {
let data = &*self.rust.lock().unwrap();
for (k, v) in data {
if *v != 0 {
tikv_util::error!(
"TestGcObjectMonitor::valid_clean failed at type {} refcount {}",
k,
v
);
return false;
}
}
true
}
pub fn is_empty_rust(&self) -> bool {
let data = &*self.rust.lock().unwrap();
data.is_empty()
}
}

#[cfg(any(test, feature = "testexport"))]
lazy_static::lazy_static! {
pub static ref TEST_GC_OBJ_MONITOR: TestGcObjectMonitor = TestGcObjectMonitor::default();
}

impl Default for RustStrWithViewVec {
fn default() -> Self {
RustStrWithViewVec {
buffs: std::ptr::null_mut(),
len: 0,
inner: RawRustPtr::default(),
}
}
}

struct RustStrWithViewVecInner {
// Hold the Vec of String.
#[allow(clippy::box_collection)]
_data: Pin<Box<Vec<Vec<u8>>>>,
// Hold the BaseBuffView array.
#[allow(clippy::box_collection)]
buff_view_vec: Pin<Box<Vec<BaseBuffView>>>,
}

impl Drop for RustStrWithViewVecInner {
fn drop(&mut self) {
#[cfg(any(test, feature = "testexport"))]
{
TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::VecOfString.into(), -1);
}
}
}

pub fn build_from_vec_string(s: Vec<Vec<u8>>) -> RustStrWithViewVec {
let vec_len = s.len();
let vec_len_64: u64 = vec_len as u64;
let inner_vec_of_string = Box::pin(s);
let mut buff_view_vec: Vec<BaseBuffView> = Vec::with_capacity(vec_len);
for i in 0..vec_len {
buff_view_vec.push(BaseBuffView {
data: inner_vec_of_string[i].as_ptr() as *const _,
len: inner_vec_of_string[i].len() as u64,
});
}
let inner = Box::new(RustStrWithViewVecInner {
_data: inner_vec_of_string,
buff_view_vec: Box::pin(buff_view_vec),
});
#[cfg(any(test, feature = "testexport"))]
{
TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::VecOfString.into(), 1);
}
let inner_wrapped = RawRustPtr {
ptr: inner.as_ref() as *const _ as RawVoidPtr,
type_: RawRustPtrType::VecOfString.into(),
};
let buff_view_vec_ptr = inner.buff_view_vec.as_ptr();
std::mem::forget(inner);

RustStrWithViewVec {
buffs: buff_view_vec_ptr,
len: vec_len_64,
inner: inner_wrapped,
}
}

impl Default for RustStrWithView {
fn default() -> Self {
RustStrWithView {
buff: BaseBuffView {
data: std::ptr::null(),
len: 0,
},
inner: RawRustPtr::default(),
}
}
}

struct RustStrWithViewInner {
// Hold the String.
#[allow(clippy::box_collection)]
_data: Pin<Box<Vec<u8>>>,
}

impl Drop for RustStrWithViewInner {
fn drop(&mut self) {
#[cfg(any(test, feature = "testexport"))]
{
TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::String.into(), -1);
}
}
}

pub fn build_from_string(s: Vec<u8>) -> RustStrWithView {
let str_len = s.len();
let inner_string = Box::pin(s);
let buff = BaseBuffView {
data: inner_string.as_ptr() as *const _,
len: str_len as u64,
};
let inner = Box::new(RustStrWithViewInner {
_data: inner_string,
});
#[cfg(any(test, feature = "testexport"))]
{
TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::String.into(), 1);
}
let inner_wrapped = RawRustPtr {
ptr: inner.as_ref() as *const _ as RawVoidPtr,
type_: RawRustPtrType::String.into(),
};
std::mem::forget(inner);

RustStrWithView {
buff,
inner: inner_wrapped,
}
}
33 changes: 32 additions & 1 deletion proxy_components/proxy_ffi/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,19 @@ pub mod root {
}
#[repr(C)]
#[derive(Debug)]
pub struct RustStrWithView {
pub buff: root::DB::BaseBuffView,
pub inner: root::DB::RawRustPtr,
}
#[repr(C)]
#[derive(Debug)]
pub struct RustStrWithViewVec {
pub buffs: *const root::DB::BaseBuffView,
pub len: u64,
pub inner: root::DB::RawRustPtr,
}
#[repr(C)]
#[derive(Debug)]
pub struct SSTReaderInterfaces {
pub fn_get_sst_reader: ::std::option::Option<
unsafe extern "C" fn(
Expand Down Expand Up @@ -285,6 +298,18 @@ pub mod root {
arg4: root::DB::BaseBuffView,
),
>,
pub fn_approx_size: ::std::option::Option<
unsafe extern "C" fn(
arg1: root::DB::SSTReaderPtr,
arg2: root::DB::ColumnFamilyType,
) -> u64,
>,
pub fn_get_split_keys: ::std::option::Option<
unsafe extern "C" fn(
arg1: root::DB::SSTReaderPtr,
splits_count: u64,
) -> root::DB::RustStrWithViewVec,
>,
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
Expand Down Expand Up @@ -434,6 +459,12 @@ pub mod root {
applied_index: u64,
),
>,
pub fn_get_config_json: ::std::option::Option<
unsafe extern "C" fn(
arg1: root::DB::RaftStoreProxyPtr,
kind: u64,
) -> root::DB::RustStrWithView,
>,
}
#[repr(C)]
#[derive(Debug)]
Expand Down Expand Up @@ -659,7 +690,7 @@ pub mod root {
arg3: root::DB::RawVoidPtr,
) -> u32;
}
pub const RAFT_STORE_PROXY_VERSION: u64 = 3797917752479181299;
pub const RAFT_STORE_PROXY_VERSION: u64 = 5692329170612304456;
pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639;
}
}
8 changes: 8 additions & 0 deletions proxy_components/proxy_ffi/src/raftstore_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct RaftStoreProxy {
raftstore_proxy_engine: RwLock<Option<Eng>>,
pd_client: Option<Arc<dyn PdClient>>,
cluster_raftstore_ver: RwLock<RaftstoreVer>,
proxy_config_str: String,
}

impl RaftStoreProxy {
Expand All @@ -38,6 +39,7 @@ impl RaftStoreProxy {
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
raftstore_proxy_engine: Option<Eng>,
pd_client: Option<Arc<dyn PdClient>>,
proxy_config_str: String,
) -> Self {
RaftStoreProxy {
status,
Expand All @@ -46,6 +48,7 @@ impl RaftStoreProxy {
raftstore_proxy_engine: RwLock::new(raftstore_proxy_engine),
pd_client,
cluster_raftstore_ver: RwLock::new(RaftstoreVer::Uncertain),
proxy_config_str,
}
}
}
Expand Down Expand Up @@ -378,6 +381,11 @@ impl RaftStoreProxy {
unreachable!()
}
}

// TODO may be we can later move ProxyConfig to proxy_ffi.
pub fn get_proxy_config_str(&self) -> &String {
&self.proxy_config_str
}
}

pub trait RaftStoreProxyEngineTrait {
Expand Down
Loading

0 comments on commit 28583d7

Please sign in to comment.