🎯 OBJECTIF
Comprendre comment :
🧠 MODÈLE MENTAL
Un sink = consumer Kafka classique habillé par Connect. Il consomme des topics, applique des SMT stateless (transformations 1-à-1), écrit dans la cible via batching, commite les offsets Kafka, et route les messages en erreur vers un Dead Letter Queue. Le framework gère tout ce qui est répétitif (offsets, retries, DLQ) — ce qu'un consumer Spring naïf reconstituerait à la main en 500 lignes avec ses bugs spécifiques.
Les SMT sont la colle qui adapte le format source au format attendu par le sink. ExtractNewRecordState (unwrap) est quasi-obligatoire après Debezium — sans lui, le sink voit une enveloppe {before, after, op, source} au lieu du message plat.
Prérequis : kafka-connect-1-fondamentaux (workers, connectors, tasks, topics internes).
@KafkaListener à la main// Consumer Spring "simple" — incomplet sur presque tout
@KafkaListener(topics = "sie.stock.stock", groupId = "reporting")
public void onMessage(ConsumerRecord<String, StockDto> record) {
jdbc.update("INSERT INTO stock_current (id, sku, quantity) VALUES (?, ?, ?) ON CONFLICT ...",
record.value().id(), record.value().sku(), record.value().quantity());
}javaCe code semble simple. Ce qu'il ne fait pas :
| Aspect | Consumer naïf | JDBC Sink |
|---|---|---|
| Batching | 1 INSERT par message = 1 round-trip. Catastrophique en débit. | Bufferise batch.size, batch SQL en 1 transaction. |
| DELETE | Tombstone (value=null) ? À tester et coder manuellement. |
delete.enabled=true → DELETE auto sur tombstone. |
| Schéma SQL | Tu codes les colonnes à la main. Évolution = recompilation. | Déduit les colonnes depuis le schéma du message. |
| DLQ | try/catch + topic DLQ + headers + skip à coder. | Config déclarative : errors.tolerance=all + DLQ topic. |
| Scaling | N instances déployées, rebuild requis. | tasks.max + PUT config. Pas de rebuild. |
| DELETE sur Debezium | Tombstone ignoré → divergence silencieuse avec la source. | delete.enabled=true → suppression synchronisée. |
Quand coder un consumer à la main : logique métier complexe (validation, enrichissement, décision conditionnelle, appel à un autre service) qui ne tient pas dans des SMT. Dans ce cas le coût est justifié.
{
"name": "reporting-stock-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "4",
"topics": "sie.stock.stock",
"connection.url": "jdbc:postgresql://pg-reporting.prod:5432/reporting",
"connection.user": "reporting_writer",
"connection.password": "${file:/secrets/db-creds:reporting_pwd}",
"table.name.format": "stock_current",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"delete.enabled": "true",
"batch.size": "500",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq.reporting-stock-sink",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}json| Champ | Valeur recommandée | Rôle |
|---|---|---|
tasks.max |
≤ nb partitions topic | min(tasks.max, nb_partitions) tasks créées. |
auto.create |
false |
En prod, Flyway/Liquibase gère le DDL. Laisser Connect créer des tables = hors contrôle. |
auto.evolve |
false |
Même raison — pas d'ALTER TABLE implicite. |
insert.mode |
upsert |
INSERT ... ON CONFLICT (id) DO UPDATE SET ... — rend le sink idempotent. insert brut plante sur duplicate key. |
pk.mode |
record_key |
PK de la table cible = clé Kafka. record_value si PK dans la value. kafka (topic+partition+offset) = éviter. |
delete.enabled |
true |
Tombstone → DELETE FROM table WHERE id=?. Indispensable si source = Debezium, sinon divergence. |
batch.size |
500 |
N messages bufferisés, batch SQL en 1 transaction. Débit ↑, mémoire ↑. |
🚨 insert.mode=insert avec Debezium = panne garantie
Debezium peut rejouer des events (at-least-once). insert brut sur un ID déjà présent → DuplicateKeyException → task FAILED → flux bloqué. Toujours upsert en sink Debezium.
sequenceDiagram
participant K as Kafka
participant T as Sink task
participant SMT as SMT chain
participant PG as Postgres reporting
participant DLQ as dlq.reporting-stock-sink
K->>T: poll() → batch de records
T->>SMT: transform chain (unwrap, mask...)
SMT->>T: SinkRecord transformé
alt Message valide INSERT/UPDATE
T->>T: bufferise (batch.size)
T->>PG: INSERT ... ON CONFLICT DO UPDATE (batch)
PG-->>T: OK
T->>K: commit offset
else Tombstone + delete.enabled=true
T->>PG: DELETE FROM stock_current WHERE id = ?
T->>K: commit offset
else Erreur (conversion, contrainte DB)
T->>DLQ: publie + headers contexte
T->>K: commit offset (skip)
endmermaidLe message Kafka est un tableau de bytes. Le converter traduit entre bytes (Kafka) et struct interne (Connect).
Formats disponibles :
JsonConverter — lisible, pas de contrat, 3-5× plus lourd qu'Avro. OK pour dev ou jetable.AvroConverter + Schema Registry — binaire compact, contrat fort, évolution contrôlée. Défaut recommandé en prod. Voir kafka-schema-registry pour le détail du wire format, des subjects et des règles de compatibilité.ProtobufConverter — alternative Avro, meilleur support multi-langage.Wire format Avro : [magic byte 0x00][schema ID 4 bytes][payload binaire]. Le schéma n'est pas dans le message — le consumer lit l'ID, appelle le Registry, cache le schéma, décode.
🚨 Converter mismatch = task FAILED
Sink en Avro qui lit un topic JSON → SerializationException sur chaque message. Règle : un topic = un format, documenté et verrouillé. Source et sink doivent avoir le même value.converter.
Transformations stateless : un message en entrée → un message en sortie, sans mémoire. S'appliquent avant publication (source) ou après consommation (sink).
ExtractNewRecordState)Message Debezium brut (sans unwrap) :
{
"before": null,
"after": { "id": 42, "sku": "ABC-123", "quantity": 50 },
"op": "c",
"source": { "connector": "postgresql", "lsn": 24677340, "table": "stock" }
}jsonAprès unwrap :
{ "id": 42, "sku": "ABC-123", "quantity": 50 }jsonSur DELETE avec delete.handling.mode=rewrite, deux messages produits :
{ "id": 42, "__deleted": "true" }{"id": 42}, value=null → déclenche le DELETE dans JDBC Sink"transforms": "mask",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email,phone",
"transforms.mask.replacement": "***"
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.extractKey.fields": "order_id"
"transforms": "unwrap,mask,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "ssn",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "sie\\.client\\.(.*)",
"transforms.route.replacement": "client_masked.$1"
L'ordre dans transforms = ordre d'exécution. Chaque SMT consomme la sortie de la précédente.
🚨 Limite dure des SMT
Stateless, un message à la fois. Pas d'accès aux messages précédents, pas de lookup externe, pas de join. Pour enrichir avec un autre topic, un cache Redis ou une DB → Kafka Streams ou Flink.
Message mal formé, schema incompatible, contrainte DB violée → sans protection, task FAILED, flux bloqué jusqu'à intervention humaine.
flowchart LR
K["Topic source"] --> T["Sink task"]
T -->|OK| PG[(Postgres)]
T -->|erreur| DLQ["dlq.reporting-stock-sink"]
DLQ -.->|alerte Grafana| OPS["Équipe ops"]
DLQ -.->|rejeu après fix| TmermaidHeaders ajoutés au message DLQ :
__connect.errors.topic — topic source__connect.errors.partition / __connect.errors.offset — position exacte__connect.errors.exception.class.name — type d'exception__connect.errors.exception.message + stacktrace — détail complet🚨 errors.tolerance=all sans DLQ = perte silencieuse
Les messages en erreur sont ignorés sans trace. Toujours coupler avec un topic DLQ et monitorer sa taille. Un DLQ qui grossit = alerte, pas une feature. Pas de DLQ côté source — un source en erreur doit FAIL (impossible de skipper sans casser l'ordre).
⚡ TL;DR — chaque concept en une ligne
JDBC Sink
✓ Consumer Kafka + batching + upsert + DELETE sur tombstone + DLQ + offsets — déclaratif en JSON.
⚠ insert.mode=insert avec Debezium = DuplicateKeyException garantie. Toujours upsert.
Converters + Schema Registry
✓ Avro + Schema Registry = messages compacts avec contrat fort et évolution contrôlée.
⚠ Source Avro + sink JSON = SerializationException sur chaque message. Un topic = un format verrouillé.
SMT ✓ Transformations stateless en chaîne — unwrap Debezium, masking PII, routing, extraction de clé. ⚠ Stateless = pas de join, pas de lookup, pas d'enrichissement cross-message. Pour ça → Streams ou Flink.
DLQ
✓ Capture les messages empoisonnés sans bloquer le flux — headers de contexte pour débugger après coup.
⚠ errors.tolerance=all sans DLQ = perte silencieuse. DLQ qui grossit = alerte, pas une feature.
🎓 À retenir
delete.enabled=true obligatoire quand la source est Debezium — sans ça, les DELETE de la table source ne se propagent pas et la table cible diverge silencieusement.connect-offsets) et sink (consumer group dans __consumer_offsets) sont deux systèmes distincts — un reset d'un source connector ne touche pas les offsets du sink et vice versa.auto.create=false et auto.evolve=false en prod — laisser Connect créer ou modifier des tables en production court-circuite le processus de migration géré (Flyway, Liquibase) et peut provoquer des changements de schéma non trackés.