Skip to content

Commit

Permalink
Fixed #1055
Browse files Browse the repository at this point in the history
Changed how automatic scope creation is done. Instead of inserting
scopes on the Pull => Stream boundary, they are now inserted on ++.

Introduced `appendWithoutScope` for cases in which scope insertion at
append time would be prohbitively expensive.
  • Loading branch information
mpilquist committed Jan 8, 2018
1 parent 434dc26 commit 146e8fe
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 37 deletions.
1 change: 1 addition & 0 deletions core/jvm/src/test/scala/fs2/PipeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ class PipeSpec extends Fs2Spec {
Stream
.range(0, 100)
.map(i => (i, i))
.scope
.through(first(_.map(_ + 1).take(5)))
.toList shouldBe List((1, 0), (2, 1), (3, 2), (4, 3), (5, 4))
}
Expand Down
5 changes: 4 additions & 1 deletion core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ class ResourceSafetySpec extends Fs2Spec with EventuallySupport {
_ => IO { c.incrementAndGet; throw Err })
val nested = s0.foldRight(innermost)((i, inner) => bracket(c)(Stream.emit(i) ++ inner))
try { runLog { nested }; throw Err } // this test should always fail, so the `run` should throw
catch { case Err => () }
catch {
case Err => ()
case e: CompositeFailure if e.all.forall(_.isInstanceOf[Err.type]) => ()
}
withClue(f.tag) { 0L shouldBe c.get }
}

Expand Down
10 changes: 7 additions & 3 deletions core/shared/src/main/scala/fs2/CompositeFailure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import cats.data.NonEmptyList

/** Represents multiple (>1) exceptions were thrown. */
final class CompositeFailure(
head: Throwable,
tail: NonEmptyList[Throwable]
val head: Throwable,
val tail: NonEmptyList[Throwable]
) extends Throwable(
s"Multiple exceptions were thrown (${1 + tail.size}), first ${head.getClass.getName}: ${head.getMessage}",
head)
head) {

/** Gets all causes (guaranteed to have at least 2 elements). */
def all: NonEmptyList[Throwable] = head :: tail
}

