Skip to content

Commit

Permalink
Merge pull request typelevel#2768 from armanbilge/topic/channel-queue…
Browse files Browse the repository at this point in the history
…-sink

Add `Channel#trySend`
  • Loading branch information
mpilquist authored Mar 29, 2022
2 parents 98e57d3 + 2ca8783 commit f504097
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 8 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.compression.Compression.gunzip$default$1$"
),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.ChunkCompanionPlatform.makeArrayBuilder")
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.ChunkCompanionPlatform.makeArrayBuilder"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.concurrent.Channel.trySend")
)

lazy val root = tlCrossRootProject
Expand Down
43 changes: 36 additions & 7 deletions core/shared/src/main/scala/fs2/concurrent/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ sealed trait Channel[F[_], A] {
*/
def send(a: A): F[Either[Channel.Closed, Unit]]

/** Attempts to send an element through this channel, and indicates if
* it succeeded (`true`) or not (`false`).
*
* It can be called concurrently by multiple producers, and it may
* not succeed if the channel is bounded or synchronous. It will
* never semantically block.
*
* No-op if the channel is closed, see [[close]] for further info.
*/
def trySend(a: A): F[Either[Channel.Closed, Boolean]]

/** The stream of elements sent through this channel.
* It terminates if [[close]] is called and all elements in the channel
* have been emitted (see [[close]] for futher info).
Expand Down Expand Up @@ -145,33 +156,48 @@ object Channel {
F.uncancelable { poll =>
state.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed.pure[F])
(s, Channel.closed[Unit].pure[F])

case State(values, size, waiting, producers, closed @ false) =>
if (size < capacity)
(
State(a :: values, size + 1, None, producers, false),
notifyStream(waiting)
notifyStream(waiting).as(rightUnit)
)
else
(
State(values, size, None, (a, producer) :: producers, false),
notifyStream(waiting) <* waitOnBound(producer, poll)
notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll)
)
}.flatten
}
}

def trySend(a: A) =
state.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Boolean].pure[F])

case s @ State(values, size, waiting, producers, closed @ false) =>
if (size < capacity)
(
State(a :: values, size + 1, None, producers, false),
notifyStream(waiting).as(rightTrue)
)
else
(s, rightFalse.pure[F])
}.flatten

def close =
state
.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed.pure[F])
(s, Channel.closed[Unit].pure[F])

case State(values, size, waiting, producers, closed @ false) =>
(
State(values, size, None, producers, true),
notifyStream(waiting) <* signalClosure
notifyStream(waiting).as(rightUnit) <* signalClosure
)
}
.flatten
Expand Down Expand Up @@ -219,7 +245,7 @@ object Channel {
}.flatten

def notifyStream(waitForChanges: Option[Deferred[F, Unit]]) =
waitForChanges.traverse(_.complete(())).as(rightUnit)
waitForChanges.traverse(_.complete(()))

def waitOnBound(producer: Deferred[F, Unit], poll: Poll[F]) =
poll(producer.get).onCancel {
Expand Down Expand Up @@ -248,6 +274,9 @@ object Channel {
}

// allocate once
private final val closed: Either[Closed, Unit] = Left(Closed)
@inline private final def closed[A]: Either[Closed, A] = _closed
private[this] final val _closed: Either[Closed, Nothing] = Left(Closed)
private final val rightUnit: Either[Closed, Unit] = Right(())
private final val rightTrue: Either[Closed, Boolean] = Right(true)
private final val rightFalse: Either[Closed, Boolean] = Right(false)
}
13 changes: 13 additions & 0 deletions core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ class ChannelSuite extends Fs2Suite {
p.assertEquals(v)
}

test("trySend does not block") {
val v = Vector(1, 2, 3, 4)
val capacity = 3
val p = for {
chan <- Channel.bounded[IO, Int](capacity)
_ <- v.traverse(chan.trySend)
_ <- chan.close
res <- chan.stream.chunks.take(1).compile.lastOrError
} yield res.toVector

p.assertEquals(v.take(capacity))
}

test("Timely closure") {
val v = Vector(1, 2, 3)
val p = for {
Expand Down

0 comments on commit f504097

Please sign in to comment.