🎯 OBJECTIF
Comprendre comment :
🧠 MODÈLE MENTAL
Une topologie Kafka Streams est une chaîne d'opérations. Chaque opération reçoit un stream de messages en entrée, fait quelque chose, et produit un stream de messages en sortie. C'est comme une chaîne de montage : chaque poste fait une tâche précise, et le résultat passe au poste suivant.
Il y a deux grandes familles d'opérations. Les opérations sans état (stateless) traitent chaque message de façon indépendante — filtrer, transformer, éclater. Elles ne se souviennent de rien entre deux messages et sont très rapides. Les opérations avec état (stateful) ont besoin de se souvenir des messages précédents pour fonctionner — compter, additionner, calculer une moyenne. Elles lisent et écrivent dans un state store local (une mini base de données embarquée dans l'application).
La règle la plus importante de cette note : si on change la clé d'un message, Kafka Streams doit redistribuer les messages avant de pouvoir faire une opération avec état. Cette redistribution s'appelle un repartitionnement — on en explique le pourquoi en détail dans la section dédiée.
Prérequis : kafka-streams-1-fondamentaux — topologie, KStream, KTable, parallélisme.
Avant de rentrer dans le détail, voici la carte des opérations disponibles. Les opérations sans état travaillent directement sur le stream et retournent un stream transformé. Les opérations avec état passent d'abord par un groupement, puis calculent un résultat qui est stocké dans une KTable.
flowchart LR
KS["KStream"] --> SL["Opérations sans état<br/>filter, map, flatMap<br/>selectKey, peek, branch, merge"]
KS --> SF["Opérations avec état<br/>groupByKey / groupBy<br/>count / reduce / aggregate"]
SL --> KS2["KStream transformé"]
SF --> KT["KTable résultat"]mermaidCes opérations traitent chaque message indépendamment. Elles ne lisent rien en dehors du message courant et ne gardent aucun état entre deux appels.
filter est l'opération la plus simple. On lui donne une condition, et elle ne laisse passer que les messages pour lesquels la condition est vraie. Les messages qui ne passent pas sont ignorés silencieusement — leur position dans le topic (offset) avance quand même, mais aucun traitement n'est effectué.
filterNot fait l'inverse : il laisse passer les messages pour lesquels la condition est fausse.
Dans nos exemples, le flux orders est clé par customerId (l'identifiant du client) — c'est ce qu'on utilise comme clé de message.
KStream<String, OrderPlaced> stream = builder.stream("orders"); // source : flux des commandes (clé = customerId)
KStream<String, OrderPlaced> nonZero = stream
.filter((customerId, order) -> order.getAmount() != 0); // garde les commandes à montant non nul
KStream<String, OrderPlaced> withoutEmpty = stream
.filterNot((customerId, order) -> order.getAmount() == 0); // l'inverse : élimine les montants nulsjavamapValues transforme la valeur d'un message en gardant la clé intacte. C'est l'opération de transformation la plus courante — on prend un objet, on en fait un autre. Comme la clé ne change pas, Kafka Streams n'a pas besoin de redistribuer les messages.
map permet de changer à la fois la clé et la valeur. C'est plus puissant mais plus coûteux : changer la clé oblige Kafka Streams à redistribuer les messages avant toute opération avec état. On explique pourquoi dans la section sur le repartitionnement.
KStream<String, OrderEnriched> enriched = stream
.mapValues(order -> OrderEnriched.newBuilder() // transforme la valeur, clé inchangée → pas de repartition
.setOrderId(order.getOrderId())
.setAmount(order.getAmount())
.setProcessedAt(Instant.now().toEpochMilli())
.build());
KStream<String, OrderPlaced> byCountry = stream
.map((customerId, order) -> KeyValue.pair(order.getCountry(), order)); // change la clé (pays) → repartition avant agrégationjava🔑 Préférer mapValues quand on n'a pas besoin de changer la clé
mapValues est toujours plus efficace que map quand la clé n'a pas besoin de changer. La redistribution déclenchée par map a un coût réel : les messages font un aller-retour par un topic Kafka interne avant de continuer. Si on peut l'éviter, on l'évite.
Parfois un message d'entrée doit produire plusieurs messages de sortie. Par exemple, si une commande contient plusieurs lignes (plusieurs produits), on veut un message par ligne.
flatMapValues fait ça sans changer la clé. flatMap peut changer la clé en même temps, au même coût que map.
KStream<String, OrderLine> byLine = stream
.flatMapValues(order -> order.getLines().stream() // un message → plusieurs (une ligne de commande chacun)
.map(line -> OrderLine.newBuilder() // construit une OrderLine par ligne
.setOrderId(order.getOrderId())
.setProductId(line.getProductId())
.setAmount(line.getAmount())
.build())
.collect(Collectors.toList()) // collecte la liste des messages produits
);javaSi une commande a 3 lignes, ce code produit 3 messages en sortie pour chaque message en entrée. Si une commande n'a aucune ligne, il produit 0 messages — le message disparaît de la suite de la topologie.
selectKey change la clé sans toucher à la valeur. C'est l'équivalent d'un map qui ignore la valeur. Comme tout changement de clé, il déclenche une redistribution avant la prochaine opération avec état.
KStream<String, OrderPlaced> byCountry = stream
.selectKey((customerId, order) -> order.getCountry()); // change uniquement la clé (→ pays)
KGroupedStream<String, OrderPlaced> grouped = byCountry.groupByKey(); // groupe par la clé courante, pas de repartition supplémentairejavapeek applique une action sur chaque message sans modifier le stream. Il retourne exactement le même stream qu'il a reçu. C'est utile pour ajouter des logs ou des métriques sans polluer la logique de transformation.
KStream<String, OrderPlaced> withLogging = stream
.peek((customerId, order) -> // observe chaque message sans le modifier
log.debug("Traitement customerId={} pays={} montant={}", // log de debug, ne change rien au flux
customerId, order.getCountry(), order.getAmount())
);
// withLogging contient exactement les mêmes messages que streamjava🚨 peek peut être appelé plusieurs fois pour le même message
En cas de redémarrage ou de retraitement, Kafka Streams peut rejouer des messages. peek sera alors appelé plusieurs fois pour le même message. C'est sans conséquence pour du logging, mais ne jamais y mettre des opérations qui ne doivent se produire qu'une fois (appel HTTP, insertion en base).
Parfois on veut router les messages vers des destinations différentes selon leur contenu. split (l'API moderne depuis Kafka Streams 2.8) divise un stream en plusieurs sous-streams selon des conditions.
Chaque message est évalué contre les conditions dans l'ordre. Il est routé vers le premier sous-stream dont la condition est vraie. Les messages qui ne correspondent à aucune condition vont dans le sous-stream par défaut si on en a déclaré un.
Map<String, KStream<String, OrderPlaced>> branches = stream
.split(Named.as("amount-")) // découpe le stream en sous-flux (préfixe "amount-")
.branch( // 1ère règle (évaluée en premier)
(customerId, o) -> o.getAmount() > 100, // commandes > 100 €
Branched.as("high") // → sous-flux "high"
)
.branch( // 2ème règle
(customerId, o) -> o.getAmount() > 0, // commandes entre 1 et 100 €
Branched.as("normal") // → sous-flux "normal"
)
.defaultBranch(Branched.as("refund")); // tout le reste (montant ≤ 0) → "refund"
KStream<String, OrderPlaced> highValue = branches.get("amount-high"); // récupère le sous-flux high
KStream<String, OrderPlaced> normal = branches.get("amount-normal"); // récupère le sous-flux normal
KStream<String, OrderPlaced> refund = branches.get("amount-refund"); // récupère le sous-flux refundjava🚨 L'ordre des conditions est important
Un message qui satisfait la première condition n'est jamais évalué par les suivantes. Ici, une commande avec amount = 150 va dans high — même si elle satisfait aussi la condition > 0 de normal. Toujours mettre les conditions les plus spécifiques en premier.
merge fait l'inverse de split : il fusionne deux streams distincts en un seul. Utile quand on a plusieurs topics source qui contiennent des données de même nature.
KStream<String, OrderPlaced> web = builder.stream("orders-web"); // canal web
KStream<String, OrderPlaced> mobile = builder.stream("orders-mobile"); // canal mobile
KStream<String, OrderPlaced> allOrders = web.merge(mobile); // fusionne les deux canaux en un seul fluxjavaPars d'une contrainte physique : un thread ne voit que ses propres partitions. Le thread qui traite la partition 1 n'a aucun accès aux messages de la partition 3 — il ne les verra jamais.
Maintenant tu veux compter les commandes par pays. Pour que le compteur « FR » soit juste, il faut que toutes les commandes FR soient traitées par le même thread — sinon le thread 1 en compte 3, le thread 3 en compte 5, et personne n'a le total. Mais tes commandes sont arrivées partitionnées par client (customerId) : les commandes FR sont donc éparpillées partout (C-42 en partition 1, C-99 en partition 3…). Rien ne les regroupe par pays.
Le point clé : Kafka ne déplace jamais un message déjà écrit. La partition d'un message est fixée une seule fois, à l'écriture, par hash(clé) % nb_partitions. La seule façon de « ranger » un message à une autre adresse, c'est d'en réécrire une copie dans un topic — et c'est cette écriture qui le place sur la bonne partition. Écrire, c'est ranger.
Ce nouveau topic, c'est le topic de repartition. Ce qui part dedans : chaque message qui franchit le changement de clé, réécrit avec sa nouvelle clé et sa valeur entière. Pas un tri, pas un filtre — une simple ré-expédition à la bonne adresse.
Analogie. Des lettres triées par expéditeur dans des casiers. Tu veux compter par pays de destination. Les lettres FR sont dispersées dans tous les casiers — et tu ne peux pas les téléporter d'un casier à l'autre. Alors tu les ré-expédies toutes, cette fois adressées par pays. Toutes les FR retombent dans le même casier, un seul agent les compte. Cette ré-expédition, c'est le topic de repartition.
sequenceDiagram
participant T1 as Topic orders
participant KS as Kafka Streams
participant TR as Topic repartition interne
participant SS as State Store
T1->>KS: (C-42, pays FR, montant 10)
KS->>KS: selectKey → nouvelle clé = "FR"
Note over KS: la clé devient FR — il faut que<br/>tous les FR soient sur le même thread
KS->>TR: réécrit (FR, pays FR, montant 10) → hash(FR) → partition 3
KS->>TR: réécrit (FR, pays FR, montant 8) venu d'un autre client
TR->>KS: relit la partition 3 — tous les FR ensemble
KS->>SS: accumule → FR += 18mermaid🔑 Ce qui voyage, et ce qu'on perd vraiment
On ne perd aucune donnée : la valeur entière est recopiée, et le topic orders d'origine n'est pas touché — on ajoute une copie re-keyée, on n'efface rien. La seule chose qui change, et c'est voulu : l'ancienne clé est remplacée par la nouvelle. Si customerId n'existait que dans la clé (pas dans la valeur), tu ne pourras plus t'en servir en aval. Le réflexe : garder dans la valeur tout champ dont tu auras besoin après un changement de clé.
🔑 La repartition est paresseuse
Changer la clé ne crée pas à lui seul de topic interne. Après selectKey, si tu fais juste .to("output"), aucun topic de repartition — écrire dans output range déjà les messages par la nouvelle clé. Le topic interne n'apparaît que si une opération avec état en aval (agrégation, join) a besoin du regroupement et qu'il n'y a pas de topic de sortie où l'écrire : c'est « l'output que Kafka Streams se crée à lui-même ».
Ce passage par un topic interne coûte une écriture + une relecture (latence et disque). C'est pour ça que mapValues est préféré à map : pas de changement de clé, pas de ré-expédition.
Les opérations qui déclenchent un repartitionnement : map, flatMap, selectKey, groupBy.
Les opérations qui n'en déclenchent pas : mapValues, flatMapValues, filter, peek, groupByKey.
Avant de calculer un agrégat, il faut dire à Kafka Streams comment regrouper les messages. Deux opérations existent.
groupByKey groupe par la clé actuelle. C'est la plus efficace : si la clé n'a pas été changée depuis la source, il n'y a pas de repartitionnement.
groupBy groupe par une nouvelle clé calculée depuis le message. Il fait deux choses d'un coup — changer la clé ET grouper — et déclenche donc toujours un repartitionnement.
// groupByKey — groupe par la clé courante (customerId). Pas de changement de clé → pas de repartition.
KGroupedStream<String, OrderPlaced> groupedByCustomer = stream.groupByKey();
// groupBy = rekey + groupe en une seule opération → 1 repartition.
KGroupedStream<String, OrderPlaced> groupedByCountry = stream
.groupBy((customerId, order) -> order.getCountry());
// ⚠️ Le piège : répéter groupBy re-localise les messages à CHAQUE fois.
// Ici DEUX repartitions complètes pour le même rekey par pays.
KTable<String, Long> nb = stream.groupBy((k, o) -> o.getCountry()).count();
KTable<String, Long> total = stream.groupBy((k, o) -> o.getCountry()).reduce(/* ... */);
// ✓ selectKey ré-expédie UNE fois ; les deux groupByKey relisent
// un stream déjà rangé par pays → UNE seule repartition partagée.
KStream<String, OrderPlaced> byCountry = stream
.selectKey((customerId, order) -> order.getCountry());
KTable<String, Long> nb2 = byCountry.groupByKey().count();
KTable<String, Long> total2 = byCountry.groupByKey().reduce(/* ... */);javaKGroupedStream n'est pas un stream utilisable directement — c'est une étape intermédiaire qui prépare les agrégations. La seule chose qu'on peut faire avec est d'appeler count, reduce ou aggregate.
Une fois les messages groupés, trois opérations permettent de calculer des résultats. Toutes retournent une KTable — la table de l'état courant du résultat.
count compte combien de messages sont arrivés pour chaque clé depuis le début du stream. Chaque nouveau message incrémente le compteur de sa clé.
KTable<String, Long> ordersPerCustomer = stream
.groupByKey() // groupe par customerId
.count(Materialized.as("orders-count-store")); // compte les commandes par client, stocké dans un state store
// Résultat dans le state store :
// C-A → 47
// C-B → 12javareduce combine deux valeurs de même type en une seule. À chaque nouveau message, Kafka Streams prend la valeur actuellement stockée pour cette clé et la combine avec le nouveau message selon la fonction qu'on lui donne.
La contrainte : le type de sortie doit être le même que le type d'entrée.
KTable<String, OrderPlaced> totalSpent = stream
.groupByKey() // groupe par client
.reduce( // combine deux valeurs de même type
(accumulated, newOrder) -> OrderPlaced.newBuilder(accumulated) // repart de l'accumulé, ajoute le nouveau montant
.setAmount(accumulated.getAmount() + newOrder.getAmount())
.build()
);javaaggregate est comme reduce, mais sans contrainte sur le type de sortie. On peut accumuler des OrderPlaced vers un CustomerSummary qui contient une somme, un compteur, une moyenne — ce qu'on veut. Elle nécessite une valeur initiale : la première fois qu'une clé est rencontrée, Kafka Streams appelle la fonction initiale pour créer la valeur de départ.
KTable<String, CustomerSummary> customerSummary = stream
.groupByKey() // groupe par client
.aggregate( // agrège vers un type de sortie différent
() -> CustomerSummary.newBuilder() // valeur initiale (1ère fois qu'on voit ce client)
.setTotalAmount(0L)
.setOrderCount(0L)
.build(),
(customerId, order, current) -> CustomerSummary.newBuilder() // accumulateur : courant + nouvelle commande
.setTotalAmount(current.getTotalAmount() + order.getAmount())
.setOrderCount(current.getOrderCount() + 1)
.build(),
Materialized.<String, CustomerSummary, KeyValueStore<Bytes, byte[]>> // matérialise dans un store
.as("customer-summary-store") // nom du store
.withValueSerde(customerSummarySerde) // serde de la valeur de sortie
);javaCe qui se passe message par message pour le client C-A :
1ère commande (montant: 10) → valeur initiale créée → {total: 10, count: 1}
2ème commande (montant: 25) → {10, 1} + nouveau → {total: 35, count: 2}
3ème commande (montant: 7) → {35, 2} + nouveau → {total: 42, count: 3}
État final C-A : {totalAmount: 42, orderCount: 3}
| Opération | Type sortie | Valeur initiale | Quand l'utiliser |
|---|---|---|---|
count |
Long |
0 (automatique) | Compter des messages |
reduce |
Même que l'entrée | Première valeur reçue | Combiner des valeurs de même type |
aggregate |
N'importe quel type | Déclarée explicitement | Tout le reste |
Une KTable supporte aussi des transformations. La particularité : toute modification produit un nouveau message dans le changelog Kafka du state store, qui représente la mise à jour de l'état.
Filtrer une KTable crée une nouvelle KTable qui ne contient que les entrées satisfaisant la condition. Quand une entrée est retirée par le filtre, Kafka Streams émet un tombstone — un message avec value = null — pour signaler la suppression.
KTable<String, CustomerSummary> summaryTable = ...; // KTable de résumés clients (issue d'une agrégation)
KTable<String, CustomerSummary> activeCustomers = summaryTable
.filter((customerId, summary) -> summary.getTotalAmount() > 0); // garde les clients à total positif (tombstone sinon)javaTransforme la valeur de chaque entrée sans changer la clé. Même principe que sur KStream.
KTable<String, Double> averageOrderValue = summaryTable
.mapValues(summary -> // transforme chaque valeur, clé inchangée
summary.getOrderCount() == 0 ? 0.0 // évite la division par zéro
: (double) summary.getTotalAmount() / summary.getOrderCount() // panier moyen
);javatoStream convertit une KTable en KStream. Chaque changement d'état dans la KTable — ajout, mise à jour, ou tombstone — devient un événement dans le stream résultant. C'est le moyen standard pour publier le résultat d'une agrégation vers un topic de sortie.
customerSummary
.toStream() // KTable → KStream (chaque mise à jour devient un événement)
.to("customer-summary-output"); // publie vers le topic de sortiejava⚡ TL;DR — chaque concept en une ligne
filter / filterNot ✓ Laisse passer ou élimine des messages selon une condition — les messages éliminés avancent quand même dans l'offset. ⚠ Les messages filtrés disparaissent de la topologie — pas de DLT automatique.
mapValues vs map ✓ mapValues transforme la valeur sans toucher la clé — pas de repartitionnement, à préférer par défaut. ⚠ map change la clé — déclenche un repartitionnement interne coûteux avant la prochaine opération avec état.
flatMap / flatMapValues ✓ Un message peut produire zéro, un ou plusieurs messages en sortie. ⚠ flatMap change la clé → repartitionnement. flatMapValues ne change pas la clé → pas de repartitionnement.
Repartitionnement interne ✓ Réécrit chaque message re-keyé dans un topic interne — l'écriture le range sur la bonne partition (mêmes clés ensemble). ⚠ L'ancienne clé est remplacée — garder dans la valeur tout champ utile en aval. Coûte une écriture + une relecture.
groupByKey vs groupBy ✓ groupByKey groupe par la clé actuelle — pas de repartitionnement si la clé n'a pas changé. ⚠ groupBy refait un rekey + repartition à CHAQUE appel — factoriser via selectKey si on réutilise la clé.
count / reduce / aggregate ✓ count pour les occurrences, reduce pour combiner le même type, aggregate pour tout le reste. ⚠ aggregate nécessite un Serde explicite pour le type de sortie si différent du type d'entrée.
🎓 À retenir
mapValues est toujours meilleur.selectKey + groupByKey ne gagne rien sur une agrégation unique — un groupBy seul et un selectKey().groupByKey() font chacun exactement une repartition. Le gain n'existe que si on réutilise la clé pour plusieurs agrégations : selectKey ré-expédie une fois, et les groupByKey suivants relisent un stream déjà rangé. Un groupBy répété, lui, re-localise à chaque appel.reduce et aggregate, choisir aggregate. reduce est juste un cas particulier où le type d'entrée et de sortie sont les mêmes.