From 146e8fecd0b589dbc84aed2d04f721d2b02f6bad Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 8 Jan 2018 12:23:59 -0500 Subject: [PATCH 1/7] Fixed #1055 Changed how automatic scope creation is done. Instead of inserting scopes on the Pull => Stream boundary, they are now inserted on ++. Introduced `appendWithoutScope` for cases in which scope insertion at append time would be prohbitively expensive. --- core/jvm/src/test/scala/fs2/PipeSpec.scala | 1 + .../test/scala/fs2/ResourceSafetySpec.scala | 5 +++- .../src/main/scala/fs2/CompositeFailure.scala | 10 +++++--- core/shared/src/main/scala/fs2/Pipe.scala | 4 +-- core/shared/src/main/scala/fs2/Pipe2.scala | 4 +-- core/shared/src/main/scala/fs2/Pull.scala | 10 +------- core/shared/src/main/scala/fs2/Stream.scala | 25 +++++++++++++------ .../src/main/scala/fs2/internal/Algebra.scala | 2 +- docs/ReadmeExample.md | 2 +- docs/guide.md | 18 ++++++------- docs/src/guide.md | 2 +- 11 files changed, 46 insertions(+), 37 deletions(-) diff --git a/core/jvm/src/test/scala/fs2/PipeSpec.scala b/core/jvm/src/test/scala/fs2/PipeSpec.scala index de6c5d78a3..0d69207067 100644 --- a/core/jvm/src/test/scala/fs2/PipeSpec.scala +++ b/core/jvm/src/test/scala/fs2/PipeSpec.scala @@ -605,6 +605,7 @@ class PipeSpec extends Fs2Spec { Stream .range(0, 100) .map(i => (i, i)) + .scope .through(first(_.map(_ + 1).take(5))) .toList shouldBe List((1, 0), (2, 1), (3, 2), (4, 3), (5, 4)) } diff --git a/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala b/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala index bdf4772b50..0c6005384e 100644 --- a/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala +++ b/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala @@ -57,7 +57,10 @@ class ResourceSafetySpec extends Fs2Spec with EventuallySupport { _ => IO { c.incrementAndGet; throw Err }) val nested = s0.foldRight(innermost)((i, inner) => bracket(c)(Stream.emit(i) ++ inner)) try { runLog { nested }; throw Err } // this test should always fail, so the `run` should throw - catch { case Err => () } + catch { + case Err => () + case e: CompositeFailure if e.all.forall(_.isInstanceOf[Err.type]) => () + } withClue(f.tag) { 0L shouldBe c.get } } diff --git a/core/shared/src/main/scala/fs2/CompositeFailure.scala b/core/shared/src/main/scala/fs2/CompositeFailure.scala index 75e880b7ff..f7c4491720 100644 --- a/core/shared/src/main/scala/fs2/CompositeFailure.scala +++ b/core/shared/src/main/scala/fs2/CompositeFailure.scala @@ -4,11 +4,15 @@ import cats.data.NonEmptyList /** Represents multiple (>1) exceptions were thrown. */ final class CompositeFailure( - head: Throwable, - tail: NonEmptyList[Throwable] + val head: Throwable, + val tail: NonEmptyList[Throwable] ) extends Throwable( s"Multiple exceptions were thrown (${1 + tail.size}), first ${head.getClass.getName}: ${head.getMessage}", - head) + head) { + + /** Gets all causes (guaranteed to have at least 2 elements). */ + def all: NonEmptyList[Throwable] = head :: tail +} object CompositeFailure { def apply(first: Throwable, second: Throwable, rest: List[Throwable]): CompositeFailure = diff --git a/core/shared/src/main/scala/fs2/Pipe.scala b/core/shared/src/main/scala/fs2/Pipe.scala index e23ae3e489..bb5add4a97 100644 --- a/core/shared/src/main/scala/fs2/Pipe.scala +++ b/core/shared/src/main/scala/fs2/Pipe.scala @@ -19,7 +19,7 @@ object Pipe { .eval[Read, Option[Segment[I, Unit]]](FreeC.Eval(identity)) .flatMap { case None => Stream.empty - case Some(segment) => Stream.segment(segment).append(prompts) + case Some(segment) => Stream.segment(segment).appendWithoutScope(prompts) } // Steps `s` without overhead of resource tracking @@ -29,7 +29,7 @@ object Pipe { case Some((hd, tl)) => Pull.output1((hd, tl)) case None => Pull.done } - .streamNoScope + .stream .compile .last diff --git a/core/shared/src/main/scala/fs2/Pipe2.scala b/core/shared/src/main/scala/fs2/Pipe2.scala index bf63b0480f..70907c53a6 100644 --- a/core/shared/src/main/scala/fs2/Pipe2.scala +++ b/core/shared/src/main/scala/fs2/Pipe2.scala @@ -14,7 +14,7 @@ object Pipe2 { def prompts[X](id: ReadSegment[Option[Segment[X, Unit]]]): Stream[Read, X] = Stream.eval[Read, Option[Segment[X, Unit]]](FreeC.Eval(id)).flatMap { case None => Stream.empty - case Some(segment) => Stream.segment(segment).append(prompts(id)) + case Some(segment) => Stream.segment(segment).appendWithoutScope(prompts(id)) } def promptsL: Stream[Read, I] = prompts[I](Left(identity)) def promptsR: Stream[Read, I2] = prompts[I2](Right(identity)) @@ -26,7 +26,7 @@ object Pipe2 { case Some((hd, tl)) => Pull.output1((hd, tl)) case None => Pull.done } - .streamNoScope + .stream .compile .last diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index c03144e870..c464285696 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -37,15 +37,7 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing /** Interpret this `Pull` to produce a `Stream`. The result type `R` is discarded. */ def stream: Stream[F, O] = - Stream.fromFreeC(this.scope.get[F, O, Unit]) - - /** - * Like [[stream]] but no scope is inserted around the pull, resulting in any resources being - * promoted to the parent scope of the stream, extending the resource lifetime. Typically used - * as a performance optimization, where resource lifetime can be extended in exchange for faster - * execution. - */ - def streamNoScope: Stream[F, O] = Stream.fromFreeC(get[F, O, R].map(_ => ())) + Stream.fromFreeC(this.get[F, O, R].map(_ => ())) /** Applies the resource of this pull to `f` and returns the result. */ def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] = diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 7168d331cb..20b889b8e9 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -213,7 +213,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, */ def chunks: Stream[F, Chunk[O]] = this.repeatPull(_.unconsChunk.flatMap { - case None => Pull.pure(None); + case None => Pull.pure(None) case Some((hd, tl)) => Pull.output1(hd).as(Some(tl)) }) @@ -873,12 +873,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, /** * Tracks any resources acquired during this stream and releases them when the stream completes. * - * Scopes are typically inserted automatically, at the boundary of a pull (i.e., when a pull - * is converted to a stream). This method allows a scope to be explicitly demarcated, so that - * resources can be freed earlier than when using automatically inserted scopes. - * - * One use case is scoping the left hand side of an append: `(s1.scope ++ s2)`, which ensures - * resources acquired during `s1` are released onces the end of `s1` has been passed. + * Scopes are typically inserted automatically, in between stream appends (i.e., in `s1 ++ s2`, + * a scope is inserted around `s1` when appending) This method allows a scope to be explicitly + * demarcated, so that resources can be freed earlier than when using automatically inserted scopes. */ def scope: Stream[F, O] = Stream.fromFreeC(Algebra.scope(get)) @@ -1429,7 +1426,10 @@ object Stream { def iterateEval[F[_], A](start: A)(f: A => F[A]): Stream[F, A] = emit(start) ++ eval(f(start)).flatMap(iterateEval(_)(f)) - /** Allows to get current scope during evaluation of the stream **/ + /** + * Gets the current scope, allowing manual leasing or interruption. + * This is a low-level method and generally should not be used by user code. + */ def getScope[F[_]]: Stream[F, Scope[F]] = Stream.fromFreeC(Algebra.getScope[F, Scope[F]].flatMap(Algebra.output1(_))) @@ -1589,6 +1589,12 @@ object Stream { /** Appends `s2` to the end of this stream. Alias for `s1 ++ s2`. */ def append[O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] = + fromFreeC(self.scope.get[F, O2].flatMap { _ => + s2.get + }) + + /** Appends `s2` to the end of this stream without introducing a new scope around this stream. */ + def appendWithoutScope[O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] = fromFreeC(self.get[F, O2].flatMap { _ => s2.get }) @@ -2597,6 +2603,9 @@ object Stream { def append[F[_], O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] = covary[F].append(s2) + def appendWithoutScope[F[_], O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] = + covary[F].appendWithoutScope(s2) + def concurrently[F[_], O2](that: Stream[F, O2])(implicit F: Effect[F], ec: ExecutionContext): Stream[F, O] = covary[F].concurrently(that) diff --git a/core/shared/src/main/scala/fs2/internal/Algebra.scala b/core/shared/src/main/scala/fs2/internal/Algebra.scala index b8dee9c347..5745e10fc9 100644 --- a/core/shared/src/main/scala/fs2/internal/Algebra.scala +++ b/core/shared/src/main/scala/fs2/internal/Algebra.scala @@ -360,7 +360,7 @@ private[fs2] object Algebra { case gs: GetScope[F, O2] => gs.asInstanceOf[Algebra[G, O2, X]] } } - fr.translate[Algebra[G, O, ?]](algFtoG) + FreeC.suspend(fr.translate[Algebra[G, O, ?]](algFtoG)) } } diff --git a/docs/ReadmeExample.md b/docs/ReadmeExample.md index aed40a63dd..d96848d718 100644 --- a/docs/ReadmeExample.md +++ b/docs/ReadmeExample.md @@ -98,7 +98,7 @@ There are a number of ways of interpreting the stream. In this case, we call `co ```scala scala> val task: IO[Unit] = written.compile.drain -task: cats.effect.IO[Unit] = IO$129936443 +task: cats.effect.IO[Unit] = IO$1552023456 ``` We still haven't *done* anything yet. Effects only occur when we run the resulting task. We can run a `IO` by calling `unsafeRunSync()` -- the name is telling us that calling it performs effects and hence, it is not referentially transparent. diff --git a/docs/guide.md b/docs/guide.md index eb1be4c733..5c3243540e 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -137,13 +137,13 @@ val eff = Stream.eval(IO { println("TASK BEING RUN!!"); 1 + 1 }) // eff: fs2.Stream[cats.effect.IO,Int] = Stream(..) val ra = eff.compile.toVector // gather all output into a Vector -// ra: cats.effect.IO[Vector[Int]] = IO$1874296212 +// ra: cats.effect.IO[Vector[Int]] = IO$2094280090 val rb = eff.compile.drain // purely for effects -// rb: cats.effect.IO[Unit] = IO$2100089851 +// rb: cats.effect.IO[Unit] = IO$1715372649 val rc = eff.compile.fold(0)(_ + _) // run and accumulate some result -// rc: cats.effect.IO[Int] = IO$440495402 +// rc: cats.effect.IO[Int] = IO$767519369 ``` Notice these all return a `IO` of some sort, but this process of compilation doesn't actually _perform_ any of the effects (nothing gets printed). @@ -274,10 +274,10 @@ scala> val count = new java.util.concurrent.atomic.AtomicLong(0) count: java.util.concurrent.atomic.AtomicLong = 0 scala> val acquire = IO { println("incremented: " + count.incrementAndGet); () } -acquire: cats.effect.IO[Unit] = IO$477876869 +acquire: cats.effect.IO[Unit] = IO$350659135 scala> val release = IO { println("decremented: " + count.decrementAndGet); () } -release: cats.effect.IO[Unit] = IO$1801684415 +release: cats.effect.IO[Unit] = IO$1048497406 ``` ```scala @@ -436,7 +436,7 @@ scala> s2.toList res33: List[Int] = List(1, 2) ``` -FS2 takes care to guarantee that any resources allocated by the `Pull` are released when the `.stream` completes. Note again that _nothing happens_ when we call `.stream` on a `Pull`, it is merely establishing a scope in which all resource allocations are tracked so that they may be appropriately freed. +FS2 takes care to guarantee that any resources allocated by the `Pull` are released when the stream completes. Note again that _nothing happens_ when we call `.stream` on a `Pull`, it is merely converting back to the `Stream` API. There are lots of useful transformation functions in [`Stream`](../core/shared/src/main/scala/fs2/Stream.scala) built using the `Pull` type. @@ -554,7 +554,7 @@ import cats.effect.Sync // import cats.effect.Sync val T = Sync[IO] -// T: cats.effect.Sync[cats.effect.IO] = cats.effect.IOInstances$$anon$1@3f28af56 +// T: cats.effect.Sync[cats.effect.IO] = cats.effect.IOInstances$$anon$1@698f7fb0 val s = Stream.eval_(T.delay { destroyUniverse() }) ++ Stream("...moving on") // s: fs2.Stream[cats.effect.IO,String] = Stream(..) @@ -611,12 +611,12 @@ val c = new Connection { // Effect extends both Sync and Async val T = cats.effect.Effect[IO] -// T: cats.effect.Effect[cats.effect.IO] = cats.effect.IOInstances$$anon$1@3f28af56 +// T: cats.effect.Effect[cats.effect.IO] = cats.effect.IOInstances$$anon$1@698f7fb0 val bytes = T.async[Array[Byte]] { (cb: Either[Throwable,Array[Byte]] => Unit) => c.readBytesE(cb) } -// bytes: cats.effect.IO[Array[Byte]] = IO$795809508 +// bytes: cats.effect.IO[Array[Byte]] = IO$1494894241 Stream.eval(bytes).map(_.toList).compile.toVector.unsafeRunSync() // res42: Vector[List[Byte]] = Vector(List(0, 1, 2)) diff --git a/docs/src/guide.md b/docs/src/guide.md index ce2e8fca6c..c8865005f8 100644 --- a/docs/src/guide.md +++ b/docs/src/guide.md @@ -338,7 +338,7 @@ val s2 = Stream(1,2,3,4).through(tk(2)) s2.toList ``` -FS2 takes care to guarantee that any resources allocated by the `Pull` are released when the `.stream` completes. Note again that _nothing happens_ when we call `.stream` on a `Pull`, it is merely establishing a scope in which all resource allocations are tracked so that they may be appropriately freed. +FS2 takes care to guarantee that any resources allocated by the `Pull` are released when the stream completes. Note again that _nothing happens_ when we call `.stream` on a `Pull`, it is merely converting back to the `Stream` API. There are lots of useful transformation functions in [`Stream`](../core/shared/src/main/scala/fs2/Stream.scala) built using the `Pull` type. From c9b343c8d691caccef69c9ea353ba8d520e89f2a Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 8 Jan 2018 12:26:45 -0500 Subject: [PATCH 2/7] Reverted unnecessary change in PipeSpec --- core/jvm/src/test/scala/fs2/PipeSpec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/jvm/src/test/scala/fs2/PipeSpec.scala b/core/jvm/src/test/scala/fs2/PipeSpec.scala index 0d69207067..de6c5d78a3 100644 --- a/core/jvm/src/test/scala/fs2/PipeSpec.scala +++ b/core/jvm/src/test/scala/fs2/PipeSpec.scala @@ -605,7 +605,6 @@ class PipeSpec extends Fs2Spec { Stream .range(0, 100) .map(i => (i, i)) - .scope .through(first(_.map(_ + 1).take(5))) .toList shouldBe List((1, 0), (2, 1), (3, 2), (4, 3), (5, 4)) } From cea0b20e8a2fc5ffbad37acc21faa7c37afd4515 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 8 Jan 2018 12:30:35 -0500 Subject: [PATCH 3/7] Reverted unnecessary FreeC.suspend in translate --- core/shared/src/main/scala/fs2/internal/Algebra.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/internal/Algebra.scala b/core/shared/src/main/scala/fs2/internal/Algebra.scala index 5745e10fc9..b8dee9c347 100644 --- a/core/shared/src/main/scala/fs2/internal/Algebra.scala +++ b/core/shared/src/main/scala/fs2/internal/Algebra.scala @@ -360,7 +360,7 @@ private[fs2] object Algebra { case gs: GetScope[F, O2] => gs.asInstanceOf[Algebra[G, O2, X]] } } - FreeC.suspend(fr.translate[Algebra[G, O, ?]](algFtoG)) + fr.translate[Algebra[G, O, ?]](algFtoG) } } From aa50586791e6081cd7578a418b26b1ba88de05ac Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 8 Jan 2018 12:49:41 -0500 Subject: [PATCH 4/7] Removed automatic scoping around ++ --- .../test/scala/fs2/ResourceSafetySpec.scala | 5 +++-- core/shared/src/main/scala/fs2/Pipe.scala | 2 +- core/shared/src/main/scala/fs2/Pipe2.scala | 2 +- core/shared/src/main/scala/fs2/Stream.scala | 18 ++++++------------ 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala b/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala index 0c6005384e..c4c06a5180 100644 --- a/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala +++ b/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala @@ -56,10 +56,11 @@ class ResourceSafetySpec extends Fs2Spec with EventuallySupport { Stream.bracket(IO(c.decrementAndGet))(_ => f.get, _ => IO { c.incrementAndGet; throw Err }) val nested = s0.foldRight(innermost)((i, inner) => bracket(c)(Stream.emit(i) ++ inner)) + def allErr(e: CompositeFailure): Boolean = e.all.forall { case Err => true; case _ => false } try { runLog { nested }; throw Err } // this test should always fail, so the `run` should throw catch { - case Err => () - case e: CompositeFailure if e.all.forall(_.isInstanceOf[Err.type]) => () + case Err => () + case e: CompositeFailure if allErr(e) => () } withClue(f.tag) { 0L shouldBe c.get } } diff --git a/core/shared/src/main/scala/fs2/Pipe.scala b/core/shared/src/main/scala/fs2/Pipe.scala index bb5add4a97..1337c0c13d 100644 --- a/core/shared/src/main/scala/fs2/Pipe.scala +++ b/core/shared/src/main/scala/fs2/Pipe.scala @@ -19,7 +19,7 @@ object Pipe { .eval[Read, Option[Segment[I, Unit]]](FreeC.Eval(identity)) .flatMap { case None => Stream.empty - case Some(segment) => Stream.segment(segment).appendWithoutScope(prompts) + case Some(segment) => Stream.segment(segment).append(prompts) } // Steps `s` without overhead of resource tracking diff --git a/core/shared/src/main/scala/fs2/Pipe2.scala b/core/shared/src/main/scala/fs2/Pipe2.scala index 70907c53a6..901f4ab6d8 100644 --- a/core/shared/src/main/scala/fs2/Pipe2.scala +++ b/core/shared/src/main/scala/fs2/Pipe2.scala @@ -14,7 +14,7 @@ object Pipe2 { def prompts[X](id: ReadSegment[Option[Segment[X, Unit]]]): Stream[Read, X] = Stream.eval[Read, Option[Segment[X, Unit]]](FreeC.Eval(id)).flatMap { case None => Stream.empty - case Some(segment) => Stream.segment(segment).appendWithoutScope(prompts(id)) + case Some(segment) => Stream.segment(segment).append(prompts(id)) } def promptsL: Stream[Read, I] = prompts[I](Left(identity)) def promptsR: Stream[Read, I2] = prompts[I2](Right(identity)) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 20b889b8e9..862d35951e 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -873,9 +873,12 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, /** * Tracks any resources acquired during this stream and releases them when the stream completes. * - * Scopes are typically inserted automatically, in between stream appends (i.e., in `s1 ++ s2`, - * a scope is inserted around `s1` when appending) This method allows a scope to be explicitly - * demarcated, so that resources can be freed earlier than when using automatically inserted scopes. + * Scopes are sometimes inserted automatically, (e.g., as a result of calling `handleErrorWith`). + * This method allows a scope to be explicitly demarcated, so that resources can be freed earlier + * than when using automatically inserted scopes. + * + * One use case is scoping the left hand side of an append: `(s1.scope ++ s2)`, which ensures + * resources acquired during `s1` are released onces the end of `s1` has been passed. */ def scope: Stream[F, O] = Stream.fromFreeC(Algebra.scope(get)) @@ -1589,12 +1592,6 @@ object Stream { /** Appends `s2` to the end of this stream. Alias for `s1 ++ s2`. */ def append[O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] = - fromFreeC(self.scope.get[F, O2].flatMap { _ => - s2.get - }) - - /** Appends `s2` to the end of this stream without introducing a new scope around this stream. */ - def appendWithoutScope[O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] = fromFreeC(self.get[F, O2].flatMap { _ => s2.get }) @@ -2603,9 +2600,6 @@ object Stream { def append[F[_], O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] = covary[F].append(s2) - def appendWithoutScope[F[_], O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] = - covary[F].appendWithoutScope(s2) - def concurrently[F[_], O2](that: Stream[F, O2])(implicit F: Effect[F], ec: ExecutionContext): Stream[F, O] = covary[F].concurrently(that) From 77694d20da98223864a03f86bb586bc37a223019 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 8 Jan 2018 12:57:14 -0500 Subject: [PATCH 5/7] Adjusted syntax for 2.11 scalac bug workaround --- core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala b/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala index c4c06a5180..7f4af91f6d 100644 --- a/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala +++ b/core/jvm/src/test/scala/fs2/ResourceSafetySpec.scala @@ -56,11 +56,10 @@ class ResourceSafetySpec extends Fs2Spec with EventuallySupport { Stream.bracket(IO(c.decrementAndGet))(_ => f.get, _ => IO { c.incrementAndGet; throw Err }) val nested = s0.foldRight(innermost)((i, inner) => bracket(c)(Stream.emit(i) ++ inner)) - def allErr(e: CompositeFailure): Boolean = e.all.forall { case Err => true; case _ => false } try { runLog { nested }; throw Err } // this test should always fail, so the `run` should throw catch { - case Err => () - case e: CompositeFailure if allErr(e) => () + case Err => () + case e: CompositeFailure if e.all.forall { case Err => true; case _ => false } => () } withClue(f.tag) { 0L shouldBe c.get } } From 43abb98ee235da6388fb946deed0fa13bf96b003 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 8 Jan 2018 12:58:19 -0500 Subject: [PATCH 6/7] Doc cleanup --- core/shared/src/main/scala/fs2/Stream.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 862d35951e..45be6c732d 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -876,9 +876,6 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * Scopes are sometimes inserted automatically, (e.g., as a result of calling `handleErrorWith`). * This method allows a scope to be explicitly demarcated, so that resources can be freed earlier * than when using automatically inserted scopes. - * - * One use case is scoping the left hand side of an append: `(s1.scope ++ s2)`, which ensures - * resources acquired during `s1` are released onces the end of `s1` has been passed. */ def scope: Stream[F, O] = Stream.fromFreeC(Algebra.scope(get)) From 45365f59c85ec7d32852985f2831fed37d61649b Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 8 Jan 2018 13:44:04 -0500 Subject: [PATCH 7/7] Fixed scope test --- core/jvm/src/test/scala/fs2/StreamSpec.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/jvm/src/test/scala/fs2/StreamSpec.scala b/core/jvm/src/test/scala/fs2/StreamSpec.scala index 0e8dbfb841..bf3ff7df42 100644 --- a/core/jvm/src/test/scala/fs2/StreamSpec.scala +++ b/core/jvm/src/test/scala/fs2/StreamSpec.scala @@ -321,6 +321,7 @@ class StreamSpec extends Fs2Spec with Inside { } "scope" in { + // TODO This test should be replaced with one that shows proper usecase for .scope val c = new java.util.concurrent.atomic.AtomicLong(0) val s1 = Stream.emit("a").covary[IO] val s2 = Stream.bracket(IO { c.incrementAndGet() shouldBe 1L; () })( @@ -328,7 +329,7 @@ class StreamSpec extends Fs2Spec with Inside { _ => IO { c.decrementAndGet(); () } ) runLog { - (s1.scope ++ s2).take(2).repeat.take(4).merge(Stream.eval_(IO.unit)) + (s1.scope ++ s2).take(2).scope.repeat.take(4).merge(Stream.eval_(IO.unit)) } }