Table of Contents |
---|
Introduction
Kafka Connect allows you to continuously ingest data from external systems into Kafka via connect sources and write data from Kafka to external system via connect sinks.
Various plugins are available for a variety of different data sources and data sinks.
Single Message Transformations (SMTs) are applied to messages as they flow through Connect.
Connect Demo
Postgres JDBC Sink Connector (Confluent)
Prepare the connector image
Download the JDBC sink connector from JDBC Connector (Source and Sink)
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 USER root:root RUN mkdir -p /opt/kafka/plugins/jdbc COPY ./confluentinc-kafka-connect-jdbc-10.6.0.zip /opt/kafka/plugins/j dbc/ RUN unzip /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip -d /opt/kafka/plugins/jdbc RUN rm /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi connector
Prepare the postges database
Setup a new username/password and schema for Kafka connect to write to
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT 'CREATE DATABASE kafka' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = 'kafka')\gexec DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_user WHERE usename = 'kafka') THEN CREATE USER kafka WITH PASSWORD 'kafka'; GRANT ALL PRIVILEGES ON DATABASE kafka TO kafka; END IF; END $$; |
...
The database connection details are also included.
Influxdb Sink Connector (Apache Camel)
Prepare the connector image
Download the Influxdb connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-influxdb-kafka-connector/0.8.0/camel-influxdb-kafka-connector-0.8.0-package.tar.gz
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 USER root:root RUN mkdir -p /opt/kafka/plugins/camel COPY ./camel-influxdb-kafka-connector-0.8.0-package.tar.gz /opt/kafka/plugins/camel/ RUN tar -xvzf /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz --directory /opt/kafka/plugins/camel RUN rm /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_influxdb_connector
Prepare the message producer
...
Code Block | ||||
---|---|---|---|---|
| ||||
{'pretty': 'true', 'db': 'ts_host_metrics', 'q': 'SELECT "region", "host", "free", "used" FROM "disk_space" WHERE "host"=\'localhost\''} { "results": [ { "statement_id": 0, "series": [ { "name": "disk_space", "columns": [ "time", "region", "host", "free", "used" ], "values": [ [ "2023-01-23T13:03:32.1246436Z", "IE-region", "localhost", "81%", "19%" ] ] } ] } ] } |
Minio Sink Connector (Apache Camel)
Prepare the connector image
Download the minio sink connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-file-kafka-connector/3.20.0/camel-file-kafka-connector-3.20.0-package.tar.gz
We need to create some custom classes:
CamelMinioConverters.java - this will allow us to deserialize to json and convert the hashmap to an inputstream
Code Block | ||||
---|---|---|---|---|
| ||||
package com.custom.convert.minio; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.Map; import org.apache.camel.Converter; @Converter(generateLoader = true) public final class CamelMinioConverters { private CamelMinioConverters() { } @SuppressWarnings("deprecation") @Converter public static InputStream fromMapToInputStream(Map<String, Object> map) { String json = "{}"; try { json = new ObjectMapper().writeValueAsString(map); } catch (JsonProcessingException e) { e.printStackTrace(); } return new ByteArrayInputStream(json.getBytes()); } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package com.custom.aggregate.minio; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; import org.apache.camel.Message; public class CustomStringAggregator implements AggregationStrategy { //@Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Message newIn = newExchange.getIn(); Map<String, String> keyMap = (HashMap) newIn.getHeaders().get("camel.kafka.connector.record.key"); String key = keyMap.get("id"); SimpleDateFormat sdf = new SimpleDateFormat("ddMMyy-hhmmss-SSS"); String fileName = "ExchangeKafka-" + key + "-" + sdf.format( new Date() ) + ".json"; newIn.setHeader("file", fileName); // lets append the old body to the new body if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); if (body != null) { String newBody = newIn.getBody(String.class); if (newBody != null) { body += System.lineSeparator() + newBody; } newIn.setBody(body); } return newExchange; } } |
We can then create a custom jar for our code and copy it into the docker container.
Note: You can also set the filename in the Kafka header using either CamelHeader.file or CamelHeader.ce-file
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 USER root:root RUN mkdir -p /opt/kafka/plugins/camel COPY ./camel-minio-sink-kafka-connector-3.20.0-package.tar.gz /opt/kafka/plugins/camel/ RUN tar -xvzf /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.20.0-package.tar.gz --directory /opt/kafka/plugins/camel COPY ./custom-converter-1.0.0.jar /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector/ RUN rm /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.20.0-package.tar.gzRUN rm -rf /opt/kafka/plugins/camel/docs USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_minio_connector
Prepare the message producer
...
Here's a screen shot where we use the aggregator to set the file name:
File Sink Connector (Apache Camel)
Prepare the connector image
Download the file connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-file-kafka-connector/3.20.0/camel-file-kafka-connector-3.20.0-package.tar.gz
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 USER root:root RUN mkdir -p /opt/kafka/plugins/camel COPY ./camel-file-kafka-connector-3.20.0-package.tar.gz /opt/kafka/plugins/camel/ RUN tar -xvzf /opt/kafka/plugins/camel/camel-file-kafka-connector-3.20.0-package.tar.gz --directory /opt/kafka/plugins/camel RUN rm /opt/kafka/plugins/camel/camel-file-kafka-connector-3.20.0-package.tar.gz RUN rm -rf /opt/kafka/plugins/camel/docs USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_file_connector
Prepare the message producer
...
Run: kubectl create -f kafka webhook.yaml to start the pod.
Create the Note: You can delete the webhook once the connector pod starts.
Create the KafkaConnect object
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: connect namespace: kafka labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster namespace: kafka labels: webhook: "true" annotations: # use-connector-resources configures this KafkaConnect # to use KafkaConnector resources to avoid # needing to call the Connect REST API directly strimzi.io/use-connector-resources: "true" spec: image: ktimoney/strimzi-file-connect replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9093 tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt - secretName: my-cluster-clients-ca-cert certificate: ca.crt authentication: type: tls certificateAndKey: secretName: connect certificate: user.crt key: user.key config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 |
...
The image tag references the image we built earlier : ktimoney/strimzi-file-connectCreate the Sink connector
In the metadata section of KafkaConnect I have added a new label webhook: "true", this will be picked up by the MutatingWebhookConfiguration:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafkaadmissionregistration.strimzik8s.io/v1beta2v1 kind: KafkaConnectorMutatingWebhookConfiguration metadata: name: mykafka-sinkwebhook-connectorconfig namespace: kafka labelsannotations: strimzicert-manager.io/clusterinject-ca-from-secret: mykafka/kafka-connectwebhook-clustercert specwebhooks: - name: kafka-webhook.kafka.svc.cluster.local admissionReviewVersions: - "v1" sideEffects: "None" timeoutSeconds: 30 objectSelector: matchLabels: webhook: "true" |
matchLabels: webhook: "true"
Create the Sink connector
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-sink-connector
namespace: kafka
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.file.CamelFileSinkConnector
tasksMax: 1
config:
topics: my-topic
value.converter: org.apache.kafka.connect.storage.StringConverter
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
camel.sink.endpoint.fileName: mydata-${date:now:ddMMyy-hhmmss-SSS}.txt
camel.sink.path.directoryName: /opt/kafka/data
camel.sink.endpoint.fileExist: Append
camel.sink.endpoint.autoCreate: true
camel.aggregation.disable: true |
...
How to Write a Connector for Kafka Connect – Deep Dive into Configuration Handling
Camel Kafka Connector Examples
...
Camel Kafka Connector Source Code
Using secrets in Kafka Connect configuration