sql >> Database >  >> NoSQL >> MongoDB

MongoDB-prestaties:MongoDB Map-Reduce-bewerkingen uitvoeren op secundairen

Map-reduce is misschien wel de meest veelzijdige van de aggregatiebewerkingen die MongoDB ondersteunt.

Map-Reduce is een populair programmeermodel dat is ontstaan ​​bij Google voor het parallel verwerken en samenvoegen van grote hoeveelheden gegevens. Een gedetailleerde bespreking van Map-Reduce valt buiten het bestek van dit artikel, maar in wezen is het een aggregatieproces met meerdere stappen. De belangrijkste twee stappen zijn de kaartfase (verwerk elk document en verzend de resultaten) en de verkleiningsfase (verzamelt de resultaten die tijdens de kaartfase worden verzonden).

MongoDB ondersteunt drie soorten aggregatiebewerkingen:Map-Reduce, aggregatiepijplijn en aggregatieopdrachten voor één doel. U kunt dit MongoDB-vergelijkingsdocument gebruiken om te zien welke aan uw behoeften voldoet.https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/

In mijn laatste bericht zagen we, met voorbeelden, hoe aggregatiepijplijnen op secondaries kunnen worden uitgevoerd. In dit bericht zullen we door het uitvoeren van Map-Reduce-taken lopen op de MongoDB secundaire replica's.

MongoDB Map-Reduce

MongoDB ondersteunt het uitvoeren van Map-Reduce-taken op de databaseservers. Dit biedt de flexibiliteit om complexe aggregatietaken te schrijven die niet zo eenvoudig via aggregatiepijplijnen kunnen worden uitgevoerd. Met MongoDB kunt u een aangepaste kaart schrijven en functies in Javascript verminderen die via Mongo-shell of een andere client aan de database kunnen worden doorgegeven. Op grote en constant groeiende datasets kan men zelfs overwegen om incrementele Map-Reduce-taken uit te voeren om te voorkomen dat steeds oudere gegevens worden verwerkt.

Historisch gezien werden de kaart en de reductiemethoden vroeger uitgevoerd in een context met één thread. Die beperking is echter verwijderd in versie 2.4.

Waarom Map-Reduce-taken uitvoeren op de secundaire?

Net als andere aggregatietaken, is Map-Reduce ook een resource-intensieve 'batch'-taak, dus het is goed geschikt voor uitvoering op alleen-lezen replica's. De waarschuwingen daarbij zijn:

1) Het zou goed moeten zijn om enigszins verouderde gegevens te gebruiken. Of u kunt het schrijfprobleem aanpassen om ervoor te zorgen dat replica's altijd synchroon lopen met de primaire. Deze tweede optie gaat ervan uit dat een treffer op de schrijfprestaties acceptabel is.

2) De uitvoer van de Map-Reduce-taak mag niet naar een andere verzameling in de database worden geschreven, maar moet worden teruggestuurd naar de toepassing (d.w.z. er wordt niet naar de database geschreven).

Laten we eens kijken hoe we dit kunnen doen aan de hand van voorbeelden, zowel vanuit de mongo-shell als de Java-driver.

Map-Reduce op replicasets

Gegevensset

Ter illustratie gebruiken we een vrij eenvoudige dataset:een dagelijkse transactierecorddump van een detailhandelaar. Een voorbeelditem ziet er als volgt uit:

RS-replica-0:PRIMARY> use test
switched to db test
RS-replica-0:PRIMARY> show tables
txns
RS-replica-0:PRIMARY> db.txns.findOne()
{
    "_id" : ObjectId("584a3b71cdc1cb061957289b"),
    "custid" : "cust_66",
    "txnval" : 100,
    "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...],
...
}

In onze voorbeelden berekenen we de totale uitgaven van een bepaalde klant op die dag. Dus, gezien ons schema, zullen de kaart- en reductiemethoden er als volgt uitzien:

var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record
var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid

Laten we, nu ons schema is vastgesteld, eens kijken naar Map-Reduce in actie.

MongoDB Shell

Om ervoor te zorgen dat een Map-Reduce-taak op de secundaire wordt uitgevoerd, moet de leesvoorkeur worden ingesteld op secundair . Zoals we hierboven al zeiden, om een ​​Map-Reduce op een secundaire te laten werken, moet de uitvoer van het resultaat inline zijn (In feite is dat de enige uit-waarde die is toegestaan ​​op secundairen). Laten we eens kijken hoe het werkt.

