Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected behaviors around the use of Pull #2717

Closed
LLCampos opened this issue Nov 22, 2021 · 8 comments · Fixed by #2745 or #2763
Closed

Unexpected behaviors around the use of Pull #2717

LLCampos opened this issue Nov 22, 2021 · 8 comments · Fixed by #2745 or #2763
Labels

Comments

@LLCampos
Copy link
Contributor

I'm not even sure how to describe the problem. Please feel free to change the title to something more appropriate.

  "org.typelevel" %% "cats-effect" % "3.2.9",
  "co.fs2" %% "fs2-core" % "3.2.2",
import cats.effect.{IO, IOApp}
import fs2.Pull.StreamPullOps
import fs2.Stream

object DebugFs2Issue extends IOApp.Simple {

  val run: IO[Unit] = for {
    _ <- problematicStream
      .flatMap(ev => Stream.eval(IO.delay { println(s"Printing: $ev") }))
      .map(s => s) // <- THIS MAP
      .compile
      .drain
  } yield ()

  def problematicStream[B, O]: Stream[IO, (String, Int)] = {
    val stream = Stream
      .emit(1)
      .map(Left(_)) ++ Stream.emit(Right("emitted")).covary[IO]
    stream.pull.uncons1.flatMap { case Some((Left(_), s)) =>
      s.collect { case Right(b) => b }.through(failurePipe).pull.echo
    }.stream
  }

  def failurePipe: Stream[IO, String] => Stream[IO, (String, Int)] =
    _.zipWith(Stream.emit(2))((_, _))

  def successPipe: Stream[IO, String] => Stream[IO, (String, Int)] =
    _.map(v => (v, 2))
}
  1. If you run things as above, nothing will be printed.
  2. From 1), if you remove the line marked as <- THIS MAP, "Printing: (emitted,2)" is printed. This is unexpected.
  3. From 1), if you use successPipe instead of failurePipe, "Printing: (emitted,2)" is also printed. This is also unexpected, since I would predict that _.zipWith(Stream.emit(2))((_, _)) is equivalent to _.map(v => (v, 2)).
  4. From 1), if you replace .emit(1).map(Left(_)) with .emit(Left(1)), "Printing: (emitted,2)" is printed. I would expect those two commands to be equivalent.
@LLCampos LLCampos added the bug label Nov 22, 2021
@nikiforo
Copy link
Contributor

nikiforo commented Nov 22, 2021

This might be a minimization. Result changes if THIS MAP is commented, or other suggested 3, 4 changes are applied. expected Result List((2,2)), actual Result List()

  def run: IO[Unit] = runIssue2717

  def runIssue2717 =
    problematicStream.covary[IO]
      .map(s => s) // <- THIS MAP
      .compile
      .toList
      .flatMap(list => IO.println(s"Result $list"))

  def stream = Stream(1).as(1) ++ Stream(2)

  def problematicStream =
    stream.pull.uncons1.flatMap { case Some((_, s)) =>
      s.zipWith(Stream(2))((_, _)).pull.echo
    }.stream

@nikiforo
Copy link
Contributor

28c795b is the first bad commit
commit 28c795b
Author: Michael Pilquist mpilquist@gmail.com
Date: Mon Aug 23 22:16:50 2021 -0400

Fix failing resourceWeak test

:040000 040000 4acd111b77f5588fc68b4f5d89bddf647ebbac14 0dddc9c75af3ecad9cb512d8234e31f8cf4a253f M core

@nikiforo
Copy link
Contributor

nikiforo commented Nov 22, 2021

That's not really a minimisation, I've just changed values and elements in

  test("issue-2717 - unexpected behavior of Pull") {
    val stream = Stream(1).as(1) ++ Stream(2)
    val zippedPull =
      stream.pull.uncons1.flatMap { case Some((_, s)) =>
        s.zipWith(Stream(3))(Tuple2.apply).pull.echo
      }
    val actual = zippedPull.stream.covary[IO].map(identity).compile.toList
    actual.assertEquals(List((2, 3)))
  }

@diesalbla
Copy link
Contributor

diesalbla commented Nov 26, 2021

I have been spending a bit of time on this curious example. Here is another aspect of it: when you look

import cats.effect.{IO, IOApp, SyncIO}
import fs2.Pull.StreamPullOps
import fs2.Stream

object DebugFs2Issue extends IOApp.Simple {

  val stream = Stream(1).as(1) ++ Stream(2)

  val unconsed: Pull[Pure, INothing, Option[(Int, Stream[Pure, Int])]] =
    stream.pull.uncons1

  val problematicStream: Pull[Pure, (Int, Int), Unit] =
    unconsed.flatMap { case Some((_, s)) =>
      s.zipWith(Stream(2))((_, _)).pull.echo
    }

