Skip to content

Commit

Permalink
Merge the resp3 branch. (redis-rs#1058)
Browse files Browse the repository at this point in the history
* Initial implementation of RESP3 (redis-rs#757)

These changes implement all RESP3 types (excluding streamed types). 
RESP3 can be enabled per connection by adding `?resp3=true` to 
connection uri. It currently supports PubSub as RESP2 PubSub support 
in library, but in future PRs it'll support handling normal commands 
and PubSub messages in one connection. Only `num-bigint` is added 
as dependency to support `BigNumber` type.

Changes made in support of redis-rs#329 and redis-rs#749

* Add RESP3 support to cluster connections. (redis-rs#1001)

* Resp3 Push Management (redis-rs#898)

* squash!

* oops!

* test invalidation in cluster && introduce client tracking options

* introduce basic PubSub functionality to MultiplexedConnection and make tokio sender unbounded channel

* fix tests & linter, make PushSender::Tokio as aio feature only

* add resp3 to ci branches and fix cluster client tracking option

* test dropping and update ci yml

* remove unsubscribe fn and introduce unsubscribing by dropping receiver.

* fix tests because RedisJson returns responses in an array when it's resp3

* override redisjson cache (it's a temp solution)

* add -skip test_module to RESP3 testing and upgrade redis 6.2.4 to 6.2.13

* test json modules with RESP3 and get json fix from main

* in redis v7 RedisJson is different with Resp3 🤔

* Implement Pub/Sub in Async Cluster & fmt & remove usage of is_err_and(stable only after v1.70)

* don't use sharded pub/sub with redis v6

* use REDIS_VERSION in env instead of using HELLO command to fetch redis version

* oops

* fix linter

* fix fmt and remove benchmark from CI

* simplify PushManager and add tokio as non-optional dependency.

* get fixes from 220c6a9

* use --test-threads=1

* override redisjson cache (it's a temp solution)

* remove get_push_manager from traits & remove push manager from aio::Connection

* remove client_tracking_options

* remove 0.21.x from rust.yml

* add tests for pushmanager

* format & move push_info into a variable

* change tests according to comments.

* apply 6.2.4 changes && fmt

* try to fix

* remove con_addr & remove pub/sub support in cluster connections

* add disconnection handling to sync, mpx, cm && test it

* remove push_manager argument from connection creation

* better docs

* add has_reply function to PushKind

* remove no response command support in mpx since it's not used in mpx pub/sub

* apply changes from redis-rs#994

* fix tests

* Use enum instead of boolean for RESP version. (redis-rs#1012)

Since there's a discussion starting about what might become RESP4, this
PR will make it easier to add more RESP versions in the future.

* Rename Value enum types in order to match Redis RESP names. (redis-rs#779)

* Rename Value::Bulk to Value::Array.

* Rename Value::Status to Value::SimpleString.

* Rename Value::Data to Value::BulkString.

* Fix debug names of values.

* fix nightly comments.

* reintroduce client tracking to tests.

* fix merge errors.

---------

Co-authored-by: Altan Özlü <5479094+altanozlu@users.noreply.github.com>
  • Loading branch information
nihohit and altanozlu authored Mar 12, 2024
1 parent 8801b61 commit 30d5730
Show file tree
Hide file tree
Showing 39 changed files with 2,577 additions and 601 deletions.
16 changes: 9 additions & 7 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Rust

on:
push:
branches: [ main, 0.21.x ]
branches: [ main, 0.x.x ]
pull_request:
branches: [ main, 0.21.x ]
branches: [ main, 0.x.x ]

env:
CARGO_TERM_COLOR: always
Expand All @@ -19,7 +19,7 @@ jobs:
fail-fast: false
matrix:
redis:
- 6.2.4
- 6.2.13
- 7.2.0
rust:
- stable
Expand Down Expand Up @@ -73,7 +73,7 @@ jobs:
run: make test

- name: Checkout RedisJSON
if: steps.cache-redisjson.outputs.cache-hit != 'true' && matrix.redis != '6.2.4'
if: steps.cache-redisjson.outputs.cache-hit != 'true' && matrix.redis != '6.2.13'
uses: actions/checkout@v4
with:
repository: "RedisJSON/RedisJSON"
Expand All @@ -94,7 +94,7 @@ jobs:
# This shouldn't cause issues in the future so long as no profiles or patches
# are applied to the workspace Cargo.toml file
- name: Compile RedisJSON
if: steps.cache-redisjson.outputs.cache-hit != 'true' && matrix.redis != '6.2.4'
if: steps.cache-redisjson.outputs.cache-hit != 'true' && matrix.redis != '6.2.13'
run: |
cp ./Cargo.toml ./Cargo.toml.actual
echo $'\nexclude = [\"./__ci/redis-json\"]' >> Cargo.toml
Expand All @@ -104,8 +104,10 @@ jobs:
rm -rf ./__ci/redis-json
- name: Run module-specific tests
if: matrix.redis != '6.2.4'
if: matrix.redis != '6.2.13'
run: make test-module
env:
REDIS_VERSION: ${{ matrix.redis }}

- name: Check features
run: |
Expand Down Expand Up @@ -183,4 +185,4 @@ jobs:
git fetch
git checkout ${{ github.base_ref }}
cargo bench --all-features -- --measurement-time 15 --save-baseline base
critcmp base changes
critcmp base changes
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ test:
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test --locked -p redis --no-default-features -- --nocapture --test-threads=1

@echo "===================================================================="
@echo "Testing Connection Type TCP with all features"
@echo "Testing Connection Type TCP with all features and RESP2"
@echo "===================================================================="
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test --locked -p redis --all-features -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing Connection Type TCP with all features and RESP3"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp PROTOCOL=RESP3 cargo test -p redis --all-features -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing Connection Type TCP with all features and Rustls support"
@echo "===================================================================="
Expand Down Expand Up @@ -55,10 +60,15 @@ test:

test-module:
@echo "===================================================================="
@echo "Testing with module support enabled (currently only RedisJSON)"
@echo "Testing RESP2 with module support enabled (currently only RedisJSON)"
@echo "===================================================================="
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test --locked --all-features test_module -- --test-threads=1

@echo "===================================================================="
@echo "Testing RESP3 with module support enabled (currently only RedisJSON)"
@echo "===================================================================="
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 RESP3=true cargo test --all-features test_module -- --test-threads=1

test-single: test

bench:
Expand Down
14 changes: 7 additions & 7 deletions redis-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,26 @@ pub trait IntoRedisValue {

impl IntoRedisValue for String {
fn into_redis_value(self) -> Value {
Value::Data(self.as_bytes().to_vec())
Value::BulkString(self.as_bytes().to_vec())
}
}

impl IntoRedisValue for &str {
fn into_redis_value(self) -> Value {
Value::Data(self.as_bytes().to_vec())
Value::BulkString(self.as_bytes().to_vec())
}
}

#[cfg(feature = "bytes")]
impl IntoRedisValue for bytes::Bytes {
fn into_redis_value(self) -> Value {
Value::Data(self.to_vec())
Value::BulkString(self.to_vec())
}
}

impl IntoRedisValue for Vec<u8> {
fn into_redis_value(self) -> Value {
Value::Data(self)
Value::BulkString(self)
}
}

Expand Down Expand Up @@ -310,7 +310,7 @@ mod tests {
cmd("SET").arg("bar").arg("foo").execute(&mut conn);
assert_eq!(
cmd("GET").arg("bar").query(&mut conn),
Ok(Value::Data(b"foo".as_ref().into()))
Ok(Value::BulkString(b"foo".as_ref().into()))
);
}

Expand Down Expand Up @@ -401,10 +401,10 @@ mod tests {
fn pipeline_atomic_test() {
let mut conn = MockRedisConnection::new(vec![MockCmd::with_values(
pipe().atomic().cmd("GET").arg("foo").cmd("GET").arg("bar"),
Ok(vec![Value::Bulk(
Ok(vec![Value::Array(
vec!["hello", "world"]
.into_iter()
.map(|x| Value::Data(x.as_bytes().into()))
.map(|x| Value::BulkString(x.as_bytes().into()))
.collect(),
)]),
)]);
Expand Down
14 changes: 7 additions & 7 deletions redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ bytes = { version = "1", optional = true }
futures-util = { version = "0.3.15", default-features = false, optional = true }
pin-project-lite = { version = "0.2", optional = true }
tokio-util = { version = "0.7", optional = true }
tokio = { version = "1", features = ["rt", "net", "time"], optional = true }
tokio = { version = "1", features = ["rt", "net", "time", "sync"] }
socket2 = { version = "0.5", default-features = false, optional = true }

# Only needed for the connection manager
arc-swap = { version = "1.1.0", optional = true }
arc-swap = { version = "1.1.0" }
futures = { version = "0.3.3", optional = true }
tokio-retry = { version = "0.3.0", optional = true }

Expand Down Expand Up @@ -80,7 +80,7 @@ serde_json = { version = "1.0.82", optional = true }
# Only needed for bignum Support
rust_decimal = { version = "1.33.1", optional = true }
bigdecimal = { version = "0.4.2", optional = true }
num-bigint = { version = "0.4.4", optional = true }
num-bigint = "0.4.4"

# Optional aHash support
ahash = { version = "0.8.6", optional = true }
Expand All @@ -93,7 +93,7 @@ uuid = { version = "1.6.1", optional = true }
[features]
default = ["acl", "streams", "geospatial", "script", "keep-alive"]
acl = []
aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/alloc", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "tokio/sync", "combine/tokio", "async-trait"]
aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/alloc", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "combine/tokio", "async-trait"]
geospatial = []
json = ["serde", "serde/derive", "serde_json"]
cluster = ["crc16", "rand"]
Expand All @@ -105,18 +105,18 @@ tls-rustls-webpki-roots = ["tls-rustls", "webpki-roots"]
async-std-comp = ["aio", "async-std"]
async-std-native-tls-comp = ["async-std-comp", "async-native-tls", "tls-native-tls"]
async-std-rustls-comp = ["async-std-comp", "futures-rustls", "tls-rustls"]
tokio-comp = ["aio", "tokio", "tokio/net"]
tokio-comp = ["aio", "tokio/net"]
tokio-native-tls-comp = ["tokio-comp", "tls-native-tls", "tokio-native-tls"]
tokio-rustls-comp = ["tokio-comp", "tls-rustls", "tokio-rustls"]
connection-manager = ["arc-swap", "futures", "aio", "tokio-retry"]
connection-manager = ["futures", "aio", "tokio-retry"]
streams = []
cluster-async = ["cluster", "futures", "futures-util", "log"]
keep-alive = ["socket2"]
sentinel = ["rand"]
tcp_nodelay = []
rust_decimal = ["dep:rust_decimal"]
bigdecimal = ["dep:bigdecimal"]
num-bigint = ["dep:num-bigint"]
num-bigint = []
uuid = ["dep:uuid"]
disable-client-setinfo = []

Expand Down
8 changes: 4 additions & 4 deletions redis/benches/bench_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,12 @@ fn bench_decode_simple(b: &mut Bencher, input: &[u8]) {
b.iter(|| redis::parse_redis_value(input).unwrap());
}
fn bench_decode(c: &mut Criterion) {
let value = Value::Bulk(vec![
let value = Value::Array(vec![
Value::Okay,
Value::Status("testing".to_string()),
Value::Bulk(vec![]),
Value::SimpleString("testing".to_string()),
Value::Array(vec![]),
Value::Nil,
Value::Data(vec![b'a'; 10]),
Value::BulkString(vec![b'a'; 10]),
Value::Int(7512182390),
]);

Expand Down
2 changes: 1 addition & 1 deletion redis/examples/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ fn read_records(client: &redis::Client) -> RedisResult<()> {
for StreamId { id, map } in ids {
println!("\tID {id}");
for (n, s) in map {
if let Value::Data(bytes) = s {
if let Value::BulkString(bytes) = s {
println!("\t\t{}: {}", n, String::from_utf8(bytes).expect("utf8"))
} else {
panic!("Weird data")
Expand Down
30 changes: 15 additions & 15 deletions redis/src/acl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ impl FromRedisValue for AclInfo {
let flags = flags
.as_sequence()
.ok_or_else(|| {
not_convertible_error!(flags, "Expect a bulk response of ACL flags")
not_convertible_error!(flags, "Expect an array response of ACL flags")
})?
.iter()
.map(|flag| match flag {
Value::Data(flag) => match flag.as_slice() {
Value::BulkString(flag) => match flag.as_slice() {
b"on" => Ok(Rule::On),
b"off" => Ok(Rule::Off),
b"allkeys" => Ok(Rule::AllKeys),
Expand All @@ -181,14 +181,14 @@ impl FromRedisValue for AclInfo {
let passwords = passwords
.as_sequence()
.ok_or_else(|| {
not_convertible_error!(flags, "Expect a bulk response of ACL flags")
not_convertible_error!(flags, "Expect an array response of ACL flags")
})?
.iter()
.map(|pass| Ok(Rule::AddHashedPass(String::from_redis_value(pass)?)))
.collect::<RedisResult<_>>()?;

let commands = match commands {
Value::Data(cmd) => std::str::from_utf8(cmd)?,
Value::BulkString(cmd) => std::str::from_utf8(cmd)?,
_ => {
return Err(not_convertible_error!(
commands,
Expand Down Expand Up @@ -281,18 +281,18 @@ mod tests {

#[test]
fn test_from_redis_value() {
let redis_value = Value::Bulk(vec![
Value::Data("flags".into()),
Value::Bulk(vec![
Value::Data("on".into()),
Value::Data("allchannels".into()),
let redis_value = Value::Array(vec![
Value::BulkString("flags".into()),
Value::Array(vec![
Value::BulkString("on".into()),
Value::BulkString("allchannels".into()),
]),
Value::Data("passwords".into()),
Value::Bulk(vec![]),
Value::Data("commands".into()),
Value::Data("-@all +get".into()),
Value::Data("keys".into()),
Value::Bulk(vec![Value::Data("pat:*".into())]),
Value::BulkString("passwords".into()),
Value::Array(vec![]),
Value::BulkString("commands".into()),
Value::BulkString("-@all +get".into()),
Value::BulkString("keys".into()),
Value::Array(vec![Value::BulkString("pat:*".into())]),
]);
let acl_info = AclInfo::from_redis_value(&redis_value).expect("Parse successfully");

Expand Down
Loading

0 comments on commit 30d5730

Please sign in to comment.