🎯 OBJECTIF
Comprendre comment :
🧠 MODÈLE MENTAL
Kafka stocke des bytes. Sans contrat partagé, un producer peut changer le format d'un message et casser silencieusement tous ses consumers — le message arrive, se désérialise partiellement ou lève une exception non tracée. Le Schema Registry est ce contrat : chaque message porte un identifiant de schéma (4 octets), le consumer résout le schéma correspondant, et la compatibilité est vérifiée au moment du publish, pas à la consommation. Le bug est détecté à la source, pas en production chez le consumer.
Sans contrat, un producer qui renomme un champ casse tous les consumers existants de manière silencieuse ou bruyante selon le sérialiseur utilisé.
Producer v1 publie : { "stockId": "SKU-A", "quantity": 10 }
Producer v2 publie : { "sku": "SKU-A", "qty": 10 } ← renommage
Consumer v1 lit le message v2 → "stockId" absent → null ou exception
Avec JSON pur, aucune validation au publish. Le consumer découvre le problème en runtime, potentiellement des heures après.
sequenceDiagram
participant P as Producer
participant SR as Schema Registry
participant K as Kafka Broker
participant C as Consumer
P->>SR: POST /subjects/stock-moved-value/versions
SR-->>P: schema_id = 42
P->>K: publish [0x00][42][avro payload]
C->>K: poll()
K-->>C: [0x00][42][avro payload]
C->>SR: GET /schemas/ids/42
SR-->>C: schema
Note over C: cache local schema_id → schema
C->>C: désérialise payloadmermaidLe producer enregistre le schéma au démarrage (ou le retrouve s'il existe déjà) et reçoit un schema_id entier. Chaque message est préfixé par un magic byte (0x00) + le schema_id sur 4 octets big-endian. Le consumer lit ces 5 octets, résout le schéma via le Registry (avec cache local), puis désérialise le payload.
[ 0x00 ][ schema_id : 4 octets big-endian ][ payload Avro binaire ]
// Ce que le SerDe fait sous le capot (simplifié)
byte[] magic = {0x00};
byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(schemaId).array();
byte[] avroPayload = avroEncoder.encode(record);
byte[] message = concat(magic, schemaIdBytes, avroPayload);java🚨 Le payload Avro seul n'est pas auto-descriptif
Sans le Schema Registry et les 5 octets de préfixe, un payload Avro binaire est illisible — il ne contient pas le schéma. Contrairement à JSON ou Protobuf avec Any, il faut toujours le schéma writer pour désérialiser. C'est pour ça que le schema_id dans le message est non négociable.
Un subject est le nom sous lequel un schéma est enregistré dans le Registry. Par convention Confluent :
| Convention | Subject | Usage |
|---|---|---|
TopicNameStrategy (défaut) |
{topic}-value / {topic}-key |
Un schéma par topic — le plus courant |
RecordNameStrategy |
{namespace}.{RecordName} |
Un schéma par type Avro — permet de réutiliser le même type sur plusieurs topics |
TopicRecordNameStrategy |
{topic}-{namespace}.{RecordName} |
Combinaison — rarement utile |
# Producer — changer la stratégie
value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategyproperties🔑 Recommandation
Garder TopicNameStrategy (défaut) sauf besoin explicite de partager un type Avro entre topics. RecordNameStrategy complexifie la gestion des compatibilités sans gain clair dans la majorité des cas.
| Mode | Règle | Ce qui est autorisé |
|---|---|---|
BACKWARD (défaut) |
Le nouveau schéma peut lire les messages écrits avec l'ancien | Ajouter un champ avec default, supprimer un champ optionnel |
FORWARD |
L'ancien schéma peut lire les messages écrits avec le nouveau | Supprimer un champ avec default, ajouter un champ optionnel |
FULL |
BACKWARD + FORWARD | Seulement : ajouter/supprimer des champs avec default |
BACKWARD_TRANSITIVE |
BACKWARD contre toutes les versions précédentes | Idem BACKWARD mais vérifié sur tout l'historique |
FULL_TRANSITIVE |
FULL contre toutes les versions précédentes | Recommandé en prod stabilisée |
NONE |
Pas de vérification | Migration forcée, à éviter |
🚨 BACKWARD ne protège pas le consumer v1
Avec BACKWARD, un consumer toujours en v1 qui reçoit un message v2 peut casser. Le mode ne garantit que la lecture dans un sens. Pour protéger les deux sens (rolling deploy, rollback), utiliser FULL ou FULL_TRANSITIVE.
{
"type": "record",
"namespace": "com.example.wms.events",
"name": "StockMoved",
"fields": [
{ "name": "stockId", "type": "string" },
{ "name": "quantity", "type": "int" },
{ "name": "warehouse", "type": "string" },
{ "name": "movedAt", "type": "long", "logicalType": "timestamp-millis" },
{
"name": "reason",
"type": ["null", "string"],
"default": null
}
]
}jsonRègles pour rester compatible BACKWARD :
"default" (sinon l'ancien reader ne sait pas quoi mettre)null en première position → ["null", "string"] pour que null soit la valeur default implicite<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>xml<!-- Plugin pour générer les classes Java depuis les .avsc -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<goals><goal>schema</goal></goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
</configuration>
</execution>
</executions>
</plugin>xmlspring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081
auto.register.schemas: true # dev uniquement
use.latest.version: false
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true # utilise les classes générées, pas GenericRecordyaml🚨 auto.register.schemas: true est interdit en production
En dev, c'est pratique — le schéma est enregistré automatiquement au premier publish. En prod, ça permet à n'importe quel producer de modifier le schéma sans validation. Toujours false en prod, schémas enregistrés via CI/CD uniquement.
@Service
public class StockEventProducer {
private final KafkaTemplate<String, StockMoved> kafkaTemplate;
public StockEventProducer(KafkaTemplate<String, StockMoved> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publish(StockMoved event) {
kafkaTemplate.send("stock-moved", event.getStockId().toString(), event);
}
}java@Component
public class StockEventConsumer {
@KafkaListener(topics = "stock-moved", groupId = "stock-service")
public void consume(StockMoved event) {
// event est déjà désérialisé en classe Java générée par Avro
process(event);
}
}javaL'ordre de déploiement est non négociable : consumers d'abord, producers ensuite. Un consumer v2 sait gérer un message v1 (champ absent → valeur default). L'inverse n'est pas vrai : un consumer v1 qui reçoit un message v2 avec un champ inconnu peut exploser selon le sérialiseur.
sequenceDiagram
participant CI as CI/CD
participant SR as Schema Registry
participant Cv2 as Consumer v2
participant Pv2 as Producer v2
CI->>SR: POST schema v2 (champ + default)
SR-->>CI: compatible BACKWARD
Note over Cv2: deploy consumers v2 en premier
Cv2->>SR: résout schema_id v1 et v2
Note over Pv2: deploy producers v2 ensuite
Pv2->>SR: enregistre schema_id v2
Pv2->>Pv2: publie messages avec nouveau champmermaidUn renommage direct est un breaking change dans tous les modes sauf NONE. La seule voie safe est de passer par un champ parallèle : ajouter le nouveau nom sans supprimer l'ancien, migrer les consumers, puis supprimer l'ancien dans une version ultérieure. Ne jamais compter sur un basculement atomique.
flowchart LR
A["v1 — champ stockId"] -->|"étape 1 : ajoute sku avec default"| B["v2 — stockId + sku"]
B -->|"étape 2 : consumers migrent vers sku"| C["v3 — supprime stockId"]mermaid# Vérifier la compatibilité avant merge
curl -X POST \
http://schema-registry:8081/compatibility/subjects/stock-moved-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d @src/main/avro/StockMoved.avsc
# Enregistrer le schéma (après vérification)
curl -X POST \
http://schema-registry:8081/subjects/stock-moved-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d @src/main/avro/StockMoved.avscbashLe consumer ne contacte pas le Schema Registry à chaque message — il maintient un cache local schema_id → schéma. En prod avec des millions de messages, le Registry ne voit que les misses (nouvelles versions).
La capacité par défaut est 1000 schémas, définie par la constante DEFAULT_MAX_SCHEMAS_PER_SUBJECT dans AbstractKafkaSchemaSerDe.java (source Confluent). Cette valeur couvre largement la majorité des usages — ne l'ajuster qu'en cas de prolifération massive de versions de schémas.
# Valeur par défaut : 1000 (constante DEFAULT_MAX_SCHEMAS_PER_SUBJECT)
schema.registry.cache.capacity=1000properties🔑 Implication opérationnelle
Si le Schema Registry est indisponible, les producers et consumers déjà démarrés continuent de fonctionner grâce au cache. Le problème n'arrive qu'au démarrage ou lors de l'apparition d'une nouvelle version. HA recommandée, mais pas avec les mêmes exigences que Kafka lui-même.
Unknown magic byteLe premier octet du message n'est pas 0x00. Le KafkaAvroDeserializer ne reconnaît pas le format Confluent wire format et refuse de désérialiser.
Scénarios typiques :
StringSerializer ou JsonSerializer) sur un topic dont le consumer attend de l'AvroKafkaAvroSerializer# Inspecter les premiers octets d'un message pour diagnostic
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic stock-moved \
--from-beginning \
--max-messages 1 \
--property print.key=truebashIdentifier la source du message mal formé et corriger le producer. Si le topic contient un mix JSON/Avro (migration en cours), configurer un ErrorHandler pour absorber les messages non-Avro sans bloquer la partition :
@Bean
public DefaultErrorHandler errorHandler() {
// log et skip — pas de retry sur erreur de désérialisation
return new DefaultErrorHandler((record, ex) -> {
log.error("Désérialisation impossible offset={} partition={} : {}",
record.offset(), record.partition(), ex.getMessage());
}, new FixedBackOff(0L, 0L));
}javaSerializationException: Error retrieving Avro schemaLe schema_id présent dans le message n'existe pas dans le Schema Registry interrogé par le consumer.
Scénarios typiques :
# Vérifier que le schema_id existe dans le Registry
curl http://schema-registry:8081/schemas/ids/42bashVérifier que schema.registry.url pointe sur le bon Registry. Ne jamais supprimer un schéma du Registry tant que des messages l'utilisant sont encore dans la rétention Kafka.
ReaderNotWriterCompatibleExceptionLe schéma dans le message (writer schema) et le schéma de la classe Java générée (reader schema) sont incompatibles — champ obligatoire absent, type changé, namespace différent.
Scénarios typiques :
.avsc modifié sans respecter les règles de compatibilitéRespecter l'ordre consumers d'abord, vérifier la compatibilité via le Registry avant tout déploiement. Voir section 8️⃣.
| Erreur | Cause racine | Fix rapide |
|---|---|---|
Unknown magic byte |
Message non encodé Avro sur le topic | Corriger le producer ou ajouter un ErrorHandler |
Error retrieving Avro schema |
schema_id introuvable dans le Registry | Vérifier l'URL du Registry, ne pas supprimer les schémas actifs |
ReaderNotWriterCompatibleException |
Incompatibilité reader/writer schema | Respecter l'ordre de déploiement et les règles de compatibilité |
⚡ TL;DR — chaque concept en une ligne
Wire format ✓ Magic byte (0x00) + schema_id (4 octets) + payload Avro binaire — le consumer résout le schéma depuis le Registry. ⚠ Un payload Avro seul sans schema_id est illisible — le schéma writer est toujours requis.
Subject
✓ Nom d'enregistrement du schéma dans le Registry — par défaut {topic}-value.
⚠ RecordNameStrategy permet de partager un type entre topics mais complexifie la gestion — garder TopicNameStrategy par défaut.
Compatibilité BACKWARD ✓ Le nouveau schéma peut lire les messages anciens — ajouter un champ avec default est safe. ⚠ Ne protège pas les consumers en version précédente — utiliser FULL pour un rolling deploy bidirectionnel.
auto.register.schemas ✓ Pratique en dev — enregistre automatiquement au premier publish. ⚠ Interdit en prod — n'importe quel producer peut casser le contrat. CI/CD only.
Cache consumer ✓ Le consumer cache les schémas localement (défaut : 1000 entrées) — le Registry n'est pas contacté à chaque message. ⚠ Indisponibilité du Registry bloque le démarrage et les nouvelles versions, pas le runtime sur schémas connus.
Migration sans downtime ✓ Toujours déployer les consumers avant les producers lors d'un ajout de champ. ⚠ Renommer un champ est un breaking change — passer par un champ parallèle + dépréciation en 3 étapes.
Unknown magic byte ✓ Erreur claire et immédiate — le consumer refuse le message dès le premier octet. ⚠ Souvent silencieux côté producer — un script ou outil de test peut polluer un topic Avro sans warning visible.
🎓 À retenir
auto.register.schemas: true en prod = n'importe qui peut casser le contrat — toujours gérer les schémas via CI/CD avec vérification de compatibilité avant enregistrement.Unknown magic byte = topic pollué — un seul producer sans KafkaAvroSerializer suffit à casser tous les consumers Avro du topic. Toujours vérifier le sérialiseur côté producer en premier.