diff --git a/src/app/cli/src/commands/verify_command.rs b/src/app/cli/src/commands/verify_command.rs index 433dcc395..ac3d25778 100644 --- a/src/app/cli/src/commands/verify_command.rs +++ b/src/app/cli/src/commands/verify_command.rs @@ -152,7 +152,6 @@ impl VerifyCommand { Ok(self .verification_svc - .clone() .verify_multi(filtered_requests, options, listener) .await) } @@ -193,14 +192,14 @@ impl VerifyCommand { let mut current_missed_dependencies = vec![]; - for dependecy in summary.dependencies { + for dependency in summary.dependencies { if self .dataset_repo - .resolve_dataset_ref(&DatasetRef::ID(dependecy.clone())) + .resolve_dataset_ref(&DatasetRef::ID(dependency.clone())) .await .is_err() { - current_missed_dependencies.push(dependecy.to_string()); + current_missed_dependencies.push(dependency.to_string()); } } if !current_missed_dependencies.is_empty() { diff --git a/src/e2e/app/cli/common/src/kamu_api_server_client_ext.rs b/src/e2e/app/cli/common/src/kamu_api_server_client_ext.rs index a8a53ac4e..ef4fc862e 100644 --- a/src/e2e/app/cli/common/src/kamu_api_server_client_ext.rs +++ b/src/e2e/app/cli/common/src/kamu_api_server_client_ext.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use lazy_static::lazy_static; -use opendatafabric::{DatasetAlias, DatasetName}; +use opendatafabric::{AccountName, DatasetAlias, DatasetName}; use reqwest::{Method, StatusCode}; use crate::{KamuApiServerClient, RequestBody}; @@ -134,6 +134,8 @@ pub const DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_3: &str = indoc::i "# ); +pub const E2E_USER_ACCOUNT_NAME_STR: &str = "e2e-user"; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// pub type AccessToken = String; @@ -153,8 +155,11 @@ pub trait KamuApiServerClientExt { async fn create_player_scores_dataset(&self, token: &AccessToken) -> DatasetId; - /// NOTE: only for single-tenant workspaces - async fn create_player_scores_dataset_with_data(&self, token: &AccessToken) -> DatasetId; + async fn create_player_scores_dataset_with_data( + &self, + token: &AccessToken, + account_name_maybe: Option, + ) -> DatasetId; async fn create_leaderboard(&self, token: &AccessToken) -> DatasetId; @@ -249,14 +254,19 @@ impl KamuApiServerClientExt for KamuApiServerClient { .await } - async fn create_player_scores_dataset_with_data(&self, token: &AccessToken) -> DatasetId { + async fn create_player_scores_dataset_with_data( + &self, + token: &AccessToken, + account_name_maybe: Option, + ) -> DatasetId { let dataset_id = self.create_player_scores_dataset(token).await; // TODO: Use the alias from the reply, after fixing the bug: // https://github.com/kamu-data/kamu-cli/issues/891 - - // At the moment, only single-tenant - let dataset_alias = DatasetAlias::new(None, DatasetName::new_unchecked("player-scores")); + let dataset_alias = DatasetAlias::new( + account_name_maybe, + DatasetName::new_unchecked("player-scores"), + ); self.ingest_data( &dataset_alias, diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs b/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs index 52308e3fa..406d408d3 100644 --- a/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs +++ b/src/e2e/app/cli/inmem/tests/tests/commands/mod.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. mod test_add_command; +mod test_compact_command; mod test_complete_command; mod test_config_command; mod test_delete_command; @@ -28,3 +29,4 @@ mod test_system_gc_command; mod test_system_generate_token_command; mod test_system_info_command; mod test_tail_command; +mod test_verify_command; diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_compact_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_compact_command.rs new file mode 100644 index 000000000..6ff90ee7b --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_compact_command.rs @@ -0,0 +1,36 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_compact_hard + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_compact_keep_metadata_only + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_compact_verify + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/commands/test_verify_command.rs b/src/e2e/app/cli/inmem/tests/tests/commands/test_verify_command.rs new file mode 100644 index 000000000..0a4b5598a --- /dev/null +++ b/src/e2e/app/cli/inmem/tests/tests/commands/test_verify_command.rs @@ -0,0 +1,36 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::prelude::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_verify_regular_dataset + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_verify_recursive + extra_test_groups = "containerized, engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_verify_integrity + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/inmem/tests/tests/test_smart_transfer_protocol.rs b/src/e2e/app/cli/inmem/tests/tests/test_smart_transfer_protocol.rs index dfa4094c2..abb5120d3 100644 --- a/src/e2e/app/cli/inmem/tests/tests/test_smart_transfer_protocol.rs +++ b/src/e2e/app/cli/inmem/tests/tests/test_smart_transfer_protocol.rs @@ -24,3 +24,104 @@ kamu_cli_run_api_server_e2e_test!( ); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_force_push_pull, + options = Options::default() + .with_multi_tenant() + .with_today_as_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_push_pull_add_alias, + options = Options::default() + .with_multi_tenant() + .with_today_as_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_pull_as, + options = Options::default() + .with_multi_tenant() + .with_today_as_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_push_pull_all, + options = Options::default() + .with_multi_tenant() + .with_today_as_frozen_system_time(), + extra_test_groups = "containerized, engine, ingest, transform, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_push_pull_recursive, + options = Options::default() + .with_multi_tenant() + .with_today_as_frozen_system_time(), + extra_test_groups = "containerized, engine, ingest, transform, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_pull_set_watermark, + options = Options::default().with_frozen_system_time(), +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_pull_reset_derivative, + options = Options::default().with_frozen_system_time(), + extra_test_groups = "containerized, engine, ingest, transform, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_run_api_server_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_push_visibility, + options = Options::default() + .with_multi_tenant() + .with_today_as_frozen_system_time(), + extra_test_groups = "engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_push_pull_s3, + options = Options::default().with_frozen_system_time(), + extra_test_groups = "containerized, engine, ingest, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +kamu_cli_execute_command_e2e_test!( + storage = inmem, + fixture = kamu_cli_e2e_repo_tests::test_smart_pull_derivative, + options = Options::default().with_frozen_system_time(), + extra_test_groups = "containerized, engine, ingest, transform, datafusion" +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/mod.rs b/src/e2e/app/cli/repo-tests/src/commands/mod.rs index 5125a93ce..ef14ffc19 100644 --- a/src/e2e/app/cli/repo-tests/src/commands/mod.rs +++ b/src/e2e/app/cli/repo-tests/src/commands/mod.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. mod test_add_command; +mod test_compact_command; mod test_complete_command; mod test_config_command; mod test_delete_command; @@ -28,8 +29,10 @@ mod test_system_generate_token_command; mod test_system_info_command; mod test_system_info_diagnose; mod test_tail_command; +mod test_verify_command; pub use test_add_command::*; +pub use test_compact_command::*; pub use test_complete_command::*; pub use test_config_command::*; pub use test_delete_command::*; @@ -50,3 +53,4 @@ pub use test_system_generate_token_command::*; pub use test_system_info_command::*; pub use test_system_info_diagnose::*; pub use test_tail_command::*; +pub use test_verify_command::*; diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_compact_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_compact_command.rs new file mode 100644 index 000000000..316359eb4 --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_compact_command.rs @@ -0,0 +1,200 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::assert_matches::assert_matches; + +use kamu_cli_e2e_common::{ + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; +use kamu_cli_puppet::extensions::KamuCliPuppetExt; +use kamu_cli_puppet::KamuCliPuppet; +use opendatafabric::{DatasetName, MetadataEvent}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_compact_hard(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + ) + .await; + + let blocks_before_compacting = kamu.list_blocks(&dataset_name).await; + + let assert = kamu + .execute([ + "--yes", + "system", + "compact", + dataset_name.as_str(), + "--hard", + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) were compacted + "# + )), + "Unexpected output:\n{stderr}", + ); + + let blocks_after_compacting = kamu.list_blocks(&dataset_name).await; + assert_eq!( + blocks_before_compacting.len() - 1, + blocks_after_compacting.len() + ); + assert_matches!( + blocks_after_compacting.first().unwrap().block.event, + MetadataEvent::AddData(_) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_compact_keep_metadata_only(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + ) + .await; + + let blocks_before_compacting = kamu.list_blocks(&dataset_name).await; + + let assert = kamu + .execute([ + "--yes", + "system", + "compact", + dataset_name.as_str(), + "--hard", + "--keep-metadata-only", + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) were compacted + "# + )), + "Unexpected output:\n{stderr}", + ); + + let blocks_after_compacting = kamu.list_blocks(&dataset_name).await; + assert_eq!( + blocks_before_compacting.len() - 2, + blocks_after_compacting.len() + ); + assert_matches!( + blocks_after_compacting.first().unwrap().block.event, + MetadataEvent::SetDataSchema(_) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_compact_verify(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + ) + .await; + + let blocks_before_compacting = kamu.list_blocks(&dataset_name).await; + + let assert = kamu + .execute([ + "--yes", + "system", + "compact", + dataset_name.as_str(), + "--hard", + "--verify", + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains( + indoc::indoc!( + r#" + verify with dataset_ref: player-scores + "# + ) + .trim() + ), + "Unexpected output:\n{stderr}", + ); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) were compacted + "# + )), + "Unexpected output:\n{stderr}", + ); + + let blocks_after_compacting = kamu.list_blocks(&dataset_name).await; + assert_eq!( + blocks_before_compacting.len() - 1, + blocks_after_compacting.len() + ); + assert_matches!( + blocks_after_compacting.first().unwrap().block.event, + MetadataEvent::AddData(_) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/commands/test_verify_command.rs b/src/e2e/app/cli/repo-tests/src/commands/test_verify_command.rs new file mode 100644 index 000000000..088c6e1b2 --- /dev/null +++ b/src/e2e/app/cli/repo-tests/src/commands/test_verify_command.rs @@ -0,0 +1,150 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_cli_e2e_common::{ + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, +}; +use kamu_cli_puppet::extensions::KamuCliPuppetExt; +use kamu_cli_puppet::KamuCliPuppet; +use opendatafabric::DatasetName; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_verify_regular_dataset(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 2, "player_id": "Bob", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + + let assert = kamu + .execute(["verify", dataset_name.as_str()]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) are valid + "# + )), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_verify_recursive(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + let dataset_derivative_name = DatasetName::new_unchecked("leaderboard"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 2, "player_id": "Bob", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + + kamu.execute(["pull", dataset_derivative_name.as_str()]) + .await + .success(); + + // Call verify without recursive flag + let assert = kamu + .execute(["verify", dataset_derivative_name.as_str()]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) are valid + "# + )), + "Unexpected output:\n{stderr}", + ); + + // Call verify wit recursive flag + let assert = kamu + .execute(["verify", dataset_derivative_name.as_str(), "--recursive"]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 2 dataset(s) are valid + "# + )), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_verify_integrity(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let data = indoc::indoc!( + r#" + {"match_time": "2000-01-01", "match_id": 2, "player_id": "Bob", "score": 90} + "#, + ); + + kamu.ingest_data(&dataset_name, data).await; + + let assert = kamu + .execute(["verify", dataset_name.as_str(), "--integrity"]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!( + r#" + 1 dataset(s) are valid + "# + )), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/e2e/app/cli/repo-tests/src/test_flow.rs b/src/e2e/app/cli/repo-tests/src/test_flow.rs index aae45ade0..eb96c840a 100644 --- a/src/e2e/app/cli/repo-tests/src/test_flow.rs +++ b/src/e2e/app/cli/repo-tests/src/test_flow.rs @@ -17,7 +17,7 @@ pub async fn test_get_dataset_list_flows(kamu_api_server_client: KamuApiServerCl let token = kamu_api_server_client.login_as_kamu().await; let dataset_id = kamu_api_server_client - .create_player_scores_dataset_with_data(&token) + .create_player_scores_dataset_with_data(&token, None) .await; // The query is almost identical to kamu-web-ui, for ease of later edits. @@ -88,7 +88,7 @@ pub async fn test_dataset_all_flows_paused(kamu_api_server_client: KamuApiServer let token = kamu_api_server_client.login_as_kamu().await; let dataset_id = kamu_api_server_client - .create_player_scores_dataset_with_data(&token) + .create_player_scores_dataset_with_data(&token, None) .await; // The query is almost identical to kamu-web-ui, for ease of later edits. @@ -147,7 +147,7 @@ pub async fn test_dataset_flows_initiators(kamu_api_server_client: KamuApiServer let token = kamu_api_server_client.login_as_kamu().await; let dataset_id = kamu_api_server_client - .create_player_scores_dataset_with_data(&token) + .create_player_scores_dataset_with_data(&token, None) .await; // The query is almost identical to kamu-web-ui, for ease of later edits. @@ -228,7 +228,7 @@ pub async fn test_dataset_trigger_flow(kamu_api_server_client: KamuApiServerClie let token = kamu_api_server_client.login_as_kamu().await; let _root_dataset_id = kamu_api_server_client - .create_player_scores_dataset_with_data(&token) + .create_player_scores_dataset_with_data(&token, None) .await; let derivative_dataset_id = kamu_api_server_client.create_leaderboard(&token).await; diff --git a/src/e2e/app/cli/repo-tests/src/test_smart_transfer_protocol.rs b/src/e2e/app/cli/repo-tests/src/test_smart_transfer_protocol.rs index 4de9a1663..5deee2f3e 100644 --- a/src/e2e/app/cli/repo-tests/src/test_smart_transfer_protocol.rs +++ b/src/e2e/app/cli/repo-tests/src/test_smart_transfer_protocol.rs @@ -7,29 +7,110 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use kamu_cli_e2e_common::{KamuApiServerClient, KamuApiServerClientExt}; +use std::str::FromStr; + +use chrono::DateTime; +use kamu::testing::LocalS3Server; +use kamu_cli_e2e_common::{ + KamuApiServerClient, + KamuApiServerClientExt, + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR, + E2E_USER_ACCOUNT_NAME_STR, +}; +use kamu_cli_puppet::extensions::{KamuCliPuppetExt, RepoAlias}; use kamu_cli_puppet::KamuCliPuppet; +use opendatafabric::{AccountName, DatasetName}; use reqwest::Url; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// pub async fn test_smart_push_pull_sequence(kamu_api_server_client: KamuApiServerClient) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + // 1. Grub a token let token = kamu_api_server_client.login_as_e2e_user().await; - let kamu_api_server_dataset_endpoint = { - let base_url = kamu_api_server_client.get_base_url(); + let kamu_api_server_dataset_endpoint = get_dataset_endpoint( + kamu_api_server_client.get_base_url(), + &dataset_name, + E2E_USER_ACCOUNT_NAME_STR, + ); + + // 2. Pushing the dataset to the API server + { + let kamu_in_push_workspace = KamuCliPuppet::new_workspace_tmp().await; + + // 2.1. Add the dataset + { + kamu_in_push_workspace + .execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + } + + // 2.1. Ingest data to the dataset + { + kamu_in_push_workspace + .ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + } + + // 2.2. Login to the API server + kamu_in_push_workspace + .execute([ + "login", + kamu_api_server_client.get_base_url().as_str(), + "--access-token", + token.as_str(), + ]) + .await + .success(); + + // 2.3. Push the dataset to the API server + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_dataset_endpoint.as_str(), + ], + "1 dataset(s) pushed", + ) + .await; + } + + // 3. Pulling the dataset from the API server + { + let kamu_in_pull_workspace = KamuCliPuppet::new_workspace_tmp().await; + + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", kamu_api_server_dataset_endpoint.as_str()], + "1 dataset(s) updated", + ) + .await; + } +} - let mut dataset_endpoint = Url::parse("odf+http://host").unwrap(); +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - dataset_endpoint.set_host(base_url.host_str()).unwrap(); - dataset_endpoint.set_port(base_url.port()).unwrap(); +pub async fn test_smart_force_push_pull(kamu_api_server_client: KamuApiServerClient) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + // 1. Grub a token + let token = kamu_api_server_client.login_as_e2e_user().await; - dataset_endpoint - .join("e2e-user/player-scores") - .unwrap() - .to_string() - }; + let kamu_api_server_dataset_endpoint = get_dataset_endpoint( + kamu_api_server_client.get_base_url(), + &dataset_name, + E2E_USER_ACCOUNT_NAME_STR, + ); // 2. Pushing the dataset to the API server { @@ -37,75 +118,166 @@ pub async fn test_smart_push_pull_sequence(kamu_api_server_client: KamuApiServer // 2.1. Add the dataset { - let dataset_path = kamu_in_push_workspace - .workspace_path() - .join("player-scores.yaml"); - - std::fs::write( - dataset_path.clone(), - indoc::indoc!( - r#" - kind: DatasetSnapshot - version: 1 - content: - name: player-scores - kind: Root - metadata: - - kind: AddPushSource - sourceName: default - read: - kind: NdJson - schema: - - "match_time TIMESTAMP" - - "match_id BIGINT" - - "player_id STRING" - - "score BIGINT" - merge: - kind: Ledger - primaryKey: - - match_id - - player_id - - kind: SetVocab - eventTimeColumn: match_time - "# - ), - ) - .unwrap(); - kamu_in_push_workspace - .execute(["add", dataset_path.to_str().unwrap()]) + .execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) .await .success(); } // 2.1. Ingest data to the dataset { - let dataset_data_path = kamu_in_push_workspace - .workspace_path() - .join("player-scores.data.ndjson"); - - std::fs::write( - dataset_data_path.clone(), - indoc::indoc!( - r#" - {"match_time": "2000-01-01", "match_id": 1, "player_id": "Alice", "score": 100} - {"match_time": "2000-01-01", "match_id": 1, "player_id": "Bob", "score": 80} - "#, - ), + kamu_in_push_workspace + .ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + } + + // 2.2. Login to the API server + kamu_in_push_workspace + .execute([ + "login", + kamu_api_server_client.get_base_url().as_str(), + "--access-token", + token.as_str(), + ]) + .await + .success(); + + // Initial dataset push + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_dataset_endpoint.as_str(), + ], + "1 dataset(s) pushed", + ) + .await; + + // Hard compact dataset + kamu_in_push_workspace + .execute([ + "--yes", + "system", + "compact", + dataset_name.as_str(), + "--hard", + "--keep-metadata-only", + ]) + .await + .success(); + + // Should fail without force flag + run_and_assert_command_failure( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_dataset_endpoint.as_str(), + ], + "Failed to push 1 dataset(s)", + ) + .await; + + // Should successfully push with force flag + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_dataset_endpoint.as_str(), + "--force", + ], + "1 dataset(s) pushed", + ) + .await; + } + + // 3. Pulling the dataset from the API server + { + let kamu_in_pull_workspace = KamuCliPuppet::new_workspace_tmp().await; + + // Call with no-alias flag to avoid remote ingest checking in next step + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec![ + "pull", + kamu_api_server_dataset_endpoint.as_str(), + "--no-alias", + ], + "1 dataset(s) updated", + ) + .await; + + // Ingest data in pulled dataset + + kamu_in_pull_workspace + .ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, ) - .unwrap(); + .await; + + // Should fail without force flag + run_and_assert_command_failure( + &kamu_in_pull_workspace, + vec!["pull", kamu_api_server_dataset_endpoint.as_str()], + "Failed to update 1 dataset(s)", + ) + .await; + + // Should successfully pull with force flag + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", kamu_api_server_dataset_endpoint.as_str(), "--force"], + "1 dataset(s) updated", + ) + .await; + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +pub async fn test_smart_push_pull_add_alias(kamu_api_server_client: KamuApiServerClient) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + // 1. Grub a token + let token = kamu_api_server_client.login_as_e2e_user().await; + + let kamu_api_server_dataset_endpoint = get_dataset_endpoint( + kamu_api_server_client.get_base_url(), + &dataset_name, + E2E_USER_ACCOUNT_NAME_STR, + ); + + // 2. Push command + { + let kamu_in_push_workspace = KamuCliPuppet::new_workspace_tmp().await; + + // Add the dataset + { kamu_in_push_workspace - .execute([ - "ingest", - "player-scores", - dataset_data_path.to_str().unwrap(), - ]) + .execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) .await .success(); } - // 2.2. Login to the API server + // Ingest data to the dataset + { + kamu_in_push_workspace + .ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + } + + // Login to the API server kamu_in_push_workspace .execute([ "login", @@ -116,27 +288,1025 @@ pub async fn test_smart_push_pull_sequence(kamu_api_server_client: KamuApiServer .await .success(); - // 2.3. Push the dataset to the API server - kamu_in_push_workspace - .execute([ + // Dataset push without storing alias + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_dataset_endpoint.as_str(), + "--no-alias", + ], + "1 dataset(s) pushed", + ) + .await; + + // Check alias should be empty + let aliases = kamu_in_push_workspace + .get_list_of_repo_aliases(&opendatafabric::DatasetRef::from(dataset_name.clone())) + .await; + assert!(aliases.is_empty()); + + // Dataset push with storing alias + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ "push", - "player-scores", + dataset_name.as_str(), "--to", kamu_api_server_dataset_endpoint.as_str(), + ], + "1 dataset(s) up-to-date", + ) + .await; + + let aliases = kamu_in_push_workspace + .get_list_of_repo_aliases(&opendatafabric::DatasetRef::from(dataset_name.clone())) + .await; + let expected_aliases = vec![RepoAlias { + dataset: dataset_name.clone(), + kind: "Push".to_string(), + alias: kamu_api_server_dataset_endpoint.clone(), + }]; + pretty_assertions::assert_eq!(aliases, expected_aliases); + } + + // 3. Pull command + { + let kamu_in_pull_workspace = KamuCliPuppet::new_workspace_tmp().await; + + // Dataset pull without storing alias + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec![ + "pull", + kamu_api_server_dataset_endpoint.as_str(), + "--no-alias", + ], + "1 dataset(s) updated", + ) + .await; + + // Check alias should be empty + let aliases = kamu_in_pull_workspace + .get_list_of_repo_aliases(&opendatafabric::DatasetRef::from(dataset_name.clone())) + .await; + assert!(aliases.is_empty()); + + // Delete local dataset + kamu_in_pull_workspace + .execute(["--yes", "delete", dataset_name.as_str()]) + .await + .success(); + + // Dataset pull with storing alias + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", kamu_api_server_dataset_endpoint.as_str()], + "1 dataset(s) updated", + ) + .await; + + let aliases = kamu_in_pull_workspace + .get_list_of_repo_aliases(&opendatafabric::DatasetRef::from(dataset_name.clone())) + .await; + let expected_aliases = vec![RepoAlias { + dataset: dataset_name.clone(), + kind: "Pull".to_string(), + alias: kamu_api_server_dataset_endpoint.clone(), + }]; + pretty_assertions::assert_eq!(aliases, expected_aliases); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_smart_pull_as(kamu_api_server_client: KamuApiServerClient) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + // 1. Grub a token + let token = kamu_api_server_client.login_as_e2e_user().await; + + kamu_api_server_client + .create_player_scores_dataset_with_data( + &token, + Some(AccountName::new_unchecked(E2E_USER_ACCOUNT_NAME_STR)), + ) + .await; + + let kamu_api_server_dataset_endpoint = get_dataset_endpoint( + kamu_api_server_client.get_base_url(), + &dataset_name, + E2E_USER_ACCOUNT_NAME_STR, + ); + + { + let kamu_in_pull_workspace = KamuCliPuppet::new_workspace_tmp().await; + + let new_dataset_name = DatasetName::new_unchecked("foo"); + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec![ + "pull", + kamu_api_server_dataset_endpoint.as_str(), + "--as", + new_dataset_name.as_str(), + ], + "1 dataset(s) updated", + ) + .await; + + let expected_dataset_list = kamu_in_pull_workspace + .list_datasets() + .await + .into_iter() + .map(|dataset| dataset.name) + .collect::>(); + + pretty_assertions::assert_eq!(vec![new_dataset_name], expected_dataset_list); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_smart_push_pull_all(kamu_api_server_client: KamuApiServerClient) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + let dataset_derivative_name = DatasetName::new_unchecked("leaderboard"); + + // 1. Grub a token + let token = kamu_api_server_client.login_as_e2e_user().await; + + let kamu_api_server_root_dataset_endpoint = get_dataset_endpoint( + kamu_api_server_client.get_base_url(), + &dataset_name, + E2E_USER_ACCOUNT_NAME_STR, + ); + let kamu_api_server_derivative_dataset_endpoint = get_dataset_endpoint( + kamu_api_server_client.get_base_url(), + &dataset_derivative_name, + E2E_USER_ACCOUNT_NAME_STR, + ); + + let mut kamu_in_push_workspace = KamuCliPuppet::new_workspace_tmp().await; + + // 2. Pushing datasets to the API server + { + kamu_in_push_workspace + .set_system_time(Some(DateTime::from_str("2050-01-02T03:04:05Z").unwrap())); + + // 2.1. Add datasets + { + kamu_in_push_workspace + .execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu_in_push_workspace + .execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + } + + // 2.1. Ingest data to the dataset + { + kamu_in_push_workspace + .ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + } + + // 2.2. Login to the API server + kamu_in_push_workspace + .execute([ + "login", + kamu_api_server_client.get_base_url().as_str(), + "--access-token", + token.as_str(), ]) .await .success(); + + // Push all datasets should fail + run_and_assert_command_failure( + &kamu_in_push_workspace, + vec!["push", "--all"], + "Pushing all datasets is not yet supported", + ) + .await; + + // Push datasets one by one + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_root_dataset_endpoint.as_str(), + ], + "1 dataset(s) pushed", + ) + .await; + + kamu_in_push_workspace + .execute(["pull", dataset_derivative_name.as_str()]) + .await + .success(); + + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_derivative_name.as_str(), + "--to", + kamu_api_server_derivative_dataset_endpoint.as_str(), + ], + "1 dataset(s) pushed", + ) + .await; } - // 3. Pulling the dataset from the API server + // 3. Pulling datasets from the API server { let kamu_in_pull_workspace = KamuCliPuppet::new_workspace_tmp().await; + // Pull datasets one by one and check data + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", kamu_api_server_root_dataset_endpoint.as_str()], + "1 dataset(s) updated", + ) + .await; + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", kamu_api_server_derivative_dataset_endpoint.as_str()], + "1 dataset(s) updated", + ) + .await; + + let expected_schema = indoc::indoc!( + r#" + message arrow_schema { + REQUIRED INT64 offset; + REQUIRED INT32 op; + REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_id; + OPTIONAL BYTE_ARRAY player_id (STRING); + OPTIONAL INT64 score; + } + "# + ); + let expected_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | offset | op | system_time | match_time | match_id | player_id | score | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | 0 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00Z | 1 | Alice | 100 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00.001Z | 1 | Bob | 80 | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + "# + ); + let expected_derivative_schema = indoc::indoc!( + r#" + message arrow_schema { + OPTIONAL INT64 offset; + REQUIRED INT32 op; + REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 place; + OPTIONAL INT64 match_id; + OPTIONAL BYTE_ARRAY player_id (STRING); + OPTIONAL INT64 score; + } + "# + ); + let expected_derivative_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | offset | op | system_time | match_time | place | match_id | player_id | score | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | 0 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00Z | 1 | 1 | Alice | 100 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00.001Z | 2 | 1 | Bob | 80 | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + "# + ); + + kamu_in_pull_workspace + .assert_last_data_slice(&dataset_name, expected_schema, expected_data) + .await; + kamu_in_pull_workspace + .assert_last_data_slice( + &dataset_derivative_name, + expected_derivative_schema, + expected_derivative_data, + ) + .await; + + // Update remote datasets + + kamu_in_push_workspace + .ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + ) + .await; + run_and_assert_command_success( + &kamu_in_push_workspace, + vec!["pull", dataset_derivative_name.as_str()], + "1 dataset(s) updated", + ) + .await; + + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_root_dataset_endpoint.as_str(), + ], + "1 dataset(s) pushed", + ) + .await; + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_derivative_name.as_str(), + "--to", + kamu_api_server_derivative_dataset_endpoint.as_str(), + ], + "1 dataset(s) pushed", + ) + .await; + + // Pull all datasets + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", "--all"], + "2 dataset(s) updated", + ) + .await; + + // Perform dataslices checks + let expected_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | offset | op | system_time | match_time | match_id | player_id | score | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | 2 | 0 | 2050-01-02T03:04:05Z | 2000-01-02T00:00:00Z | 2 | Alice | 70 | + | 3 | 0 | 2050-01-02T03:04:05Z | 2000-01-02T00:00:00.001Z | 2 | Charlie | 90 | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + "# + ); + let expected_derivative_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | offset | op | system_time | match_time | place | match_id | player_id | score | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | 2 | 1 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00.001Z | 2 | 1 | Bob | 80 | + | 3 | 0 | 2050-01-02T03:04:05Z | 2000-01-02T00:00:00.001Z | 2 | 2 | Charlie | 90 | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + "# + ); + + kamu_in_pull_workspace + .assert_last_data_slice(&dataset_name, expected_schema, expected_data) + .await; + kamu_in_pull_workspace + .assert_last_data_slice( + &dataset_derivative_name, + expected_derivative_schema, + expected_derivative_data, + ) + .await; + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_smart_push_pull_recursive(kamu_api_server_client: KamuApiServerClient) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + let dataset_derivative_name = DatasetName::new_unchecked("leaderboard"); + + // 1. Grub a token + let token = kamu_api_server_client.login_as_e2e_user().await; + + let kamu_api_server_root_dataset_endpoint = get_dataset_endpoint( + kamu_api_server_client.get_base_url(), + &dataset_name, + E2E_USER_ACCOUNT_NAME_STR, + ); + let mut kamu_in_push_workspace = KamuCliPuppet::new_workspace_tmp().await; + + // 2. Pushing datasets to the API server + { + kamu_in_push_workspace + .set_system_time(Some(DateTime::from_str("2050-01-02T03:04:05Z").unwrap())); + + // 2.1. Add datasets + { + kamu_in_push_workspace + .execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + } + + // 2.1. Ingest data to the dataset + { + kamu_in_push_workspace + .ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + } + + // 2.2. Login to the API server + kamu_in_push_workspace + .execute([ + "login", + kamu_api_server_client.get_base_url().as_str(), + "--access-token", + token.as_str(), + ]) + .await + .success(); + + // Push all datasets should fail + run_and_assert_command_failure( + &kamu_in_push_workspace, + vec!["push", dataset_name.as_str(), "--recursive"], + "Recursive push is not yet supported", + ) + .await; + + // Push dataset + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_root_dataset_endpoint.as_str(), + ], + "1 dataset(s) pushed", + ) + .await; + } + + // 3. Pulling datasets from the API server + { + let mut kamu_in_pull_workspace = KamuCliPuppet::new_workspace_tmp().await; + kamu_in_pull_workspace + .set_system_time(Some(DateTime::from_str("2050-01-02T03:04:05Z").unwrap())); + + // Pull datasets one by one and check data + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", kamu_api_server_root_dataset_endpoint.as_str()], + "1 dataset(s) updated", + ) + .await; + kamu_in_pull_workspace - .execute(["pull", kamu_api_server_dataset_endpoint.as_str()]) + .execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) .await .success(); + + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", dataset_derivative_name.as_str()], + "1 dataset(s) updated", + ) + .await; + + let expected_schema = indoc::indoc!( + r#" + message arrow_schema { + REQUIRED INT64 offset; + REQUIRED INT32 op; + REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_id; + OPTIONAL BYTE_ARRAY player_id (STRING); + OPTIONAL INT64 score; + } + "# + ); + let expected_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | offset | op | system_time | match_time | match_id | player_id | score | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | 0 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00Z | 1 | Alice | 100 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00.001Z | 1 | Bob | 80 | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + "# + ); + let expected_derivative_schema = indoc::indoc!( + r#" + message arrow_schema { + OPTIONAL INT64 offset; + REQUIRED INT32 op; + REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 place; + OPTIONAL INT64 match_id; + OPTIONAL BYTE_ARRAY player_id (STRING); + OPTIONAL INT64 score; + } + "# + ); + let expected_derivative_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | offset | op | system_time | match_time | place | match_id | player_id | score | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | 0 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00Z | 1 | 1 | Alice | 100 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00.001Z | 2 | 1 | Bob | 80 | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + "# + ); + + kamu_in_pull_workspace + .assert_last_data_slice(&dataset_name, expected_schema, expected_data) + .await; + kamu_in_pull_workspace + .assert_last_data_slice( + &dataset_derivative_name, + expected_derivative_schema, + expected_derivative_data, + ) + .await; + + // Update remote datasets + + kamu_in_push_workspace + .ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + ) + .await; + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_root_dataset_endpoint.as_str(), + ], + "1 dataset(s) pushed", + ) + .await; + + // Pull all datasets + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", dataset_derivative_name.as_str(), "--recursive"], + "2 dataset(s) updated", + ) + .await; + + // Perform dataslices checks + let expected_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | offset | op | system_time | match_time | match_id | player_id | score | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | 2 | 0 | 2050-01-02T03:04:05Z | 2000-01-02T00:00:00Z | 2 | Alice | 70 | + | 3 | 0 | 2050-01-02T03:04:05Z | 2000-01-02T00:00:00.001Z | 2 | Charlie | 90 | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + "# + ); + let expected_derivative_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | offset | op | system_time | match_time | place | match_id | player_id | score | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | 2 | 1 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00.001Z | 2 | 1 | Bob | 80 | + | 3 | 0 | 2050-01-02T03:04:05Z | 2000-01-02T00:00:00.001Z | 2 | 2 | Charlie | 90 | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + "# + ); + + kamu_in_pull_workspace + .assert_last_data_slice(&dataset_name, expected_schema, expected_data) + .await; + kamu_in_pull_workspace + .assert_last_data_slice( + &dataset_derivative_name, + expected_derivative_schema, + expected_derivative_data, + ) + .await; } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_smart_pull_set_watermark(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + let assert = kamu + .execute([ + "pull", + dataset_name.as_str(), + "--set-watermark", + "2051-01-02T03:04:05Z", + ]) + .await + .success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(indoc::indoc!(r#"Committed new block"#).trim()), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_smart_pull_reset_derivative(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + let dataset_derivative_name = DatasetName::new_unchecked("leaderboard"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + + run_and_assert_command_success( + &kamu, + vec!["pull", dataset_derivative_name.as_str()], + "1 dataset(s) updated", + ) + .await; + + let expected_derivative_schema = indoc::indoc!( + r#" + message arrow_schema { + OPTIONAL INT64 offset; + REQUIRED INT32 op; + REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 place; + OPTIONAL INT64 match_id; + OPTIONAL BYTE_ARRAY player_id (STRING); + OPTIONAL INT64 score; + } + "# + ); + let expected_derivative_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | offset | op | system_time | match_time | place | match_id | player_id | score | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | 0 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00Z | 1 | 1 | Alice | 100 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00.001Z | 2 | 1 | Bob | 80 | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + "# + ); + kamu.assert_last_data_slice( + &dataset_derivative_name, + expected_derivative_schema, + expected_derivative_data, + ) + .await; + + // Compact root dataset + kamu.execute([ + "--yes", + "system", + "compact", + dataset_name.as_str(), + "--hard", + "--keep-metadata-only", + ]) + .await + .success(); + + // Pull derivative should fail + run_and_assert_command_failure( + &kamu, + vec!["pull", dataset_derivative_name.as_str()], + "Failed to update 1 dataset(s)", + ) + .await; + + // Add new data to root dataset + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_2, + ) + .await; + + run_and_assert_command_success( + &kamu, + vec![ + "pull", + dataset_derivative_name.as_str(), + "--reset-derivatives-on-diverged-input", + ], + "1 dataset(s) updated", + ) + .await; + + let expected_derivative_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | offset | op | system_time | match_time | place | match_id | player_id | score | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | 0 | 0 | 2050-01-02T03:04:05Z | 2000-01-02T00:00:00Z | 2 | 2 | Alice | 70 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2000-01-02T00:00:00.001Z | 1 | 2 | Charlie | 90 | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + "# + ); + kamu.assert_last_data_slice( + &dataset_derivative_name, + expected_derivative_schema, + expected_derivative_data, + ) + .await; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_smart_push_visibility(kamu_api_server_client: KamuApiServerClient) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + // 1. Grub a token + let token = kamu_api_server_client.login_as_e2e_user().await; + + let kamu_api_server_dataset_endpoint = get_dataset_endpoint( + kamu_api_server_client.get_base_url(), + &dataset_name, + E2E_USER_ACCOUNT_NAME_STR, + ); + + // 2. Pushing the dataset to the API server + { + let kamu_in_push_workspace = KamuCliPuppet::new_workspace_tmp().await; + + // 2.1. Add the dataset + { + kamu_in_push_workspace + .execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + } + + // 2.1. Ingest data to the dataset + { + kamu_in_push_workspace + .ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + } + + // 2.2. Login to the API server + kamu_in_push_workspace + .execute([ + "login", + kamu_api_server_client.get_base_url().as_str(), + "--access-token", + token.as_str(), + ]) + .await + .success(); + + run_and_assert_command_success( + &kamu_in_push_workspace, + vec![ + "push", + dataset_name.as_str(), + "--to", + kamu_api_server_dataset_endpoint.as_str(), + "--visibility", + "private", + ], + "1 dataset(s) pushed", + ) + .await; + + // ToDo add visibility check + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_smart_push_pull_s3(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + + let s3_server = LocalS3Server::new().await; + + let dataset_url = format!("{}/e2e-user/{dataset_name}", s3_server.url); + // Push dataset + run_and_assert_command_success( + &kamu, + vec!["push", dataset_name.as_str(), "--to", dataset_url.as_str()], + "1 dataset(s) pushed", + ) + .await; + + { + let kamu_in_pull_workspace = KamuCliPuppet::new_workspace_tmp().await; + + run_and_assert_command_success( + &kamu_in_pull_workspace, + vec!["pull", dataset_url.as_str()], + "1 dataset(s) updated", + ) + .await; + + let expected_schema = indoc::indoc!( + r#" + message arrow_schema { + REQUIRED INT64 offset; + REQUIRED INT32 op; + REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_id; + OPTIONAL BYTE_ARRAY player_id (STRING); + OPTIONAL INT64 score; + } + "# + ); + let expected_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | offset | op | system_time | match_time | match_id | player_id | score | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + | 0 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00Z | 1 | Alice | 100 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00.001Z | 1 | Bob | 80 | + +--------+----+----------------------+--------------------------+----------+-----------+-------+ + "# + ); + kamu.assert_last_data_slice(&dataset_name, expected_schema, expected_data) + .await; + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn test_smart_pull_derivative(kamu: KamuCliPuppet) { + let dataset_name = DatasetName::new_unchecked("player-scores"); + let dataset_derivative_name = DatasetName::new_unchecked("leaderboard"); + + kamu.execute_with_input(["add", "--stdin"], DATASET_ROOT_PLAYER_SCORES_SNAPSHOT_STR) + .await + .success(); + + kamu.execute_with_input( + ["add", "--stdin"], + DATASET_DERIVATIVE_LEADERBOARD_SNAPSHOT_STR, + ) + .await + .success(); + + kamu.ingest_data( + &dataset_name, + DATASET_ROOT_PLAYER_SCORES_INGEST_DATA_NDJSON_CHUNK_1, + ) + .await; + + run_and_assert_command_failure( + &kamu, + vec![ + "tail", + dataset_derivative_name.as_str(), + "--output-format", + "table", + ], + "Error: Dataset schema is not yet available: leaderboard", + ) + .await; + + run_and_assert_command_success( + &kamu, + vec!["pull", dataset_derivative_name.as_str()], + "1 dataset(s) updated", + ) + .await; + + let expected_derivative_schema = indoc::indoc!( + r#" + message arrow_schema { + OPTIONAL INT64 offset; + REQUIRED INT32 op; + REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 match_time (TIMESTAMP(MILLIS,true)); + OPTIONAL INT64 place; + OPTIONAL INT64 match_id; + OPTIONAL BYTE_ARRAY player_id (STRING); + OPTIONAL INT64 score; + } + "# + ); + + let expected_derivative_data = indoc::indoc!( + r#" + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | offset | op | system_time | match_time | place | match_id | player_id | score | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + | 0 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00Z | 1 | 1 | Alice | 100 | + | 1 | 0 | 2050-01-02T03:04:05Z | 2000-01-01T00:00:00.001Z | 2 | 1 | Bob | 80 | + +--------+----+----------------------+--------------------------+-------+----------+-----------+-------+ + "# + ); + kamu.assert_last_data_slice( + &dataset_derivative_name, + expected_derivative_schema, + expected_derivative_data, + ) + .await; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Helpers +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +fn get_dataset_endpoint( + base_url: &Url, + dataset_name: &DatasetName, + account_name_str: &str, +) -> String { + let mut dataset_endpoint = Url::parse("odf+http://host").unwrap(); + + dataset_endpoint.set_host(base_url.host_str()).unwrap(); + dataset_endpoint.set_port(base_url.port()).unwrap(); + + dataset_endpoint + .join(format!("{account_name_str}/{dataset_name}").as_str()) + .unwrap() + .to_string() +} + +async fn run_and_assert_command_success( + kamu: &KamuCliPuppet, + args: Vec<&str>, + expected_message: &str, +) { + let assert = kamu.execute(args).await.success(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(expected_message), + "Unexpected output:\n{stderr}", + ); +} + +async fn run_and_assert_command_failure( + kamu: &KamuCliPuppet, + args: Vec<&str>, + expected_message: &str, +) { + let assert = kamu.execute(args).await.failure(); + + let stderr = std::str::from_utf8(&assert.get_output().stderr).unwrap(); + + assert!( + stderr.contains(expected_message), + "Unexpected output:\n{stderr}", + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs b/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs index 8c9292bf7..bebf25b1e 100644 --- a/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs +++ b/src/utils/kamu-cli-puppet/src/kamu_cli_puppet_ext.rs @@ -14,15 +14,10 @@ use std::path::PathBuf; use async_trait::async_trait; use chrono::{DateTime, Utc}; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use opendatafabric::serde::yaml::{ - DatasetKindDef, - YamlDatasetSnapshotSerializer, - YamlMetadataBlockDeserializer, -}; +use opendatafabric::serde::yaml::{YamlDatasetSnapshotSerializer, YamlMetadataBlockDeserializer}; use opendatafabric::serde::{DatasetSnapshotSerializer, MetadataBlockDeserializer}; use opendatafabric::{ DatasetID, - DatasetKind, DatasetName, DatasetRef, DatasetSnapshot, @@ -41,14 +36,16 @@ pub trait KamuCliPuppetExt { async fn add_dataset(&self, dataset_snapshot: DatasetSnapshot); + async fn list_blocks(&self, dataset_name: &DatasetName) -> Vec; + + async fn ingest_data(&self, dataset_name: &DatasetName, data: &str); + async fn get_list_of_repo_aliases(&self, dataset_ref: &DatasetRef) -> Vec; async fn complete(&self, input: T, current: usize) -> Vec where T: Into + Send; - async fn list_blocks(&self, dataset_name: &DatasetName) -> Vec; - async fn start_api_server(self, e2e_data_file_path: PathBuf) -> ServerOutput; async fn assert_last_data_slice( @@ -227,6 +224,18 @@ impl KamuCliPuppetExt for KamuCliPuppet { kamu_data_utils::testing::assert_data_eq(df.clone(), expected_data).await; kamu_data_utils::testing::assert_schema_eq(df.schema(), expected_schema); } + + async fn ingest_data(&self, dataset_name: &DatasetName, data: &str) { + let dataset_data_path = self + .workspace_path() + .join(format!("{dataset_name}.data.ndjson")); + + std::fs::write(dataset_data_path.clone(), data).unwrap(); + + self.execute(["ingest", dataset_name, dataset_data_path.to_str().unwrap()]) + .await + .success(); + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -244,8 +253,9 @@ pub struct DatasetRecord { #[serde(rename = "ID")] pub id: DatasetID, pub name: DatasetName, - #[serde(with = "DatasetKindDef")] - pub kind: DatasetKind, + // CLI returns regular ENUM DatasetKind(Root/Derivative) for local datasets + // but for remote it is Remote(DatasetKind) type + pub kind: String, pub head: Multihash, pub pulled: Option>, pub records: usize,