Ik ben geen expert op het gebied van mongodb, maar op basis van de voorbeelden die ik heb gezien, is dit een patroon dat ik zou proberen.
Ik heb de andere gebeurtenissen dan de gegevens weggelaten, omdat het beperken ervan de grootste zorg lijkt te zijn.
var cursor = db.collection('mycollection').find({});
const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait
const nextBatch = () => {
if(cursor.hasNext()) {
cursorNext.next('next');
}
});
cursorNext
.switchMap(() => // wait for cursorNext to signal
Rx.Observable.fromPromise(cursor.next()) // get a single doc
.repeat() // get another
.takeWhile(() => cursor.hasNext() ) // stop taking if out of data
.take(batchSize) // until full batch
.toArray() // combine into a single emit
)
.map(docsBatch => {
// do something with the batch
// return docsBatch or modified doscBatch
})
... // other operators?
.subscribe(x => {
...
nextBatch();
});
Ik probeer een test samen te stellen van deze Rx-stroom zonder mongodb, in de tussentijd kan dit je wat ideeën geven.