Skip to content

Commit

Permalink
Only require single place where to register table docs
Browse files Browse the repository at this point in the history
This commit changes the place where the table docs need to be registered
to be included in the automatic table docs generation to a single place.
  • Loading branch information
tillrohrmann committed Jun 20, 2024
1 parent 78e190f commit 03f93fb
Show file tree
Hide file tree
Showing 31 changed files with 237 additions and 155 deletions.
4 changes: 2 additions & 2 deletions crates/storage-query-datafusion/src/deployment/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use super::schema::DeploymentBuilder;
use super::schema::SysDeploymentBuilder;
use crate::table_util::format_using;
use restate_schema_api::deployment::{Deployment, DeploymentType};

#[inline]
pub(crate) fn append_deployment_row(
builder: &mut DeploymentBuilder,
builder: &mut SysDeploymentBuilder,
output: &mut String,
deployment: Deployment,
) {
Expand Down
2 changes: 1 addition & 1 deletion crates/storage-query-datafusion/src/deployment/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::table_macro::*;

use datafusion::arrow::datatypes::DataType;

define_table!(deployment(
define_table!(sys_deployment(
/// The ID of the service deployment.
id: DataType::LargeUtf8,

Expand Down
8 changes: 4 additions & 4 deletions crates/storage-query-datafusion/src/deployment/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::sync::mpsc::Sender;
use restate_schema_api::deployment::{Deployment, DeploymentResolver};
use restate_types::identifiers::ServiceRevision;

use super::schema::DeploymentBuilder;
use super::schema::SysDeploymentBuilder;
use crate::context::QueryContext;
use crate::deployment::row::append_deployment_row;
use crate::table_providers::{GenericTableProvider, Scan};
Expand All @@ -32,7 +32,7 @@ pub(crate) fn register_self(
resolver: impl DeploymentResolver + Send + Sync + Debug + 'static,
) -> datafusion::common::Result<()> {
let deployment_table = GenericTableProvider::new(
DeploymentBuilder::schema(),
SysDeploymentBuilder::schema(),
Arc::new(DeploymentMetadataScanner(resolver)),
);

Expand Down Expand Up @@ -71,7 +71,7 @@ async fn for_each_state(
tx: Sender<datafusion::common::Result<RecordBatch>>,
rows: Vec<(Deployment, Vec<(String, ServiceRevision)>)>,
) {
let mut builder = DeploymentBuilder::new(schema.clone());
let mut builder = SysDeploymentBuilder::new(schema.clone());
let mut temp = String::new();
for (deployment, _) in rows {
append_deployment_row(&mut builder, &mut temp, deployment);
Expand All @@ -83,7 +83,7 @@ async fn for_each_state(
// we probably don't want to panic, is it will cause the entire process to exit
return;
}
builder = DeploymentBuilder::new(schema.clone());
builder = SysDeploymentBuilder::new(schema.clone());
}
}
if !builder.empty() {
Expand Down
4 changes: 2 additions & 2 deletions crates/storage-query-datafusion/src/idempotency/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use super::schema::IdempotencyBuilder;
use super::schema::SysIdempotencyBuilder;

use crate::table_util::format_using;
use restate_storage_api::idempotency_table::IdempotencyMetadata;
use restate_types::identifiers::{IdempotencyId, WithPartitionKey};

#[inline]
pub(crate) fn append_idempotency_row(
builder: &mut IdempotencyBuilder,
builder: &mut SysIdempotencyBuilder,
output: &mut String,
idempotency_id: IdempotencyId,
idempotency_metadata: IdempotencyMetadata,
Expand Down
2 changes: 1 addition & 1 deletion crates/storage-query-datafusion/src/idempotency/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::table_macro::*;

use datafusion::arrow::datatypes::DataType;

define_table!(idempotency(
define_table!(sys_idempotency(
/// Internal column that is used for partitioning the services invocations. Can be ignored.
partition_key: DataType::UInt64,

Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/idempotency/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use restate_storage_api::idempotency_table::{IdempotencyMetadata, ReadOnlyIdempo
use restate_types::identifiers::{IdempotencyId, PartitionKey};

use super::row::append_idempotency_row;
use super::schema::IdempotencyBuilder;
use super::schema::SysIdempotencyBuilder;
use crate::context::{QueryContext, SelectPartitions};
use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition};
use crate::table_providers::PartitionedTableProvider;
Expand All @@ -31,7 +31,7 @@ pub(crate) fn register_self(
) -> datafusion::common::Result<()> {
let table = PartitionedTableProvider::new(
partition_selector,
IdempotencyBuilder::schema(),
SysIdempotencyBuilder::schema(),
LocalPartitionsScanner::new(partition_store_manager, IdempotencyScanner),
);

Expand All @@ -44,7 +44,7 @@ pub(crate) fn register_self(
struct IdempotencyScanner;

impl ScanLocalPartition for IdempotencyScanner {
type Builder = IdempotencyBuilder;
type Builder = SysIdempotencyBuilder;
type Item = (IdempotencyId, IdempotencyMetadata);

fn scan_partition_store(
Expand Down
4 changes: 2 additions & 2 deletions crates/storage-query-datafusion/src/inbox/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use super::schema::InboxBuilder;
use super::schema::SysInboxBuilder;
use crate::table_util::format_using;
use restate_storage_api::inbox_table::{InboxEntry, SequenceNumberInboxEntry};
use restate_types::identifiers::{TimestampAwareId, WithPartitionKey};

#[inline]
pub(crate) fn append_inbox_row(
builder: &mut InboxBuilder,
builder: &mut SysInboxBuilder,
output: &mut String,
inbox_entry: SequenceNumberInboxEntry,
) {
Expand Down
2 changes: 1 addition & 1 deletion crates/storage-query-datafusion/src/inbox/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::table_macro::*;

use datafusion::arrow::datatypes::DataType;

define_table!(inbox(
define_table!(sys_inbox(
/// Internal column that is used for partitioning the services invocations. Can be ignored.
partition_key: DataType::UInt64,

Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/inbox/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use restate_types::identifiers::PartitionKey;

use crate::context::{QueryContext, SelectPartitions};
use crate::inbox::row::append_inbox_row;
use crate::inbox::schema::InboxBuilder;
use crate::inbox::schema::SysInboxBuilder;
use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition};
use crate::table_providers::PartitionedTableProvider;

Expand All @@ -31,7 +31,7 @@ pub(crate) fn register_self(
) -> datafusion::common::Result<()> {
let table = PartitionedTableProvider::new(
partition_selector,
InboxBuilder::schema(),
SysInboxBuilder::schema(),
LocalPartitionsScanner::new(partition_store_manager, InboxScanner),
);

Expand All @@ -44,7 +44,7 @@ pub(crate) fn register_self(
struct InboxScanner;

impl ScanLocalPartition for InboxScanner {
type Builder = InboxBuilder;
type Builder = SysInboxBuilder;
type Item = SequenceNumberInboxEntry;

fn scan_partition_store(
Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/invocation_state/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::invocation_state::schema::StateBuilder;
use crate::invocation_state::schema::SysInvocationStateBuilder;
use crate::table_util::format_using;
use restate_invoker_api::InvocationStatusReport;
use restate_types::identifiers::WithPartitionKey;
use restate_types::time::MillisSinceEpoch;

#[inline]
pub(crate) fn append_state_row(
builder: &mut StateBuilder,
pub(crate) fn append_invocation_state_row(
builder: &mut SysInvocationStateBuilder,
output: &mut String,
status_row: InvocationStatusReport,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::table_macro::*;

use datafusion::arrow::datatypes::DataType;

define_table!(state(
define_table!(sys_invocation_state(
/// Internal column that is used for partitioning the services invocations. Can be ignored.
partition_key: DataType::UInt64,

Expand Down
16 changes: 9 additions & 7 deletions crates/storage-query-datafusion/src/invocation_state/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ use restate_invoker_api::{InvocationStatusReport, StatusHandle};
use restate_types::identifiers::{PartitionKey, WithPartitionKey};

use crate::context::QueryContext;
use crate::invocation_state::row::append_state_row;
use crate::invocation_state::schema::StateBuilder;
use crate::invocation_state::row::append_invocation_state_row;
use crate::invocation_state::schema::SysInvocationStateBuilder;
use crate::table_providers::{GenericTableProvider, Scan};
use crate::table_util::Builder;

pub(crate) fn register_self(
ctx: &QueryContext,
status: impl StatusHandle + Send + Sync + Debug + Clone + 'static,
) -> datafusion::common::Result<()> {
let status_table =
GenericTableProvider::new(StateBuilder::schema(), Arc::new(StatusScanner(status)));
let status_table = GenericTableProvider::new(
SysInvocationStateBuilder::schema(),
Arc::new(StatusScanner(status)),
);

ctx.as_ref()
.register_table("sys_invocation_state", Arc::new(status_table))
Expand Down Expand Up @@ -71,13 +73,13 @@ async fn for_each_state<'a, I>(
) where
I: Iterator<Item = InvocationStatusReport> + 'a,
{
let mut builder = StateBuilder::new(schema.clone());
let mut builder = SysInvocationStateBuilder::new(schema.clone());
let mut temp = String::new();
let mut rows = rows.collect::<Vec<_>>();
// need to be ordered by partition key for symmetric joins
rows.sort_unstable_by_key(|row| row.invocation_id().partition_key());
for row in rows {
append_state_row(&mut builder, &mut temp, row);
append_invocation_state_row(&mut builder, &mut temp, row);
if builder.full() {
let batch = builder.finish();
if tx.send(batch).await.is_err() {
Expand All @@ -86,7 +88,7 @@ async fn for_each_state<'a, I>(
// we probably don't want to panic, is it will cause the entire process to exit
return;
}
builder = StateBuilder::new(schema.clone());
builder = SysInvocationStateBuilder::new(schema.clone());
}
}
if !builder.empty() {
Expand Down
12 changes: 6 additions & 6 deletions crates/storage-query-datafusion/src/invocation_status/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::invocation_status::schema::{InvocationStatusBuilder, InvocationStatusRowBuilder};
use crate::invocation_status::schema::{SysInvocationStatusBuilder, SysInvocationStatusRowBuilder};
use crate::table_util::format_using;
use restate_storage_api::invocation_status_table::{
InFlightInvocationMetadata, InvocationStatus, JournalMetadata, StatusTimestamps,
Expand All @@ -18,7 +18,7 @@ use restate_types::invocation::{ServiceType, Source, TraceId};

#[inline]
pub(crate) fn append_invocation_status_row(
builder: &mut InvocationStatusBuilder,
builder: &mut SysInvocationStatusBuilder,
output: &mut String,
invocation_id: InvocationId,
invocation_status: InvocationStatus,
Expand Down Expand Up @@ -82,7 +82,7 @@ pub(crate) fn append_invocation_status_row(
}

fn fill_in_flight_invocation_metadata(
row: &mut InvocationStatusRowBuilder,
row: &mut SysInvocationStatusRowBuilder,
output: &mut String,
meta: InFlightInvocationMetadata,
) {
Expand All @@ -94,7 +94,7 @@ fn fill_in_flight_invocation_metadata(
}

#[inline]
fn fill_invoked_by(row: &mut InvocationStatusRowBuilder, output: &mut String, source: Source) {
fn fill_invoked_by(row: &mut SysInvocationStatusRowBuilder, output: &mut String, source: Source) {
match source {
Source::Service(invocation_id, invocation_target) => {
row.invoked_by("service");
Expand All @@ -116,14 +116,14 @@ fn fill_invoked_by(row: &mut InvocationStatusRowBuilder, output: &mut String, so
}

#[inline]
fn fill_timestamps(row: &mut InvocationStatusRowBuilder, stat: &StatusTimestamps) {
fn fill_timestamps(row: &mut SysInvocationStatusRowBuilder, stat: &StatusTimestamps) {
row.created_at(stat.creation_time().as_u64() as i64);
row.modified_at(stat.modification_time().as_u64() as i64);
}

#[inline]
fn fill_journal_metadata(
row: &mut InvocationStatusRowBuilder,
row: &mut SysInvocationStatusRowBuilder,
output: &mut String,
journal_metadata: &JournalMetadata,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::table_macro::*;

use datafusion::arrow::datatypes::DataType;

define_table!(invocation_status(
define_table!(sys_invocation_status(
/// Internal column that is used for partitioning the services invocations. Can be ignored.
partition_key: DataType::UInt64,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use restate_types::identifiers::{InvocationId, PartitionKey};

use crate::context::{QueryContext, SelectPartitions};
use crate::invocation_status::row::append_invocation_status_row;
use crate::invocation_status::schema::InvocationStatusBuilder;
use crate::invocation_status::schema::SysInvocationStatusBuilder;
use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition};
use crate::table_providers::PartitionedTableProvider;

Expand All @@ -33,7 +33,7 @@ pub(crate) fn register_self(
) -> datafusion::common::Result<()> {
let status_table = PartitionedTableProvider::new(
partition_selector,
InvocationStatusBuilder::schema(),
SysInvocationStatusBuilder::schema(),
LocalPartitionsScanner::new(partition_store_manager, StatusScanner),
);

Expand All @@ -46,7 +46,7 @@ pub(crate) fn register_self(
struct StatusScanner;

impl ScanLocalPartition for StatusScanner {
type Builder = InvocationStatusBuilder;
type Builder = SysInvocationStatusBuilder;
type Item = (InvocationId, InvocationStatus);

fn scan_partition_store(
Expand Down
4 changes: 2 additions & 2 deletions crates/storage-query-datafusion/src/journal/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::journal::schema::JournalBuilder;
use crate::journal::schema::SysJournalBuilder;

use restate_service_protocol::codec::ProtobufRawEntryCodec;

Expand All @@ -21,7 +21,7 @@ use restate_types::journal::{Entry, SleepEntry};

#[inline]
pub(crate) fn append_journal_row(
builder: &mut JournalBuilder,
builder: &mut SysJournalBuilder,
output: &mut String,
journal_entry_id: JournalEntryId,
journal_entry: JournalEntry,
Expand Down
2 changes: 1 addition & 1 deletion crates/storage-query-datafusion/src/journal/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::table_macro::*;

use datafusion::arrow::datatypes::DataType;

define_table!(journal(
define_table!(sys_journal(
/// Internal column that is used for partitioning the services invocations. Can be ignored.
partition_key: DataType::UInt64,

Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/journal/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use restate_types::identifiers::{JournalEntryId, PartitionKey};

use crate::context::{QueryContext, SelectPartitions};
use crate::journal::row::append_journal_row;
use crate::journal::schema::JournalBuilder;
use crate::journal::schema::SysJournalBuilder;
use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition};
use crate::table_providers::PartitionedTableProvider;

Expand All @@ -30,7 +30,7 @@ pub(crate) fn register_self(
) -> datafusion::common::Result<()> {
let journal_table = PartitionedTableProvider::new(
partition_selector,
JournalBuilder::schema(),
SysJournalBuilder::schema(),
LocalPartitionsScanner::new(partition_store_manager, JournalScanner),
);

Expand All @@ -43,7 +43,7 @@ pub(crate) fn register_self(
struct JournalScanner;

impl ScanLocalPartition for JournalScanner {
type Builder = JournalBuilder;
type Builder = SysJournalBuilder;
type Item = (JournalEntryId, JournalEntry);

fn scan_partition_store(
Expand Down
Loading

0 comments on commit 03f93fb

Please sign in to comment.