From d8567beb031fdb48bcd2602efda6449d836b9c1f Mon Sep 17 00:00:00 2001 From: Yo Eight Date: Sat, 6 Jul 2024 14:44:46 -0400 Subject: [PATCH] Add integration tests. (#7) --- Cargo.lock | 349 +++++++++++++----- Cargo.toml | 4 +- geth-client-tests/Cargo.toml | 29 ++ geth-client-tests/src/append_read_tests.rs | 64 ++++ geth-client-tests/src/lib.rs | 33 ++ geth-client/src/next/driver.rs | 52 ++- geth-client/src/next/mod.rs | 14 +- geth-common/src/lib.rs | 1 + geth-engine/Cargo.toml | 57 +++ {geth-node => geth-engine}/src/bus.rs | 0 .../src/domain/index.rs | 0 {geth-node => geth-engine}/src/domain/mod.rs | 0 {geth-node => geth-engine}/src/grpc/local.rs | 0 {geth-node => geth-engine}/src/grpc/mod.rs | 9 +- .../src/grpc/protocol.rs | 0 geth-engine/src/lib.rs | 47 +++ {geth-node => geth-engine}/src/messages.rs | 22 +- {geth-node => geth-engine}/src/names.rs | 0 geth-engine/src/options.rs | 19 + {geth-node => geth-engine}/src/process.rs | 15 +- .../src/process/storage/mod.rs | 0 .../src/process/storage/reader.rs | 0 .../src/process/storage/writer.rs | 0 .../src/process/subscriptions/mod.rs | 3 - .../src/process/subscriptions/programmable.rs | 4 +- .../src/services/index.rs | 0 .../src/services/mod.rs | 0 geth-node/Cargo.toml | 45 +-- geth-node/src/main.rs | 44 +-- 29 files changed, 588 insertions(+), 223 deletions(-) create mode 100644 geth-client-tests/Cargo.toml create mode 100644 geth-client-tests/src/append_read_tests.rs create mode 100644 geth-client-tests/src/lib.rs create mode 100644 geth-engine/Cargo.toml rename {geth-node => geth-engine}/src/bus.rs (100%) rename {geth-node => geth-engine}/src/domain/index.rs (100%) rename {geth-node => geth-engine}/src/domain/mod.rs (100%) rename {geth-node => geth-engine}/src/grpc/local.rs (100%) rename {geth-node => geth-engine}/src/grpc/mod.rs (67%) rename {geth-node => geth-engine}/src/grpc/protocol.rs (100%) create mode 100644 geth-engine/src/lib.rs rename {geth-node => geth-engine}/src/messages.rs (69%) rename {geth-node => geth-engine}/src/names.rs (100%) create mode 100644 geth-engine/src/options.rs rename {geth-node => geth-engine}/src/process.rs (95%) rename {geth-node => geth-engine}/src/process/storage/mod.rs (100%) rename {geth-node => geth-engine}/src/process/storage/reader.rs (100%) rename {geth-node => geth-engine}/src/process/storage/writer.rs (100%) rename {geth-node => geth-engine}/src/process/subscriptions/mod.rs (98%) rename {geth-node => geth-engine}/src/process/subscriptions/programmable.rs (98%) rename {geth-node => geth-engine}/src/services/index.rs (100%) rename {geth-node => geth-engine}/src/services/mod.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index a508ee2..d4b5efc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,16 +43,16 @@ dependencies = [ [[package]] name = "anstream" -version = "0.3.2" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163" +checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", - "is-terminal", + "is_terminal_polyfill", "utf8parse", ] @@ -77,17 +77,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] name = "anstyle-wincon" -version = "1.0.1" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -322,20 +322,19 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.19" +version = "4.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d" +checksum = "84b3edb18336f4df585bc9aa31dd99c036dfa5dc5e9a2939a722a188f3a8970d" dependencies = [ "clap_builder", "clap_derive", - "once_cell", ] [[package]] name = "clap_builder" -version = "4.3.19" +version = "4.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1" +checksum = "c1c09dd5ada6c6c78075d6fd0da3f90d8080651e2d6cc8eb2f1aaa4034ced708" dependencies = [ "anstream", "anstyle", @@ -345,9 +344,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.3.12" +version = "4.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" +checksum = "2bac35c6dafb060fd4d275d9a4ffae97917c13a6327903a8be2153cd964f7085" dependencies = [ "heck", "proc-macro2", @@ -357,9 +356,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.5.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" +checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" [[package]] name = "colorchoice" @@ -449,6 +448,47 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83b2eb4d90d12bdda5ed17de686c2acb4c57914f8f921b8da7e112b5a36f3fe1" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622687fe0bac72a04e5599029151f5796111b90f1baaa9b544d807a5e31cd120" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" +dependencies = [ + "darling_core", + "quote", + "syn", +] + +[[package]] +name = "deunicode" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339544cc9e2c4dc3fc7149fd630c5f22263a4fdf18a98afd0075784968b5cf00" + [[package]] name = "digest" version = "0.10.7" @@ -477,7 +517,19 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys", + "windows-sys 0.48.0", +] + +[[package]] +name = "dummy" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e57e12b69e57fad516e01e2b3960f122696fdb13420e1a88ed8e210316f2876" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -500,7 +552,7 @@ checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" dependencies = [ "errno-dragonfly", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -532,6 +584,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fake" +version = "2.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c25829bde82205da46e1823b2259db6273379f626fc211f126f65654a2669be" +dependencies = [ + "deunicode", + "dummy", + "rand", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -683,6 +746,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "geth-client-tests" +version = "0.1.0" +dependencies = [ + "eyre", + "fake", + "futures", + "geth-client", + "geth-common", + "geth-engine", + "serde", + "serde_json", + "temp-dir", + "tokio", + "uuid", +] + [[package]] name = "geth-common" version = "0.1.0" @@ -727,6 +807,37 @@ dependencies = [ "uuid", ] +[[package]] +name = "geth-engine" +version = "0.1.0" +dependencies = [ + "async-stream", + "async-trait", + "byteorder", + "bytes", + "chrono", + "clap", + "eyre", + "flatbuffers", + "futures", + "geth-common", + "geth-domain", + "geth-mikoshi", + "moka", + "prost", + "prost-derive", + "prost-types", + "pyro-core", + "pyro-runtime", + "rand", + "serde_json", + "tokio", + "tonic", + "tonic-build", + "tracing", + "uuid", +] + [[package]] name = "geth-mikoshi" version = "0.1.0" @@ -753,31 +864,12 @@ dependencies = [ name = "geth-node" version = "0.1.0" dependencies = [ - "async-stream", - "async-trait", - "byteorder", - "bytes", - "chrono", + "clap", "eyre", - "flatbuffers", - "futures", - "geth-common", - "geth-domain", - "geth-mikoshi", - "moka", - "prost", - "prost-derive", - "prost-types", - "pyro-core", - "pyro-runtime", - "rand", - "serde_json", + "geth-engine", "tokio", - "tonic", - "tonic-build", "tracing", "tracing-subscriber", - "uuid", ] [[package]] @@ -868,9 +960,9 @@ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" [[package]] name = "heck" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" @@ -971,6 +1063,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "indenter" version = "0.3.3" @@ -1014,19 +1112,14 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ "hermit-abi", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] -name = "is-terminal" -version = "0.4.9" +name = "is_terminal_polyfill" +version = "1.70.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" -dependencies = [ - "hermit-abi", - "rustix 0.38.4", - "windows-sys", -] +checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" [[package]] name = "itertools" @@ -1076,12 +1169,6 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" -[[package]] -name = "linux-raw-sys" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" - [[package]] name = "lock_api" version = "0.4.10" @@ -1167,7 +1254,7 @@ dependencies = [ "libc", "log", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1285,7 +1372,7 @@ dependencies = [ "libc", "redox_syscall 0.3.5", "smallvec", - "windows-targets", + "windows-targets 0.48.1", ] [[package]] @@ -1354,9 +1441,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] @@ -1632,21 +1719,8 @@ dependencies = [ "errno", "io-lifetimes", "libc", - "linux-raw-sys 0.3.8", - "windows-sys", -] - -[[package]] -name = "rustix" -version = "0.38.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" -dependencies = [ - "bitflags 2.3.3", - "errno", - "libc", - "linux-raw-sys 0.4.3", - "windows-sys", + "linux-raw-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1750,9 +1824,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -1829,9 +1903,9 @@ dependencies = [ [[package]] name = "strsim" -version = "0.10.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" @@ -1856,6 +1930,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "temp-dir" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f227968ec00f0e5322f9b8173c7a0cbcff6181a0a5b28e9892491c286277231" + [[package]] name = "temp_testdir" version = "0.2.3" @@ -1872,8 +1952,8 @@ dependencies = [ "cfg-if", "fastrand", "redox_syscall 0.3.5", - "rustix 0.37.23", - "windows-sys", + "rustix", + "windows-sys 0.48.0", ] [[package]] @@ -1898,9 +1978,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ "cfg-if", "once_cell", @@ -1934,7 +2014,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2090,20 +2170,20 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.3" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ - "lazy_static", "log", + "once_cell", "tracing-core", ] [[package]] name = "tracing-subscriber" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "nu-ansi-term", "sharded-slab", @@ -2321,7 +2401,7 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" dependencies = [ - "windows-targets", + "windows-targets 0.48.1", ] [[package]] @@ -2330,7 +2410,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.1", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.5", ] [[package]] @@ -2339,13 +2428,29 @@ version = "0.48.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", +] + +[[package]] +name = "windows-targets" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +dependencies = [ + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -2354,38 +2459,86 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" + [[package]] name = "windows_aarch64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" + [[package]] name = "windows_i686_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +[[package]] +name = "windows_i686_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" + [[package]] name = "windows_i686_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +[[package]] +name = "windows_i686_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" + [[package]] name = "windows_x86_64_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" + [[package]] name = "windows_x86_64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" diff --git a/Cargo.toml b/Cargo.toml index 936b6da..2fd817e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,13 @@ [workspace] resolver = "2" members = [ - "geth-node", + "geth-engine", "geth-mikoshi", "geth-repl", "geth-client", "geth-common", "geth-consensus", "geth-domain", + "geth-client-tests", + "geth-node", ] diff --git a/geth-client-tests/Cargo.toml b/geth-client-tests/Cargo.toml new file mode 100644 index 0000000..90c9486 --- /dev/null +++ b/geth-client-tests/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "geth-client-tests" +version = "0.1.0" +edition = "2021" + +[dependencies.geth-client] +path = "../geth-client" + +[dependencies.geth-engine] +path = "../geth-engine" + +[dependencies.geth-common] +path = "../geth-common" + +[dependencies.tokio] +version = "*" +features = ["full"] + +[dependencies.fake] +version = "2.9" +features = ["derive"] + +[dependencies] +temp-dir = "0.1" +eyre = "0.6" +uuid = "*" +serde = "1" +serde_json = "1" +futures = "0.3" diff --git a/geth-client-tests/src/append_read_tests.rs b/geth-client-tests/src/append_read_tests.rs new file mode 100644 index 0000000..f80ca9b --- /dev/null +++ b/geth-client-tests/src/append_read_tests.rs @@ -0,0 +1,64 @@ +use fake::faker::name::en::Name; +use fake::{Fake, Faker}; +use futures::TryStreamExt; +use temp_dir::TempDir; +use uuid::Uuid; + +use geth_client::GrpcClient; +use geth_common::{AppendStreamCompleted, Client, Direction, ExpectedRevision, Propose, Revision}; + +use crate::tests::{client_endpoint, random_valid_options, Toto}; + +#[tokio::test] +async fn simple_append() -> eyre::Result<()> { + let db_dir = TempDir::new()?; + let options = random_valid_options(&db_dir); + + let client = GrpcClient::new(client_endpoint(&options)); + tokio::spawn(geth_engine::run(options.clone())); + + let stream_name: String = Name().fake(); + let event_type: String = Name().fake(); + let event_id = Uuid::new_v4(); + let expected: Toto = Faker.fake(); + + let completed = client + .append_stream( + &stream_name, + ExpectedRevision::Any, + vec![Propose { + id: event_id, + r#type: event_type.clone(), + data: serde_json::to_vec(&expected)?.into(), + }], + ) + .await?; + + let write_result = match completed { + AppendStreamCompleted::Success(r) => r, + AppendStreamCompleted::Error(e) => panic!("error: {}", e), + }; + + assert_eq!( + ExpectedRevision::Revision(1), + write_result.next_expected_version + ); + + let mut stream = client + .read_stream(&stream_name, Direction::Forward, Revision::Start, 1) + .await; + + let event = stream.try_next().await?.unwrap(); + + assert_eq!(event_id, event.id); + assert_eq!(event_type, event.r#type); + assert_eq!(stream_name, event.stream_name); + assert_eq!(0, event.revision); + + let actual = serde_json::from_slice::(&event.data)?; + + assert_eq!(expected.key, actual.key); + assert_eq!(expected.value, actual.value); + + Ok(()) +} diff --git a/geth-client-tests/src/lib.rs b/geth-client-tests/src/lib.rs new file mode 100644 index 0000000..c2aa7ef --- /dev/null +++ b/geth-client-tests/src/lib.rs @@ -0,0 +1,33 @@ +#[cfg(test)] +mod append_read_tests; + +#[cfg(test)] +pub mod tests { + use fake::{Dummy, Fake}; + use serde::{Deserialize, Serialize}; + use temp_dir::TempDir; + + use geth_common::EndPoint; + use geth_engine::Options; + + pub fn random_valid_options(temp_dir: &TempDir) -> Options { + Options { + host: "127.0.0.1".to_string(), + port: (1_113..2_113).fake(), + db: temp_dir.path().as_os_str().to_str().unwrap().to_string(), + } + } + + pub fn client_endpoint(options: &Options) -> EndPoint { + EndPoint { + host: options.host.clone(), + port: options.port, + } + } + + #[derive(Serialize, Deserialize, Dummy)] + pub struct Toto { + pub key: String, + pub value: u64, + } +} diff --git a/geth-client/src/next/driver.rs b/geth-client/src/next/driver.rs index 801ff1a..a7382c4 100644 --- a/geth-client/src/next/driver.rs +++ b/geth-client/src/next/driver.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; +use std::time::Duration; +use eyre::bail; use uuid::Uuid; use geth_common::generated::next::protocol; use geth_common::{EndPoint, Operation, OperationIn, OperationOut}; -use crate::next::{connect_to_node, Command, Connection, Mailbox}; +use crate::next::{connect_to_node, Command, ConnErr, Connection, Mailbox}; pub struct Driver { endpoint: EndPoint, @@ -108,9 +110,51 @@ impl Driver { .parse() .unwrap(); - let conn = connect_to_node(uri, self.mailbox.clone()).await?; - self.connection = Some(conn); + let mut attempts = 0; + let max_attempts = 10; + loop { + match connect_to_node(&uri, self.mailbox.clone()).await { + Err(e) => match e { + ConnErr::Transport(e) => { + if e.to_string() == "transport error" { + attempts += 1; + + if attempts > max_attempts { + tracing::error!( + "max connection attempt reached ({})", + max_attempts + ); + + bail!("max connection attempt reached"); + } + + tracing::error!( + "error when connecting to {}:{} {}/{}", + self.endpoint.host, + self.endpoint.port, + attempts, + max_attempts, + ); + + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + + bail!("fatal error when connecting: {}", e); + } - Ok(()) + ConnErr::Status(e) => { + // TODO - we might retry if the server is not ready or under too much load. + tracing::error!("error when reaching endpoint: {}", e); + bail!("error when reaching endpoint"); + } + }, + + Ok(c) => { + self.connection = Some(c); + return Ok(()); + } + } + } } } diff --git a/geth-client/src/next/mod.rs b/geth-client/src/next/mod.rs index 603c493..3cfbe22 100644 --- a/geth-client/src/next/mod.rs +++ b/geth-client/src/next/mod.rs @@ -27,13 +27,21 @@ pub struct Command { type Connection = UnboundedSender; type Mailbox = UnboundedSender; -pub(crate) async fn connect_to_node(uri: Uri, mailbox: Mailbox) -> eyre::Result { - let mut client = ProtocolClient::connect(uri).await?; +pub enum ConnErr { + Transport(tonic::transport::Error), + Status(tonic::Status), +} + +pub(crate) async fn connect_to_node(uri: &Uri, mailbox: Mailbox) -> Result { + let mut client = ProtocolClient::connect(uri.clone()) + .await + .map_err(ConnErr::Transport)?; let (connection, stream_request) = mpsc::unbounded_channel(); let mut stream_response = client .multiplex(UnboundedReceiverStream::new(stream_request)) - .await? + .await + .map_err(ConnErr::Status)? .into_inner(); tokio::spawn(async move { diff --git a/geth-common/src/lib.rs b/geth-common/src/lib.rs index 07956cf..37926d5 100644 --- a/geth-common/src/lib.rs +++ b/geth-common/src/lib.rs @@ -891,6 +891,7 @@ impl From for operation_out::delete_stream_completed::DeleteResult } } +#[derive(Debug)] pub enum AppendStreamCompleted { Success(WriteResult), Error(AppendError), diff --git a/geth-engine/Cargo.toml b/geth-engine/Cargo.toml new file mode 100644 index 0000000..50e1fed --- /dev/null +++ b/geth-engine/Cargo.toml @@ -0,0 +1,57 @@ +[package] +name = "geth-engine" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies.geth-mikoshi] +path = "../geth-mikoshi" + +[dependencies.geth-common] +path = "../geth-common" + +[dependencies.geth-domain] +path = "../geth-domain" + +[dependencies.tokio] +version = "1.20" +features = ["full"] + +[dependencies.uuid] +version = "1" +features = ["v4"] + +[dependencies.rand] +version = "0.8" +features = ["small_rng"] + +[dependencies.pyro-core] +git = "https://github.com/YoEight/pyro.git" + +[dependencies.pyro-runtime] +git = "https://github.com/YoEight/pyro.git" + +[dependencies.clap] +version = "4.5" +features = ["derive"] + +[dependencies] +tonic = "0.10" +prost = "0.12" +prost-types = "0.12" +prost-derive = "0.12" +tracing = "0.1" +futures = "*" +bytes = "1" +eyre = "0.6" +async-stream = "0.3" +byteorder = "1" +chrono = "0.4" +serde_json = "1" +moka = "0.11" +flatbuffers = "23.5.26" +async-trait = "0.1.71" + +[build-dependencies] +tonic-build = "0.10" diff --git a/geth-node/src/bus.rs b/geth-engine/src/bus.rs similarity index 100% rename from geth-node/src/bus.rs rename to geth-engine/src/bus.rs diff --git a/geth-node/src/domain/index.rs b/geth-engine/src/domain/index.rs similarity index 100% rename from geth-node/src/domain/index.rs rename to geth-engine/src/domain/index.rs diff --git a/geth-node/src/domain/mod.rs b/geth-engine/src/domain/mod.rs similarity index 100% rename from geth-node/src/domain/mod.rs rename to geth-engine/src/domain/mod.rs diff --git a/geth-node/src/grpc/local.rs b/geth-engine/src/grpc/local.rs similarity index 100% rename from geth-node/src/grpc/local.rs rename to geth-engine/src/grpc/local.rs diff --git a/geth-node/src/grpc/mod.rs b/geth-engine/src/grpc/mod.rs similarity index 67% rename from geth-node/src/grpc/mod.rs rename to geth-engine/src/grpc/mod.rs index f0b574e..2cf79bc 100644 --- a/geth-node/src/grpc/mod.rs +++ b/geth-engine/src/grpc/mod.rs @@ -3,14 +3,19 @@ use tonic::transport::{self, Server}; use geth_common::generated::next::protocol::protocol_server::ProtocolServer; use geth_common::Client; +use crate::options::Options; + mod local; mod protocol; -pub async fn start_server(client: C) -> Result<(), transport::Error> +pub async fn start_server(options: Options, client: C) -> Result<(), transport::Error> where C: Client + Send + Sync + 'static, { - let addr = "127.0.0.1:2113".parse().unwrap(); + let addr = format!("{}:{}", options.host, options.port) + .parse() + .unwrap(); + let protocols = protocol::ProtocolImpl::new(client); tracing::info!("GethDB is listening on {}", addr); diff --git a/geth-node/src/grpc/protocol.rs b/geth-engine/src/grpc/protocol.rs similarity index 100% rename from geth-node/src/grpc/protocol.rs rename to geth-engine/src/grpc/protocol.rs diff --git a/geth-engine/src/lib.rs b/geth-engine/src/lib.rs new file mode 100644 index 0000000..ac8b6c1 --- /dev/null +++ b/geth-engine/src/lib.rs @@ -0,0 +1,47 @@ +use std::path::PathBuf; +use std::sync::Arc; + +use tokio::select; + +use geth_domain::{Lsm, LsmSettings}; +use geth_mikoshi::storage::FileSystemStorage; +use geth_mikoshi::wal::chunks::ChunkBasedWAL; +use geth_mikoshi::wal::WALRef; + +use crate::domain::index::Index; +pub use crate::options::Options; +use crate::process::{InternalClient, Processes}; + +mod bus; +mod domain; +mod grpc; +mod messages; +mod names; +mod options; +mod process; +mod services; + +pub async fn run(options: Options) -> eyre::Result<()> { + let storage = FileSystemStorage::new(PathBuf::from(&options.db))?; + let lsm = Lsm::load(LsmSettings::default(), storage.clone())?; + let index = Index::new(lsm); + let wal = WALRef::new(ChunkBasedWAL::load(storage.clone())?); + let processes = Processes::new(wal, index.clone()); + let sub_client = processes.subscriptions_client().clone(); + let client = Arc::new(InternalClient::new(processes)); + let services = services::start(client.clone(), index, sub_client); + + select! { + server = grpc::start_server(options, client) => { + if let Err(e) = server { + tracing::error!("GethDB node gRPC module crashed: {}", e); + } + } + + _ = services.exited() => { + tracing::info!("GethDB node terminated"); + } + } + + Ok(()) +} diff --git a/geth-node/src/messages.rs b/geth-engine/src/messages.rs similarity index 69% rename from geth-node/src/messages.rs rename to geth-engine/src/messages.rs index e10c4ec..e630bfc 100644 --- a/geth-node/src/messages.rs +++ b/geth-engine/src/messages.rs @@ -1,9 +1,7 @@ use eyre::Report; use uuid::Uuid; -use geth_common::{ - Direction, ExpectedRevision, Propose, Revision, WriteResult, WrongExpectedRevisionError, -}; +use geth_common::{Direction, ExpectedRevision, Propose, Revision}; use geth_mikoshi::MikoshiStream; #[derive(Debug)] @@ -23,7 +21,6 @@ pub enum ReadStreamCompleted { #[derive(Debug)] pub struct AppendStream { - pub correlation: Uuid, pub stream_name: String, pub events: Vec, pub expected: ExpectedRevision, @@ -35,28 +32,12 @@ pub struct DeleteStream { pub expected: ExpectedRevision, } -pub enum AppendStreamCompleted { - Success(WriteResult), - Failure(WrongExpectedRevisionError), - Unexpected(Report), - StreamDeleted, -} - -#[derive(Debug)] -pub enum DeleteStreamCompleted { - Success(WriteResult), - Failure(WrongExpectedRevisionError), - Unexpected(Report), -} - #[derive(Debug)] pub struct SubscribeTo { - pub correlation: Uuid, pub target: SubscriptionTarget, } pub struct SubscriptionConfirmed { - pub correlation: Uuid, pub outcome: SubscriptionRequestOutcome, } @@ -75,7 +56,6 @@ pub enum SubscriptionTarget { pub struct StreamTarget { pub parent: Option, pub stream_name: String, - pub starting: Revision, } #[derive(Debug)] diff --git a/geth-node/src/names.rs b/geth-engine/src/names.rs similarity index 100% rename from geth-node/src/names.rs rename to geth-engine/src/names.rs diff --git a/geth-engine/src/options.rs b/geth-engine/src/options.rs new file mode 100644 index 0000000..201bcc1 --- /dev/null +++ b/geth-engine/src/options.rs @@ -0,0 +1,19 @@ +use clap::Parser; + +#[derive(Parser, Debug, Clone)] +#[command(name = "geth-db")] +#[command(author, version, about, long_about = None)] +#[command(propagate_version = true)] +pub struct Options { + /// Host IP address. + #[arg(long, default_value = "127.0.0.1")] + pub host: String, + + /// Host Port. + #[arg(long, default_value = "2113")] + pub port: u16, + + // Data directory. + #[arg(long, default_value = "./geth")] + pub db: String, +} diff --git a/geth-node/src/process.rs b/geth-engine/src/process.rs similarity index 95% rename from geth-node/src/process.rs rename to geth-engine/src/process.rs index bee8aaf..b38fc4f 100644 --- a/geth-node/src/process.rs +++ b/geth-engine/src/process.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use geth_common::{ AppendStreamCompleted, Client, DeleteStreamCompleted, Direction, ExpectedRevision, GetProgramError, ProgramKillError, ProgramKilled, ProgramObtained, ProgramSummary, Propose, - Record, Revision, SubscriptionEvent, + Record, Revision, SubscriptionConfirmation, SubscriptionEvent, }; use geth_mikoshi::storage::Storage; use geth_mikoshi::wal::{WALRef, WriteAheadLog}; @@ -59,7 +59,6 @@ where let outcome = self .storage .append_stream(AppendStream { - correlation: Default::default(), stream_name: stream_id.to_string(), events: proposes, expected: expected_revision, @@ -124,33 +123,38 @@ where let (sender, recv) = oneshot::channel(); let outcome = self.subscriptions.subscribe(SubscribeMsg { payload: SubscribeTo { - correlation: Uuid::new_v4(), target: SubscriptionTarget::Stream(StreamTarget { parent: None, stream_name: stream_id.to_string(), - starting: start, }), }, mail: sender, }); + let stream_id = stream_id.to_string(); Box::pin(async_stream::try_stream! { outcome?; let resp = recv.await.map_err(|_| eyre::eyre!("Main bus has shutdown!"))?; let mut threshold = start.raw(); + let mut pre_read_events = false; if let Some(mut catchup_stream) = catchup_stream { while let Some(record) = catchup_stream.try_next().await? { + pre_read_events = true; let revision = record.revision; yield SubscriptionEvent::EventAppeared(record); threshold = revision; } + + yield SubscriptionEvent::CaughtUp; } match resp.outcome { SubscriptionRequestOutcome::Success(mut stream) => { + yield SubscriptionEvent::Confirmed(SubscriptionConfirmation::StreamName(stream_id)); + while let Some(record) = stream.next().await? { - if record.revision <= threshold { + if pre_read_events && record.revision <= threshold { continue; } @@ -172,7 +176,6 @@ where let (sender, recv) = oneshot::channel(); let outcome = self.subscriptions.subscribe(SubscribeMsg { payload: SubscribeTo { - correlation: Uuid::new_v4(), target: SubscriptionTarget::Process(ProcessTarget { id: Uuid::new_v4(), name: name.to_string(), diff --git a/geth-node/src/process/storage/mod.rs b/geth-engine/src/process/storage/mod.rs similarity index 100% rename from geth-node/src/process/storage/mod.rs rename to geth-engine/src/process/storage/mod.rs diff --git a/geth-node/src/process/storage/reader.rs b/geth-engine/src/process/storage/reader.rs similarity index 100% rename from geth-node/src/process/storage/reader.rs rename to geth-engine/src/process/storage/reader.rs diff --git a/geth-node/src/process/storage/writer.rs b/geth-engine/src/process/storage/writer.rs similarity index 100% rename from geth-node/src/process/storage/writer.rs rename to geth-engine/src/process/storage/writer.rs diff --git a/geth-node/src/process/subscriptions/mod.rs b/geth-engine/src/process/subscriptions/mod.rs similarity index 98% rename from geth-node/src/process/subscriptions/mod.rs rename to geth-engine/src/process/subscriptions/mod.rs index f9ef19a..547ea69 100644 --- a/geth-node/src/process/subscriptions/mod.rs +++ b/geth-engine/src/process/subscriptions/mod.rs @@ -140,7 +140,6 @@ async fn service(client: SubscriptionsClient, mut mailbox: mpsc::UnboundedReceiv }); let _ = msg.mail.send(SubscriptionConfirmed { - correlation: Uuid::new_v4(), outcome: SubscriptionRequestOutcome::Success(MikoshiStream::new(reader)), }); } @@ -169,7 +168,6 @@ async fn service(client: SubscriptionsClient, mut mailbox: mpsc::UnboundedReceiv ); let _ = msg.mail.send(SubscriptionConfirmed { - correlation: Uuid::new_v4(), outcome: SubscriptionRequestOutcome::Success(prog.stream), }); } @@ -182,7 +180,6 @@ async fn service(client: SubscriptionsClient, mut mailbox: mpsc::UnboundedReceiv ); let _ = msg.mail.send(SubscriptionConfirmed { - correlation: Uuid::new_v4(), outcome: SubscriptionRequestOutcome::Failure(eyre::eyre!( "Error when starting programmable subscription '{}': {}", opts.name, diff --git a/geth-node/src/process/subscriptions/programmable.rs b/geth-engine/src/process/subscriptions/programmable.rs similarity index 98% rename from geth-node/src/process/subscriptions/programmable.rs rename to geth-engine/src/process/subscriptions/programmable.rs index 7d20776..62f89c9 100644 --- a/geth-node/src/process/subscriptions/programmable.rs +++ b/geth-engine/src/process/subscriptions/programmable.rs @@ -12,7 +12,7 @@ use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::task::JoinHandle; use uuid::Uuid; -use geth_common::{Position, Record, Revision}; +use geth_common::{Position, Record}; use geth_mikoshi::MikoshiStream; use crate::bus::SubscribeMsg; @@ -274,11 +274,9 @@ pub fn spawn( let (mailbox, confirmed) = oneshot::channel(); let _ = client.subscribe(SubscribeMsg { payload: SubscribeTo { - correlation: Uuid::new_v4(), target: SubscriptionTarget::Stream(StreamTarget { parent: Some(id), stream_name: stream_name.clone(), - starting: Revision::Start, }), }, mail: mailbox, diff --git a/geth-node/src/services/index.rs b/geth-engine/src/services/index.rs similarity index 100% rename from geth-node/src/services/index.rs rename to geth-engine/src/services/index.rs diff --git a/geth-node/src/services/mod.rs b/geth-engine/src/services/mod.rs similarity index 100% rename from geth-node/src/services/mod.rs rename to geth-engine/src/services/mod.rs diff --git a/geth-node/Cargo.toml b/geth-node/Cargo.toml index 4d87e34..10d35ac 100644 --- a/geth-node/Cargo.toml +++ b/geth-node/Cargo.toml @@ -3,52 +3,15 @@ name = "geth-node" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies.geth-mikoshi] -path = "../geth-mikoshi" - -[dependencies.geth-common] -path = "../geth-common" - -[dependencies.geth-domain] -path = "../geth-domain" +[dependencies.geth-engine] +path = "../geth-engine" [dependencies.tokio] version = "1.20" features = ["full"] -[dependencies.uuid] -version = "1" -features = ["v4"] - -[dependencies.rand] -version = "0.8" -features = ["small_rng"] - -[dependencies.pyro-core] -git = "https://github.com/YoEight/pyro.git" - -[dependencies.pyro-runtime] -git = "https://github.com/YoEight/pyro.git" - [dependencies] -tonic = "0.10" -prost = "0.12" -prost-types = "0.12" -prost-derive = "0.12" +clap = "*" +eyre = "0.6" tracing = "0.1" tracing-subscriber = "0.3" -futures = "*" -bytes = "1" -eyre = "0.6" -async-stream = "0.3" -byteorder = "1" -chrono = "0.4" -serde_json = "1" -moka = "0.11" -flatbuffers = "23.5.26" -async-trait = "0.1.71" - -[build-dependencies] -tonic-build = "0.10" diff --git a/geth-node/src/main.rs b/geth-node/src/main.rs index e3034ea..2bda37f 100644 --- a/geth-node/src/main.rs +++ b/geth-node/src/main.rs @@ -1,26 +1,7 @@ -use std::path::PathBuf; -use std::sync::Arc; - -use tokio::select; +use clap::Parser; use tracing::Level; use tracing_subscriber::FmtSubscriber; -use geth_domain::{Lsm, LsmSettings}; -use geth_mikoshi::storage::FileSystemStorage; -use geth_mikoshi::wal::chunks::ChunkBasedWAL; -use geth_mikoshi::wal::WALRef; - -use crate::domain::index::Index; -use crate::process::{InternalClient, Processes}; - -mod bus; -mod domain; -mod grpc; -pub mod messages; -mod names; -mod process; -mod services; - #[tokio::main] async fn main() -> eyre::Result<()> { let subscriber = FmtSubscriber::builder() @@ -29,26 +10,7 @@ async fn main() -> eyre::Result<()> { tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); - let storage = FileSystemStorage::new(PathBuf::from("./geth"))?; - let lsm = Lsm::load(LsmSettings::default(), storage.clone())?; - let index = Index::new(lsm); - let wal = WALRef::new(ChunkBasedWAL::load(storage.clone())?); - let processes = Processes::new(wal, index.clone()); - let sub_client = processes.subscriptions_client().clone(); - let client = Arc::new(InternalClient::new(processes)); - let services = services::start(client.clone(), index, sub_client); - - select! { - server = grpc::start_server(client) => { - if let Err(e) = server { - tracing::error!("GethDB node gRPC module crashed: {}", e); - } - } - - _ = services.exited() => { - tracing::info!("GethDB node terminated"); - } - } + let options = geth_engine::Options::parse(); - Ok(()) + geth_engine::run(options).await }