object CompositeFailure {
def apply(first: Throwable, second: Throwable, rest: List[Throwable]): CompositeFailure =
Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/fs2/Pipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object Pipe {
.eval[Read, Option[Segment[I, Unit]]](FreeC.Eval(identity))
.flatMap {
case None => Stream.empty
case Some(segment) => Stream.segment(segment).append(prompts)
case Some(segment) => Stream.segment(segment).appendWithoutScope(prompts)
}

// Steps `s` without overhead of resource tracking
Expand All @@ -29,7 +29,7 @@ object Pipe {
case Some((hd, tl)) => Pull.output1((hd, tl))
case None => Pull.done
}
.streamNoScope
.stream
.compile
.last

Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/fs2/Pipe2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object Pipe2 {
def prompts[X](id: ReadSegment[Option[Segment[X, Unit]]]): Stream[Read, X] =
Stream.eval[Read, Option[Segment[X, Unit]]](FreeC.Eval(id)).flatMap {
case None => Stream.empty
case Some(segment) => Stream.segment(segment).append(prompts(id))
case Some(segment) => Stream.segment(segment).appendWithoutScope(prompts(id))
}
def promptsL: Stream[Read, I] = prompts[I](Left(identity))
def promptsR: Stream[Read, I2] = prompts[I2](Right(identity))
Expand All @@ -26,7 +26,7 @@ object Pipe2 {
case Some((hd, tl)) => Pull.output1((hd, tl))
case None => Pull.done
}
.streamNoScope
.stream
.compile
.last

Expand Down
10 changes: 1 addition & 9 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,7 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing

/** Interpret this `Pull` to produce a `Stream`. The result type `R` is discarded. */
def stream: Stream[F, O] =
Stream.fromFreeC(this.scope.get[F, O, Unit])

/**
* Like [[stream]] but no scope is inserted around the pull, resulting in any resources being
* promoted to the parent scope of the stream, extending the resource lifetime. Typically used
* as a performance optimization, where resource lifetime can be extended in exchange for faster
* execution.
*/
def streamNoScope: Stream[F, O] = Stream.fromFreeC(get[F, O, R].map(_ => ()))
Stream.fromFreeC(this.get[F, O, R].map(_ => ()))

/** Applies the resource of this pull to `f` and returns the result. */
def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
Expand Down
25 changes: 17 additions & 8 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*/
def chunks: Stream[F, Chunk[O]] =
this.repeatPull(_.unconsChunk.flatMap {
case None => Pull.pure(None);
case None => Pull.pure(None)
case Some((hd, tl)) => Pull.output1(hd).as(Some(tl))
})

Expand Down Expand Up @@ -873,12 +873,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
/**
* Tracks any resources acquired during this stream and releases them when the stream completes.
*
* Scopes are typically inserted automatically, at the boundary of a pull (i.e., when a pull
* is converted to a stream). This method allows a scope to be explicitly demarcated, so that
* resources can be freed earlier than when using automatically inserted scopes.
*
* One use case is scoping the left hand side of an append: `(s1.scope ++ s2)`, which ensures
* resources acquired during `s1` are released onces the end of `s1` has been passed.
* Scopes are typically inserted automatically, in between stream appends (i.e., in `s1 ++ s2`,
* a scope is inserted around `s1` when appending) This method allows a scope to be explicitly
* demarcated, so that resources can be freed earlier than when using automatically inserted scopes.
*/
def scope: Stream[F, O] = Stream.fromFreeC(Algebra.scope(get))

Expand Down Expand Up @@ -1429,7 +1426,10 @@ object Stream {
def iterateEval[F[_], A](start: A)(f: A => F[A]): Stream[F, A] =
emit(start) ++ eval(f(start)).flatMap(iterateEval(_)(f))

/** Allows to get current scope during evaluation of the stream **/
/**
* Gets the current scope, allowing manual leasing or interruption.
* This is a low-level method and generally should not be used by user code.
*/
def getScope[F[_]]: Stream[F, Scope[F]] =
Stream.fromFreeC(Algebra.getScope[F, Scope[F]].flatMap(Algebra.output1(_)))

Expand Down Expand Up @@ -1589,6 +1589,12 @@ object Stream {

/** Appends `s2` to the end of this stream. Alias for `s1 ++ s2`. */
def append[O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] =
fromFreeC(self.scope.get[F, O2].flatMap { _ =>
s2.get
})

/** Appends `s2` to the end of this stream without introducing a new scope around this stream. */
def appendWithoutScope[O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] =
fromFreeC(self.get[F, O2].flatMap { _ =>
s2.get
})
Expand Down Expand Up @@ -2597,6 +2603,9 @@ object Stream {
def append[F[_], O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] =
covary[F].append(s2)

def appendWithoutScope[F[_], O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] =
covary[F].appendWithoutScope(s2)

def concurrently[F[_], O2](that: Stream[F, O2])(implicit F: Effect[F],
ec: ExecutionContext): Stream[F, O] =
covary[F].concurrently(that)
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/internal/Algebra.scala
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private[fs2] object Algebra {
case gs: GetScope[F, O2] => gs.asInstanceOf[Algebra[G, O2, X]]
}
}
fr.translate[Algebra[G, O, ?]](algFtoG)
FreeC.suspend(fr.translate[Algebra[G, O, ?]](algFtoG))
}

}
2 changes: 1 addition & 1 deletion docs/ReadmeExample.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ There are a number of ways of interpreting the stream. In this case, we call `co

```scala
scala> val task: IO[Unit] = written.compile.drain
task: cats.effect.IO[Unit] = IO$129936443
task: cats.effect.IO[Unit] = IO$1552023456
```

We still haven't *done* anything yet. Effects only occur when we run the resulting task. We can run a `IO` by calling `unsafeRunSync()` -- the name is telling us that calling it performs effects and hence, it is not referentially transparent.
Expand Down
18 changes: 9 additions & 9 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ val eff = Stream.eval(IO { println("TASK BEING RUN!!"); 1 + 1 })
// eff: fs2.Stream[cats.effect.IO,Int] = Stream(..)

val ra = eff.compile.toVector // gather all output into a Vector
// ra: cats.effect.IO[Vector[Int]] = IO$1874296212
// ra: cats.effect.IO[Vector[Int]] = IO$2094280090

val rb = eff.compile.drain // purely for effects
// rb: cats.effect.IO[Unit] = IO$2100089851
// rb: cats.effect.IO[Unit] = IO$1715372649

val rc = eff.compile.fold(0)(_ + _) // run and accumulate some result
// rc: cats.effect.IO[Int] = IO$440495402
// rc: cats.effect.IO[Int] = IO$767519369
```

Notice these all return a `IO` of some sort, but this process of compilation doesn't actually _perform_ any of the effects (nothing gets printed).
Expand Down Expand Up @@ -274,10 +274,10 @@ scala> val count = new java.util.concurrent.atomic.AtomicLong(0)
count: java.util.concurrent.atomic.AtomicLong = 0

scala> val acquire = IO { println("incremented: " + count.incrementAndGet); () }
acquire: cats.effect.IO[Unit] = IO$477876869
acquire: cats.effect.IO[Unit] = IO$350659135

scala> val release = IO { println("decremented: " + count.decrementAndGet); () }
release: cats.effect.IO[Unit] = IO$1801684415
release: cats.effect.IO[Unit] = IO$1048497406
```

```scala
Expand Down Expand Up @@ -436,7 +436,7 @@ scala> s2.toList
res33: List[Int] = List(1, 2)
```

FS2 takes care to guarantee that any resources allocated by the `Pull` are released when the `.stream` completes. Note again that _nothing happens_ when we call `.stream` on a `Pull`, it is merely establishing a scope in which all resource allocations are tracked so that they may be appropriately freed.
FS2 takes care to guarantee that any resources allocated by the `Pull` are released when the stream completes. Note again that _nothing happens_ when we call `.stream` on a `Pull`, it is merely converting back to the `Stream` API.

There are lots of useful transformation functions in [`Stream`](../core/shared/src/main/scala/fs2/Stream.scala) built using the `Pull` type.

Expand Down Expand Up @@ -554,7 +554,7 @@ import cats.effect.Sync
// import cats.effect.Sync

val T = Sync[IO]
// T: cats.effect.Sync[cats.effect.IO] = cats.effect.IOInstances$$anon$1@3f28af56
// T: cats.effect.Sync[cats.effect.IO] = cats.effect.IOInstances$$anon$1@698f7fb0

val s = Stream.eval_(T.delay { destroyUniverse() }) ++ Stream("...moving on")
// s: fs2.Stream[cats.effect.IO,String] = Stream(..)
Expand Down Expand Up @@ -611,12 +611,12 @@ val c = new Connection {

// Effect extends both Sync and Async
val T = cats.effect.Effect[IO]
// T: cats.effect.Effect[cats.effect.IO] = cats.effect.IOInstances$$anon$1@3f28af56
// T: cats.effect.Effect[cats.effect.IO] = cats.effect.IOInstances$$anon$1@698f7fb0

val bytes = T.async[Array[Byte]] { (cb: Either[Throwable,Array[Byte]] => Unit) =>
c.readBytesE(cb)
}
// bytes: cats.effect.IO[Array[Byte]] = IO$795809508
// bytes: cats.effect.IO[Array[Byte]] = IO$1494894241

Stream.eval(bytes).map(_.toList).compile.toVector.unsafeRunSync()
// res42: Vector[List[Byte]] = Vector(List(0, 1, 2))
Expand Down
2 changes: 1 addition & 1 deletion docs/src/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ val s2 = Stream(1,2,3,4).through(tk(2))
s2.toList
```

FS2 takes care to guarantee that any resources allocated by the `Pull` are released when the `.stream` completes. Note again that _nothing happens_ when we call `.stream` on a `Pull`, it is merely establishing a scope in which all resource allocations are tracked so that they may be appropriately freed.
FS2 takes care to guarantee that any resources allocated by the `Pull` are released when the stream completes. Note again that _nothing happens_ when we call `.stream` on a `Pull`, it is merely converting back to the `Stream` API.

There are lots of useful transformation functions in [`Stream`](../core/shared/src/main/scala/fs2/Stream.scala) built using the `Pull` type.

Expand Down

0 comments on commit 146e8fe

Please sign in to comment.