🎯 OBJECTIF
Comprendre comment :
🧠 MODÈLE MENTAL
Un consumer Kafka classique lit des messages, les traite un par un, et oublie. Il n'a pas de mémoire : si on lui demande "quel est le total dépensé par le client C-A depuis ce matin ?", il ne sait pas répondre — il faudrait stocker ça soi-même dans une base de données.
Kafka Streams change ce modèle. C'est une bibliothèque de traitement de flux avec état, embarquée directement dans l'application Java. Elle permet de maintenir des agrégats, de joindre des streams en temps réel, de calculer des métriques sur des fenêtres de temps — et tout ça sans cluster externe, sans infrastructure supplémentaire. L'état est stocké localement dans RocksDB, et sauvegardé automatiquement dans des topics Kafka de changelog pour survivre aux redémarrages.
Le concept central est la topologie : un graphe de nœuds qui décrit comment les messages circulent, se transforment, et s'agrègent. Les données entrent par des topics Kafka, passent par des opérations (filtre, transformation, join, agrégation), et ressortent vers d'autres topics ou vers des state stores interrogeables. Cette topologie est définie une fois au démarrage — Kafka Streams s'occupe ensuite de l'exécution, du parallélisme, de la reprise sur erreur, et de la cohérence des états.
Prérequis : kafka-core-model (topics, partitions, consumer groups, offsets), kafka-producer-consumer-tuning (poll loop, commit).
Avant de plonger dans la configuration, il faut comprendre ce que Kafka Streams fait réellement — et pourquoi c'est différent d'un consumer classique ou de Flink.
Un consumer Kafka lit des messages et les traite. C'est du traitement stateless : chaque message est traité indépendamment, sans mémoire des messages précédents. Si on veut agréger, compter, ou joindre avec une autre source de données, on doit le faire soi-même en dehors de Kafka — dans une base de données, dans Redis, dans du code applicatif.
Kafka Streams apporte le traitement stateful directement dans le flux. La bibliothèque maintient des state stores locaux (RocksDB par défaut) qui persistent l'état entre les messages. Ces stores sont sauvegardés dans des topics Kafka de changelog — si l'application redémarre, Kafka Streams rejoue le changelog pour reconstituer l'état local avant de reprendre le traitement.
flowchart LR
subgraph "Service Spring Boot"
KS["Kafka Streams"]
RS["RocksDB<br/>state store local"]
KS <--> RS
end
K1["Topic source"] -->|consomme| KS
KS -->|publie| K2["Topic résultat"]
KS <-->|changelog backup| K3["Topic changelog<br/>_internal"]mermaidPar rapport aux autres outils de stream processing :
| Outil | Déploiement | État distribué | Latence | Usage |
|---|---|---|---|---|
| Consumer Kafka | Bibliothèque | Non | Très faible | Traitement simple, stateless |
| Kafka Streams | Bibliothèque | Oui — RocksDB + changelog | Faible | Enrichissement, agrégation, join temps réel |
| Apache Flink | Cluster séparé | Oui — checkpoints | Faible | Topologies très complexes, grands volumes |
| Spark Streaming | Cluster séparé | Oui | Plus élevée (micro-batch) | Batch/stream hybride |
L'avantage de Kafka Streams sur Flink : pas de cluster à opérer. L'inconvénient : pour les volumes d'état très importants ou les topologies très complexes, Flink est plus adapté.
Une topologie Kafka Streams est un graphe acyclique dirigé (DAG) qui décrit comment les messages circulent dans l'application. Elle est composée de trois types de nœuds.
Un nœud source lit depuis un topic Kafka et injecte les messages dans la topologie. Un nœud processeur applique une opération — filtre, transformation, join, agrégation. Un nœud sink écrit le résultat vers un topic Kafka de sortie.
flowchart LR
T1["Topic orders<br/>source"] --> F["Filtre<br/>amount != 0"]
F --> M["Map<br/>enrichissement"]
M --> B["Branch"]
B -->|montant élevé| A["Topic order-alerts<br/>sink"]
B -->|normal| P["Topic orders-processed<br/>sink"]mermaidEn Spring Kafka Streams, on décrit cette topologie avec un StreamsBuilder. C'est une API fluente — chaque opération retourne un nouveau stream qui représente le flux transformé à ce stade. La topologie est construite une fois au démarrage, puis Kafka Streams l'exécute en continu.
@Configuration // classe de configuration Spring
@EnableKafkaStreams // active l'auto-configuration Kafka Streams
public class OrderTopology {
@Bean // expose la topologie comme bean Spring
public KStream<String, OrderPlaced> orderStream(StreamsBuilder builder) {
KStream<String, OrderPlaced> stream =
builder.stream("orders"); // nœud source : lit le topic orders (clé = customerId)
KStream<String, OrderPlaced> processed = stream
.filter((customerId, order) -> order.getAmount() != 0) // ne garde que les commandes à montant non nul
.mapValues(order -> enrich(order)); // enrichit la valeur sans changer la clé → pas de repartition
processed.to("orders-processed"); // nœud sink : écrit le résultat vers orders-processed
return processed; // retourne le stream pour un éventuel chaînage
}
}java🔑 La topologie est immutable après démarrage
Une topologie ne peut pas être modifiée à chaud. Pour changer la logique de traitement, il faut redémarrer l'application avec la nouvelle topologie. Contrairement à une configuration Spring classique, on ne peut pas reconfigurer le graphe de traitement sans arrêt.
C'est la distinction la plus importante à comprendre dans Kafka Streams. Ces trois abstractions représentent les données différemment et ont des comportements fondamentalement différents.
Un KStream représente un flux infini d'événements indépendants. Chaque message est une chose qui s'est passée — une commande, un paiement, une transaction. Les messages avec la même clé sont des événements distincts, pas des mises à jour l'un de l'autre.
Imaginons un topic orders qui reçoit ces messages dans l'ordre :
(C-A, {amount: +10}) ← commande de 10 €
(C-A, {amount: -3}) ← remboursement de 3 €
(C-A, {amount: +7}) ← commande de 7 €
Vu comme un KStream, ces trois messages coexistent. Chacun représente un événement distinct. Le KStream ne sait pas ce que "le total dépensé par C-A" veut dire — c'est juste une succession d'événements.
Une KTable représente l'état courant d'une entité. Quand un nouveau message arrive avec une clé existante, il remplace la valeur précédente — c'est une sémantique d'upsert. Un message avec value = null est un tombstone qui supprime l'entrée.
Reprenons le même topic vu comme une KTable :
Après message 1 : C-A → {amount: +10}
Après message 2 : C-A → {amount: -3} ← remplace le +10
Après message 3 : C-A → {amount: +7} ← remplace le -3
État final : C-A → {amount: +7}
La KTable ne cumule pas — elle remplace. Ce modèle est adapté quand le producer publie déjà l'état final à chaque fois, comme Debezium qui publie la ligne complète après chaque UPDATE en base.
🚨 KTable ≠ somme des événements
Si le topic contient des deltas (des +10, -3, +7 qui représentent des variations), une KTable directe ne donnera pas le total cumulé — elle donnera juste la dernière valeur publiée. Pour obtenir un cumul, il faut agréger un KStream comme expliqué ci-dessous.
C'est la bonne réponse quand on veut additionner des deltas. On groupe le KStream par clé et on utilise aggregate — Kafka Streams maintient lui-même le cumul dans un state store :
KTable<String, Long> customerTotal = builder
.stream("orders") // source : flux des commandes
.groupByKey() // groupe par la clé courante (customerId)
.aggregate( // agrège les montants par client
() -> 0L, // valeur initiale pour toute nouvelle clé
(customerId, order, currentTotal) -> currentTotal + order.getAmount(), // accumulateur : ajoute le montant
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> // matérialise le résultat dans un store
.as("customer-total-store") // nom du state store
.withValueSerde(Serdes.Long()) // serde de la valeur (Long)
);javaAvec les mêmes messages :
Message (C-A, +10) → state store : C-A = 0 + 10 = 10
Message (C-A, -3) → state store : C-A = 10 + (-3) = 7
Message (C-A, +7) → state store : C-A = 7 + 7 = 14
Le résultat est une KTable dont les valeurs sont calculées par Kafka Streams — pas simplement copiées depuis le topic.
| KTable directe | KStream agrégé | |
|---|---|---|
| Sémantique | Upsert — le dernier message gagne | Accumulation — Kafka Streams calcule |
| Quand utiliser | Le producer publie déjà l'état final (CDC, config) | Le topic contient des deltas à additionner |
| Exemple | Table client mise à jour par CDC | Montant total dépensé par client |
Une GlobalKTable est comme une KTable, mais elle est répliquée intégralement sur chaque instance de l'application, quelle que soit la partition assignée. Contrairement à la KTable qui ne voit que les partitions assignées à l'instance courante, la GlobalKTable voit tout le topic.
flowchart TD
subgraph "Instance 1 (partitions 0 et 1)"
KT1["KTable<br/>partitions 0 et 1 seulement"]
GKT1["GlobalKTable<br/>toutes les partitions"]
end
subgraph "Instance 2 (partitions 2 et 3)"
KT2["KTable<br/>partitions 2 et 3 seulement"]
GKT2["GlobalKTable<br/>toutes les partitions"]
end
T["Topic référentiel<br/>4 partitions"] --> KT1
T --> KT2
T --> GKT1
T --> GKT2mermaidLa GlobalKTable est faite pour les tables de référence — un référentiel client, une table de configuration, un mapping de codes — dont on a besoin pour enrichir tous les messages quel que soit la partition. Elle est petite et ne change pas souvent.
| KTable | GlobalKTable | |
|---|---|---|
| Données vues | Partitions assignées seulement | Toutes les partitions |
| Co-partitionnement requis pour join | Oui | Non |
| Consommation mémoire | Proportionnelle aux partitions assignées | Proportionnelle à tout le topic |
| Usage | Agrégats, états entités | Référentiels, lookups, enrichissement |
🚨 KTable et co-partitionnement pour les joins
Pour joindre un KStream avec une KTable, les deux topics doivent avoir le même nombre de partitions et les messages de même clé doivent être dans la même partition. Si le client C-A est en partition 0 dans le stream mais en partition 2 dans la table, ils ne seront jamais sur la même instance et le join ne fonctionnera pas. Avec GlobalKTable, cette contrainte disparaît — toutes les données de la table sont disponibles sur chaque instance.
Spring Boot auto-configure Kafka Streams via @EnableKafkaStreams et les propriétés spring.kafka.streams.*. La configuration minimale nécessite un application-id — c'est l'identifiant unique de l'application Streams. Il sert à deux choses : préfixer les topics internes créés par Kafka Streams (changelog, repartition), et définir le consumer group de l'application.
spring:
kafka:
bootstrap-servers: localhost:9092 # adresse du cluster Kafka
streams:
# Identifiant unique de l'application.
# Deux instances avec le même application-id forment un cluster Streams —
# Kafka distribue les partitions entre elles automatiquement.
application-id: order-processing-service
# Serde par défaut pour les clés et les valeurs.
# Peut être surchargé par stream via Consumed.with() ou Produced.with().
default-key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde # clé = String
default-value-serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde # valeur = Avro
properties:
schema.registry.url: http://localhost:8081 # URL du Schema Registry (résolution des schémas Avro)
# En cas d'erreur de désérialisation d'un message :
# LogAndContinueExceptionHandler → log et skip le message
# LogAndFailExceptionHandler → arrêt de l'application (défaut)
default.deserialization.exception.handler: >
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
# Cache interne pour batcher les writes vers les state stores.
# 0 = désactive le cache (utile en test pour voir les résultats immédiatement).
statestore.cache.max.bytes: 10485760
# Répertoire local où RocksDB stocke les state stores sur disque.
state.dir: /tmp/kafka-streamsyaml@Configuration // classe de configuration Spring
@EnableKafkaStreams // active Kafka Streams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) // bean de config attendu par Spring
public KafkaStreamsConfiguration streamsConfig() {
Map<String, Object> props = new HashMap<>(); // map des propriétés Streams
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-service"); // identité de l'app dans Kafka
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // adresse du cluster
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // serde clé par défaut
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); // serde valeur par défaut (Avro)
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); // URL du Schema Registry
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); // nombre de threads de traitement par instance
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); // fréquence des commits d'offset (ms)
return new KafkaStreamsConfiguration(props); // construit l'objet de config Spring
}
}javaDans Kafka Streams, chaque message est sérialisé/désérialisé à chaque passage dans un nœud et à chaque écriture dans un state store. Les Serdes définissent comment. Le Serde par défaut est configuré globalement, mais chaque stream peut utiliser un Serde différent.
@Bean // expose le stream comme bean
public KStream<String, OrderPlaced> orderStream(StreamsBuilder builder) {
SpecificAvroSerde<OrderPlaced> orderSerde = new SpecificAvroSerde<>(); // serde Avro pour OrderPlaced
orderSerde.configure( // configure le serde
Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, // clé : URL du Schema Registry
"http://localhost:8081"), // valeur : l'URL
false // false = serde de valeur, true = serde de clé
);
return builder.stream( // crée le nœud source
"orders", // topic lu
Consumed.with(Serdes.String(), orderSerde) // serdes explicites clé/valeur pour ce stream
);
}javaPour ancrer les concepts, voici une topologie complète qui lit des commandes, enrichit avec un référentiel client, détecte les alertes, et agrège par pays.
flowchart LR
SM["orders<br/>source"] --> F["Filtre<br/>amount != 0"]
CR["customer-ref<br/>GlobalKTable"] --> J
F --> J["Join<br/>enrichissement"]
J --> B["Branch"]
B -->|montant élevé| AL["order-alerts<br/>sink"]
B -->|normal| AG["Agrégation<br/>par pays"]
AG --> SU["order-summary<br/>sink"]mermaid@Configuration // classe de configuration Spring
@EnableKafkaStreams // active Kafka Streams
public class OrderProcessingTopology {
@Bean // expose la topologie comme bean
public KStream<String, OrderPlaced> buildTopology(StreamsBuilder builder) {
GlobalKTable<String, CustomerRef> customerRef = builder.globalTable( // référentiel client en GlobalKTable
"customer-ref", // topic du référentiel
Materialized.as("customer-ref-store") // store local pour le référentiel
);
KStream<String, OrderPlaced> orderStream = builder
.stream("orders") // source : flux des commandes
.filter((customerId, order) -> order.getAmount() != 0); // élimine les commandes à montant nul
KStream<String, OrderEnriched> enriched = orderStream.join( // join stream × GlobalKTable
customerRef, // table de référence à joindre
(customerId, order) -> customerId, // clé de lookup dans la GlobalKTable (ici = la clé du stream)
(order, customer) -> OrderEnriched.newBuilder() // combine commande + client
.setOrderId(order.getOrderId())
.setCustomerName(customer.getName())
.setTier(customer.getTier())
.setAmount(order.getAmount())
.setAlertThreshold(customer.getAlertThreshold())
.setCountry(order.getCountry())
.build()
);
Map<String, KStream<String, OrderEnriched>> branches = enriched
.split(Named.as("branch-")) // découpe le stream en sous-flux
.branch( // 1ère règle (évaluée en premier)
(customerId, e) -> e.getAmount() > e.getAlertThreshold(), // montant au-dessus du seuil client
Branched.as("alert") // → sous-flux "alert"
)
.defaultBranch(Branched.as("normal")); // tout le reste → sous-flux "normal"
branches.get("branch-alert") // récupère le sous-flux des alertes
.mapValues(e -> OrderAlert.newBuilder() // transforme en OrderAlert
.setOrderId(e.getOrderId())
.setCustomerName(e.getCustomerName())
.setAmount(e.getAmount())
.build())
.to("order-alerts"); // publie vers le topic order-alerts
branches.get("branch-normal") // récupère le sous-flux normal
.groupBy((customerId, e) -> e.getCountry()) // regroupe par pays (rekey → repartition interne)
.aggregate( // agrège le montant total par pays
() -> 0L, // valeur initiale
(country, event, total) -> total + event.getAmount(), // accumulateur : somme des montants
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> // matérialise dans un store
.as("country-summary-store") // nom du store
.withValueSerde(Serdes.Long()) // serde de la valeur
)
.toStream() // KTable → KStream pour publier les mises à jour
.mapValues(total -> CountrySummary.newBuilder().setTotal(total).build()) // emballe le total dans CountrySummary
.to("order-summary"); // publie vers order-summary
return orderStream; // retourne le stream principal
}
}javaKafka Streams distribue le travail sur deux axes : les threads au sein d'une instance, et les instances de l'application.
Quand Kafka Streams démarre, il crée une tâche de stream par partition du topic source. Si le topic a 6 partitions, il y a 6 tâches. Ces tâches sont distribuées entre les threads disponibles sur toutes les instances qui partagent le même application-id.
Chaque tâche possède son propre state store local. C'est pour ça que le co-partitionnement est obligatoire pour les joins sur KTable : deux messages de clés différentes doivent être dans la même partition pour être traités par la même tâche et avoir accès au même state store.
flowchart LR
subgraph "Instance 1 (2 threads)"
TH1["Thread 1<br/>Tâche 0 partition 0<br/>Tâche 1 partition 1"]
TH2["Thread 2<br/>Tâche 2 partition 2"]
end
subgraph "Instance 2 (2 threads)"
TH3["Thread 3<br/>Tâche 3 partition 3<br/>Tâche 4 partition 4"]
TH4["Thread 4<br/>Tâche 5 partition 5"]
end
K["Topic source<br/>6 partitions"] --> TH1
K --> TH2
K --> TH3
K --> TH4mermaidLe parallélisme maximal est min(nb_partitions, nb_threads_total_toutes_instances). Ajouter des instances au-delà du nombre de partitions ne crée pas de tâches supplémentaires — les instances supplémentaires restent en standby (utiles pour le failover, pas pour le débit).
⚡ TL;DR — chaque concept en une ligne
Kafka Streams vs consumer classique ✓ Bibliothèque embarquée — traitement stateful avec RocksDB local et changelog Kafka. Zéro infrastructure à opérer. ⚠ Pour les volumes d'état très importants ou les topologies très complexes, Flink est plus adapté.
Topologie ✓ DAG de nœuds source/processeur/sink — décrit une fois au démarrage, exécuté en continu. ⚠ Immutable après démarrage — tout changement de logique nécessite un redémarrage.
KStream ✓ Flux d'événements indépendants — chaque message est une chose qui s'est passée, pas une mise à jour. ⚠ Pas de notion d'état courant — pour agréger des deltas, utiliser groupByKey().aggregate().
KTable directe ✓ Vue upsert — le dernier message par clé représente l'état courant. Idéal pour le CDC Debezium. ⚠ Ne cumule pas les valeurs — si le topic contient des deltas, la KTable donnera juste la dernière valeur publiée.
KStream agrégé ✓ Kafka Streams calcule lui-même le cumul dans un state store — idéal pour additionner des commandes. ⚠ Nécessite un state store local — plus de mémoire et de disque qu'un traitement stateless.
GlobalKTable ✓ Table répliquée sur toutes les instances — pas de contrainte de co-partitionnement pour les joins. ⚠ Consomme plus de mémoire — réserver aux petites tables de référence qui changent peu.
application-id ✓ Identifiant unique de l'app — préfixe les topics internes, définit le consumer group. ⚠ Changer l'application-id repart de zéro — perte de tous les offsets et états commités.
Parallélisme ✓ Tâches = partitions — distribuées entre threads et instances automatiquement. ⚠ Parallélisme maximal borné par le nombre de partitions — au-delà, les instances supplémentaires font du standby.
🎓 À retenir
KStream.groupByKey().aggregate().