sql >> Database >  >> NoSQL >> MongoDB

Realtime gegevensstreaming met MongoDB-wijzigingsstreams

Onlangs heeft MongoDB een nieuwe functie uitgebracht vanaf versie 3.6, Change Streams. Dit geeft u onmiddellijk toegang tot uw gegevens, wat u helpt om op de hoogte te blijven van uw gegevenswijzigingen. In de wereld van vandaag wil iedereen onmiddellijke meldingen in plaats van deze na enkele uren of minuten te krijgen. Voor sommige toepassingen is het van cruciaal belang om voor elke update realtime meldingen naar alle geabonneerde gebruikers te sturen. MongoDB heeft dit proces heel gemakkelijk gemaakt door deze functie te introduceren. In dit artikel zullen we leren over de MongoDB-wijzigingsstroom en zijn toepassingen met enkele voorbeelden.

Wijzigingsstromen definiëren

Wijzigingsstromen zijn niets anders dan de realtime stroom van alle wijzigingen die zich voordoen in de database of verzameling of zelfs in implementaties. Wanneer bijvoorbeeld een update (invoegen, bijwerken of verwijderen) plaatsvindt in een specifieke verzameling, activeert MongoDB een wijzigingsgebeurtenis met alle gegevens die zijn gewijzigd.

U kunt wijzigingsstromen voor elke verzameling definiëren, net als elke andere normale aggregatie-operator met behulp van $changeStream-operator en watch()-methode. U kunt de wijzigingsstroom ook definiëren met de MongoCollection.watch()-methode.

Voorbeeld

db.myCollection.watch()

Streamfuncties wijzigen

  • Wijzigingen filteren

    U kunt de wijzigingen filteren om alleen voor bepaalde gerichte gegevens gebeurtenismeldingen te ontvangen.

    Voorbeeld:

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Deze code zorgt ervoor dat u alleen updates krijgt voor records waarvan de naam gelijk is aan Bob. Op deze manier kunt u elke pijplijn schrijven om de wijzigingsstromen te filteren.

  • Wijzigingsstreams hervatten

    Deze functie zorgt ervoor dat er bij eventuele storingen geen gegevens verloren gaan. Elk antwoord in de stream bevat het hervattingstoken dat kan worden gebruikt om de stream vanaf een specifiek punt opnieuw te starten. Voor sommige frequente netwerkstoringen zal het mongodb-stuurprogramma proberen de verbinding met de abonnees opnieuw tot stand te brengen met behulp van het meest recente hervatstoken. Hoewel, in het geval dat de applicatie volledig mislukt, de hervattingstoken door de clients moet worden onderhouden om de stream te hervatten.

  • Bestelde wijzigingsstreams

    MongoDB gebruikt een globale logische klok om alle change stream-gebeurtenissen over alle replica's en shards van een cluster te ordenen, zodat de ontvanger de meldingen altijd ontvangt in dezelfde volgorde als waarin de opdrachten op de database zijn toegepast.

  • Evenementen met volledige documenten

    MongoDB retourneert standaard het deel van de overeenkomende documenten. Maar u kunt de configuratie van de wijzigingsstroom wijzigen om een ​​volledig document te ontvangen. Geef hiervoor { fullDocument:“updateLookup”} door aan de kijkmethode.
    Voorbeeld:

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Duurzaamheid

    Wijzigingsstromen geven alleen melding voor de gegevens die zijn vastgelegd voor de meeste replica's. Dit zorgt ervoor dat gebeurtenissen worden gegenereerd door gegevens over de persistentie van de meerderheid, waardoor de duurzaamheid van het bericht wordt gegarandeerd.

  • Beveiliging/Toegangscontrole

    Wijzigingsstromen zijn zeer veilig. Gebruikers kunnen alleen wijzigingsstromen maken voor de collecties waarvoor ze leesrechten hebben. U kunt wijzigingsstromen maken op basis van gebruikersrollen.

Multiplenines Word een MongoDB DBA - MongoDB naar productie brengenLeer over wat u moet weten om MongoDB gratis te implementeren, bewaken, beheren en schalen

Voorbeeld van wijzigingsstromen

In dit voorbeeld maken we wijzigingsstromen voor de Aandelencollectie om een ​​melding te krijgen wanneer een aandelenkoers boven een bepaalde drempel komt.

  • Het cluster instellen

    Om wijzigingsstromen te gebruiken, moeten we eerst een replicaset maken. Voer de volgende opdracht uit om een ​​replicaset met één knooppunt te maken.

    mongod --dbpath ./data --replSet “rs”
  • Voeg enkele records toe aan de Stocks-verzameling

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Knooppuntomgeving instellen en afhankelijkheden installeren

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Abonneer u op de wijzigingen

    Maak één index.js-bestand en plaats de volgende code erin.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Voer nu dit bestand uit:

    node index.js
  • Voeg een nieuw record in db in om een ​​update te ontvangen

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Controleer nu uw console, u ontvangt een update van MongoDB.
    Voorbeeldreactie:

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Hier kunt u de waarde van de bewerkingstype-parameter wijzigen met de volgende bewerkingen om te luisteren naar verschillende soorten wijzigingen in een verzameling:

  • Invoegen
  • Vervangen (behalve unieke ID)
  • Bijwerken
  • Verwijderen
  • Ongeldig (wanneer Mongo ongeldige cursor retourneert)

Andere modi voor het wijzigen van streams

U kunt wijzigingsstromen starten voor een database en implementatie op dezelfde manier als voor verzameling. Deze functie is vrijgegeven vanaf MongoDB versie 4.0. Hier zijn de opdrachten om een ​​wijzigingsstroom te openen voor database en implementaties.

Against DB: db.watch()
Against deployment: Mongo.watch()

Conclusie

MongoDB Change Streams vereenvoudigt de integratie tussen frontend en backend op een realtime en naadloze manier. Deze functie kan u helpen om MongoDB te gebruiken voor het pubsub-model, zodat u geen Kafka- of RabbitMQ-implementaties meer hoeft te beheren. Als uw toepassing realtime informatie vereist, moet u deze functie van MongoDB eens bekijken. Ik hoop dat dit bericht je op weg zal helpen met MongoDB-wijzigingsstreams.


  1. Redis pub sub max abonnees en uitgevers

  2. Redis-server stoppen. Noch afsluiten, noch stoppen werkt

  3. Hoe installeer ik een eerdere versie van mongodb met homebrew?

  4. MongoDB GridFS opvragen?