Skip to content

Commit

Permalink
Draft of 'DatasetActionAuthorizer::get_readable_datasets' query that …
Browse files Browse the repository at this point in the history
…does not scan the dataset repository

DatasetRegistry => DatasetUrlResolver
Decomposed entries iteration and rebac
Fixed query service issues with spawned threads
Fixed missing transaction in smart protocol authorization check
Use transactions for all CLI commands except API/UI server
Separated compacting service and compacting use case. Commands use use case. TaskExecutor uses service directly.
Reset service decoupled from authorizer
SetWatermarkUseCase
Temporarily disabled authroization checks in ingest/transform/sync services. To be addressed later.
Early beginning of `DatasetRegistry` interface.
Basic bridge to `DatasetRepository` for testing.
Basic real implementation via `DatasetEntryService`.
Authorizers only filter datasets by allowing an action, but the initial list comes externally.
Using only `DatasetRegistry` in `QueryService` and `KamuSchema`
Workaround in failing e2e test: await until outbox is flushed after creating a dataset
Registry used in OData
Unified code that is streaming datasets page by page
Authorization checks in OData
DatasetRegistry used in all commands, use cases and GraphQL queries/mutations
`CompactionService` decoupled from `DatasetRepository`
Decoupled `ResetService` from `DatasetRepository`
Extracted use case from `VerificationService`. Decoupled it from `DatasetRepository` and `DatasetActionAuthorizer`.
No more `DatasetRepository` in `RemoteAliasesRegistry`, `DatasetChangesService`, `DatasetOwnershipService`, `ProvenanceService`
PushIngestService decoupled
Polling ingest service - decoupled from dataset repository
Decoupled engines from DatasetRepository
API cleanup and simplifications
Decoupling `TransformService` from `DatasetRepository` - part 1
  • Loading branch information
zaychenko-sergei committed Oct 15, 2024
1 parent f060d0a commit b740221
Show file tree
Hide file tree
Showing 188 changed files with 3,778 additions and 2,189 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 64 additions & 1 deletion src/adapter/auth-oso/src/oso_dataset_authorizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::str::FromStr;
use std::sync::Arc;

use dill::*;
use internal_error::ErrorIntoInternal;
use internal_error::{ErrorIntoInternal, InternalError, ResultIntoInternal};
use kamu_accounts::{CurrentAccountSubject, DEFAULT_ACCOUNT_NAME_STR};
use kamu_core::auth::*;
use kamu_core::AccessError;
Expand Down Expand Up @@ -122,6 +122,69 @@ impl DatasetActionAuthorizer for OsoDatasetAuthorizer {

allowed_actions
}

async fn filter_datasets_allowing(
&self,
dataset_handles: Vec<DatasetHandle>,
action: DatasetAction,
) -> Result<Vec<DatasetHandle>, InternalError> {
let mut matched_dataset_handles = Vec::new();
for hdl in dataset_handles {
let is_allowed = self
.oso
.is_allowed(
self.actor(),
action.to_string(),
self.dataset_resource(&hdl),
)
.int_err()?;
if is_allowed {
matched_dataset_handles.push(hdl);
}
}

Ok(matched_dataset_handles)
}

