Dit is nog niet gepubliceerd, maar in de master branch van Alpakka, MongoSource.apply
neemt een typeparameter:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Daarom kun je met de aanstaande 0.18-release van Alpakka het volgende doen:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Merk op dat source hier wordt ervan uitgegaan dat todoCollection.find() retourneert een Observable[TodoMongo]; pas de typen naar behoefte aan.
In de tussentijd kunt u de bovenstaande code eenvoudig handmatig toevoegen. Bijvoorbeeld:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Merk op dat MyMongoSource is gedefinieerd om te verblijven in de akka.stream.alpakka.mongodb.scaladsl pakket (zoals MongoSource ), omdat ObservableToPublisher
is een pakket-privéles. U zou MyMongoSource . gebruiken op dezelfde manier waarop u MongoSource . zou gebruiken :
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())