🎯 OBJECTIF
Comprendre comment :
KafkaTemplate pour publier des messages avec garanties@KafkaListener robustes (concurrency, AckMode, batch, filter)AckMode selon le besoin🧠 MODÈLE MENTAL
Spring Kafka est une couche d'abstraction sur le client Kafka officiel. Elle ne cache pas la complexité — elle la rend configurable via des beans Spring plutôt que via des Properties brutes passées directement au client.
L'architecture repose sur trois beans. Deux encapsulent la configuration bas niveau — un pour le producer (sérialisation, idempotence, batching), un pour le consumer (désérialisation, group ID, offset). Le troisième orchestre les consumers : il crée les threads, gère la poll loop, et branche l'error handler. C'est lui qui fait le lien entre la configuration et les méthodes @KafkaListener.
Le point le plus important : une exception dans un @KafkaListener ne crashe pas le consumer. Elle passe dans l'ErrorHandler. Sans error handler configuré, le message est retryé indéfiniment et la partition est bloquée jusqu'à intervention humaine. La gestion des erreurs est traitée dans kafka-spring-error-handling.
Prérequis : kafka-core-model (partitions, offsets, consumer groups), kafka-producer-consumer-tuning (poll loop, commit), kafka-schema-registry (sérialisation Avro).
Intégrer Kafka dans un service Spring Boot revient à configurer deux flux : publication vers Kafka et consommation depuis Kafka.
flowchart LR
subgraph Publication
A["Configuration producer"] --> B["KafkaTemplate"]
end
subgraph Consommation
C["Configuration consumer"] --> D["ContainerFactory"] --> E["@KafkaListener"]
end
B -->|messages| K[("Kafka")]
K -->|messages| EmermaidCôté publication
| Ce qu'on configure | Pourquoi |
|---|---|
| Sérialisation | Convertir un objet Java en bytes pour l'envoi réseau |
| Comportement d'envoi | Batching, idempotence, compression |
| Point d'entrée | KafkaTemplate — l'API utilisée dans le code pour publier |
Côté consommation
| Ce qu'on configure | Pourquoi |
|---|---|
| Désérialisation | Reconvertir les bytes reçus en objet Java |
| Appartenance au groupe | Consumer group, offset de départ au premier démarrage |
| Orchestration des threads | Combien de consumers en parallèle, quand commiter l'offset |
| Gestion des erreurs | Que faire si le traitement échoue — retries, DLT |
Spring Boot auto-configure la plupart de ces points à partir de spring.kafka.* dans application.yml. Ce qui nécessite presque toujours une configuration explicite en production : l'orchestration des threads et la gestion des erreurs — les valeurs par défaut ne conviennent pas.
ProducerFactory est responsable de créer les instances de KafkaProducer. Par défaut, elle crée un seul producer partagé entre tous les appels, ce qui est correct dans la majorité des cas. Exception : les producers transactionnels ont besoin d'une instance par thread pour que les transactions restent isolées.
Spring Boot auto-configure la ProducerFactory à partir de spring.kafka.producer.*. La déclarer explicitement devient nécessaire quand plusieurs profils de publication coexistent dans le même service — par exemple un producer Avro et un producer JSON, qui nécessitent des serializers différents.
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081
# L'idempotence protège contre les doublons réseau et le réordonnancement
# des messages en cas de retry. Elle active implicitement acks=all et
# max.in.flight=5 — inutile de les redéclarer.
enable.idempotence: true
# Sans linger.ms, chaque message part seul dans son propre batch.
# 5ms d'attente permet d'accumuler plusieurs messages et d'envoyer
# un batch compressé — débit nettement meilleur.
linger.ms: 5
compression.type: lz4yamlQuand on a besoin de plusieurs ProducerFactory, on les déclare explicitement en Java :
@Bean
public ProducerFactory<String, StockMoved> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(props);
}javaConsumerFactory crée les instances de KafkaConsumer. Contrairement à la ProducerFactory, elle crée un consumer par thread de listener — pas un singleton partagé. KafkaConsumer n'est pas thread-safe, ce qui rend ce comportement obligatoire.
spring:
kafka:
consumer:
group-id: stock-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
# "earliest" : au premier démarrage sans offset commité, lire depuis
# le début du topic pour ne rater aucun message historique.
# "latest" : ignorer les messages produits avant le démarrage.
auto-offset-reset: earliest
# L'auto-commit Kafka doit être désactivé quand Spring gère le commit.
# Laisser les deux actifs en même temps provoque une divergence des offsets.
enable-auto-commit: false
properties:
schema.registry.url: http://localhost:8081
# Désérialiser vers les classes Java générées (StockMoved, etc.)
# plutôt que GenericRecord. Sans ça, chaque accès à un champ
# nécessite un cast manuel.
specific.avro.reader: trueyamlVersion Java explicite pour les cas multi-format :
@Bean
public ConsumerFactory<String, StockMoved> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put("schema.registry.url", "http://localhost:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}javaLe ConcurrentKafkaListenerContainerFactory est le bean le plus important des trois. Il prend la ConsumerFactory et crée N containers — chaque container est un thread indépendant avec son propre KafkaConsumer. C'est lui qui gère la poll loop, les heartbeats, le commit d'offset, et qui branche l'ErrorHandler sur les listeners.
Spring Boot auto-configure ce bean, mais ses valeurs par défaut posent deux problèmes en production :
| Problème | Conséquence |
|---|---|
Pas d'ErrorHandler configuré |
Une exception dans un listener provoque un retry infini |
AckMode par défaut (BATCH) |
L'offset est commité même si le traitement a échoué |
On le déclare donc toujours explicitement.
spring:
kafka:
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 3yamlEn Java, pour brancher l'ErrorHandler (déclaré dans kafka-spring-error-handling) :
@Bean
public ConcurrentKafkaListenerContainerFactory<String, StockMoved> kafkaListenerContainerFactory(
ConsumerFactory<String, StockMoved> consumerFactory,
DefaultErrorHandler errorHandler
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, StockMoved>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// Durée maximale du poll() avant de rendre la main. Trop court = CPU
// gaspillé sur des polls vides. Trop long = détection lente des rebalances.
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}javaQuand un service consomme des topics dans des formats différents (Avro pour les événements métier, JSON pour des commandes ou de la configuration), chaque format a besoin de sa propre ConsumerFactory et de son propre ContainerFactory. Spring utilise par défaut celle nommée kafkaListenerContainerFactory. Pour les autres, le nom doit être précisé dans le @KafkaListener.
@Bean("jsonContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> jsonContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(jsonConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
// Sans containerFactory = "jsonContainerFactory", Spring utilise la factory Avro
// par défaut et lève une DeserializationException dès le premier message JSON.
@KafkaListener(topics = "audit-log", containerFactory = "jsonContainerFactory")
public void consumeJson(String payload) { ... }javaKafkaTemplate est le point d'entrée pour publier. Il wrappe le KafkaProducer et expose une API Spring avec support des futures et des transactions. Son comportement le plus important à comprendre : send() est asynchrone par défaut.
Quand on appelle kafkaTemplate.send(...), le message entre dans un buffer interne du producer. Kafka attend que le buffer atteigne batch.size ou que linger.ms soit écoulé, puis envoie le batch au broker. La méthode retourne immédiatement un CompletableFuture sans attendre la confirmation du broker.
Ce comportement est intentionnel : le batching améliore le débit et rend la compression efficace. Mais sans callback, une erreur de publication (broker down, timeout réseau, topic inexistant) ne remonte jamais dans le code appelant. Le service continue à fonctionner normalement pendant que des messages sont perdus en silence.
@Service
public class StockEventProducer {
private final KafkaTemplate<String, StockMoved> kafkaTemplate;
// Le callback est appelé quand le broker confirme ou rejette le message.
// C'est ici qu'on décide quoi faire en cas d'échec : retry, alerte, exception.
public void publish(StockMoved event) {
kafkaTemplate.send("stock-moved", event.getStockId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Échec publication stockId={} : {}",
event.getStockId(), ex.getMessage());
} else {
log.debug("Publié offset={} partition={}",
result.getRecordMetadata().offset(),
result.getRecordMetadata().partition());
}
});
}
// .get() bloque le thread jusqu'à l'ack du broker — utile quand on a besoin
// de la garantie que le message est dans Kafka avant de continuer.
// À éviter dans les chemins critiques en débit.
public void publishSync(StockMoved event) throws ExecutionException, InterruptedException {
kafkaTemplate.send("stock-moved", event.getStockId(), event).get();
}
// flush() force l'envoi de tous les messages en attente dans le buffer.
// À appeler lors d'un arrêt propre pour ne rien perdre.
// Ne jamais appeler en boucle — ça détruit l'effet du batching.
@PreDestroy
public void onShutdown() {
kafkaTemplate.flush();
}
}java🚨 send() sans callback = erreurs de publication silencieuses
C'est le piège le plus fréquent avec KafkaTemplate. Localement tout fonctionne, et en production un incident réseau provoque des milliers de messages perdus sans aucune trace dans les logs. Toujours brancher un whenComplete ou utiliser .get() sur les topics métier critiques.
@KafkaListener déclare une méthode comme handler pour un ou plusieurs topics. Spring crée en arrière-plan un container qui gère tout : poll loop, heartbeat, désérialisation, dispatch vers la méthode, commit d'offset. Le développeur n'écrit que la logique métier.
Ce qui se passe en cas d'exception dans la méthode est important à comprendre : Spring intercepte l'exception avant qu'elle remonte à la poll loop et la passe à l'ErrorHandler. Ce n'est pas un crash du consumer — c'est un appel contrôlé à l'error handler. Sans error handler configuré, le comportement par défaut est un retry infini sur le même message, qui bloque la partition.
@Component
public class StockEventConsumer {
@KafkaListener(
topics = "stock-moved",
groupId = "stock-service",
concurrency = "3"
)
public void consume(
@Payload StockMoved event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack
) {
process(event);
// L'offset est commité ici, après que le traitement est terminé.
// Si une exception est levée avant cette ligne, l'offset reste non
// commité et le message sera retraité au redémarrage.
ack.acknowledge();
}
}javaL'offset est la position d'un consumer dans un topic. Commiter un offset signifie dire à Kafka "j'ai traité jusqu'ici — si je redémarre, reprends depuis là". L'AckMode contrôle quand ce commit se fait.
Commiter trop tôt expose à la perte de messages : Kafka considère qu'ils sont traités alors que le traitement n'est pas terminé. Commiter trop tard expose au retraitement inutile. Il existe deux familles de modes.
Les modes automatiques délèguent le timing à Spring. BATCH (le défaut) commite après chaque batch poll(), même si une exception a été levée en cours de traitement — les messages non traités sont considérés comme consommés, c'est une perte silencieuse.
Les modes manuels donnent le contrôle au développeur. MANUAL_IMMEDIATE commite uniquement quand ack.acknowledge() est appelé explicitement. Une exception avant cette ligne laisse l'offset non commité et le message est retraité au redémarrage.
| AckMode | Type | Comportement |
|---|---|---|
RECORD |
Auto | Commit après chaque message |
BATCH |
Auto | Commit après chaque batch poll() — défaut |
TIME |
Auto | Commit toutes les N millisecondes |
COUNT |
Auto | Commit toutes les N acknowledgments |
COUNT_TIME |
Auto | Commit au premier des deux seuils |
MANUAL |
Manuel | acknowledge() marque le commit, effectif au prochain poll() |
MANUAL_IMMEDIATE |
Manuel | acknowledge() commit immédiatement — recommandé en prod |
🚨 enable-auto-commit: false est obligatoire avec MANUAL_IMMEDIATE
Si enable-auto-commit: true est encore actif côté Kafka, le client Kafka commite l'offset de son côté indépendamment de ack.acknowledge(). Les deux systèmes entrent en conflit et le résultat dépend de la course entre les deux timings. Toujours désactiver l'auto-commit Kafka quand Spring gère le commit.
concurrency = 3 crée 3 containers indépendants, chacun avec son propre thread et son propre KafkaConsumer dans le même group-id. Kafka distribue les partitions du topic entre ces consumers lors du rebalance — la redistribution automatique des partitions quand un consumer rejoint ou quitte le groupe.
La règle fondamentale est qu'une partition ne peut être assignée qu'à un seul consumer à la fois dans un même group. Le nombre de consumers actifs est donc min(concurrency, nb_partitions). Si le topic a 3 partitions et concurrency = 5, les 2 threads supplémentaires démarrent mais restent oisifs — ils consomment de la mémoire sans traiter le moindre message.
flowchart LR
subgraph "Topic (3 partitions)"
P0["Partition 0"]
P1["Partition 1"]
P2["Partition 2"]
end
subgraph "Consumer Group (concurrency=3)"
C0["Thread 0"]
C1["Thread 1"]
C2["Thread 2"]
end
P0 --> C0
P1 --> C1
P2 --> C2mermaidIl est possible d'assigner des partitions fixes à un listener, en dehors du mécanisme de rebalance. C'est utile pour des consumers de monitoring ou de replay ciblé qui ne doivent pas participer à la distribution automatique des partitions du groupe principal.
@KafkaListener(
topicPartitions = @TopicPartition(
topic = "stock-moved",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "1", initialOffset = "0")
}
)
)
public void consumeSpecificPartitions(StockMoved event) { ... }javaPar défaut, Spring invoque le listener une fois par message. En mode batch, il passe directement la liste complète des messages retournés par le poll() Kafka. Ce n'est pas un batch artificiel reconstitué par Spring — c'est le vrai batch tel que Kafka l'a retourné, dont la taille dépend de max.poll.records.
L'intérêt principal est pour les écritures en base : un seul saveAll() pour N messages dans une seule transaction DB, au lieu de N save() séparés. Le gain en débit peut être significatif sur des topics à fort volume.
@KafkaListener(topics = "stock-moved", batch = "true")
public void consumeBatch(List<StockMoved> events, Acknowledgment ack) {
stockRepository.saveAll(toEntities(events));
ack.acknowledge();
}javaLe revers est que si saveAll() échoue sur le message 50 d'un batch de 100, les 100 messages sont retraités au redémarrage — y compris les 49 premiers déjà persistés. Le traitement doit être idempotent, c'est-à-dire que traiter deux fois le même message ne crée pas de doublon. Si ce n'est pas possible, il faut utiliser un listener message par message.
Le filtre de messages s'interpose entre la désérialisation et l'invocation du listener. Un message filtré est acquitté silencieusement — l'offset avance mais le listener n'est jamais appelé. C'est plus propre qu'un if dans le listener parce que ça sépare le routage de la logique métier, et ça évite de polluer les métriques du listener avec des messages qui n'auraient pas dû arriver.
Une nuance à retenir : le filtre s'applique après la désérialisation. Si le message est malformé et que la désérialisation échoue, l'exception est levée avant que le filtre soit évalué — elle va directement dans l'ErrorHandler.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, StockMoved> kafkaListenerContainerFactory(
ConsumerFactory<String, StockMoved> consumerFactory
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, StockMoved>();
factory.setConsumerFactory(consumerFactory);
// Retourner true filtre le message (offset commité, listener non appelé).
// Retourner false laisse passer le message vers le listener.
factory.setRecordFilterStrategy(record -> record.value().getQuantity() <= 0);
return factory;
}java⚡ TL;DR — chaque concept en une ligne
ProducerFactory
✓ Crée les producers — singleton partagé par défaut, à déclarer explicitement si plusieurs formats coexistent.
⚠ Producer transactionnel = ProducerFactory séparée avec transactionIdPrefix.
ConsumerFactory
✓ Crée les consumers — un par thread, car KafkaConsumer n'est pas thread-safe.
⚠ Mixer Avro et JSON dans la même factory provoque une DeserializationException sur le premier message du mauvais format.
ContainerFactory
✓ Orchestre les threads, la poll loop et l'error handler — à déclarer explicitement pour brancher l'ErrorHandler.
⚠ L'auto-configuration Spring Boot ne branche pas d'error handler — comportement par défaut inutilisable en prod.
KafkaTemplate.send()
✓ Asynchrone — le message entre dans le buffer, pas encore dans Kafka. Brancher un whenComplete pour détecter les erreurs.
⚠ Sans callback ni .get(), les erreurs de publication sont silencieuses.
AckMode MANUAL_IMMEDIATE
✓ L'offset n'est commité que quand ack.acknowledge() est appelé — seule garantie qu'un crash ne perd pas de message.
⚠ Modes automatiques : l'offset peut être commité avant la fin du traitement en cas d'exception.
Concurrency
✓ N threads consumers dans le même group — Kafka distribue les partitions automatiquement.
⚠ concurrency > nb_partitions = threads oisifs. Toujours aligner les deux.
Batch listener
✓ Reçoit le batch poll() complet — idéal pour les saveAll() en base.
⚠ Exception sur un message = tout le batch retraité. Idempotence obligatoire.
Filtre de messages
✓ Filtre avant le listener, après désérialisation — acquittement silencieux.
⚠ Ne filtre pas les erreurs de désérialisation — c'est le rôle de l'ErrorHandler.
🎓 À retenir
ContainerFactory est le bean central — c'est lui qui branche l'error handler, contrôle la concurrency et l'AckMode. L'auto-configuration Spring Boot ne suffit pas en prod.send() asynchrone sans callback = trou dans la fiabilité — les erreurs de publication sont rares et c'est précisément pour ça qu'on les oublie. Le callback coûte quelques lignes et évite des incidents silencieux.MANUAL_IMMEDIATE + enable-auto-commit: false — les deux vont ensemble. L'un sans l'autre crée soit des conflits, soit une perte de contrôle.concurrency avec le nombre de partitions — provisionner les partitions en cohérence avec le parallélisme attendu dès le départ.