sql >> Database >  >> NoSQL >> MongoDB

Sla een zeer grote CSV op in mongoDB met behulp van mangoest

Welkom bij streamen. Wat je echt wilt, is een "gebeurtenisstroom" die je invoer "één brok tegelijk" verwerkt, en natuurlijk idealiter door een gemeenschappelijk scheidingsteken zoals het "nieuwe regel"-teken dat je momenteel gebruikt.

Voor echt efficiënte dingen kun je het gebruik van MongoDB "Bulk API" inserts om het laden zo snel mogelijk te laten verlopen zonder al het machinegeheugen of CPU-cycli op te vreten.

Ik pleit er niet voor, want er zijn verschillende oplossingen beschikbaar, maar hier is een lijst die gebruikmaakt van de line- input-stream pakket om het gedeelte "line terminator" eenvoudig te maken.

Schemadefinities alleen door "voorbeeld":

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

var entrySchema = new Schema({},{ strict: false })

var Entry = mongoose.model( "Schema", entrySchema );

var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));

stream.setDelimiter("\n");

mongoose.connection.on("open",function(err,conn) { 

    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;

    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });

    stream.on("line",function(line) {

        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation

                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.

                    counter++;

                    if ( counter % 1000 == 0 ) {
                        stream.pause();
                        bulk.execute(function(err,result) {
                            if (err) callback(err);
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            stream.resume();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );

    });

    stream.on("end",function() {

        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });

});

Dus over het algemeen breekt de "stream"-interface daar "de invoer af" om "regel voor regel" te verwerken. Dat weerhoudt je ervan om alles in één keer te laden.

De belangrijkste onderdelen zijn de "Bulk Operations API" van MongoDB. Hierdoor kunt u veel bewerkingen tegelijk "in de wachtrij plaatsen" voordat u daadwerkelijk naar de server verzendt. Dus in dit geval met het gebruik van een "modulo", worden schrijfacties alleen verzonden per 1000 verwerkte ingangen. Je kunt echt alles doen tot de 16 MB BSON-limiet, maar houd het beheersbaar.

Naast de bewerkingen die in bulk worden verwerkt, is er een extra "limiter" van de async bibliotheek. Het is niet echt vereist, maar dit zorgt ervoor dat in wezen niet meer dan de "modulo-limiet" van documenten op enig moment in verwerking is. De algemene batch-"inserts" hebben geen andere IO-kosten dan geheugen, maar de "execute" -aanroepen betekenen dat IO wordt verwerkt. Dus we wachten in plaats van meer dingen in de rij te zetten.

Er zijn zeker betere oplossingen die u kunt vinden voor het "streamen" van CSV-type gegevens, wat dit lijkt te zijn. Maar over het algemeen geeft dit u de concepten om dit op een geheugenefficiënte manier te doen zonder ook CPU-cycli te verspillen.



  1. MongoDB-aggregatie:dubbele lookup en lookup-reactie samenvoegen met respectief object

  2. Datums en tijdstempels invoegen en ophalen in mongodb met behulp van PHP

  3. Combinatie van unieke mangoest-nodejs met meerdere kolommen

  4. Bursts van RedisTimeoutException met behulp van StackExchange.Redis