Blog

MongoDB – Aggregation leicht gemacht

Hey, MongoDB-Benutzer da draußen. Heute möchte ich Euch gaaanz kurz zeigen, dass das neue Aggregation Framework von MongoDB, das mit der Version 2.2.1 stabil wurde, durchaus brauchbar ist.

Wer viel misst…

Ohne große Worte ein Beispiel aus der relationalen Welt:

select log_level, count(*) as log_level_count from log_entry group by log_level

Zählen mit SQL ist kinderleicht. Mit MongoDB musste man bislang auf MapReduce-Algorithmen setzen, und das könnte z.B. so aussehen:

db.logEntries.mapReduce(
	function() {emit(this.logLevel, {logLevelCount: 1});},
	function(k, vs) {
		var r = {logLevelCount: 0};
		vs.forEach(function(v) {r.logLevelCount += v.logLevelCount;});
		return r;
	},
	{"out": {"inline": 1}}
)

Gut, Soll erfüllt. Zwar ist der MapReduce-Code “lauter” als die SQL-Abfrage, aber dafür ist dieses Feature auch sehr mächtig und spielt erst in verteilten Systemen seinen Trumpf aus – die leichte Parallelisierbarkeit.

Mit dem Aggregation Framework sieht dieselbe Abfragesemantik so aus:

db.logEntries.aggregate({"$group": {"_id": "$logLevel", "logLevelCount": {"$sum": 1}}})

Diese Abfrage ist deutlich lesbarer (oder nicht?!). Aber natürlich muss man sich an dieser Stelle die Frage stellen, warum das Aggregation Framework überhaupt eingeführt wurde, wenn man doch per MapReduce zum gleichen Ergebnis kommt. Die Antwort ist simpel: Performance.

MapReduce in MongoDB ist teuer. Das ist darauf zurückzuführen, dass solche Algorithmen in JavaScript geschrieben werden. Eine Script-Evaluierung für jeden zu verarbeitenden Datensatz ist eine schlechte Abfrage und eine gute Heizung. Aggregation Framework-Abfragen hingegen kommen ohne JavaScript aus und das wirkt sich (ein kleines bisschen) positiv auf die Performance aus.

Zum Vergleich

In einem Test habe ich aus einem Pool von Log-Levels („trace“, „debug“, „info“, „warn“, „error“ und „fatal“) 800.000 Datensätze in eine MongoDB-Collection geschrieben. Die Collection ist in acht Shards unterteilt und kann auf meinem Notebook echt-parallel MapReduce abgefragt werden (danke, großer Micro-Prozessor-Konzern, für den ich keine Schleichwerbung mache).

Ein Datensatz sieht in etwa so aus:

{"_id": ObjectId("50a9f5b6e8d4f7906f71aee1"), "shardAnchor": 1, "logLevel": "fatal"}

Bis auf das „shardAnchor“-Feld ist kein Schnickschnack enthalten (das „shardAnchor“-Feld dient für diesen Test einfach nur dazu, die Chunks gleichmäßig auf die Shards zu verteilen).

Wenn wir jetzt die oben stehenden Abfragen abfeuern, bekommen wir folgendes Ergebnis (ich habe den Test in Java implementiert und den Treiber org.mongodb mongo-java-driver-2.9.3 verwendet):

Found 800000 records; not removing or creating anything
Invoking MapReduce
MapReduce took 5870 milliseconds
MapReduce result:
	debug: 133663.0
	error: 133230.0
	fatal: 132868.0
	info: 133244.0
	trace: 133563.0
	warn: 133432.0
Invoking aggregation
Aggregation took 452 milliseconds
Aggregation result:
	warn: 133432
	debug: 133663
	fatal: 132868
	error: 133230
	info: 133244
	trace: 133563

Die Zahlen sprechen für sich: 5,87 Sekunden vs. 0,452. Frage: Wer zählt noch mit MapReduce? Quizfrage: Wer wird auch in Zukunft mit MapReduce zählen?

Ach ja, für die ganz nerdierigen unter uns ist hier der Code des Tests:

package noorg;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;

import com.mongodb.AggregationOutput;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MapReduceCommand.OutputType;
import com.mongodb.MapReduceOutput;
import com.mongodb.Mongo;

public class MongoDbTest {

	public static void main(String[] args) throws Exception {
		Mongo m = new Mongo();
		DB db = m.getDB("test");
		DBCollection c = db.getCollection("logEntries");
		c.ensureIndex(new BasicDBObject("logLevel", Integer.valueOf(1)));

		final int chunks = 100, chunkSize = 8000, totalRecordCount = chunks * chunkSize;
		long start;

		DBCursor cursor = c.find();
		if (c.count() != totalRecordCount) {
			System.out.println("Collection " + c.getName() + " record count is not " + totalRecordCount + "; Removing and re-creating everything");
			System.out.println("Removed " + c.remove(new BasicDBObject(0)).getN() + " records from collection " + c.getName());

			String[] logLevels = new String[] {
				"trace",
				"debug",
				"info",
				"warn",
				"error",
				"fatal"
			};
			Random r = new Random();

			int shardAnchor = -1;
			start = System.currentTimeMillis();
			for (int i = 0; i < chunks; ++i) {
				List<DBObject> os = new LinkedList<>();
				for (int y = 0; y < chunkSize; ++y) {
					DBObject o = new BasicDBObject(2)
						.append("shardAnchor", Integer.valueOf(++shardAnchor))
						.append("logLevel", logLevels[r.nextInt(logLevels.length)])
					;
					os.add(o);
					if (shardAnchor >= 7)
						shardAnchor = -1;
				}
				c.insert(os);
			}
			System.out.println("Created " + totalRecordCount + " records in " + (System.currentTimeMillis() - start) + " milliseconds");
		} else {
			System.out.println("Found " + totalRecordCount + " records; not removing or creating anything");
		}
		cursor.close();

		System.out.println("Invoking MapReduce");
		start = System.currentTimeMillis();
		MapReduceOutput mrOutput = c.mapReduce(
			"function() {emit(this.logLevel, {logLevelCount: 1});}",
			"function(k, vs) {var r = {logLevelCount: 0}; vs.forEach(function(v) {r.logLevelCount += v.logLevelCount;}); return r;}",
			null,
			OutputType.INLINE,
			null
		);
		System.out.println("MapReduce took " + (System.currentTimeMillis() - start) + " milliseconds");
		System.out.println("MapReduce result:");
		for (DBObject o : mrOutput.results()) {
			System.out.println("t" + o.get("_id") + ": " + ((DBObject) o.get("value")).get("logLevelCount"));
		}

		System.out.println("Invoking aggregation");
		start = System.currentTimeMillis();
		AggregationOutput aOutput = c.aggregate(
			new BasicDBObject(
				"$group", new BasicDBObject()
					.append("_id", "$logLevel")
					.append("logLevelCount", new BasicDBObject("$sum", Integer.valueOf(1)))
			)
		);
		System.out.println("Aggregation took " + (System.currentTimeMillis() - start) + " milliseconds");
		System.out.println("Aggregation result:");
		for (DBObject o : aOutput.results()) {
			System.out.println("t" + o.get("_id") + ": " + o.get("logLevelCount"));
		}
	}
}

Fazit

Das Aggregation Framework finde ich toll. Es ist extrem mächtig (siehe Referenz) und flexibel, da es als Pipeline realisiert wurde (mehrere Aggregationen in Reihe). So lassen sich Daten leicht und vor allem schnell verdichten.

Holisticon AG — Teile diesen Artikel

Über den Autor

Antwort hinterlassen