sql >> Database >  >> NoSQL >> MongoDB

CSV importeren met Mongoose Schema

Je kunt het doen met fast-csv door de headers . te krijgen van de schemadefinitie die de geparseerde regels als "objecten" zal retourneren. Je hebt een aantal mismatches, dus ik heb ze gemarkeerd met correcties:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Zolang het schema daadwerkelijk overeenkomt met de opgegeven CSV, is het goed. Dit zijn de correcties die ik kan zien, maar als u de werkelijke veldnamen anders wilt laten uitlijnen, moet u deze aanpassen. Maar er was eigenlijk een Number op de plaats waar er een String . is en in wezen een extra veld, waarvan ik aanneem dat het de lege is in de CSV.

De algemene dingen zijn het verkrijgen van de reeks veldnamen uit het schema en doorgeven aan de opties bij het maken van de csv-parserinstantie:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Als je dat eenmaal doet, krijg je een "Object" terug in plaats van een array:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Maak je geen zorgen over de "types", want Mongoose cast de waarden volgens het schema.

De rest gebeurt binnen de handler voor de data evenement. Voor maximale efficiëntie gebruiken we insertMany() om slechts één keer per 10.000 regels naar de database te schrijven. Hoe dat daadwerkelijk naar de server en processen gaat, hangt af van de MongoDB-versie, maar 10.000 zou redelijk moeten zijn op basis van het gemiddelde aantal velden dat u zou importeren voor een enkele verzameling in termen van de "trade-off" voor geheugengebruik en het schrijven van een redelijk netwerkverzoek. Maak het getal indien nodig kleiner.

De belangrijkste onderdelen zijn om deze aanroepen te markeren als async functies en await het resultaat van de insertMany() alvorens verder te gaan. We moeten ook pause() de stream en resume() op elk item, anders lopen we het risico de buffer te overschrijven documenten in te voegen voordat ze daadwerkelijk worden verzonden. De pause() en resume() zijn nodig om "tegendruk" op de leiding uit te oefenen, anders blijven items gewoon "naar buiten komen" en de data afvuren evenement.

Uiteraard vereist de controle voor de 10.000 inzendingen dat we dat zowel bij elke iteratie als bij het voltooien van de stream controleren om de buffer leeg te maken en alle resterende documenten naar de server te sturen.

Dat is echt wat u wilt doen, aangezien u zeker niet bij "elke" iteratie via de data een asynchrone aanvraag naar de server wilt afvuren. gebeurtenis of in wezen zonder te wachten tot elk verzoek is voltooid. Je komt er mee weg door dat niet te controleren op "zeer kleine bestanden", maar voor elke echte belasting zul je zeker de call-stack overschrijden vanwege "in-flight" asynchrone oproepen die nog niet zijn voltooid.

Ter info - een package.json gebruikt. De mz is optioneel omdat het slechts een gemoderniseerde Promise . is ingeschakelde bibliotheek van standaard knooppunt "ingebouwde" bibliotheken die ik gewoon ben om te gebruiken. De code is natuurlijk volledig uitwisselbaar met de fs module.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Eigenlijk met Node v8.9.x en hoger kunnen we dit zelfs veel eenvoudiger maken met een implementatie van AsyncIterator via de stream-to-iterator module. Het is nog steeds in Iterator<Promise<T>> modus, maar het zou moeten doen totdat Node v10.x stabiel LTS wordt:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

In principe wordt het hele "gebeurtenis"-afhandelen en pauzeren en hervatten van de stream vervangen door een eenvoudige for lus:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Eenvoudig! Dit wordt opgeschoond in een latere node-implementatie met for..await..of wanneer het stabieler wordt. Maar het bovenstaande werkt prima op de opgegeven versie en hoger.



  1. mongodb tekst zoeken met meerdere velden

  2. Mongo DB vindt alle records met de hoogste waarde, afhankelijk van een sleutelveld

  3. Een overzicht van MongoDB-back-upopties

  4. Serialiseer een klas op twee verschillende manieren met Jackson