$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017
MongoDB shell version: 3.2.10
connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test
2016-12-09T08:15:19.347+0000 I NETWORK  [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017
2016-12-09T08:15:19.349+0000 I NETWORK  [ReplicaSetMonitorWatcher] starting
RS-replica-0:PRIMARY> db.setSlaveOk()
RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary')
RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode()
secondary
RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); }
RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); }
RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }})
{
    "results" : [
        {
            "_id" : "cust_0",
            "value" : 72734
        },
        {
            "_id" : "cust_1",
            "value" : 67737
        },
...
    ]
    "timeMillis" : 215,
    "counts" : {
        "input" : 10000,
        "emit" : 10000,
        "reduce" : 909,
        "output" : 101
    },
    "ok" : 1

}

Een kijkje in de logboeken op de secundaire bevestigt dat de taak inderdaad op de secundaire is uitgevoerd.

...
2016-12-09T08:17:24.842+0000 D COMMAND  [conn344] mr ns: test.txns
2016-12-09T08:17:24.843+0000 I COMMAND  [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:17:24.865+0000 I COMMAND  [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:17:25.063+0000 I COMMAND  [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms
...

Java

Laten we nu proberen een Map-Reduce-taak uit te voeren op de leesreplica's van een Java-toepassing. Op het MongoDB Java-stuurprogramma is het instellen van de leesvoorkeur voldoende. De uitvoer is standaard inline, dus er hoeven geen extra parameters te worden doorgegeven. Hier is een voorbeeld met driverversie 3.2.2:

public class MapReduceExample {

    private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0";
    private static final String COL_NAME = "txns";
    private static final String DEF_DB = "test";

    public MapReduceExample() { }

    public static void main(String[] args) {
        MapReduceExample writer = new MapReduceExample();
        writer.mapReduce();
    }

    public static final String mapfunction = "function() { emit(this.custid, this.txnval); }";
    public static final String reducefunction = "function(key, values) { return Array.sum(values); }";

    private void mapReduce() {
        printer("Initializing...");
        Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
        MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
        MongoClient client = new MongoClient(uri);
        MongoDatabase database = client.getDatabase(DEF_DB);
        MongoCollection collection = database.getCollection(COL_NAME);
        MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default
        MongoCursor cursor = iterable.iterator();
        while (cursor.hasNext()) {
           Document result = cursor.next();
           printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value"));
        }
        printer("Done...");
    }
...
}

Zoals blijkt uit de logboeken, liep de taak op de secundaire:

...
2016-12-09T08:32:31.419+0000 D COMMAND  [conn371] mr ns: test.txns
2016-12-09T08:32:31.420+0000 I COMMAND  [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:32:31.444+0000 I COMMAND  [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:32:31.890+0000 I COMMAND  [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms
...

MongoDB Map-Reduce op Sharded-clusters

MongoDB ondersteunt Map-Reduce op Sharded-clusters, zowel wanneer een Sharded-verzameling de invoer is als wanneer het de uitvoer is van een Map-Reduce-taak. MongoDB biedt momenteel echter geen ondersteuning voor het uitvoeren van kaartverkleinende taken op secundaire onderdelen van een shard-cluster. Dus zelfs als de out optie is ingesteld op inline , worden Map-Reduce-taken altijd uitgevoerd op de primaire taken van een shard-cluster. Dit probleem wordt gevolgd via deze JIRA-bug.

De syntaxis voor het uitvoeren van een Map-Reduce-taak op een Shard-cluster is dezelfde als die op een replicaset. Dus de voorbeelden in de bovenstaande sectie zijn geldig. Als het bovenstaande Java-voorbeeld wordt uitgevoerd op een shard-cluster, verschijnen er logberichten op de primaire gegevens die aangeven dat de opdracht daar is uitgevoerd.

...
2016-11-24T08:46:30.828+0000 I COMMAND  [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in
line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS
tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing:
 { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha
rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu
t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu
ireCount: { r: 1 } } } protocol:op_command 115ms
2016-11-24T08:46:30.830+0000 I COMMAND  [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0
...

Bezoek onze MongoDB-productpagina voor meer informatie over onze uitgebreide lijst met functies.


  1. Collecties maken, tonen en neerzetten in MongoDB

  2. Implementeer ik het serialiseren en deserialiseren van NodesJS + Passport + RedisStore?

  3. Hoe de ClusterControl-server te beveiligen

  4. Robuuste berichtserialisatie in Apache Kafka met behulp van Apache Avro, deel 1