Skip to content

Commit

Permalink
Merge pull request typelevel#2706 from zainab-ali/pull-resources
Browse files Browse the repository at this point in the history
Improve the guide on Pull
  • Loading branch information
mpilquist authored Nov 6, 2021
2 parents e80dc1e + 3c14cc9 commit 25b19c5
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 24 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Remember to follow the [code of conduct][coc] in online and offline discourse.

### Prerequisites

You'll need JDK 11, [sbt][sbt], [Node.js][node] (for running Scala.js tests) and [Jekyll][jekyll] (for building the microsite).
You'll need JDK 16, [sbt][sbt], [Node.js][node] (for running Scala.js tests) and [Jekyll][jekyll] (for building the microsite).

We use several sbt plugins to build and check the project, including [MiMa (Migration Manager)][mima], [scalafmt][scalafmt] and [sbt-microsites][sbt-microsites].

Expand Down
2 changes: 1 addition & 1 deletion build-site.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ You can build the site and preview it locally.
## With Nix

1. Run `nix-shell --run "sbt 'microsite/mdoc --watch'"`.
2. Run `nix-shell --run "node_modules/docsify-cli/bin/docsify serve target/website/docs/"` in a different terminal.
2. Run `nix-shell --run "node_modules/docsify-cli/bin/docsify serve target/website/"` in a different terminal.

## Without Nix

Expand Down
16 changes: 5 additions & 11 deletions shell.nix
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{ java ? "openjdk11" }:
{ java ? "openjdk16" }:

