Hey, MongoDB-Benutzer da draußen. Heute möchte ich Euch gaaanz kurz zeigen, dass das neue Aggregation Framework von MongoDB, das mit der Version 2.2.1 stabil wurde, durchaus brauchbar ist.
Wer viel misst…
Ohne große Worte ein Beispiel aus der relationalen Welt:
select log_level, count(*) as log_level_count from log_entry group by log_level
Zählen mit SQL ist kinderleicht. Mit MongoDB musste man bislang auf MapReduce-Algorithmen setzen, und das könnte z.B. so aussehen:
db.logEntries.mapReduce( function() {emit(this.logLevel, {logLevelCount: 1});}, function(k, vs) { var r = {logLevelCount: 0}; vs.forEach(function(v) {r.logLevelCount += v.logLevelCount;}); return r; }, {"out": {"inline": 1}} )
Gut, Soll erfüllt. Zwar ist der MapReduce-Code “lauter” als die SQL-Abfrage, aber dafür ist dieses Feature auch sehr mächtig und spielt erst in verteilten Systemen seinen Trumpf aus – die leichte Parallelisierbarkeit.
Mit dem Aggregation Framework sieht dieselbe Abfragesemantik so aus:
db.logEntries.aggregate({"$group": {"_id": "$logLevel", "logLevelCount": {"$sum": 1}}})
Diese Abfrage ist deutlich lesbarer (oder nicht?!). Aber natürlich muss man sich an dieser Stelle die Frage stellen, warum das Aggregation Framework überhaupt eingeführt wurde, wenn man doch per MapReduce zum gleichen Ergebnis kommt. Die Antwort ist simpel: Performance.
MapReduce in MongoDB ist teuer. Das ist darauf zurückzuführen, dass solche Algorithmen in JavaScript geschrieben werden. Eine Script-Evaluierung für jeden zu verarbeitenden Datensatz ist eine schlechte Abfrage und eine gute Heizung. Aggregation Framework-Abfragen hingegen kommen ohne JavaScript aus und das wirkt sich (ein kleines bisschen) positiv auf die Performance aus.
Zum Vergleich
In einem Test habe ich aus einem Pool von Log-Levels („trace“, „debug“, „info“, „warn“, „error“ und „fatal“) 800.000 Datensätze in eine MongoDB-Collection geschrieben. Die Collection ist in acht Shards unterteilt und kann auf meinem Notebook echt-parallel MapReduce abgefragt werden (danke, großer Micro-Prozessor-Konzern, für den ich keine Schleichwerbung mache).
Ein Datensatz sieht in etwa so aus:
{"_id": ObjectId("50a9f5b6e8d4f7906f71aee1"), "shardAnchor": 1, "logLevel": "fatal"}
Bis auf das „shardAnchor“-Feld ist kein Schnickschnack enthalten (das „shardAnchor“-Feld dient für diesen Test einfach nur dazu, die Chunks gleichmäßig auf die Shards zu verteilen).
Wenn wir jetzt die oben stehenden Abfragen abfeuern, bekommen wir folgendes Ergebnis (ich habe den Test in Java implementiert und den Treiber org.mongodb mongo-java-driver-2.9.3 verwendet):
Found 800000 records; not removing or creating anything Invoking MapReduce MapReduce took 5870 milliseconds MapReduce result: debug: 133663.0 error: 133230.0 fatal: 132868.0 info: 133244.0 trace: 133563.0 warn: 133432.0 Invoking aggregation Aggregation took 452 milliseconds Aggregation result: warn: 133432 debug: 133663 fatal: 132868 error: 133230 info: 133244 trace: 133563
Die Zahlen sprechen für sich: 5,87 Sekunden vs. 0,452. Frage: Wer zählt noch mit MapReduce? Quizfrage: Wer wird auch in Zukunft mit MapReduce zählen?
Ach ja, für die ganz nerdierigen unter uns ist hier der Code des Tests:
package noorg; import java.util.LinkedList; import java.util.List; import java.util.Random; import com.mongodb.AggregationOutput; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MapReduceCommand.OutputType; import com.mongodb.MapReduceOutput; import com.mongodb.Mongo; public class MongoDbTest { public static void main(String[] args) throws Exception { Mongo m = new Mongo(); DB db = m.getDB("test"); DBCollection c = db.getCollection("logEntries"); c.ensureIndex(new BasicDBObject("logLevel", Integer.valueOf(1))); final int chunks = 100, chunkSize = 8000, totalRecordCount = chunks * chunkSize; long start; DBCursor cursor = c.find(); if (c.count() != totalRecordCount) { System.out.println("Collection " + c.getName() + " record count is not " + totalRecordCount + "; Removing and re-creating everything"); System.out.println("Removed " + c.remove(new BasicDBObject(0)).getN() + " records from collection " + c.getName()); String[] logLevels = new String[] { "trace", "debug", "info", "warn", "error", "fatal" }; Random r = new Random(); int shardAnchor = -1; start = System.currentTimeMillis(); for (int i = 0; i < chunks; ++i) { List<DBObject> os = new LinkedList<>(); for (int y = 0; y < chunkSize; ++y) { DBObject o = new BasicDBObject(2) .append("shardAnchor", Integer.valueOf(++shardAnchor)) .append("logLevel", logLevels[r.nextInt(logLevels.length)]) ; os.add(o); if (shardAnchor >= 7) shardAnchor = -1; } c.insert(os); } System.out.println("Created " + totalRecordCount + " records in " + (System.currentTimeMillis() - start) + " milliseconds"); } else { System.out.println("Found " + totalRecordCount + " records; not removing or creating anything"); } cursor.close(); System.out.println("Invoking MapReduce"); start = System.currentTimeMillis(); MapReduceOutput mrOutput = c.mapReduce( "function() {emit(this.logLevel, {logLevelCount: 1});}", "function(k, vs) {var r = {logLevelCount: 0}; vs.forEach(function(v) {r.logLevelCount += v.logLevelCount;}); return r;}", null, OutputType.INLINE, null ); System.out.println("MapReduce took " + (System.currentTimeMillis() - start) + " milliseconds"); System.out.println("MapReduce result:"); for (DBObject o : mrOutput.results()) { System.out.println("t" + o.get("_id") + ": " + ((DBObject) o.get("value")).get("logLevelCount")); } System.out.println("Invoking aggregation"); start = System.currentTimeMillis(); AggregationOutput aOutput = c.aggregate( new BasicDBObject( "$group", new BasicDBObject() .append("_id", "$logLevel") .append("logLevelCount", new BasicDBObject("$sum", Integer.valueOf(1))) ) ); System.out.println("Aggregation took " + (System.currentTimeMillis() - start) + " milliseconds"); System.out.println("Aggregation result:"); for (DBObject o : aOutput.results()) { System.out.println("t" + o.get("_id") + ": " + o.get("logLevelCount")); } } }
Fazit
Das Aggregation Framework finde ich toll. Es ist extrem mächtig (siehe Referenz) und flexibel, da es als Pipeline realisiert wurde (mehrere Aggregationen in Reihe). So lassen sich Daten leicht und vor allem schnell verdichten.