Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentStream usage with tokio leads to ACCESS_VIOLATION #2851

Closed
inklesspen1rus opened this issue Apr 19, 2024 · 3 comments
Closed

ConcurrentStream usage with tokio leads to ACCESS_VIOLATION #2851

inklesspen1rus opened this issue Apr 19, 2024 · 3 comments

Comments

@inklesspen1rus
Copy link

Tried to use concurrent streams to sleep in parallel with tokio:

use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            tokio::time::sleep(Duration::from_secs(x as _)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

But sometimes I get crash:

> cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
     Running `target\debug\rstestss.exe`
error: process didn't exit successfully: `target\debug\rstestss.exe` (exit code: 0xc0000005, STATUS_ACCESS_VIOLATION)

Without "current_thread" flavor program just freeze

Other runtimes work fine:
async_std

use core::time::Duration;
use async_std;
use futures_concurrency::prelude::*;
use futures::prelude::*;

#[async_std::main]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            async_std::task::sleep(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

smol

use core::time::Duration;
use futures_concurrency::prelude::*;
use futures::prelude::*;

async fn main_async() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            smol::Timer::after(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

fn main() {
    smol::block_on(main_async());
}

Also tokio runtime with smol Timer works fine:

use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            // tokio::time::sleep(Duration::from_secs(x as _)).await;
            smol::Timer::after(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

Is that Tokio issue?

@taiki-e
Copy link
Member

taiki-e commented Apr 19, 2024

I think it is a tokio or futures_concurrency issue, since none of the APIs in the crate included in this repository you are using have unsafe code (iter, chain, for_each).

@taiki-e taiki-e closed this as not planned Won't fix, can't repro, duplicate, stale Apr 19, 2024
@inklesspen1rus
Copy link
Author

Oh, really, sorry
I thought that's futures-concurrency repository 😅

@taiki-e
Copy link
Member

taiki-e commented Apr 19, 2024

I have never used futures-concurrency, and I don't intend to review everything of it, but it has bunch of very suspicious unsafe codes even at a cursory glance: This would be unsound when vec, the storage of slab, is reallocated. This probably has the same problem as tokio-rs/tokio#2612. The heavy use of ManuallyDrop around Pin API reminds me of async-rs/async-std#903. etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants