Dit is nog niet gepubliceerd, maar in de master branch van Alpakka, MongoSource.apply
neemt een typeparameter:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Daarom kun je met de aanstaande 0.18-release van Alpakka het volgende doen:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Merk op dat source
hier wordt ervan uitgegaan dat todoCollection.find()
retourneert een Observable[TodoMongo]
; pas de typen naar behoefte aan.
In de tussentijd kunt u de bovenstaande code eenvoudig handmatig toevoegen. Bijvoorbeeld:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Merk op dat MyMongoSource
is gedefinieerd om te verblijven in de akka.stream.alpakka.mongodb.scaladsl
pakket (zoals MongoSource
), omdat ObservableToPublisher
is een pakket-privéles. U zou MyMongoSource
. gebruiken op dezelfde manier waarop u MongoSource
. zou gebruiken :
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())