diff --git a/src/binder/copy.rs b/src/binder/copy.rs index 0d17f0eef..d574ac87a 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -62,7 +62,7 @@ impl Binder { } => (table_name, columns), CopySource::Query(_) => return Err(BindError::Todo("copy from query".into())), }; - let (table, is_internal) = self.bind_table_id(&table_name)?; + let (table, _) = self.bind_table_id(&table_name)?; let cols = self.bind_table_columns(&table_name, &columns)?; @@ -76,18 +76,11 @@ impl Binder { let copy = if to { // COPY TO - let scan = if is_internal { - self.egraph.add(Node::Internal([table, cols])) - } else { - let true_ = self.egraph.add(Node::true_()); - self.egraph.add(Node::Scan([table, cols, true_])) - }; + let true_ = self.egraph.add(Node::true_()); + let scan = self.egraph.add(Node::Scan([table, cols, true_])); self.egraph.add(Node::CopyTo([ext_source, scan])) } else { // COPY FROM - if is_internal { - return Err(BindError::NotSupportedOnInternalTable); - } let types = self.type_(cols)?; let types = self.egraph.add(Node::Type(types)); let copy = self.egraph.add(Node::CopyFrom([ext_source, types])); diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 0ed9cacef..fb1897786 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -10,7 +10,7 @@ use itertools::Itertools; use crate::array; use crate::catalog::function::FunctionCatalog; -use crate::catalog::{RootCatalog, RootCatalogRef, TableRefId, DEFAULT_SCHEMA_NAME}; +use crate::catalog::{RootCatalog, RootCatalogRef, TableRefId}; use crate::parser::*; use crate::planner::{Expr as Node, RecExpr, TypeError, TypeSchemaAnalysis}; @@ -370,7 +370,7 @@ impl Binder { /// Split an object name into `(schema name, table name)`. fn split_name(name: &ObjectName) -> Result<(&str, &str)> { Ok(match name.0.as_slice() { - [table] => (DEFAULT_SCHEMA_NAME, &table.value), + [table] => (RootCatalog::DEFAULT_SCHEMA_NAME, &table.value), [schema, table] => (&schema.value, &table.value), _ => return Err(BindError::InvalidTableName(name.0.clone())), }) diff --git a/src/binder/table.rs b/src/binder/table.rs index 45699fa20..f80219eb4 100644 --- a/src/binder/table.rs +++ b/src/binder/table.rs @@ -3,7 +3,7 @@ use std::vec::Vec; use super::*; -use crate::catalog::{ColumnRefId, INTERNAL_SCHEMA_NAME}; +use crate::catalog::{ColumnRefId, RootCatalog}; impl Binder { /// Binds the FROM clause. Returns a nested [`Join`](Node::Join) plan of tables. @@ -60,14 +60,10 @@ impl Binder { fn bind_table_factor(&mut self, table: TableFactor) -> Result { match table { TableFactor::Table { name, alias, .. } => { - let (table_id, is_internal) = self.bind_table_id(&name)?; + let (table_id, _) = self.bind_table_id(&name)?; let cols = self.bind_table_def(&name, alias, false)?; - let id = if is_internal { - self.egraph.add(Node::Internal([table_id, cols])) - } else { - let null = self.egraph.add(Node::null()); - self.egraph.add(Node::Scan([table_id, cols, null])) - }; + let null = self.egraph.add(Node::null()); + let id = self.egraph.add(Node::Scan([table_id, cols, null])); Ok(id) } TableFactor::Derived { @@ -252,7 +248,7 @@ impl Binder { .get_table_id_by_name(schema_name, table_name) .ok_or_else(|| BindError::InvalidTable(table_name.into()))?; let id = self.egraph.add(Node::Table(table_ref_id)); - Ok((id, schema_name == INTERNAL_SCHEMA_NAME)) + Ok((id, table_ref_id.schema_id == RootCatalog::SYSTEM_SCHEMA_ID)) } } @@ -270,7 +266,7 @@ mod tests { let catalog = Arc::new(RootCatalog::new()); let col_catalog = ColumnCatalog::new(0, ColumnDesc::new("a", DataType::Int32, false)); catalog - .add_table(0, "t".into(), vec![col_catalog], false, vec![]) + .add_table(1, "t".into(), vec![col_catalog], false, vec![]) .unwrap(); let stmts = parse("select x.b from (select a as b from t) as x").unwrap(); diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 43861e2b6..abae2945f 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -11,12 +11,6 @@ pub use self::schema::*; pub use self::table::*; use crate::types::*; -pub static DEFAULT_SCHEMA_NAME: &str = "postgres"; -pub static INTERNAL_SCHEMA_NAME: &str = "pg_catalog"; - -static CONTRIBUTORS_TABLE_NAME: &str = "contributors"; -pub const CONTRIBUTORS_TABLE_ID: TableId = 0; - mod column; pub mod function; mod root; diff --git a/src/catalog/root.rs b/src/catalog/root.rs index 7612ecddb..50950c035 100644 --- a/src/catalog/root.rs +++ b/src/catalog/root.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex}; use super::function::FunctionCatalog; use super::*; +use crate::parser; /// The root of all catalogs. pub struct RootCatalog { @@ -27,8 +28,8 @@ impl Default for RootCatalog { impl RootCatalog { pub fn new() -> RootCatalog { let mut inner = Inner::default(); - inner.add_schema(DEFAULT_SCHEMA_NAME.into()).unwrap(); - inner.add_internals(); + inner.add_system_schema(); + inner.add_schema(Self::DEFAULT_SCHEMA_NAME.into()).unwrap(); RootCatalog { inner: Mutex::new(inner), } @@ -126,6 +127,10 @@ impl RootCatalog { let schema = inner.schemas.get_mut(&schema_idx).unwrap(); schema.create_function(name, arg_types, arg_names, return_type, language, body); } + + pub const DEFAULT_SCHEMA_NAME: &'static str = "postgres"; + pub const SYSTEM_SCHEMA_NAME: &'static str = "pg_catalog"; + pub const SYSTEM_SCHEMA_ID: TableId = 0; } impl Inner { @@ -141,34 +146,73 @@ impl Inner { Ok(schema_id) } - fn add_internals(&mut self) { - let schema_id = self.add_schema(INTERNAL_SCHEMA_NAME.into()).unwrap(); - let table_id = self - .schemas - .get_mut(&schema_id) - .unwrap() - .add_table( - CONTRIBUTORS_TABLE_NAME.to_string(), - vec![ColumnCatalog::new( - 0, - ColumnDesc::new("github_id", DataType::String, false), - )], - false, - vec![], - ) + fn add_system_schema(&mut self) { + let schema_id = self + .add_schema(RootCatalog::SYSTEM_SCHEMA_NAME.into()) .unwrap(); - assert_eq!(table_id, CONTRIBUTORS_TABLE_ID); + let system_schema = self.schemas.get_mut(&schema_id).unwrap(); + assert_eq!(schema_id, RootCatalog::SYSTEM_SCHEMA_ID); + + let stmts = parser::parse(CREATE_SYSTEM_TABLE_SQL).unwrap(); + for stmt in stmts { + let parser::Statement::CreateTable { name, columns, .. } = stmt else { + panic!("invalid system table sql: {stmt}"); + }; + system_schema + .add_table( + name.to_string(), + columns + .into_iter() + .enumerate() + .map(|(cid, col)| { + let mut column = ColumnCatalog::from(&col); + column.set_id(cid as u32); + column + }) + .collect(), + false, + vec![], + ) + .expect("failed to add system table"); + } } } fn split_name(name: &str) -> Option<(&str, &str)> { match name.split('.').collect::>()[..] { - [table] => Some((DEFAULT_SCHEMA_NAME, table)), + [table] => Some((RootCatalog::DEFAULT_SCHEMA_NAME, table)), [schema, table] => Some((schema, table)), _ => None, } } +const CREATE_SYSTEM_TABLE_SQL: &str = " + create table contributors ( + github_id string not null + ); + create table pg_tables ( + schema_id int not null, + schema_name string not null, + table_id int not null, + table_name string not null + ); + create table pg_attribute ( + schema_name string not null, + table_name string not null, + column_id int not null, + column_name string not null, + column_type string not null, + column_not_null boolean not null + ); + create table pg_stat ( + schema_name string not null, + table_name string not null, + column_name string not null, + n_row int, + n_distinct int + ); +"; + #[cfg(test)] mod tests { use std::sync::Arc; @@ -178,17 +222,21 @@ mod tests { #[test] fn test_root_catalog() { let catalog = Arc::new(RootCatalog::new()); - let schema_catalog1 = catalog.get_schema_by_id(0).unwrap(); + let schema_catalog1 = catalog + .get_schema_by_id(RootCatalog::SYSTEM_SCHEMA_ID) + .unwrap(); assert_eq!(schema_catalog1.id(), 0); - assert_eq!(schema_catalog1.name(), DEFAULT_SCHEMA_NAME); + assert_eq!(schema_catalog1.name(), RootCatalog::SYSTEM_SCHEMA_NAME); - let schema_catalog2 = catalog.get_schema_by_name(DEFAULT_SCHEMA_NAME).unwrap(); - assert_eq!(schema_catalog1.id(), schema_catalog2.id()); - assert_eq!(schema_catalog1.name(), schema_catalog2.name()); + let schema_catalog2 = catalog + .get_schema_by_name(RootCatalog::DEFAULT_SCHEMA_NAME) + .unwrap(); + assert_eq!(schema_catalog2.id(), 1); + assert_eq!(schema_catalog2.name(), RootCatalog::DEFAULT_SCHEMA_NAME); let col = ColumnCatalog::new(0, ColumnDesc::new("a", DataType::Int32, false)); let table_id = catalog - .add_table(0, "t".into(), vec![col], false, vec![]) + .add_table(1, "t".into(), vec![col], false, vec![]) .unwrap(); assert_eq!(table_id, 0); } diff --git a/src/catalog/schema.rs b/src/catalog/schema.rs index d9b97d4cf..8a01815a1 100644 --- a/src/catalog/schema.rs +++ b/src/catalog/schema.rs @@ -56,7 +56,7 @@ impl SchemaCatalog { pub(super) fn delete_table(&mut self, id: TableId) { let catalog = self.tables.remove(&id).unwrap(); - self.table_idxs.remove(&catalog.name()).unwrap(); + self.table_idxs.remove(catalog.name()).unwrap(); } pub fn all_tables(&self) -> HashMap> { diff --git a/src/catalog/table.rs b/src/catalog/table.rs index 41b546d32..1d33af41b 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -93,8 +93,8 @@ impl TableCatalog { .cloned() } - pub fn name(&self) -> String { - self.name.clone() + pub fn name(&self) -> &str { + &self.name } pub fn id(&self) -> TableId { diff --git a/src/db.rs b/src/db.rs index 4ba8ad6f9..d21c232a7 100644 --- a/src/db.rs +++ b/src/db.rs @@ -5,11 +5,9 @@ use std::sync::{Arc, Mutex}; use futures::TryStreamExt; use risinglight_proto::rowset::block_statistics::BlockStatisticsType; -use crate::array::{ - ArrayBuilder, ArrayBuilderImpl, Chunk, DataChunk, I32ArrayBuilder, StringArrayBuilder, -}; +use crate::array::Chunk; use crate::binder::bind_header; -use crate::catalog::{RootCatalogRef, TableRefId, INTERNAL_SCHEMA_NAME}; +use crate::catalog::{RootCatalog, RootCatalogRef, TableRefId}; use crate::parser::{parse, ParserError, Statement}; use crate::planner::Statistics; use crate::storage::{ @@ -53,148 +51,30 @@ impl Database { Ok(()) } - fn run_desc(&self, table_name: &str) -> Result, Error> { - let mut column_id = I32ArrayBuilder::new(); - let mut column_name = StringArrayBuilder::new(); - let mut column_type = StringArrayBuilder::new(); - let mut column_is_null = StringArrayBuilder::new(); - let mut column_is_primary = StringArrayBuilder::new(); - let table_catalog = self.catalog.get_table_by_name(table_name).unwrap(); - - let all_columns = table_catalog.all_columns(); - for (id, column) in &all_columns { - let name = column.name(); - let data_type = column.data_type().to_string().to_ascii_lowercase(); - let is_null = column.is_nullable(); - let is_primary = column.is_primary(); - - column_id.push(Some(&(*id as i32))); - column_name.push(Some(name)); - column_type.push(Some(&data_type)); - if is_null { - column_is_null.push(Some("nullable")); - } else { - column_is_null.push(Some("not null")); - } - - if is_primary { - column_is_primary.push(Some("primary")); - } else { - column_is_primary.push(None); - } - } - let vecs: Vec = vec![ - column_id.into(), - column_name.into(), - column_type.into(), - column_is_null.into(), - column_is_primary.into(), - ]; - Ok(vec![Chunk::new(vec![DataChunk::from_iter( - vecs.into_iter(), - )])]) - } - - fn run_dt(&self) -> Result, Error> { - let mut schema_id_vec = I32ArrayBuilder::new(); - let mut schema_vec = StringArrayBuilder::new(); - let mut table_id_vec = I32ArrayBuilder::new(); - let mut table_vec = StringArrayBuilder::new(); - for (_, schema) in self.catalog.all_schemas() { - for (_, table) in schema.all_tables() { - schema_id_vec.push(Some(&(schema.id() as i32))); - schema_vec.push(Some(&schema.name())); - table_id_vec.push(Some(&(table.id() as i32))); - table_vec.push(Some(&table.name())); - } - } - let vecs: Vec = vec![ - schema_id_vec.into(), - schema_vec.into(), - table_id_vec.into(), - table_vec.into(), - ]; - Ok(vec![Chunk::new(vec![DataChunk::from_iter( - vecs.into_iter(), - )])]) - } - - async fn run_internal(&self, cmd: &str) -> Result, Error> { - if let Some((cmd, arg)) = cmd.split_once(' ') { - if cmd == "stat" { - if let StorageImpl::SecondaryStorage(ref storage) = self.storage { - let (table, col) = arg.split_once(' ').expect("failed to parse command"); - let table_id = self - .catalog - .get_table_id_by_name("postgres", table) - .expect("table not found"); - let col_id = self - .catalog - .get_table(&table_id) - .unwrap() - .get_column_id_by_name(col) - .expect("column not found"); - let table = storage.get_table(table_id)?; - let txn = table.read().await?; - let row_count = txn.aggreagate_block_stat(&[ - ( - BlockStatisticsType::RowCount, - // Note that `col_id` is the column catalog id instead of storage - // column id. This should be fixed in the - // future. - StorageColumnRef::Idx(col_id), - ), - ( - BlockStatisticsType::DistinctValue, - StorageColumnRef::Idx(col_id), - ), - ]); - let mut stat_name = StringArrayBuilder::with_capacity(2); - let mut stat_value = StringArrayBuilder::with_capacity(2); - stat_name.push(Some("RowCount")); - stat_value.push(Some( - row_count[0] - .as_usize() - .unwrap() - .unwrap() - .to_string() - .as_str(), - )); - stat_name.push(Some("DistinctValue")); - stat_value.push(Some( - row_count[1] - .as_usize() - .unwrap() - .unwrap() - .to_string() - .as_str(), - )); - Ok(vec![Chunk::new(vec![DataChunk::from_iter([ - ArrayBuilderImpl::from(stat_name), - ArrayBuilderImpl::from(stat_value), - ])])]) - } else { - Err(Error::Internal( - "this storage engine doesn't support statistics".to_string(), - )) - } - } else if cmd == "d" { - self.run_desc(arg) - } else { - Err(Error::Internal("unsupported command".to_string())) - } - } else if cmd == "dt" { - self.run_dt() - } else { - Err(Error::Internal("unsupported command".to_string())) - } + /// Convert a command to SQL. + fn command_to_sql(&self, cmd: &str) -> Result { + let tokens = cmd.split_whitespace().collect::>(); + Ok(match tokens.as_slice() { + ["dt"] => "SELECT * FROM pg_catalog.pg_tables".to_string(), + ["d", table] => format!( + "SELECT * FROM pg_catalog.pg_attribute WHERE table_name = '{table}'", + ), + ["stat"] => "SELECT * FROM pg_catalog.pg_stat".to_string(), + ["stat", table] => format!("SELECT * FROM pg_catalog.pg_stat WHERE table_name = '{table}'"), + ["stat", table, column] => format!( + "SELECT * FROM pg_catalog.pg_stat WHERE table_name = '{table}' AND column_name = '{column}'", + ), + _ => return Err(Error::Internal("invalid command".into())), + }) } /// Run SQL queries and return the outputs. pub async fn run(&self, sql: &str) -> Result, Error> { - if let Some(cmdline) = sql.trim().strip_prefix('\\') { - return self.run_internal(cmdline).await; - } + let sql = if let Some(cmd) = sql.trim().strip_prefix('\\') { + self.command_to_sql(cmd)? + } else { + sql.to_string() + }; let optimizer = crate::planner::Optimizer::new( self.catalog.clone(), @@ -205,7 +85,7 @@ impl Database { }, ); - let stmts = parse(sql)?; + let stmts = parse(&sql)?; let mut outputs: Vec = vec![]; for stmt in stmts { if self.handle_set(&stmt)? { @@ -242,7 +122,7 @@ impl Database { }; for schema in self.catalog.all_schemas().values() { // skip internal schema - if schema.name() == INTERNAL_SCHEMA_NAME { + if schema.name() == RootCatalog::SYSTEM_SCHEMA_NAME { continue; } for table in schema.all_tables().values() { diff --git a/src/executor/insert.rs b/src/executor/insert.rs index dcc331259..da6acd29d 100644 --- a/src/executor/insert.rs +++ b/src/executor/insert.rs @@ -66,7 +66,7 @@ mod tests { async fn simple() { let storage = create_table().await; let executor = InsertExecutor { - table_id: TableRefId::new(0, 0), + table_id: TableRefId::new(1, 0), column_ids: vec![0, 1], storage: storage.as_in_memory_storage(), }; @@ -87,7 +87,7 @@ mod tests { storage .as_in_memory_storage() .create_table( - 0, + 1, "t", &[ ColumnCatalog::new(0, ColumnDesc::new("v1", DataType::Int32, false)), diff --git a/src/executor/internal.rs b/src/executor/internal.rs deleted file mode 100644 index 5f82268c0..000000000 --- a/src/executor/internal.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0. -use super::*; -use crate::array::{ArrayImpl, StringArray}; -use crate::catalog::{TableRefId, CONTRIBUTORS_TABLE_ID}; -/// The executor of internal tables. -pub struct InternalTableExecutor { - pub table_id: TableRefId, -} - -impl InternalTableExecutor { - #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] - pub async fn execute(self) { - match self.table_id.table_id { - CONTRIBUTORS_TABLE_ID => { - yield contributors(); - } - _ => { - panic!( - "InternalTableExecutor::execute: unknown table ref id: {}", - self.table_id - ); - } - } - } -} - -// TODO: find a better way to maintain the contributors list instead of hard-coding, and get total -// contributors when contributors is more than 100. (per_page max is 100) -// update this funciton with `curl https://api.github.com/repos/risinglightdb/risinglight/contributors?per_page=100 | jq ".[].login"` -fn contributors() -> DataChunk { - let contributors = vec![ - "skyzh", - "wangrunji0408", - "MingjiHan99", - "pleiadesian", - "TennyZhuang", - "xxchan", - "st1page", - "Fedomn", - "arkbriar", - "likg227", - "lokax", - "BaymaxHWY", - "zzl200012", - "alissa-tung", - "ludics", - "shmiwy", - "yinfredyue", - "unconsolable", - "xiaoyong-z", - "Kikkon", - "eliasyaoyc", - "GoGim1", - "kwannoel", - "D2Lark", - "tabVersion", - "WindowsXp-Beta", - "chaixuqing", - "yingjunwu", - "adlternative", - "wangqiim", - "yeya24", - "PsiACE", - "JayiceZ", - "chowc", - "noneback", - "RinChanNOWWW", - "SkyFan2002", - "Y7n05h", - "Ted-Jiang", - "LiuYuHui", - "rapiz1", - "zehaowei", - "Gun9niR", - "cadl", - "nanderstabel", - "sundy-li", - "xinchengxx", - "yuzi-neko", - "XieJiann", - ]; - [ArrayImpl::new_string(StringArray::from_iter( - contributors.iter().map(|s| Some(*s)).sorted(), - ))] - .into_iter() - .collect() -} diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 21f3d106a..6a502b71b 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -31,7 +31,6 @@ use self::filter::*; use self::hash_agg::*; use self::hash_join::*; use self::insert::*; -use self::internal::*; use self::limit::*; use self::merge_join::*; use self::nested_loop_join::*; @@ -41,11 +40,13 @@ use self::order::*; use self::projection::*; use self::simple_agg::*; use self::sort_agg::*; +use self::system_table_scan::*; use self::table_scan::*; use self::top_n::TopNExecutor; use self::values::*; use self::window::*; use crate::array::DataChunk; +use crate::catalog::RootCatalog; use crate::executor::create_function::CreateFunctionExecutor; use crate::planner::{Expr, ExprAnalysis, Optimizer, RecExpr, TypeSchemaAnalysis}; use crate::storage::{Storage, TracedStorageError}; @@ -63,10 +64,10 @@ mod filter; mod hash_agg; mod hash_join; mod insert; -mod internal; mod limit; mod nested_loop_join; mod order; +mod system_table_scan; // mod perfect_hash_agg; mod merge_join; mod projection; @@ -185,20 +186,35 @@ impl Builder { fn build_id(&self, id: Id) -> BoxedExecutor { use Expr::*; let stream = match self.node(id).clone() { - Scan([table, list, filter]) => TableScanExecutor { - table_id: self.node(table).as_table(), - columns: (self.node(list).as_list().iter()) + Scan([table, list, filter]) => { + let table_id = self.node(table).as_table(); + let columns = (self.node(list).as_list().iter()) .map(|id| self.node(*id).as_column()) - .collect(), - filter: { - // analyze range for the filter - let mut egraph = egg::EGraph::new(ExprAnalysis::default()); - let root = egraph.add_expr(&self.recexpr(filter)); - egraph[root].data.range.clone().map(|(_, r)| r) - }, - storage: self.storage.clone(), + .collect(); + + if table_id.schema_id == RootCatalog::SYSTEM_SCHEMA_ID { + SystemTableScan { + catalog: self.optimizer.catalog().clone(), + storage: self.storage.clone(), + table_id, + columns, + } + .execute() + } else { + TableScanExecutor { + table_id, + columns, + filter: { + // analyze range for the filter + let mut egraph = egg::EGraph::new(ExprAnalysis::default()); + let root = egraph.add_expr(&self.recexpr(filter)); + egraph[root].data.range.clone().map(|(_, r)| r) + }, + storage: self.storage.clone(), + } + .execute() + } } - .execute(), Values(rows) => ValuesExecutor { column_types: self.plan_types(id).to_vec(), @@ -214,11 +230,6 @@ impl Builder { } .execute(), - Internal([table, _]) => InternalTableExecutor { - table_id: self.node(table).as_table(), - } - .execute(), - Proj([projs, child]) => ProjectionExecutor { projs: self.resolve_column_index(projs, child), } diff --git a/src/executor/system_table_scan.rs b/src/executor/system_table_scan.rs new file mode 100644 index 000000000..c9b1c74c2 --- /dev/null +++ b/src/executor/system_table_scan.rs @@ -0,0 +1,227 @@ +// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0. + +use risinglight_proto::rowset::block_statistics::BlockStatisticsType; + +use super::*; +use crate::array::*; +use crate::catalog::{ColumnRefId, RootCatalogRef, TableRefId}; +use crate::storage::{Storage, StorageColumnRef, Table}; + +/// Scan a system table. +pub struct SystemTableScan { + pub catalog: RootCatalogRef, + pub storage: Arc, + pub table_id: TableRefId, + pub columns: Vec, +} + +impl SystemTableScan { + #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] + pub async fn execute(self) { + let table = self + .catalog + .get_table(&self.table_id) + .expect("table not found"); + assert_eq!(self.columns.len(), table.all_columns().len()); + + yield match table.name() { + "contributors" => contributors(), + "pg_tables" => pg_tables(self.catalog), + "pg_attribute" => pg_attribute(self.catalog), + "pg_stat" => pg_stat(self.catalog, &*self.storage).await?, + name => panic!("unknown system table: {:?}", name), + }; + } +} + +// TODO: find a better way to maintain the contributors list instead of hard-coding, and get total +// contributors when contributors is more than 100. (per_page max is 100) +// update this funciton with `curl https://api.github.com/repos/risinglightdb/risinglight/contributors?per_page=100 | jq ".[].login"` +fn contributors() -> DataChunk { + let contributors = vec![ + "skyzh", + "wangrunji0408", + "MingjiHan99", + "pleiadesian", + "TennyZhuang", + "xxchan", + "st1page", + "caicancai", + "Fedomn", + "arkbriar", + "likg227", + "lokax", + "zzl200012", + "unconsolable", + "BaymaxHWY", + "alissa-tung", + "ludics", + "Sunt-ing", + "yinfredyue", + "xiaoyong-z", + "Kikkon", + "D2Lark", + "xzhseh", + "ice1000", + "kwannoel", + "GoGim1", + "eliasyaoyc", + "wangqiim", + "silver-ymz", + "adlternative", + "yingjunwu", + "chaixuqing", + "WindowsXp-Beta", + "tabVersion", + "SkyFan2002", + "FANNG1", + "XieJiann", + "yuzi-neko", + "xinchengxx", + "sundy-li", + "nanderstabel", + "jetjinser", + "cadl", + "Gun9niR", + "zehaowei", + "rapiz1", + "LiuYuHui", + "Ted-Jiang", + "Y7n05h", + "RinChanNOWWW", + "noneback", + "chowc", + "xiaguan", + "JayiceZ", + "danipozo", + "PsiACE", + "yeya24", + ]; + [ArrayImpl::new_string(StringArray::from_iter( + contributors.iter().map(|s| Some(*s)).sorted(), + ))] + .into_iter() + .collect() +} + +/// Returns `pg_tables` table. +fn pg_tables(catalog: RootCatalogRef) -> DataChunk { + let mut schema_id = I32ArrayBuilder::new(); + let mut table_id = I32ArrayBuilder::new(); + let mut schema_name = StringArrayBuilder::new(); + let mut table_name = StringArrayBuilder::new(); + + for (_, schema) in catalog.all_schemas() { + for (_, table) in schema.all_tables() { + schema_id.push(Some(&(schema.id() as i32))); + table_id.push(Some(&(table.id() as i32))); + schema_name.push(Some(&schema.name())); + table_name.push(Some(table.name())); + } + } + [ + ArrayBuilderImpl::from(schema_id), + schema_name.into(), + table_id.into(), + table_name.into(), + ] + .into_iter() + .collect() +} + +/// Returns `pg_attribute` table. +fn pg_attribute(catalog: RootCatalogRef) -> DataChunk { + // let mut schema_id = I32ArrayBuilder::new(); + // let mut table_id = I32ArrayBuilder::new(); + let mut schema_name = StringArrayBuilder::new(); + let mut table_name = StringArrayBuilder::new(); + let mut column_id = I32ArrayBuilder::new(); + let mut column_name = StringArrayBuilder::new(); + let mut column_type = StringArrayBuilder::new(); + let mut column_not_null = BoolArrayBuilder::new(); + + for (_, schema) in catalog.all_schemas() { + for (_, table) in schema.all_tables() { + for (_, column) in table.all_columns() { + let name = column.name(); + let data_type = column.data_type().to_string().to_ascii_lowercase(); + let not_null = !column.is_nullable(); + + // schema_id.push(Some(&(sid as i32))); + // table_id.push(Some(&(tid as i32))); + schema_name.push(Some(&schema.name())); + table_name.push(Some(table.name())); + column_id.push(Some(&(column.id() as i32))); + column_name.push(Some(name)); + column_type.push(Some(&data_type)); + column_not_null.push(Some(¬_null)); + } + } + } + + [ + ArrayBuilderImpl::from(schema_name), + table_name.into(), + column_id.into(), + column_name.into(), + column_type.into(), + column_not_null.into(), + ] + .into_iter() + .collect() +} + +/// Returns `pg_stat` table. +async fn pg_stat( + catalog: RootCatalogRef, + storage: &impl Storage, +) -> Result { + // let mut schema_id = I32ArrayBuilder::new(); + // let mut table_id = I32ArrayBuilder::new(); + // let mut column_id = I32ArrayBuilder::new(); + let mut schema_name = StringArrayBuilder::new(); + let mut table_name = StringArrayBuilder::new(); + let mut column_name = StringArrayBuilder::new(); + let mut n_row = I32ArrayBuilder::new(); + let mut n_distinct = I32ArrayBuilder::new(); + + if let Some(storage) = storage.as_disk() { + for (sid, schema) in catalog.all_schemas() { + if sid == RootCatalog::SYSTEM_SCHEMA_ID { + continue; + } + for (tid, table) in schema.all_tables() { + let stable = storage.get_table(TableRefId::new(sid, tid))?; + + for (cid, column) in table.all_columns() { + let txn = stable.read().await?; + let values = txn.aggreagate_block_stat(&[ + (BlockStatisticsType::RowCount, StorageColumnRef::Idx(cid)), + ( + BlockStatisticsType::DistinctValue, + StorageColumnRef::Idx(cid), + ), + ]); + let row = values[0].as_usize().unwrap().unwrap() as i32; + let distinct = values[1].as_usize().unwrap().unwrap() as i32; + + // schema_id.push(Some(&(sid as i32))); + // table_id.push(Some(&(tid as i32))); + // column_id.push(Some(&(cid as i32))); + schema_name.push(Some(&schema.name())); + table_name.push(Some(table.name())); + column_name.push(Some(column.name())); + n_row.push(Some(&row)); + n_distinct.push(Some(&distinct)); + } + } + } + } + Ok(DataChunk::from_iter([ + ArrayBuilderImpl::from(schema_name), + table_name.into(), + column_name.into(), + n_row.into(), + n_distinct.into(), + ])) +} diff --git a/src/planner/explain.rs b/src/planner/explain.rs index b9dc879fe..c75d57ba2 100644 --- a/src/planner/explain.rs +++ b/src/planner/explain.rs @@ -113,7 +113,12 @@ impl<'a> Explain<'a> { Type(t) => Pretty::display(t), Table(i) => { if let Some(catalog) = self.catalog { - catalog.get_table(i).expect("no table").name().into() + catalog + .get_table(i) + .expect("no table") + .name() + .to_string() + .into() } else { Pretty::display(i) } @@ -232,14 +237,6 @@ impl<'a> Explain<'a> { ] .with(cost, rows), ), - Internal([table, list]) => Pretty::childless_record( - "Internal", - vec![ - ("table", self.expr(table).pretty()), - ("list", self.expr(list).pretty()), - ] - .with(cost, rows), - ), Values(values) => Pretty::simple_record( "Values", vec![("rows", Pretty::display(&values.len()))].with(cost, rows), diff --git a/src/planner/mod.rs b/src/planner/mod.rs index cc55cf94a..1ca63f8d2 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -93,7 +93,6 @@ define_language! { // plans "scan" = Scan([Id; 3]), // (scan table [column..] filter) - "internal" = Internal([Id; 2]), // (internal table [column..]) "values" = Values(Box<[Id]>), // (values [expr..]..) "proj" = Proj([Id; 2]), // (proj [expr..] child) "filter" = Filter([Id; 2]), // (filter expr child) diff --git a/src/planner/rules/schema.rs b/src/planner/rules/schema.rs index c5d51317c..0215cc59d 100644 --- a/src/planner/rules/schema.rs +++ b/src/planner/rules/schema.rs @@ -24,7 +24,7 @@ pub fn analyze_schema(enode: &Expr, x: impl Fn(&Id) -> Schema) -> Schema { List(ids) => ids.to_vec(), // plans that change schema - Scan([_, columns, _]) | Internal([_, columns]) => x(columns), + Scan([_, columns, _]) => x(columns), Values(vs) => x(&vs[0]), Proj([exprs, _]) | Agg([exprs, _]) => x(exprs), Window([exprs, child]) => concat(x(child), x(exprs)), diff --git a/src/storage/memory/mod.rs b/src/storage/memory/mod.rs index 4447a25e1..5a741a819 100644 --- a/src/storage/memory/mod.rs +++ b/src/storage/memory/mod.rs @@ -118,4 +118,8 @@ impl Storage for InMemoryStorage { self.catalog.drop_table(table_id); Ok(()) } + + fn as_disk(&self) -> Option<&super::SecondaryStorage> { + None + } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 80a819368..c51c774e9 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -81,6 +81,9 @@ pub trait Storage: Sync + Send + 'static { fn get_table(&self, table_id: TableRefId) -> StorageResult; fn drop_table(&self, table_id: TableRefId) -> impl Future> + Send; + + // XXX: remove this + fn as_disk(&self) -> Option<&SecondaryStorage>; } /// A table in the storage engine. [`Table`] is by default a reference to a table, diff --git a/src/storage/secondary/mod.rs b/src/storage/secondary/mod.rs index d7d014f2d..b9a5cb013 100644 --- a/src/storage/secondary/mod.rs +++ b/src/storage/secondary/mod.rs @@ -183,4 +183,8 @@ impl Storage for SecondaryStorage { async fn drop_table(&self, table_id: TableRefId) -> StorageResult<()> { self.drop_table_inner(table_id).await } + + fn as_disk(&self) -> Option<&SecondaryStorage> { + Some(self) + } } diff --git a/tests/sql/catalog.slt b/tests/sql/catalog.slt index 1f0a3d7a0..7db803bb8 100644 --- a/tests/sql/catalog.slt +++ b/tests/sql/catalog.slt @@ -1,8 +1,11 @@ statement ok create table t(v1 int, v2 int, v3 int) -query IIIIII rowsort +query ITIT rowsort \dt ---- -0 postgres 0 t -1 pg_catalog 0 contributors +0 pg_catalog 0 contributors +0 pg_catalog 1 pg_tables +0 pg_catalog 2 pg_attribute +0 pg_catalog 3 pg_stat +1 postgres 0 t diff --git a/tests/sql/internal.slt b/tests/sql/internal.slt index 926ef5667..c27589546 100644 --- a/tests/sql/internal.slt +++ b/tests/sql/internal.slt @@ -1,61 +1,9 @@ -query I -select * from pg_catalog.contributors; ----- -BaymaxHWY -D2Lark -Fedomn -GoGim1 -Gun9niR -JayiceZ -Kikkon -LiuYuHui -MingjiHan99 -PsiACE -RinChanNOWWW -SkyFan2002 -Ted-Jiang -TennyZhuang -WindowsXp-Beta -XieJiann -Y7n05h -adlternative -alissa-tung -arkbriar -cadl -chaixuqing -chowc -eliasyaoyc -kwannoel -likg227 -lokax -ludics -nanderstabel -noneback -pleiadesian -rapiz1 -shmiwy -skyzh -st1page -sundy-li -tabVersion -unconsolable -wangqiim -wangrunji0408 -xiaoyong-z -xinchengxx -xxchan -yeya24 -yinfredyue -yingjunwu -yuzi-neko -zehaowei -zzl200012 - query I select github_id from pg_catalog.contributors; ---- BaymaxHWY D2Lark +FANNG1 Fedomn GoGim1 Gun9niR @@ -66,6 +14,7 @@ MingjiHan99 PsiACE RinChanNOWWW SkyFan2002 +Sunt-ing Ted-Jiang TennyZhuang WindowsXp-Beta @@ -75,9 +24,13 @@ adlternative alissa-tung arkbriar cadl +caicancai chaixuqing chowc +danipozo eliasyaoyc +ice1000 +jetjinser kwannoel likg227 lokax @@ -86,7 +39,7 @@ nanderstabel noneback pleiadesian rapiz1 -shmiwy +silver-ymz skyzh st1page sundy-li @@ -94,9 +47,11 @@ tabVersion unconsolable wangqiim wangrunji0408 +xiaguan xiaoyong-z xinchengxx xxchan +xzhseh yeya24 yinfredyue yingjunwu diff --git a/tests/sql/statistics.slt b/tests/sql/statistics.slt index 90e5ad55d..0b64512ca 100644 --- a/tests/sql/statistics.slt +++ b/tests/sql/statistics.slt @@ -7,17 +7,14 @@ insert into t values (1,10,100), (2,20,100), (3,10,100) query II \stat t v1 ---- -RowCount 3 -DistinctValue 3 +postgres t v1 3 3 query II \stat t v2 ---- -RowCount 3 -DistinctValue 2 +postgres t v2 3 2 query II \stat t v3 ---- -RowCount 3 -DistinctValue 1 +postgres t v3 3 1