🎯 OBJECTIF
Comprendre comment :
🧠 MODÈLE MENTAL
Les trois notes précédentes ont montré comment transformer, joindre et agréger des flux. Tout ça produit de l'état — des compteurs, des sommes, des tables jointes — stocké localement dans des state stores. Cette note traite ce qui entoure cet état : comment il persiste, comment on le lit de l'extérieur, comment on gère les erreurs qui le menacent, et comment on teste tout ça.
Le fil conducteur tient en une phrase : un state store Kafka Streams n'est pas une base externe, c'est une base embarquée (RocksDB) doublée d'un topic de changelog. Cette double nature — local pour la vitesse, changelog pour la durabilité — explique presque tout : pourquoi l'état survit à un redémarrage, pourquoi on peut le lire en direct via Interactive Queries, et pourquoi une mauvaise gestion d'erreur peut bloquer ou trouer le traitement.
Prérequis : kafka-streams-2-operations-transformations (agrégations, state stores), kafka-streams-3-joins-windowing (WindowStore, SessionStore).
Une opération avec état (agrégation, join, windowing) écrit son résultat dans un state store : une base clé-valeur embarquée dans l'instance. Trois types selon l'opération : KeyValueStore (count, reduce, aggregate), WindowStore (agrégations fenêtrées), SessionStore (session windows).
Chaque écriture dans un store est aussi écrite dans un topic de changelog compacté, nommé <application-id>-<store>-changelog. C'est ce qui rend l'état durable : le store local peut disparaître, le changelog reste la source de vérité.
flowchart LR
OP["Agrégation / Join"] -->|write| SS["State store<br/>RocksDB local"]
SS -->|backup async| CL["Topic changelog<br/>compacté"]
CL -.->|restore au démarrage| SSmermaid| RocksDB (persistent) | in-memory | |
|---|---|---|
| Stockage local | Sur disque (state.dir) |
Dans la heap JVM |
| Survie au restart local | Oui — relit le RocksDB local | Non — reconstruit depuis le changelog |
| Mémoire | Faible (off-heap) | Bornée par la heap |
| Vitesse | Très bonne | Légèrement meilleure |
Le point qui surprend : les deux sont sauvegardés par le changelog. « in-memory » ne veut pas dire « volatile sans filet » — ça veut dire que la copie locale est en RAM. Au redémarrage, un store in-memory est entièrement reconstruit en rejouant le changelog ; un store RocksDB relit son fichier local et ne rejoue que ce qui manque.
.count(Materialized.as("orders-count-store")); // store persistant par défaut (RocksDB sur disque)
.count(Materialized.as( // store in-memory (dans la heap JVM)
Stores.inMemoryKeyValueStore("orders-count-store")));javaAu démarrage, avant de traiter le moindre message, Kafka Streams restaure ses stores jusqu'à l'offset committé du changelog. Le coût est proportionnel à la taille de l'état. Pour éviter une restauration complète à chaque bascule, num.standby.replicas maintient des copies à chaud sur d'autres instances : en cas de failover, une standby prend le relais quasi instantanément.
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); // 1 copie à chaud sur une autre instance (failover rapide)java🚨 state.dir sur stockage éphémère = restauration complète à chaque déploiement
Si state.dir pointe vers un disque éphémère (/tmp, conteneur sans volume), le RocksDB local repart vide à chaque redéploiement. Kafka Streams doit alors rejouer tout le changelog avant de reprendre — sur un gros état, des minutes d'indisponibilité à chaque release. En prod : monter un volume persistant pour state.dir, et provisionner num.standby.replicas ≥ 1 pour les bascules.
Par défaut, un state store n'alimente que des topics de sortie. Les Interactive Queries (IQ) permettent de l'interroger directement — par exemple exposer un endpoint REST qui renvoie le total dépensé d'un client sans repasser par Kafka.
ReadOnlyKeyValueStore<String, Long> store = streams.store( // ouvre le state store en lecture seule
StoreQueryParameters.fromNameAndType( // paramètres de la requête
"customer-summary-store", // nom du store à interroger
QueryableStoreTypes.keyValueStore() // type : store clé-valeur
)
);
Long total = store.get("C-42"); // lecture locale de la clé C-42javaLe piège : le store est partitionné entre les instances. Une clé donnée vit sur une seule instance. Si la requête REST arrive sur l'instance A mais que C-42 est sur l'instance B, une lecture locale renvoie null à tort. Il faut router.
KeyQueryMetadata metadata = streams.queryMetadataForKey( // localise l'instance qui détient la clé
"customer-summary-store", "C-42", Serdes.String().serializer()); // store, clé, serializer de clé
HostInfo host = metadata.activeHost(); // host de l'instance active pour cette clé
if (host.equals(thisInstance)) { // la clé est-elle locale à cette instance ?
return store.get("C-42"); // oui → lecture locale
} else {
return remoteCall(host, "C-42"); // non → appel RPC vers l'instance distante
}javasequenceDiagram
participant C as Client REST
participant A as Instance A
participant B as Instance B
C->>A: GET /total/C-42
A->>A: queryMetadataForKey("customer-summary-store", "C-42")
Note over A: la clé C-42 vit sur l'instance B
A->>B: appel RPC /total/C-42
B->>B: store.get("C-42")
B-->>A: 1240
A-->>C: 1240mermaidPour que le routage fonctionne, chaque instance doit déclarer son adresse via application.server (host
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "instance-a.svc:8080"); // adresse exposée dans les métadonnées (host:port)java🚨 IQ pendant un rebalance : InvalidStateStoreException
Pendant un rebalance ou une restauration, le store n'est temporairement pas disponible et streams.store(...) lève InvalidStateStoreException. Ne jamais laisser cette exception crasher l'endpoint — la gérer avec un retry court ou renvoyer un 503. Et toujours router via queryMetadataForKey : une lecture locale aveugle renvoie un faux null quand la clé est sur une autre instance.
Kafka Streams distingue trois familles d'erreurs, chacune avec son handler. Les confondre mène à des pertes silencieuses ou à des crashs évitables.
Quand un message entrant ne se désérialise pas (mauvais schéma, octet magique invalide), le DeserializationExceptionHandler décide.
props.put( // configure le handler d'erreur de désérialisation
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, // clé de config
LogAndContinueExceptionHandler.class // log + skip le message (vs LogAndFailExceptionHandler = arrêt, le défaut)
);javaQuand l'écriture vers un topic de sortie échoue (RecordTooLargeException, par exemple), le ProductionExceptionHandler décide entre CONTINUE (skip) et FAIL.
props.put( // configure le handler d'erreur de production
StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, // clé de config
MyProductionExceptionHandler.class // retourne CONTINUE (skip) ou FAIL selon l'exception
);javaUne exception levée dans la logique d'un processor (un NPE dans un mapValues, par exemple) remonte au StreamsUncaughtExceptionHandler.
streams.setUncaughtExceptionHandler(ex -> { // handler des exceptions non capturées dans un processor
// REPLACE_THREAD → remplace le thread, l'instance survit
// SHUTDOWN_CLIENT → arrête cette instance (défaut depuis 2.8)
// SHUTDOWN_APPLICATION → arrête toutes les instances du même application-id
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; // ici : survivre à l'erreur
});java🚨 Deux pièges silencieux dans la gestion d'erreur
LogAndContinue avale les poison pills sans alerte — chaque message malformé disparaît avec un simple log. En prod, préférer un handler custom qui route le message vers une DLT et émet une métrique, sinon tu perds des données sans le savoir.
Depuis Kafka 2.8, l'uncaught handler par défaut est SHUTDOWN_CLIENT — une seule exception non gérée dans un seul record arrête l'instance entière. Choisir explicitement REPLACE_THREAD si on veut survivre à une erreur ponctuelle.
TopologyTestDriver exécute une topologie sans cluster Kafka : en mémoire, de façon synchrone et déterministe. On y injecte des messages via un TestInputTopic et on lit les sorties via un TestOutputTopic.
Topology topology = buildTopology(); // la topologie à tester
Properties props = new Properties(); // config minimale pour le driver
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); // application-id factice
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); // aucun broker réel requis
// ⚠️ Désactiver le cache pour voir les résultats immédiatement
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); // cache à 0 → résultats visibles immédiatement
try (TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { // driver de test (try-with-resources)
TestInputTopic<String, OrderPlaced> orders = driver.createInputTopic( // topic d'entrée simulé
"orders", new StringSerializer(), orderSerde.serializer()); // nom + serializers clé/valeur
TestOutputTopic<String, CustomerSummary> out = driver.createOutputTopic( // topic de sortie simulé
"customer-summary-output", new StringDeserializer(), summarySerde.deserializer()); // nom + deserializers
orders.pipeInput("C-A", order(10)); // injecte une commande de 10 pour C-A
orders.pipeInput("C-A", order(25)); // injecte une commande de 25 pour C-A
assertThat(out.readKeyValue().value.getTotalAmount()).isEqualTo(35); // vérifie l'agrégat en sortie (10 + 25)
KeyValueStore<String, CustomerSummary> store = // accès direct au state store
driver.getKeyValueStore("customer-summary-store");
assertThat(store.get("C-A").getOrderCount()).isEqualTo(2); // vérifie le compteur dans le store (2 commandes)
}javaPour les topologies fenêtrées ou avec punctuator, driver.advanceWallClockTime(Duration) permet de faire avancer le temps manuellement.
🚨 Deux réglages indispensables en test
Cache activé = résultats invisibles — par défaut Kafka Streams batche les écritures dans les stores et ne les flushe qu'au commit. En test, mettre statestore.cache.max.bytes = 0 pour voir chaque résultat immédiatement.
Dans un test de join, alimenter la KTable avant le KStream — TopologyTestDriver traite les messages dans l'ordre d'injection. Si on injecte le message KStream avant que la KTable soit alimentée, le lookup ne trouve rien et le join paraît cassé alors qu'il fonctionne.
⚡ TL;DR — chaque concept en une ligne
State store ✓ Base clé-valeur embarquée (KeyValueStore / WindowStore / SessionStore) qui porte l'état des opérations stateful. ⚠ Pas une base externe — locale à la tâche, donc partitionnée entre les instances.
RocksDB vs in-memory ✓ RocksDB survit au restart local ; in-memory est plus rapide mais reconstruit depuis le changelog. ⚠ Les deux sont sauvegardés par le changelog — « in-memory » ne veut pas dire « sans durabilité ».
Changelog & restauration
✓ Chaque écriture est répliquée dans un topic changelog compacté, rejoué au démarrage pour reconstituer l'état.
⚠ state.dir éphémère = restauration complète à chaque redéploiement — monter un volume persistant.
Standby replicas ✓ Copies à chaud sur d'autres instances pour un failover quasi instantané. ⚠ Consomment du disque et de la bande passante sur chaque instance standby.
Interactive Queries
✓ Lecture directe d'un state store depuis l'extérieur (REST), sans repasser par un topic.
⚠ La clé peut vivre sur une autre instance — router via queryMetadataForKey, sinon faux null.
DeserializationExceptionHandler ✓ Décide quoi faire d'un message illisible — LogAndContinue (skip) ou LogAndFail (arrêt). ⚠ LogAndContinue perd les poison pills en silence — préférer un handler custom + DLT + métrique.
StreamsUncaughtExceptionHandler ✓ Gère les exceptions métier non capturées — REPLACE_THREAD pour survivre. ⚠ Défaut depuis 2.8 = SHUTDOWN_CLIENT : une exception tue l'instance entière.
TopologyTestDriver
✓ Teste une topologie en mémoire, sans cluster, de façon synchrone et déterministe.
⚠ Cache à désactiver (statestore.cache.max.bytes = 0) sinon les résultats sont batchés et invisibles.
🎓 À retenir
state.dir éphémère — pas le type de store. Un RocksDB sur disque non persistant repart de zéro à chaque pod recreate et rejoue tout le changelog avant de traiter.queryMetadataForKey, une lecture locale renvoie null pour une clé qui existe ailleurs. C'est un faux négatif, pas une absence de donnée.InvalidStateStoreException est normale, pas exceptionnelle — elle survient à chaque rebalance/restore. Un endpoint IQ doit la traiter (retry ou 503), jamais crasher dessus.SHUTDOWN_CLIENT arrête l'instance sur la première exception non gérée. Si on a hérité d'un comportement « ça survivait avant », c'est ce changement de défaut.