diff --git a/src/main/scala/com/daml/projection/scaladsl/Consumer.scala b/src/main/scala/com/daml/projection/scaladsl/Consumer.scala index c6f29ab..adc8522 100644 --- a/src/main/scala/com/daml/projection/scaladsl/Consumer.scala +++ b/src/main/scala/com/daml/projection/scaladsl/Consumer.scala @@ -321,8 +321,7 @@ private[projection] object Consumer extends StrictLogging { Instant.ofEpochSecond(timestamp.seconds, timestamp.nanos.toLong)) val workflowId = txTree.workflowId val newProjectionOffset = Offset(txTree.offset) - val envelopes = txTree.rootEventIds.flatMap { eventId => - val event = txTree.eventsById(eventId) + val envelopes = txTree.eventsById.valuesIterator.flatMap { event => val envelope = Envelope[TreeEvent]( event, @@ -332,7 +331,7 @@ private[projection] object Consumer extends StrictLogging { Some(newProjectionOffset) ) if (predicate(envelope)) Some(envelope) else None - } + }.toList if (envelopes.nonEmpty) { envelopes :+ TxBoundary[TreeEvent](projectionId, newProjectionOffset) } else { diff --git a/src/test/scala/com/daml/projection/javadsl/JavaApiSpec.scala b/src/test/scala/com/daml/projection/javadsl/JavaApiSpec.scala index cb4bbc7..eae246c 100644 --- a/src/test/scala/com/daml/projection/javadsl/JavaApiSpec.scala +++ b/src/test/scala/com/daml/projection/javadsl/JavaApiSpec.scala @@ -468,9 +468,9 @@ class JavaApiSpec ) Then("the projected table should contain the events") // create + exercise - transferResultContractIds.size must be(4) + transferResultContractIds.size must be(6) transferResultContractIds must contain(firstResultContractId) - transferResultContractIds.filter(_ == "").size must be(2) + transferResultContractIds.filter(_ == "").size must be(4) Then("the projection has advanced to the tx offset associated to the archived event") projector.getCurrentOffset(projection).toScala must be(