  def run: IO[Unit] = {
    val listF = problematicStream.stream
      .map(s => s) // <- THIS MAP
      .covary[SyncIO] // A
      //.covary[IO]  // B 
      .compile
      .toList
    
    IO(listF.flatMap { list => SyncIO( println(s"Result $list")) }.unsafeRunSync()) // A
    //listF.flatMap(list => IO.println(s"Result $list"))  // B
  }

}

Without removing the map line (THIS MAP), the program changes when compiled in SyncIO vs IO:

  • Option A, running the stream on SyncIO, if we comment the B lines, the program prints Result List((2,2)).
  • In Option B, running the stream on IO, and commenting the A lines, the program now switches to print Result List().

@armanbilge
Copy link
Member

Just to clarify, it seems the issue is not IO vs SyncIO but the concurrent vs sync compilers. You can see this by using the Sync compiler for IO:

      .compile(Compiler.target(Compiler.Target.forSync))

This will give Result List((2,2)).

@armanbilge
Copy link
Member

Changing the interruptContext in the ConcurrentCompiler to return None will also give Result List((2,2)).

private[fs2] def interruptContext(root: Unique.Token): Option[F[InterruptContext[F]]] = Some(
InterruptContext(root, F.unit)
)

@mpilquist
Copy link
Member

I added a bunch of debug statements to the interpreter. For some reason, the interpreter thinks the scope which emits 2 is interrupted here:

case inter @ Interrupted(_, _) => CanceledScope(scopeId, inter)

I'm not sure why it thinks that.

@mpilquist
Copy link
Member

mpilquist commented Dec 3, 2021

Sample interpreter trace, which shows that a child scope is getting canceled/interrupted:

inScope
creating child scope cats.effect.kernel.Unique$Token@23993b6a from cats.effect.kernel.Unique$Token@57af8ba
uncons
inScope
creating child scope cats.effect.kernel.Unique$Token@3345c587 from cats.effect.kernel.Unique$Token@23993b6a
creating new child scope for cats.effect.kernel.Unique$Token@3345c587
uncons
inScope
creating child scope cats.effect.kernel.Unique$Token@7c4df96d from cats.effect.kernel.Unique$Token@3345c587
creating new child scope for cats.effect.kernel.Unique$Token@7c4df96d
uncons
inScope
creating child scope cats.effect.kernel.Unique$Token@64aa6b5b from cats.effect.kernel.Unique$Token@7c4df96d
uncons
inScope
creating child scope cats.effect.kernel.Unique$Token@6e7a4b36 from cats.effect.kernel.Unique$Token@64aa6b5b
inScope
creating child scope cats.effect.kernel.Unique$Token@4189c186 from cats.effect.kernel.Unique$Token@6e7a4b36
creating new child scope for cats.effect.kernel.Unique$Token@4189c186
uncons
output: Chunk(1)
output: Chunk(1)
inScope
creating child scope cats.effect.kernel.Unique$Token@4111bee5 from cats.effect.kernel.Unique$Token@4189c186
step leg from Scope(id=cats.effect.kernel.Unique$Token@4111bee5,interruptible=true) to cats.effect.kernel.Unique$Token@4111bee5
uncons
succeeded: Succeeded(())
closeScope SucceedScope(cats.effect.kernel.Unique$Token@4189c186)
parentCancel invoked on ictx - propagating canceled to cats.effect.kernel.Unique$Token@4189c186
InterruptContext#complete: Canceled()
closeScope SucceedScope(cats.effect.kernel.Unique$Token@6e7a4b36)
output: Chunk(2)
interruptGuard scope: cats.effect.kernel.Unique$Token@4111bee5 outcome: Canceled()
goInScope / inter cats.effect.kernel.Unique$Token@4111bee5
closeScope CanceledScope(cats.effect.kernel.Unique$Token@4111bee5,Interrupted(cats.effect.kernel.Unique$Token@4111bee5,None))
succeeded: Succeeded(())
closeScope SucceedScope(cats.effect.kernel.Unique$Token@64aa6b5b)
succeeded: Succeeded(())
succeeded: Succeeded(())
closeScope SucceedScope(cats.effect.kernel.Unique$Token@7c4df96d)
parentCancel invoked on ictx - propagating canceled to cats.effect.kernel.Unique$Token@7c4df96d
succeeded: Succeeded(())
succeeded: Succeeded(())
InterruptContext#complete: Canceled()
closeScope SucceedScope(cats.effect.kernel.Unique$Token@3345c587)
parentCancel invoked on ictx - propagating canceled to cats.effect.kernel.Unique$Token@3345c587
InterruptContext#complete: Canceled()
succeeded: Succeeded(())
succeeded: Succeeded(())
closeScope SucceedScope(cats.effect.kernel.Unique$Token@23993b6a)
succeeded: Succeeded(())
succeeded: Succeeded(())

The interruption is occurring here: https://github.com/typelevel/fs2/blob/main/core/shared/src/main/scala/fs2/internal/InterruptContext.scala#L78-L79

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants