Entwicklung & Code

Core Java: Kontrollierte Aggregation – vom Collector zum Gatherer


Mit der Stream-API besitzt Java seit Version 8 einen eleganten, funktionalen Ansatz zur Verarbeitung von Datenmengen. Die Terminaloperation collect(...) stellt dabei die Brücke vom Stream zu einer zielgerichteten Aggregation dar – sei es in Form von Listen, Maps, Strings oder komplexeren Datenstrukturen. Bis Java 20 war die Verarbeitung durch Collector-Instanzen geregelt, die intern aus einem Supplier, einem Accumulator, einem Combiner und optional einem Finisher bestanden. Dieses Modell funktioniert gut für einfache Akkumulationen, kommt jedoch insbesondere bei komplexen, zustandsbehafteten oder bedingten Aggregationen schnell an seine Grenzen.




Seit 1996 programmiert Sven Java in Industrieprojekten und seit über 15 Jahren weltweit in Branchen wie Automobil, Raumfahrt, Versicherungen, Banken, UN und Weltbank. Seit über 10 Jahren ist er von Amerika bis nach Neuseeland als Speaker auf Konferenzen und Community Events, arbeitete als Developer Advocate für JFrog und Vaadin und schreibt regelmäßig Beiträge für IT-Zeitschriften und Technologieportale.
Neben seinem Hauptthema Core Java beschäftigt er sich mit TDD und Secure Coding Practices.

In Java 21 kommt das neue Interface java.util.stream.Gatherer hinzu, das die Semantik und Kontrolle über den Akkumulationsprozess deutlich erweitert. Während ein Collector passiv Daten sammelt, reagiert ein Gatherer aktiv auf die einfließenden Elemente – vergleichbar mit einem spezialisierten Transducer in funktionalen Programmiersprachen. Gatherer sind insbesondere dort nützlich, wo eine prozedurale oder zustandsorientierte Aggregation notwendig ist, und sie erlauben zusätzlich das Einfügen von Elementen, das Filtern, das Überspringen sowie das explizite Beenden des Sammelvorgangs – und das alles im Rahmen einer funktional komponierbaren Architektur.

Ein Gatherer beschreibt die Transformation eines Stream in ein Ergebnis vom Typ R bei enger Kontrolle über den Akkumulationsprozess. Im Unterschied zum Collector, der in gewisser Weise ein Container für Aggregationslogik ist, erlaubt der Gatherer eine regelbasierte, zustandsabhängige Verarbeitung von Eingaben – inklusive der Möglichkeit, Elemente zu überspringen (Drop), zusätzlich einzufügen (Inject) oder die Verarbeitung vorzeitig zu beenden (FinishEarly).

Dazu basiert ein Gatherer auf der Idee einer Senke, die im Kontext des Stream-Prozessors aufgerufen wird. Diese Senke erhält jedes Eingabeelement, kann darauf reagieren und beeinflusst damit aktiv den Fluss der Verarbeitung. Die eigentliche Verarbeitung wird über eine sogenannte Adapter Factory beschrieben, die die Übergänge zwischen den Zuständen der Aggregation verwaltet.

Während der herkömmliche Collector vor allem als finales Akkumulationswerkzeug dient – also dazu, die im Stream enthaltenen Elemente in eine Zielstruktur wie eine Liste, eine Map oder eine Aggregation zu überführen –, geht der Gatherer konzeptionell weit über diese Rolle hinaus. Er bietet einen eigenständigen Mechanismus, der sowohl semantisch als auch funktional neue Ausdrucksformen für die Stream-Verarbeitung eröffnet und beispielsweise neue, zuvor nicht vorhandene Elemente in den Datenstrom einspeisen kann. Das eröffnet die Möglichkeit, Initialisierungswerte am Beginn eines Streams einzuführen oder Steuerzeichen wie Header und Footer gezielt an den Anfang oder das Ende zu setzen – ohne den ursprünglichen Datenstrom dafür künstlich erweitern zu müssen.

Besonders deutlich wird diese gestalterische Freiheit beim Umgang mit Zuständen (State). Ein Gatherer kann zustandsbehaftet arbeiten und diesen Zustand auch über mehrere Elemente hinweg beeinflussen lassen. Das eröffnet neue semantische Horizonte: Beispielsweise lassen sich Fensteroperationen formulieren, bei denen eine temporale oder sequenzielle Logik angewendet wird – etwa das Aggregieren von Daten bis zu einem bestimmten „Ende“-Marker oder das Zusammenfassen von Elementgruppen, die sich nur durch eine bestimmte Reihenfolge oder inhaltliche Struktur identifizieren lassen.

Auch komplexe Entscheidungsstrukturen, wie sie in mehrstufigen Parsing-Prozessen oder beim Traversieren von Entscheidungsbäumen nötig sind, lassen sich durch zustandsbehaftete Gatherer elegant und deklarativ umsetzen. Dabei bleibt die Schnittstelle weiterhin im Geist der funktionalen Programmierung: Transformation und Aggregation bleiben getrennt beschreibbar, doch der Gatherer erlaubt es, sie auf eine Weise zu verbinden, die bislang nur durch imperative oder schwer wartbare Stream-Hacks realisierbar war.

Ein weiterer Vorteil liegt im kontrollierten Einfluss vergangener Elemente auf das aktuelle Verhalten. So kann ein Gatherer die Entscheidung treffen, ein Element zu verwerfen, weil ein vorheriges Element einen bestimmten Kontext gesetzt hat. Diese Fähigkeit zur Kontextsensitivität ist in Situationen relevant, in denen Datenströme strukturell „nicht sauber“ sind – also nicht der Definition entsprechen oder Fehler enthalten. Das betrifft etwa Protokolldateien, inkonsistente Datenexporte oder die Analyse natürlicher Sprache.

Stellen wir uns vor, wir möchten aus einem Stream von Strings nur diejenigen Elemente sammeln, die eine bestimmte Eigenschaft besitzen, und diese dann gruppieren – beispielsweise alle Wörter, die länger als fünf Zeichen sind, gruppiert nach ihrem Anfangsbuchstaben. Diese Anforderung lässt sich mit einem Collector zwar formulieren, benötigt jedoch eine Kombination aus Vorverarbeitung (zum Beispiel filter(...)) und nachgelagerter Gruppierung. Mit einem Gatherer hingegen lässt sich dieser kombinierte Prozess elegant, zustandsvoll und in einem Schritt abbilden:


Gatherer>> gatherer =
    Gatherer.ofSequential(
        () -> new HashMap>(),
        (map, element, downstream) -> {
            if (element.length() > 5) {
                char key = element.charAt(0);
                map.computeIfAbsent(
key, 
k -> new ArrayList<>()).add(element);
            }
            return true;
        }
    );


In diesem Beispiel wird für jedes Element entschieden, ob es in das Ergebnis einfließt. Die Logik ist unmittelbar in den Gatherer eingebettet. Der Rückgabewert true signalisiert, dass die Verarbeitung fortgesetzt werden soll. Würde man an dieser Stelle stattdessen false zurückgeben, würde der Stream vorzeitig beendet – ein Verhalten, das mit herkömmlichen Collectors so nicht erreichbar ist.

Das Interface Gatherer unterscheidet dafür explizit zwischen sequenzieller und paralleler Verarbeitung. Die zentrale Unterscheidung ergibt sich durch die Factory-Methoden:

Gatherer.ofSequential(...) // Nur sequenziell nutzbar

Gatherer.ofConcurrent(...) // Für parallele Streams geeignet

Ein mit ofConcurrent(...) erzeugter Gatherer darf in parallelen Streams verwendet werden, muss jedoch bestimmte Anforderungen erfüllen: Er muss thread-safe sein oder auf thread-isolierten Akkumulatoren beruhen. Dies entspricht in etwa der Logik bei parallelen Collectoren, bei denen die interne Zustandsverwaltung eine gleichzeitige Verarbeitung verschiedener Elemente in unabhängigen Threads erlaubt.



Source link

Leave a Reply

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Beliebt

Die mobile Version verlassen