Kafka Connect and Debezium
In Kafka Connect, a connector is created from a configuration JSON sent to the Connect REST API.
In Pyahu CLI v1, you don’t need to keep that JSON as a file. You declare the connector in pyahu.yaml; the CLI generates or receives the JSON, writes it to a Kubernetes Secret, and creates a Job that applies the configuration to Kafka Connect.
What v1 supports
Section titled “What v1 supports”The CLI’s declarative surface covers two cases:
- Debezium source for PostgreSQL, with defaults generated by Pyahu.
- Custom source or sink, using free-form
configin the Kafka Connect format.
Debezium/PostgreSQL source
Section titled “Debezium/PostgreSQL source”For Postgres CDC, use type: source and kind: debezium.postgres:
services: kafkaConnect: enabled: true connectors: - name: app-cdc type: source kind: debezium.postgres database: app topicPrefix: app-cdc snapshotMode: initial tables: include: - public.orders - public.customers config: decimal.handling.mode: string tombstones.on.delete: "false"Fields like slot, publication, topicPrefix, database, and snapshotMode have defaults. In practice, the minimal example can be:
services: kafkaConnect: enabled: true connectors: - name: app-cdc type: source tables: include: - public.ordersWith this, pyahu up creates or updates the connector.
Generated JSON
Section titled “Generated JSON”For the app-cdc connector, the CLI generates a JSON similar to this:
{ "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres.pyahu-local-dev.svc.cluster.local", "database.port": "5432", "database.user": "pyahu", "database.password": "pyahu_local", "database.dbname": "app", "topic.prefix": "app-cdc", "plugin.name": "pgoutput", "slot.name": "app_cdc_slot", "publication.name": "app_cdc_publication", "publication.autocreate.mode": "filtered", "snapshot.mode": "initial", "table.include.list": "public.orders,public.customers", "decimal.handling.mode": "string", "tombstones.on.delete": "false"}This is the body sent to:
PUT /connectors/app-cdc/configCustom sink
Section titled “Custom sink”Sink connectors are also Kafka Connect JSON. The difference is that the sink plugin must exist in the image used by services.kafkaConnect.image.
Example with a JDBC sink in a custom image that already contains the plugin:
services: kafkaConnect: enabled: true image: ghcr.io/acme/connect-with-jdbc-sink version: 1.0.0 connectors: - name: orders-jdbc-sink type: sink kind: custom config: connector.class: io.confluent.connect.jdbc.JdbcSinkConnector tasks.max: "1" topics: app-cdc.public.orders connection.url: jdbc:postgresql://warehouse.local:5432/orders connection.user: warehouse connection.password: warehouse_local insert.mode: upsert pk.mode: record_key auto.create: "true"For custom connectors, Pyahu does not try to interpret the payload. The contents of config become the JSON sent to Kafka Connect:
{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "app-cdc.public.orders", "connection.url": "jdbc:postgresql://warehouse.local:5432/orders", "connection.user": "warehouse", "connection.password": "warehouse_local", "insert.mode": "upsert", "pk.mode": "record_key", "auto.create": "true"}The same model works for a custom source:
services: kafkaConnect: enabled: true connectors: - name: external-source type: source kind: custom config: connector.class: com.example.SourceConnector tasks.max: "1" topic: external.eventsEquivalent REST API
Section titled “Equivalent REST API”If you were creating it manually, the command would be:
curl -X PUT \ -H 'Content-Type: application/json' \ --data-binary @connector.json \ http://localhost:8083/connectors/app-cdc/configPyahu does this inside the cluster, using the Connect internal URL:
http://kafka-connect.pyahu-local-dev.svc.cluster.local:8083How the CLI applies it
Section titled “How the CLI applies it”During pyahu up, the CLI:
- Brings up the Kafka Connect Deployment using the
quay.io/debezium/connectimage. - Creates the internal compacted config, offset, and status topics.
- Renders or reads the connector JSON from
services.kafkaConnect.connectors. - Writes that JSON into a Secret as
connector.json. - Creates a Job that waits for Connect to respond and runs
PUT /connectors/<name>/config. - Waits for the connector status to become
RUNNING.
When you change the configuration, the Secret and the Job receive a name with a hash of the payload. This makes the apply idempotent and lets you reapply changes with another pyahu up.
Inspecting the connector
Section titled “Inspecting the connector”curl http://localhost:8083/connectorscurl http://localhost:8083/connectors/app-cdc/statusYou can also view it through the Kafka UI when it is enabled:
https://kafka-ui.localhostv1 rules
Section titled “v1 rules”kind: debezium.postgresis alwaystype: source.kind: customcan betype: sourceortype: sink.- Custom connectors require
config.connector.class. - Sink plugins are not installed automatically. Use a Kafka Connect image that already contains the required plugin.