let
jdk = pkgs.${java};
Expand All @@ -7,13 +7,6 @@ let
packageOverrides = p: rec {
sbt = p.sbt.overrideAttrs (
old: rec {
version = "1.4.6";

src = builtins.fetchurl {
url = "https://github.com/sbt/sbt/releases/download/v${version}/sbt-${version}.tgz";
sha256 = "194xdz55cq4w7jlxl8df9vacil37jahimav620878q4ng67g59l6";
};

patchPhase = ''
echo -java-home ${jdk} >> conf/sbtopts
'';
Expand All @@ -23,9 +16,9 @@ let
};

nixpkgs = builtins.fetchTarball {
name = "nixos-unstable-2021-01-03";
url = "https://github.com/NixOS/nixpkgs/archive/56bb1b0f7a3.tar.gz";
sha256 = "1wl5yglgj3ajbf2j4dzgsxmgz7iqydfs514w73fs9a6x253wzjbs";
name = "nixos-21.05";
url = "https://github.com/NixOS/nixpkgs/archive/refs/tags/21.05.tar.gz";
sha256 = "1ckzhh24mgz6jd1xhfgx0i9mijk6xjqxwsshnvq789xsavrmsc36";
};

pkgs = import nixpkgs { inherit config; };
Expand All @@ -34,6 +27,7 @@ in
buildInputs = [
jdk
pkgs.nodejs-14_x
pkgs.yarn
pkgs.sbt
];

Expand Down
1 change: 1 addition & 0 deletions site/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [Inference Driven Design](https://mpilquist.github.io/blog/2018/07/04/fs2/), by [Michael Pilquist][mpilquist], describes some of the tradeoffs in designing the API and the code of FS2, used to work around some of the problems in the Scala compiler.
* [Tips for working with FS2](https://underscore.io/blog/posts/2018/03/20/fs2.html), by [Pere Villega](https://github.com/pvillega),
* [A streaming library with a superpower: FS2 and functional programming](https://medium.freecodecamp.org/a-streaming-library-with-a-superpower-fs2-and-functional-programming-6f602079f70a).
* [No leftovers: Working with pulls in fs2](https://blog.kebab-ca.se/chapters/fs2/overview.html), by [Zainab Ali](https://github.com/zainab-ali).

#### Books

Expand Down
81 changes: 70 additions & 11 deletions site/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,54 @@ else Some(c => c.size match {

Otherwise, we return a function which processes the next chunk in the stream. The function first checks the size of the chunk. If it is less than the number of elements to take, it returns the chunk unmodified, causing it to be output downstream, along with the number of remaining elements to take from subsequent chunks (`n - m`). If instead, the chunks size is greater than the number of elements left to take, `n` elements are taken from the chunk and output, along with an indication that there are no more elements to take.

#### Transforming streams using pulls

Sometimes, `scanChunksOpt` isn't powerful enough to express the stream transformation. Regardless of how complex the job, the `fs2.Pull` type can usually express it.

The `Pull[+F[_],+O,+R]` type represents a program that may pull values from one or more streams, write _output_ of type `O`, and return a _result_ of type `R`. It forms a monad in `R` and comes equipped with lots of other useful operations. See the
[`Pull` class](https://github.com/functional-streams-for-scala/fs2/blob/series/1.0/core/shared/src/main/scala/fs2/Pull.scala)
The `Pull[F[_],O,R]` type represents a program that may pull values from one or more streams, write _output_ of type `O`, and return a _result_ of type `R`. It forms a monad in `R` and comes equipped with lots of other useful operations. See the
[`Pull` class](https://github.com/functional-streams-for-scala/fs2/blob/main/core/shared/src/main/scala/fs2/Pull.scala)
for the full set of operations on `Pull`.

Let's look at an implementation of `take` using `Pull`:
A pull that writes a single a single output of type `Int` can be constructed with `Pull.output1`.

```scala mdoc:reset
import fs2._
val p1 = Pull.output1(1)
```

This can be converted directly to a stream equivalent to `Stream(1)`.

```scala mdoc
val s1 = p1.stream
```

Pulls form a monad in their result `R` and can be composed using monadic operations. The following code produces a `Pull` corresponding to `Stream(1, 2)`.

```scala mdoc
p1 >> Pull.output1(2)
```

A `Pull` can be created from a stream using a variety of operations accessed under the `pull` function. For example `echo` converts a stream to its corresponding pull representation.

```scala mdoc
s1.pull.echo
```


A more useful pull is created by `uncons`. This constructs a pull that pulls the next chunk from the stream.

```scala mdoc
s1.pull.uncons
```

Let’s examine its result type.
- The `Option` is non-empty if there is a chunk to pull from the stream. If the stream has terminated and there are no more chunks to pull then the result is `None`.
- `Chunk[Int]` represents the pulled chunk.
- `Stream[Nothing, Int]` represents the tail of the stream.

Note that the output type is `Nothing` because the pull does not output any elements.

Let's look at an implementation of `take` using `uncons`:

```scala mdoc:reset
import fs2._
Expand All @@ -297,7 +338,7 @@ def tk[F[_],O](n: Long): Pipe[F,O,O] = {
case Some((hd,tl)) =>
hd.size match {
case m if m <= n => Pull.output(hd) >> go(tl, n - m)
case _ => Pull.output(hd.take(n.toInt)) >> Pull.done
case _ => Pull.output(hd.take(n.toInt))
}
case None => Pull.done
}
Expand Down Expand Up @@ -326,11 +367,17 @@ Calling `s.pull` gives us a variety of methods which convert the stream to a `Pu
case Some((hd,tl)) =>
hd.size match {
case m if m <= n => Pull.output(hd) >> go(tl, n - m)
case m => Pull.output(hd.take(n)) >> Pull.done
case m => Pull.output(hd.take(n.toInt))
}
```

If we receive a `Some`, we destructure the tuple as `hd: Chunk[O]` and `tl: Stream[F,O]`. We then check the size of the head chunk, similar to the logic we used in the `scanChunksOpt` version. If the chunk size is less than or equal to the remaining elements to take, the chunk is output via `Pull.output` and we then recurse on the tail by calling `go`, passing the remaining elements to take. Otherwise we output the first `n` elements of the head and indicate we are done pulling.
If we receive a `Some`, we destructure the tuple as `hd: Chunk[O]` and `tl: Stream[F,O]`. We then check the size of the head chunk. If the chunk size is less than or equal to the remaining elements to take, the chunk is output via `Pull.output` and we then recurse on the tail of the stream `tl` by calling `go`, passing the remaining elements to take. Otherwise we output the first `n` elements of the head.

```scala
case None => Pull.done
```

A `None` is received when there are no more elements to pull from the `in` stream. We finish pulling with `Pull.done`, a pull that does nothing.

```scala
in => go(in,n).stream
Expand All @@ -345,8 +392,20 @@ s2.toList

FS2 takes care to guarantee that any resources allocated by the `Pull` are released when the stream completes. Note again that _nothing happens_ when we call `.stream` on a `Pull`, it is merely converting back to the `Stream` API.

In practise, explicit recursion is rarely necessary — the methods under `pull` usually recurse for us. The explicit recursion in `tk` could be removed by using `pull.take`.

```scala mdoc:reset
import fs2._

def tk[F[_],O](n: Long): Pipe[F,O,O] = {
in => in.pull.take(n).void.stream
}

Stream(1,2,3,4).through(tk(2)).toList
```

There are lots of useful transformation functions in
[`Stream`](https://github.com/functional-streams-for-scala/fs2/blob/series/1.0/core/shared/src/main/scala/fs2/Stream.scala)
[`Stream`](https://github.com/functional-streams-for-scala/fs2/blob/main/core/shared/src/main/scala/fs2/Stream.scala)
built using the `Pull` type.

### Exercises Stream Transforming
Expand Down Expand Up @@ -557,7 +616,7 @@ Stream.eval(bytes).map(_.toList).compile.toVector.unsafeRunSync()
```

Be sure to check out the
[`fs2.io`](https://github.com/functional-streams-for-scala/fs2/tree/series/1.0/io/)
[`fs2.io`](https://github.com/functional-streams-for-scala/fs2/tree/main/io/)
package which has nice FS2 bindings to Java NIO libraries, using exactly this approach.

#### Asynchronous effects (callbacks invoked multiple times)
Expand Down Expand Up @@ -598,8 +657,8 @@ def rows[F[_]](h: CSVHandle)(implicit F: Async[F]): Stream[F,Row] = {
}
```

See [`Queue`](https://github.com/functional-streams-for-scala/fs2/blob/series/1.0/core/shared/src/main/scala/fs2/concurrent/Queue.scala)
for more useful methods. Most concurrent queues in FS2 support tracking their size, which is handy for implementing size-based throttling of the producer.
See [`Queue`](https://github.com/typelevel/cats-effect/blob/series/3.x/std/shared/src/main/scala/cats/effect/std/Queue.scala)
for more useful methods. Most concurrent queues in cats effect support tracking their size, which is handy for implementing size-based throttling of the producer.

### Reactive streams

Expand Down Expand Up @@ -643,7 +702,7 @@ A unicast publisher must have a single subscriber only.
Want to learn more?

* Worked examples: these present a nontrivial example of use of the library, possibly making use of lots of different library features.
* [The README example](https://github.com/functional-streams-for-scala/fs2/blob/series/1.0/docs/ReadmeExample.md)
* [The example](https://fs2.io/#/getstarted/example)
* More contributions welcome! Open a PR, following the style of one of the examples above. You can either start with a large block of code and break it down line by line, or work up to something more complicated using some smaller bits of code first.
* Detailed coverage of different modules in the library:
* File I/O
Expand Down

0 comments on commit 25b19c5

Please sign in to comment.