🎯 OBJECTIF
Comprendre comment :
🧠 MODÈLE MENTAL
Un join dans Kafka Streams, c'est comme un join SQL — on combine des données de deux sources selon une clé commune. Mais il y a une contrainte que le SQL n'a pas : les données arrivent en continu dans le temps. On ne peut pas attendre que toutes les données soient là avant de joindre — elles continuent d'arriver indéfiniment.
C'est pour ça que les joins entre deux streams ont besoin d'une fenêtre de temps : on ne cherche pas à joindre un message avec n'importe quel message de l'autre stream, mais avec les messages qui sont arrivés dans un intervalle de temps proche. Si une commande arrive à 10h00, on la joint avec les paiements qui sont arrivés entre 9h55 et 10h05 — pas avec ceux de la veille.
Le windowing, c'est le même principe appliqué aux agrégations. Au lieu de compter toutes les commandes depuis le début des temps, on veut compter celles des 5 dernières minutes. Une fenêtre définit la tranche de temps sur laquelle une agrégation est calculée.
Prérequis : kafka-streams-2-operations-transformations — opérations, groupement, agrégations.
Kafka Streams supporte plusieurs types de joins selon la nature des sources à combiner.
| Join | Fenêtre requise | Co-partitionnement requis | Cas d'usage |
|---|---|---|---|
| KStream × KStream | Oui | Oui | Corréler deux flux d'événements dans le temps |
| KStream × KTable | Non | Oui | Enrichir un événement avec l'état courant d'une entité |
| KStream × GlobalKTable | Non | Non | Enrichir avec un référentiel disponible partout |
| KTable × KTable | Non | Oui | Joindre deux états courants |
Le co-partitionnement signifie que les deux topics doivent avoir le même nombre de partitions et que les messages de même clé doivent être dans les mêmes partitions. Si ce n'est pas le cas, Kafka Streams ne peut pas garantir que les deux côtés du join arrivent sur le même thread. (La variante foreign-key du join KTable × KTable, vue en section 5️⃣, lève cette contrainte.)
Pourquoi « mêmes clés dans les mêmes partitions » ? La partition d'un message est décidée à la production par hash(clé) % nombre_de_partitions. La même clé produit le même hash, donc le même numéro de partition — quel que soit le topic. Si orders et customers sont tous deux clés par customerId avec le même nombre de partitions, le client C-42 tombe sur la même partition des deux côtés. La task qui traite cette partition a donc les deux messages en local, sans réseau ni coordination — le join est une simple lecture du state store local.
Co-partitionner, c'est garantir trois conditions ensemble :
Modèle d'exécution : partition → task → thread. Une task traite une partition de tous les topics co-partitionnés (la task 2 lit la partition 2 de chaque topic). Les tasks sont réparties sur les stream threads et sur les instances, mais une task n'est jamais coupée entre deux threads. Conséquence directe : les deux côtés d'une même clé sont toujours dans la même task, donc sur le même thread — à condition que le co-partitionnement tienne. Un message « en retard » ne change pas de partition (la partition dépend de la clé, pas du moment d'arrivée) ; il reste sur sa task.
C'est le join le plus courant. On a un flux d'événements (KStream) et une table qui contient l'état courant d'une entité (KTable). Pour chaque événement, on veut récupérer les informations de l'entité correspondante au moment où l'événement arrive.
Un exemple concret : à chaque commande (KStream), on enrichit avec les informations du client (KTable). Ici le flux orders est partitionné par customerId pour co-partitionner avec customers. Si le client est mis à jour entre deux commandes, la commande suivante verra la nouvelle version du client.
KStream<String, OrderPlaced> orders = builder.stream("orders"); // source : flux des commandes (clé = customerId)
KTable<String, Customer> customers = builder.table("customers"); // table d'état : clients (clé = customerId)
KStream<String, OrderEnriched> enriched = orders.join( // enrichit chaque commande avec son client
customers, // table de lookup (co-partitionnée par customerId)
(order, customer) -> OrderEnriched.newBuilder() // combine commande + client
.setOrderId(order.getOrderId())
.setCustomerName(customer.getName())
.setCustomerTier(customer.getTier())
.setAmount(order.getAmount())
.build()
);javaSi on veut garder la commande même quand le client n'existe pas dans la table, on utilise leftJoin. Dans ce cas, customer peut être null côté droit.
KStream<String, OrderEnriched> withFallback = orders.leftJoin( // leftJoin : garde la commande même sans client trouvé
customers, // table de lookup
(order, customer) -> OrderEnriched.newBuilder() // customer peut être null ici
.setOrderId(order.getOrderId())
.setCustomerName(customer != null ? customer.getName() : "UNKNOWN") // fallback si client absent
.setAmount(order.getAmount())
.build()
);java🚨 Co-partitionnement : le piège du silent miss
Les topics orders et customers doivent avoir le même nombre de partitions, les mêmes customerId dans les mêmes partitions, et le même partitioner. Attention au piège : Kafka Streams vérifie le nombre de partitions (et lève une TopologyException au démarrage si ça ne colle pas), mais il ne détecte pas un partitioner différent à nombre de partitions égal. Dans ce cas, le join rate silencieusement — aucune exception, juste des messages qui ne sont jamais joints. Le pire des bugs.
La bonne correction est de repartitionner pour réaligner les clés (re-key + repartition()), pas de basculer en GlobalKTable par réflexe : la GlobalKTable règle le problème mais réplique toute la table sur chaque instance — à réserver aux petites tables de référence.
Le join avec une GlobalKTable fonctionne comme le join avec une KTable, mais sans contrainte de co-partitionnement — la GlobalKTable est répliquée en entier sur chaque instance, donc toutes les données sont toujours disponibles localement.
Une autre différence : on peut chercher dans la GlobalKTable avec une clé différente de la clé du message. C'est utile quand la clé de lookup est dans la valeur du message, pas dans sa clé.
GlobalKTable<String, Country> countries = builder.globalTable("countries"); // référentiel répliqué sur chaque instance
KStream<String, OrderEnriched> enriched = orders.join( // join stream × GlobalKTable
countries, // table de référence (pas de co-partitionnement requis)
(customerId, order) -> order.getCountryCode(), // clé de lookup = un champ de la valeur (pas la clé du stream)
(order, country) -> OrderEnriched.newBuilder() // combine commande + pays
.setOrderId(order.getOrderId())
.setCountryName(country.getName())
.build()
);javaJoindre deux KStream est différent des autres joins. Comme les deux sources sont des flux infinis, il faut définir une fenêtre de temps qui délimite la plage dans laquelle deux messages doivent arriver pour être joints.
Si un message arrive dans le stream A à 10h00 et qu'on définit une fenêtre de 5 minutes, on le joint avec tous les messages du stream B arrivés entre 9h55 et 10h05. Les messages du stream B en dehors de cette plage ne sont pas joints avec ce message.
KStream<String, OrderPlaced> orders = builder.stream("orders"); // 1er flux d'événements (clé = orderId)
KStream<String, PaymentReceived> payments = builder.stream("payments"); // 2ème flux d'événements (clé = orderId)
JoinWindows window = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)); // fenêtre ±5 min, sans grace
KStream<String, OrderWithPayment> ordersWithPayment = orders.join( // corrèle commande et paiement dans la fenêtre
payments, // 2ème flux à joindre
(order, payment) -> OrderWithPayment.newBuilder() // combine les deux quand ils matchent
.setOrderId(order.getOrderId())
.setAmount(order.getAmount())
.setPaymentMethod(payment.getMethod())
.build(),
window // la fenêtre temporelle du join
);javaComme pour les autres joins, leftJoin garde la commande même sans paiement dans la fenêtre, et outerJoin garde les deux côtés même sans correspondance. À noter : en leftJoin/outerJoin stream-stream, le résultat avec null n'est émis qu'à la fermeture de la fenêtre (sinon on émettrait un null alors que le paiement va peut-être encore arriver) — d'où une latence égale à la durée de fenêtre sur ces résultats partiels.
Le join entre deux KTable combine leurs états courants. À chaque fois que l'une des deux tables est mise à jour pour une clé donnée, le join est recalculé pour cette clé.
KTable<String, Customer> customers = builder.table("customers"); // 1ère table (clé = customerId)
KTable<String, Loyalty> loyalty = builder.table("loyalty-accounts"); // 2ème table (clé = customerId)
KTable<String, CustomerView> combined = customers.join( // join sur la clé primaire (recalculé des deux côtés)
loyalty, // 2ème table à joindre
(customer, account) -> CustomerView.newBuilder() // combine fiche client + compte fidélité
.setCustomerId(customer.getCustomerId())
.setName(customer.getName())
.setPoints(account.getPoints())
.build()
);javaLe join ci-dessus est un join sur la clé primaire : les deux tables doivent être co-partitionnées par la même clé. Quand la table de gauche référence l'autre par un champ (une clé étrangère) plutôt que par sa clé, Kafka Streams propose le foreign-key join (KIP-213) : on fournit un extracteur de clé étrangère, et Streams gère lui-même la repartition interne. Pas de co-partitionnement requis.
KTable<String, Order> orders = builder.table("orders"); // table de gauche (clé = orderId)
KTable<String, Customer> customers = builder.table("customers"); // table référencée (clé = customerId)
KTable<String, EnrichedOrder> enriched = orders.join( // foreign-key join (KIP-213)
customers, // table cible de la clé étrangère
order -> order.getCustomerId(), // extracteur de FK → Streams repartitionne en interne
(order, customer) -> EnrichedOrder.newBuilder() // combine commande + client
.setOrderId(order.getOrderId())
.setCustomerName(customer.getName())
.build()
);javaLe coût : des topics internes de souscription et un peu de latence. Mais ça évite de devoir repartitionner manuellement orders par customerId.
Sans fenêtre, une agrégation s'applique à tous les messages depuis le début du stream. Pour répondre à des questions comme "combien de commandes dans les 5 dernières minutes ?", on utilise des fenêtres.
Kafka Streams propose quatre types de fenêtres.
Le temps est découpé en intervalles de taille fixe. Chaque message appartient à exactement une fenêtre. La fenêtre la plus simple.
|--- 10h00-10h05 ---|--- 10h05-10h10 ---|--- 10h10-10h15 ---|
msg1 msg2 msg3 msg4
Un message à 10h03 est dans la fenêtre 10h00-10h05. Un message à 10h07 est dans 10h05-10h10. Jamais dans la même fenêtre.
KTable<Windowed<String>, Long> ordersPerWindow = stream
.groupByKey() // groupe par customerId
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) // fenêtres fixes de 5 min, sans grace
.count(Materialized.as("orders-5min-store")); // compte les commandes par client et par fenêtrejavaLa clé du résultat est Windowed<String> — elle contient le customerId et la fenêtre de temps. Le client C-1 dans 10h00-10h05 est une entrée distincte de C-1 dans 10h05-10h10.
Comme les tumbling windows, mais les fenêtres avancent par pas plus petits que leur taille. Résultat : les fenêtres se chevauchent et un message peut appartenir à plusieurs fenêtres en même temps.
|--- 10h00-10h10 ---|
|--- 10h05-10h15 ---|
|--- 10h10-10h20 ---|
Un message à 10h07 est dans les fenêtres 10h00-10h10 et 10h05-10h15.
KTable<Windowed<String>, Long> ordersHopping = stream
.groupByKey() // groupe par customerId
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(10), Duration.ofMinutes(1)) // fenêtre de 10 min (+1 min grace)
.advanceBy(Duration.ofMinutes(5))) // avance de 5 min → fenêtres qui se chevauchent
.count(Materialized.as("orders-hopping-store")); // compte par client et par fenêtrejavaUne sliding window est centrée sur chaque paire de messages. Deux messages se retrouvent dans la même fenêtre s'ils sont arrivés dans l'intervalle défini l'un de l'autre. La fenêtre n'est pas à des intervalles fixes — elle se crée autour des événements.
KTable<Windowed<String>, Long> ordersSliding = stream
.groupByKey() // groupe par customerId
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))) // même fenêtre si <5 min d'écart
.count(Materialized.as("orders-sliding-store")); // compte par client et par fenêtrejavaUne session window n'a pas de taille fixe. Elle regroupe les messages d'une même clé qui arrivent proches dans le temps, et se ferme automatiquement quand il n'y a plus d'activité pendant un certain délai appelé le gap.
Client C-1 :
10h00 msg1 10h02 msg2 10h04 msg3 silence 15 min 10h25 msg4 10h27 msg5
|---------- session 1 (6 min) ----------| |-- session 2 (4 min) --|
KTable<Windowed<String>, Long> sessions = stream
.groupByKey() // groupe par customerId
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10))) // nouvelle session après 10 min d'inactivité
.count(Materialized.as("sessions-store")); // compte les commandes par sessionjava| Type | Taille | Chevauchement | Usage |
|---|---|---|---|
| Tumbling | Fixe | Non | Métriques par heure, par jour |
| Hopping | Fixe | Oui | Moyennes glissantes |
| Sliding | Variable | Oui | Corrélation temporelle entre événements |
| Session | Variable | Non | Sessions utilisateur, activité par rafales |
Dans un système distribué, les messages n'arrivent pas toujours dans l'ordre chronologique. Un message produit à 10h03 peut arriver dans Kafka après un message produit à 10h07 à cause d'un retard réseau.
Par défaut, quand une fenêtre se ferme, tous les messages qui arrivent après sont ignorés. Le grace period est un délai supplémentaire pendant lequel la fenêtre continue d'accepter des messages tardifs.
TimeWindows windowWithGrace = TimeWindows
.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)); // fenêtre de 5 min + 1 min de tolérance aux retards
KTable<Windowed<String>, Long> orders = stream
.groupByKey() // groupe par customerId
.windowedBy(windowWithGrace) // applique la fenêtre avec son grace period
.count(Materialized.as("orders-with-grace-store")); // compte par client et par fenêtrejava🚨 ofSizeWithNoGrace ignore silencieusement les messages tardifs
TimeWindows.ofSizeWithNoGrace() ferme la fenêtre à la seconde près. Tout message tardif disparaît sans avertissement. Dans un système de production avec de la latence réseau, toujours utiliser ofSizeAndGrace() avec un grace period adapté.
Le join KStream × KTable est un lookup : au moment où Kafka traite l'événement, il lit la valeur courante de l'entité dans la table. Quand un événement est traité en retard par rapport au moment où il s'est produit, ça soulève une question.
Exemple : une commande est passée à 10h00 (client domicilié à Paris). À 12h00, le client déménage (→ Lyon). À 14h00 seulement, la commande de 10h00 est traitée (retard réseau, rejeu). Une KTable normale la joint avec l'adresse Lyon — la valeur du moment du traitement, pas celle du moment de la commande.
Le retard ne casse rien côté partitions (la commande reste sur sa partition) ; il change juste avec quelle version on joint. Par défaut : la version courante.
Si on a besoin de la valeur au timestamp de l'événement (lookup as-of), il faut une versioned KTable (KIP-889, Kafka 3.5+) : un store qui conserve un historique par clé.
KTable<String, Customer> customers = builder.table( // KTable versionnée (permet le lookup as-of)
"customers", // topic source
Materialized.as(Stores.persistentVersionedKeyValueStore( // store versionné
"customers-store", Duration.ofDays(7))) // 7 jours d'historique conservé par clé
);javaLe join interroge alors la version temporellement correcte — utile dès qu'on doit reconstituer l'état d'une entité tel qu'il était à un instant passé.
💡 Synchronisation des entrées
Même sans versioned store, Kafka Streams tente d'aligner les flux par timestamp via max.task.idle.ms : une task attend un court délai qu'une partition d'entrée plus lente se présente, pour traiter les événements dans l'ordre temporel. Ça réduit les jointures « sur une mauvaise version » sans rejouer l'historique. Trop bas → risque de joindre sur une table pas encore à jour ; trop haut → latence.
⚡ TL;DR — chaque concept en une ligne
KStream × KTable ✓ Enrichit chaque événement avec l'état courant d'une entité — co-partitionnement obligatoire. ⚠ Un événement sans correspondance est silencieusement ignoré — utiliser leftJoin pour le garder.
KStream × GlobalKTable ✓ Même chose mais sans contrainte de co-partitionnement — la GlobalKTable est disponible partout. ⚠ Consomme plus de mémoire — réserver aux petites tables de référence.
KStream × KStream ✓ Corrèle deux flux dans le temps — fenêtre obligatoire pour délimiter la plage de join. ⚠ Un message sans correspondance dans la fenêtre est ignoré (inner join) ou gardé (left/outer join).
KTable × KTable (foreign-key) ✓ Joindre sur un champ qui référence la clé de l'autre table — Streams repartitionne tout seul. ⚠ Coûte des topics internes de souscription et un peu de latence.
Tumbling window ✓ Tranches de temps fixes sans chevauchement — chaque message dans une seule fenêtre. ⚠ Une activité à cheval sur deux fenêtres est coupée en deux.
Hopping window ✓ Fenêtres qui se chevauchent — un message peut appartenir à plusieurs fenêtres. ⚠ Plus coûteux en stockage — chaque message est compté plusieurs fois.
Session window ✓ Taille variable basée sur l'inactivité — se ferme automatiquement. ⚠ Deux sessions du même client peuvent avoir des durées très différentes.
Grace period
✓ Accepte les messages tardifs pendant un délai après la fermeture de la fenêtre.
⚠ ofSizeWithNoGrace ignore silencieusement les messages tardifs — toujours définir un grace period en prod.
🎓 À retenir
Windowed<K> — elle contient la clé métier ET la fenêtre de temps. Un même client dans deux fenêtres différentes est deux entrées distinctes.