Skip to content

Kafka Connect and Debezium

In Kafka Connect, a connector is created from a configuration JSON sent to the Connect REST API.

In Pyahu CLI v1, you don’t need to keep that JSON as a file. You declare the connector in pyahu.yaml; the CLI generates or receives the JSON, writes it to a Kubernetes Secret, and creates a Job that applies the configuration to Kafka Connect.

The CLI’s declarative surface covers two cases:

  • Debezium source for PostgreSQL, with defaults generated by Pyahu.
  • Custom source or sink, using free-form config in the Kafka Connect format.

For Postgres CDC, use type: source and kind: debezium.postgres:

services:
kafkaConnect:
enabled: true
connectors:
- name: app-cdc
type: source
kind: debezium.postgres
database: app
topicPrefix: app-cdc
snapshotMode: initial
tables:
include:
- public.orders
- public.customers
config:
decimal.handling.mode: string
tombstones.on.delete: "false"

Fields like slot, publication, topicPrefix, database, and snapshotMode have defaults. In practice, the minimal example can be:

services:
kafkaConnect:
enabled: true
connectors:
- name: app-cdc
type: source
tables:
include:
- public.orders

With this, pyahu up creates or updates the connector.

For the app-cdc connector, the CLI generates a JSON similar to this:

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres.pyahu-local-dev.svc.cluster.local",
"database.port": "5432",
"database.user": "pyahu",
"database.password": "pyahu_local",
"database.dbname": "app",
"topic.prefix": "app-cdc",
"plugin.name": "pgoutput",
"slot.name": "app_cdc_slot",
"publication.name": "app_cdc_publication",
"publication.autocreate.mode": "filtered",
"snapshot.mode": "initial",
"table.include.list": "public.orders,public.customers",
"decimal.handling.mode": "string",
"tombstones.on.delete": "false"
}

This is the body sent to:

PUT /connectors/app-cdc/config

Sink connectors are also Kafka Connect JSON. The difference is that the sink plugin must exist in the image used by services.kafkaConnect.image.

Example with a JDBC sink in a custom image that already contains the plugin:

services:
kafkaConnect:
enabled: true
image: ghcr.io/acme/connect-with-jdbc-sink
version: 1.0.0
connectors:
- name: orders-jdbc-sink
type: sink
kind: custom
config:
connector.class: io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max: "1"
topics: app-cdc.public.orders
connection.url: jdbc:postgresql://warehouse.local:5432/orders
connection.user: warehouse
connection.password: warehouse_local
insert.mode: upsert
pk.mode: record_key
auto.create: "true"

For custom connectors, Pyahu does not try to interpret the payload. The contents of config become the JSON sent to Kafka Connect:

{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "app-cdc.public.orders",
"connection.url": "jdbc:postgresql://warehouse.local:5432/orders",
"connection.user": "warehouse",
"connection.password": "warehouse_local",
"insert.mode": "upsert",
"pk.mode": "record_key",
"auto.create": "true"
}

The same model works for a custom source:

services:
kafkaConnect:
enabled: true
connectors:
- name: external-source
type: source
kind: custom
config:
connector.class: com.example.SourceConnector
tasks.max: "1"
topic: external.events

If you were creating it manually, the command would be:

Terminal window
curl -X PUT \
-H 'Content-Type: application/json' \
--data-binary @connector.json \
http://localhost:8083/connectors/app-cdc/config

Pyahu does this inside the cluster, using the Connect internal URL:

http://kafka-connect.pyahu-local-dev.svc.cluster.local:8083

During pyahu up, the CLI:

  1. Brings up the Kafka Connect Deployment using the quay.io/debezium/connect image.
  2. Creates the internal compacted config, offset, and status topics.
  3. Renders or reads the connector JSON from services.kafkaConnect.connectors.
  4. Writes that JSON into a Secret as connector.json.
  5. Creates a Job that waits for Connect to respond and runs PUT /connectors/<name>/config.
  6. Waits for the connector status to become RUNNING.

When you change the configuration, the Secret and the Job receive a name with a hash of the payload. This makes the apply idempotent and lets you reapply changes with another pyahu up.

Terminal window
curl http://localhost:8083/connectors
curl http://localhost:8083/connectors/app-cdc/status

You can also view it through the Kafka UI when it is enabled:

https://kafka-ui.localhost
  • kind: debezium.postgres is always type: source.
  • kind: custom can be type: source or type: sink.
  • Custom connectors require config.connector.class.
  • Sink plugins are not installed automatically. Use a Kafka Connect image that already contains the required plugin.