/*
fn get_readable_datasets(&self) -> DatasetHandleStream<'_> {
Box::pin(async_stream::try_stream! {
// Tracking pagination progress
let mut offset = 0;
let limit = 100;
loop {
// Load a page of dataset entries
let entries_page = self
.dataset_entry_service
.list_all_entries(PaginationOpts { limit, offset })
.await
.int_err()?;
// Actually read entires
let loaded_entries_count = entries_page.list.len();
// Convert entries to handles
let handles = self.dataset_entry_service.entries_as_handles(entries_page.list).await.int_err()?;
// Stream the entries that match ReBAC
for hdl in handles {
if self
.oso
.is_allowed(self.actor(), DatasetAction::Read.to_string(), self.dataset_resource(&hdl)).int_err()? {
yield hdl;
}
}
// Next page
offset += loaded_entries_count;
if offset >= entries_page.total_count {
break;
}
}
})
}*/
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
4 changes: 2 additions & 2 deletions src/adapter/graphql/src/mutations/dataset_metadata_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ impl DatasetMetadataMut {
#[graphql(skip)]
fn get_dataset(&self, ctx: &Context<'_>) -> std::sync::Arc<dyn domain::Dataset> {
// TODO: cut off this dependency - extract a higher level use case
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
dataset_repo.get_dataset_by_handle(&self.dataset_handle)
let dataset_registry = from_catalog::<dyn domain::DatasetRegistry>(ctx).unwrap();
dataset_registry.get_dataset_by_handle(&self.dataset_handle)
}

/// Access to the mutable metadata chain of the dataset
Expand Down
8 changes: 4 additions & 4 deletions src/adapter/graphql/src/mutations/dataset_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use chrono::{DateTime, Utc};
use domain::{DeleteDatasetError, RenameDatasetError};
use kamu_core::{self as domain};
use kamu_core::{self as domain, SetWatermarkUseCase};
use opendatafabric as odf;

use super::{DatasetEnvVarsMut, DatasetFlowsMut, DatasetMetadataMut};
Expand Down Expand Up @@ -124,9 +124,9 @@ impl DatasetMut {
ctx: &Context<'_>,
watermark: DateTime<Utc>,
) -> Result<SetWatermarkResult> {
let pull_svc = from_catalog::<dyn domain::PullService>(ctx).unwrap();
match pull_svc
.set_watermark(&self.dataset_handle.as_local_ref(), watermark)
let set_watermark_use_case = from_catalog::<dyn SetWatermarkUseCase>(ctx).unwrap();
match set_watermark_use_case
.execute(&self.dataset_handle, watermark)
.await
{
Ok(domain::PullResult::UpToDate(_)) => {
Expand Down
8 changes: 4 additions & 4 deletions src/adapter/graphql/src/mutations/datasets_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use kamu_core::{self as domain, CreateDatasetUseCaseOptions, DatasetRepositoryExt};
use kamu_core::{self as domain, CreateDatasetUseCaseOptions, DatasetRegistryExt};
use opendatafabric as odf;

use crate::mutations::DatasetMut;
Expand All @@ -25,9 +25,9 @@ pub struct DatasetsMut;
impl DatasetsMut {
/// Returns a mutable dataset by its ID
async fn by_id(&self, ctx: &Context<'_>, dataset_id: DatasetID) -> Result<Option<DatasetMut>> {
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let hdl = dataset_repo
.try_resolve_dataset_ref(&dataset_id.as_local_ref())
let dataset_registry = from_catalog::<dyn domain::DatasetRegistry>(ctx).unwrap();
let hdl = dataset_registry
.try_resolve_dataset_handle_by_ref(&dataset_id.as_local_ref())
.await?;
Ok(hdl.map(DatasetMut::new))
}
Expand Down
25 changes: 10 additions & 15 deletions src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use kamu_core::{GetSummaryOpts, MetadataChainExt};
use kamu_core::{DatasetRegistry, DatasetRegistryExt, GetSummaryOpts, MetadataChainExt};
use odf::DatasetHandle;
use {kamu_flow_system as fs, opendatafabric as odf};

Expand Down Expand Up @@ -107,12 +107,16 @@ pub(crate) async fn ensure_flow_preconditions(
dataset_flow_type: DatasetFlowType,
flow_run_configuration: Option<&FlowRunConfiguration>,
) -> Result<Option<FlowPreconditionsNotMet>> {
let dataset_registry = from_catalog::<dyn DatasetRegistry>(ctx).unwrap();
let target = dataset_registry.get_resolved_dataset_by_handle(dataset_handle);

match dataset_flow_type {
DatasetFlowType::Ingest => {
let polling_ingest_svc =
from_catalog::<dyn kamu_core::PollingIngestService>(ctx).unwrap();

let source_res = polling_ingest_svc
.get_active_polling_source(&dataset_handle.as_local_ref())
.get_active_polling_source(target)
.await
.int_err()?;
if source_res.is_none() {
Expand All @@ -124,11 +128,7 @@ pub(crate) async fn ensure_flow_preconditions(
DatasetFlowType::ExecuteTransform => {
let transform_svc = from_catalog::<dyn kamu_core::TransformService>(ctx).unwrap();

let source_res = transform_svc
.get_active_transform(&dataset_handle.as_local_ref())
.await
.int_err()?;

let source_res = transform_svc.get_active_transform(target).await.int_err()?;
if source_res.is_none() {
return Ok(Some(FlowPreconditionsNotMet {
preconditions: "No SetTransform event defined".to_string(),
Expand All @@ -141,12 +141,8 @@ pub(crate) async fn ensure_flow_preconditions(
&& let FlowRunConfiguration::Reset(reset_configuration) = flow_configuration
{
if let Some(new_head_hash) = &reset_configuration.new_head_hash() {
let dataset_repo =
from_catalog::<dyn kamu_core::DatasetRepository>(ctx).unwrap();

let dataset = dataset_repo.get_dataset_by_handle(dataset_handle);
let current_head_hash_maybe = dataset
.as_metadata_chain()
let metadata_chain = target.dataset.as_metadata_chain();
let current_head_hash_maybe = metadata_chain
.try_get_ref(&kamu_core::BlockRef::Head)
.await
.int_err()?;
Expand All @@ -155,8 +151,7 @@ pub(crate) async fn ensure_flow_preconditions(
preconditions: "Dataset does not contain any blocks".to_string(),
}));
}
if !dataset
.as_metadata_chain()
if !metadata_chain
.contains_block(new_head_hash)
.await
.int_err()?
Expand Down
6 changes: 3 additions & 3 deletions src/adapter/graphql/src/queries/accounts/account_flow_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use database_common::PaginationOpts;
use futures::TryStreamExt;
use kamu::utils::datasets_filtering::filter_datasets_by_local_pattern;
use kamu_accounts::Account as AccountEntity;
use kamu_core::DatasetRepository;
use kamu_core::DatasetRegistry;
use kamu_flow_system as fs;
use opendatafabric::DatasetRefPattern;

Expand Down Expand Up @@ -111,15 +111,15 @@ impl AccountFlowRuns {
.try_collect()
.await?;

let dataset_repo = from_catalog::<dyn DatasetRepository>(ctx).unwrap();
let dataset_registry = from_catalog::<dyn DatasetRegistry>(ctx).unwrap();

let account = Account::new(
self.account.id.clone().into(),
self.account.account_name.clone().into(),
);

let matched_datasets: Vec<_> =
filter_datasets_by_local_pattern(dataset_repo.as_ref(), datasets_with_flows)
filter_datasets_by_local_pattern(dataset_registry.as_ref(), datasets_with_flows)
.map_ok(|dataset_handle| Dataset::new(account.clone(), dataset_handle))
.try_collect()
.await
Expand Down
10 changes: 5 additions & 5 deletions src/adapter/graphql/src/queries/datasets/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ impl Dataset {

#[graphql(skip)]
pub async fn from_ref(ctx: &Context<'_>, dataset_ref: &odf::DatasetRef) -> Result<Dataset> {
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let dataset_registry = from_catalog::<dyn domain::DatasetRegistry>(ctx).unwrap();

// TODO: Should we resolve reference at this point or allow unresolved and fail
// later?
let hdl = dataset_repo
.resolve_dataset_ref(dataset_ref)
let hdl = dataset_registry
.resolve_dataset_handle_by_ref(dataset_ref)
.await
.int_err()?;
let account = Account::from_dataset_alias(ctx, &hdl.alias)
Expand All @@ -49,8 +49,8 @@ impl Dataset {

#[graphql(skip)]
fn get_dataset(&self, ctx: &Context<'_>) -> std::sync::Arc<dyn domain::Dataset> {
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
dataset_repo.get_dataset_by_handle(&self.dataset_handle)
let dataset_registry = from_catalog::<dyn domain::DatasetRegistry>(ctx).unwrap();
dataset_registry.get_dataset_by_handle(&self.dataset_handle)
}

/// Unique identifier of the dataset
Expand Down
8 changes: 4 additions & 4 deletions src/adapter/graphql/src/queries/datasets/dataset_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ impl DatasetData {
/// Total number of records in this dataset
#[tracing::instrument(level = "info", skip_all)]
async fn num_records_total(&self, ctx: &Context<'_>) -> Result<u64> {
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let dataset = dataset_repo.get_dataset_by_handle(&self.dataset_handle);
let dataset_registry = from_catalog::<dyn domain::DatasetRegistry>(ctx).unwrap();
let dataset = dataset_registry.get_dataset_by_handle(&self.dataset_handle);
let summary = dataset
.get_summary(GetSummaryOpts::default())
.await
Expand All @@ -41,8 +41,8 @@ impl DatasetData {
/// caching
#[tracing::instrument(level = "info", skip_all)]
async fn estimated_size(&self, ctx: &Context<'_>) -> Result<u64> {
let dataset_repo = from_catalog::<dyn domain::DatasetRepository>(ctx).unwrap();
let dataset = dataset_repo.get_dataset_by_handle(&self.dataset_handle);
let dataset_registry = from_catalog::<dyn domain::DatasetRegistry>(ctx).unwrap();
let dataset = dataset_registry.get_dataset_by_handle(&self.dataset_handle);
let summary = dataset
.get_summary(GetSummaryOpts::default())
.await
Expand Down
Loading

0 comments on commit b740221

Please sign in to comment.