diff --git a/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala b/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala index 896d1bc255..c681c46948 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/PairSCollectionFunctions.scala @@ -1012,6 +1012,16 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) { def minByKey(implicit ord: Ordering[V]): SCollection[(K, V)] = this.reduceByKey(ord.min) + /** + * Return latest of values for each key according to its event time, or null if there are no + * elements. + * @return + * a new SCollection of (key, latest value) pairs + * @group per_key + */ + def latestByKey: SCollection[(K, V)] = + self.applyPerKey(Latest.perKey[K, V]())(kvToTuple) + /** * Merge the values for each key using an associative reduce function. This will also perform the * merging locally on each mapper before sending results to a reducer, similarly to a "combiner" diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index f7f3221493..9c149e3758 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -807,6 +807,16 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { def min(implicit ord: Ordering[T]): SCollection[T] = this.reduce(ord.min) + /** + * Return the latest of this SCollection according to its event time, or null if there are no + * elements. + * @return + * a new SCollection with the latest element + * @group transform + */ + def latest: SCollection[T] = + this.pApply(Latest.globally()) + /** * Compute the SCollection's data distribution using approximate `N`-tiles. * @return diff --git a/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala index 94fca173e3..63546ee69b 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala @@ -35,6 +35,7 @@ import org.apache.beam.sdk.coders.{ VarIntCoder, ZstdCoder => BZstdCoder } +import org.joda.time.Instant import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -717,6 +718,17 @@ class PairSCollectionFunctionsTest extends PipelineSpec { } } + it should "support latestByKey()" in { + runWithContext { sc => + val p = sc + .parallelize(Seq(("a", 1L), ("a", 10L), ("b", 2L), ("b", 20L))) + .timestampBy { case (_, v) => Instant.ofEpochMilli(v) } + .latestByKey + + p should containInAnyOrder(Seq(("a", 10L), ("b", 20L))) + } + } + it should "support reduceByKey()" in { runWithContext { sc => val p = sc diff --git a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala index ba4ada0fb9..b7f2dbeb6a 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala @@ -527,6 +527,14 @@ class SCollectionTest extends PipelineSpec { } } + it should "support latest" in { + runWithContext { sc => + def latest(elems: Long*): SCollection[Long] = + sc.parallelize(elems).timestampBy(Instant.ofEpochMilli).latest + latest(1L, 2L, 3L) should containSingleValue(3L) + } + } + it should "support quantilesApprox()" in { runWithContext { sc => val p = sc.parallelize(0 to 100).quantilesApprox(5)