Table of Contents |
---|
Introduction
Kafka Connect allows you to continuously ingest data from external systems into Kafka via connect sources and write data from Kafka to external system via connect sinks.
Various plugins are available for a variety of different data sources and data sinks.
Single Message Transformations (SMTs) are applied to messages as they flow through Connect.
Connect Demo
Postgres JDBC Sink Connector (Confluent)
Prepare the connector image
Download the JDBC sink connector from JDBC Connector (Source and Sink)
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/jdbc
COPY ./confluentinc-kafka-connect-jdbc-10.6.0.zip /opt/kafka/plugins/j
dbc/
RUN unzip /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip -d /opt/kafka/plugins/jdbc
RUN rm /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip
USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi connector
Prepare the postges database
Setup a new username/password and schema for Kafka connect to write to
...
language | text |
---|---|
title | Postgres |
...
Table of Contents |
---|
Introduction
Kafka Connect allows you to continuously ingest data from external systems into Kafka via connect sources and write data from Kafka to external system via connect sinks.
Various plugins are available for a variety of different data sources and data sinks.
Single Message Transformations (SMTs) are applied to messages as they flow through Connect.
Connect Demo
Postgres JDBC Sink Connector (Confluent)
Prepare the connector image
Download the JDBC sink connector from JDBC Connector (Source and Sink)
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/jdbc
COPY ./confluentinc-kafka-connect-jdbc-10.6.0.zip /opt/kafka/plugins/j
dbc/
RUN unzip /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip -d /opt/kafka/plugins/jdbc
RUN rm /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip
USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi connector
Prepare the postges database
Setup a new username/password and schema for Kafka connect to write to
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT 'CREATE DATABASE kafka' WHERE NOT EXISTS (SELECT FROM pg_userdatabase WHERE usenamedatname = 'kafka')\gexec THEN DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_user WHERE usename = 'kafka') THEN CREATE USER kafka WITH PASSWORD 'kafka'; GRANT ALL PRIVILEGES ON DATABASE kafka TO kafka; END IF; END $$; |
...
The database connection details are also included.
Influxdb Sink Connector (Apache Camel)
Prepare the connector image
Download the Influxdb connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-influxdb-kafka-connector/0.8.0/camel-influxdb-kafka-connector-0.8.0-package.tar.gz
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.22.1-kafka-2.7.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-influxdb-kafka-connector-0.8.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz --directory /opt/kafka/plugins/camel
RUN rm /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz
USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_influxdb_connector
Prepare the message producer
In this example we are going to deserialize the value to json, we can do this without using a schema.
They will take the following format:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
} |
The important thing to note here is that one of the "keys" has to be "camelInfluxDB.MeasurementName", this is required by the camel transformer when converting into an influxdb point object.
The above structure can be represented in golang using the following struct:
...
language | text |
---|---|
title | Go Structs |
...
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.22.1-kafka-2.7.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-influxdb-kafka-connector-0.8.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz --directory /opt/kafka/plugins/camel
RUN rm /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz
USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_influxdb_connector
Prepare the message producer
In this example we are going to deserialize the value to json, we can do this without using a schema.
They will take the following format:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
} |
The important thing to note here is that one of the "keys" has to be "camelInfluxDB.MeasurementName", this is required by the camel transformer when converting into an influxdb point object.
The above structure can be represented in golang using the following struct:
Code Block | ||||
---|---|---|---|---|
| ||||
type InfluxPayload struct {
Measurement string `json:"camelInfluxDB.MeasurementName"`
Time string `json:"time"`
Host string `json:"host"`
Region string `json:"region"`
Used string `json:"used"`
Free string `json:"free"`
} |
Create the KafkaConnect object
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: connect
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: ktimoney/strimzi-influxdb-connect
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
- secretName: my-cluster-clients-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: connect
certificate: user.crt
key: user.key
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1 |
We are connecting to the 9093 tls port so we include required certificates in our configuration.
The image tag references the image we built earlier : ktimoney/strimzi-influx-connect
Create the Sink connector
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-sink-connector
namespace: kafka
labels:
strimzi.io/cluster: my-connect-cluster
spec: class: org.apache.camel.kafkaconnector.influxdb.CamelInfluxdbSinkConnector
tasksMax: 1
config:
topics: my-topic
errors.deadletterqueue.topic.name: my-topic-dl
errors.deadletterqueue.topic.replication.factor: 1
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
key.ignore: true
auto.create: true
camel.beans.influx: "#class:org.influxdb.InfluxDBFactory#connect('http://influxdb.default:8086', 'influxdb', 'influxdb')"
camel.sink.path.connectionBean: influx
camel.sink.endpoint.databaseName: ts_host_metrics
camel.sink.endpoint.operation: insert
camel.sink.endpoint.retentionPolicy: autogen |
The KafkaConnector is configured to use the "org.apache.camel.kafkaconnector.influxdb.CamelInfluxdbSinkConnector" class.
The value converter is set to "org.apache.kafka.connect.json.JsonConverter".
The camel connector comes with a in built TypeConverter called CamelInfluxDbConverters which will read the json produced as a Map<String, Object> map) and return an Influxdb Point object.
The database connection details are specified in camel.beans.influx.
This sets up a connection bean to be used by camel.sink.path.connectionBean.
Note: It is important to wrap the camel.beans.influx parameter in quotes otherwise it will be treated as a comment.
The yaml for the influxdb pod is available here: influxdb.yaml
When the connector is running it will produce records to influxdb like the following:
Code Block | ||||
---|---|---|---|---|
| ||||
{'pretty': 'true', 'db': 'ts_host_metrics', 'q': 'SELECT "region", "host", "free", "used" FROM "disk_space" WHERE "host"=\'localhost\''} { "results": [ { "statement_id": 0, "series": [ { "name": "disk_space", "columns": [ "time", "region", "host", "free", "used" ], "values": [ [ "2023-01-23T13:03:32.1246436Z", Measurement string `json:"camelInfluxDB.MeasurementName"`"IE-region", Time string `json:"time"`localhost", Host string `json:"host"` Region string `json:"region"`81%", Used string `json:"used"` Free string `json:"free"` } |
Create the KafkaConnect object
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: connect namespace: kafka labels:19%" ] strimzi.io/cluster: my-cluster spec: authentication: type: tls --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: ] name: my-connect-cluster namespace: kafka annotations: # use-connector-resources} configures this KafkaConnect # to use KafkaConnector resources to avoid ] # needing to call the Connect REST API} directly strimzi.io/use-connector-resources: "true" spec: image: ktimoney/strimzi-influxdb-connect replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9093 tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert] } |
Minio Sink Connector (Apache Camel)
Prepare the connector image
Download the minio sink connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-file-kafka-connector/3.20.0/camel-file-kafka-connector-3.20.0-package.tar.gz
We need to create some custom classes:
CamelMinioConverters.java - this will allow us to deserialize to json and convert the hashmap to an inputstream
Code Block | ||||
---|---|---|---|---|
| ||||
package com.custom.convert.minio; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.Map; import org.apache.camel.Converter; @Converter(generateLoader = true) public final class CamelMinioConverters { private certificate: ca.crtCamelMinioConverters() { } - secretName: my-cluster-clients-ca-cert @SuppressWarnings("deprecation") @Converter certificate: ca.crt public static authentication: type: tlsInputStream fromMapToInputStream(Map<String, Object> map) { certificateAndKey: String json = "{}"; secretName: connect try { certificate: user.crt json = key: user.keynew ObjectMapper().writeValueAsString(map); config: } group.id: connect-cluster catch (JsonProcessingException e) { offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs e.printStackTrace(); status.storage.topic: connect-cluster-status } config.storage.replication.factor: 1 return new offset.storage.replication.factor: 1ByteArrayInputStream(json.getBytes()); status.storage.replication.factor: 1 |
We are connecting to the 9093 tls port so we include required certificates in our configuration.
The image tag references the image we built earlier : ktimoney/strimzi-influx-connect
...
}
} |
StringAggregator.java - this will allow us to aggregate the messages and set the file name
Code Block | ||||
---|---|---|---|---|
Code Block | ||||
| ||||
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-sink-connector
namespace: kafka
labels:
strimzi.io/cluster: my-connect-cluster
spec: class: org.apache.camel.kafkaconnector.influxdb.CamelInfluxdbSinkConnector
tasksMax: 1
config:
topics: my-topic
errors.deadletterqueue.topic.name: my-topic-dl
errors.deadletterqueue.topic.replication.factor: 1
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
key.ignore: true
auto.create: true
camel.beans.influx: "#class:org.influxdb.InfluxDBFactory#connect('http://influxdb.default:8086', 'influxdb', 'influxdb')"
camel.sink.path.connectionBean: influx
camel.sink.endpoint.databaseName: ts_host_metrics
camel.sink.endpoint.operation: insert
camel.sink.endpoint.retentionPolicy: autogen |
The KafkaConnector is configured to use the "org.apache.camel.kafkaconnector.influxdb.CamelInfluxdbSinkConnector" class.
The value converter are set to "org.apache.kafka.connect.json.JsonConverter".
The camel connector comes with a in built TypeConverter called CamelInfluxDbConverters which will read the json produced as a Map<String, Object> map) and return an Influxdb Point object.
The database connection details are specified in camel.beans.influx.
This sets up a connection bean to be used by camel.sink.path.connectionBean.
Note: It is important to wrap the camel.beans.influx parameter in quotes otherwise it will be treated as a comment.
The yaml for the influxdb pod is available here: influxdb.yaml
When the connector is running it will produce records to influxdb like the following:
| |
package com.custom.aggregate.minio;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
public class CustomStringAggregator implements AggregationStrategy {
//@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
Map<String, String> keyMap = (HashMap) newIn.getHeaders().get("camel.kafka.connector.record.key");
String key = keyMap.get("id");
SimpleDateFormat sdf = new SimpleDateFormat("ddMMyy-hhmmss-SSS");
String fileName = "ExchangeKafka-" + key + "-" + sdf.format( new Date() ) + ".json";
newIn.setHeader("file", fileName);
// lets append the old body to the new body
if (oldExchange == null) {
return newExchange;
}
String body = oldExchange.getIn().getBody(String.class);
if (body != null) {
String newBody = newIn.getBody(String.class);
if (newBody != null) {
body += System.lineSeparator() + newBody;
}
newIn.setBody(body);
}
return newExchange;
}
} |
We can then create a custom jar for our code and copy it into the docker container.
Note: You can also set the filename in the Kafka header using either CamelHeader.file or CamelHeader.ce-file
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-minio-sink-kafka-connector-3.20.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.20.0-package.tar.gz --directory /opt/kafka/plugins/camel
COPY ./custom-converter-1.0.0.jar /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector/
RUN rm /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.20.0-package.tar.gzRUN rm -rf /opt/kafka/plugins/camel/docs
USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_minio_connector
Prepare the message producer
In this example we are going to deserialize the value to a string. (Same producer as the influxdb example)
They will take the following format:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
} |
With the type converter n place we can also deserialize as a json object.
They key will be a json that loos like this: {"id":"119"}
Create the KafkaConnect object
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: connect namespace: kafka labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster namespace: kafka annotations: # use-connector-resources configures this KafkaConnect # to use KafkaConnector resources to avoid # needing to call the Connect REST API directly strimzi.io/use-connector-resources: "true" spec: image: ktimoney/strimzi-minio-connect replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9093 tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt - "values"secretName: [my-cluster-clients-ca-cert certificate: ca.crt authentication: type: tls certificateAndKey: [ secretName: connect certificate: user.crt key: user.key "2023-01-23T13:03:32.1246436Z",config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs "IE-region",status.storage.topic: connect-cluster-status # -1 means it will use the default replication factor configured in the broker "localhost", config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 |
We are connecting to the 9093 tls port so we include required certificates in our configuration.
The image tag references the image we built earlier : ktimoney/strimzi-minio-connect
Create the Sink connector
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector namespace: kafka labels: "81%", strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.miniosink.CamelMiniosinkSinkConnector tasksMax: 1 config: topics: my-topic "19%" key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable: false value.converter.schemas.enable: false ] camel.beans.aggregate: "#class:com.custom.aggregate.minio.CustomStringAggregator" camel.aggregation.size: 1 ] camel.aggregation.timeout: 20000 camel.aggregation.disable: false camel.kamelet.minio-sink.bucketName: camel } camel.kamelet.minio-sink.accessKey: ybK7RleFUkdDeYBf camel.kamelet.minio-sink.secretKey: X0Y4zK84bwdefRTljrnPzOb1l0A6OQj2 ] camel.kamelet.minio-sink.endpoint: http://minio.default:9000 } ] } |
Minio Sink Connector (Apache Camel)
Prepare the connector image
Download the minio sink connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-file-kafka-connector/3.20.0/camel-file-kafka-connector-3.20.0-package.tar.gz
We need to create some custom classes:
CamelMinioConverters.java - this will allow us to deserialize to json and convert the hashmap to an inputstream
Code Block | ||||
---|---|---|---|---|
| ||||
package com.custom.convert.minio;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;
import org.apache.camel.Converter;
@Converter(generateLoader = true)
public final class CamelMinioConverters {
private CamelMinioConverters() {
}
@SuppressWarnings("deprecation")
@Converter
public static InputStream fromMapToInputStream(Map<String, Object> map) {
String json = "{}";
try {
json = new ObjectMapper().writeValueAsString(map);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return new ByteArrayInputStream(json.getBytes());
}
} |
StringAggregator.java - this will allow us to aggregate the messages and set the file name
Code Block | ||||
---|---|---|---|---|
| ||||
package com.custom.aggregate.minio; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; import org.apache.camel.Message; public class CustomStringAggregator implements AggregationStrategy { //@Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Message newIn = newExchange.getIn(); Map<String, String> keyMap = (HashMap) newIn.getHeaders().get("camel.kafka.connector.record.key"); String key = keyMap.get("id"); SimpleDateFormat sdf = new SimpleDateFormat("ddMMyy-hhmmss-SSS"); String fileName = "ExchangeKafka-" + key + "-" + sdf.format( new Date() ) + ".json"; newIn.setHeader("file", fileName); // lets append the old body to the new body if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); if (body != null) { String newBody = newIn.getBody(String.class);camel.kamelet.minio-sink.autoCreateBucket: true |
The KafkaConnector is configured to use the "org.apache.camel.kafkaconnector.miniosink.CamelMiniosinkSinkConnector class.
The value converter is set to "org.apache.kafka.connect.storage.StringConverter".
The minio connection details are specified in camel.kamelet.minio-sink parameters.
The aggregation size is set to 1 so it won't aggregate any of the files
This timeout is also specified so it will only aggregate records arriving within this time out value (20 seconds).
These values can be adjusted depending on your requirements.
The main purpose of the aggregator in this example is for setting the file name, this is done by setting a header with a key of "file".
The yaml for the minio pod is available here: minio.yaml
When the connector is running it copies the kafka records into minio as files:
Here's a screen shot where we use the aggregator to set the file name:
File Sink Connector (Apache Camel)
Prepare the connector image
Download the file connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-file-kafka-connector/3.20.0/camel-file-kafka-connector-3.20.0-package.tar.gz
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-file-kafka-connector-3.20.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-file-kafka-connector-3.20.0-package.tar.gz --directory /opt/kafka/plugins/camel
RUN rm /opt/kafka/plugins/camel/camel-file-kafka-connector-3.20.0-package.tar.gz
RUN rm -rf /opt/kafka/plugins/camel/docs
USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_file_connector
Prepare the message producer
In this example we are going to deserialize the value to a string.
The value will take the following format:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
} |
Due to some limitation with Strimzi KafkaConnect we will need to use a mutating webhook to attach a persistent volume to our connector pod.
The go code for this is available here: kafka-webhook.go
We need to create a docker image for this program
Code Block | ||||
---|---|---|---|---|
| ||||
FROM golang:latest
RUN mkdir /app
COPY ./kafka-webhook /app
RUN chmod +x /app/kafka-webhook
WORKDIR /app
ENTRYPOINT ["/app/kafka-webhook"] |
docker build -f Dockerfile . -t ktimoney/kafka-webhook
Mutating webhooks require https so we also need to create some certificates.
Make sure cfssl is installed on you system, then run the following commands.
cfssl gencert -initca ./ca-csr.json | cfssljson -bare ca
cfssl gencert \
-ca=ca.pem \
-ca-key=ca-key.pem \
-config=ca-config.json \
-hostname="kafka-webhook,kafka-webhook.kafka.svc.cluster.local,kafka-webhook.kafka.svc,localhost,127.0.0.1" \
-profile=default \
ca-csr.json | cfssljson -bare webhook-cert
Run the following commands to create the values you need to use in the yaml file:
TLS_CRT=$(cat webhook-cert.pem | base64 | tr -d '\n')
TLS_KEY=$(cat webhook-cert-key.pem | base64 | tr -d '\n')
CA_BUNDLE="$(openssl base64 -A <"ca.pem")"
Replace the values in : kafka-webhook.yaml
Note: You can also configure the hostPath and the containerPath as part of the deployment:
Code Block | ||||
---|---|---|---|---|
| ||||
containers: - name: kafka-webhook image: ktimoney/kafka-webhook imagePullPolicy: IfNotPresent command: ["/app/kafka-webhook"] args: [ if (newBody != null) { "-port", "8443", body += System.lineSeparator() + newBody; "-tlsCertFile", "/certs/tls.crt", } newIn.setBody(body);"-tlsKeyFile", "/certs/tls.key", } return newExchange;"-hostPath", "/var/strimzi/files", } } |
We can then create a custom jar for our code and copy it into the docker container.
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-minio-sink-kafka-connector-3.20.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.20.0-package.tar.gz --directory /opt/kafka/plugins/camel
COPY ./custom-converter-1.0.0.jar /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector/
RUN rm /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.20.0-package.tar.gzRUN rm -rf /opt/kafka/plugins/camel/docs
USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_minio_connector
Prepare the message producer
In this example we are going to deserialize the value to a string. (Same producer as the influxdb example)
They will take the following format:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
} |
With the type converter n place we can also deserialize as a json object.
They key will be a json that loos like this: {"id":"119"}
Create the KafkaConnect object
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: connect
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: ktimoney/strimzi-minio-connect
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
- secretName: my-cluster-clients-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: connect
certificate: user.crt
key: user.key
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1 |
We are connecting to the 9093 tls port so we include required certificates in our configuration.
...
"-containerPath", "/opt/kafka/data"
] |
Run: kubectl create -f kafka webhook.yaml to start the pod.
Note: You can delete the webhook once the connector pod starts.
Create the KafkaConnect object
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: connect
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
labels:
webhook: "true"
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: ktimoney/strimzi-file-connect
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
- secretName: my-cluster-clients-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: connect
certificate: user.crt
key: user.key
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1 |
We are connecting to the 9093 tls port so we include required certificates in our configuration.
The image tag references the image we built earlier : ktimoney/strimzi-file-connect
In the metadata section of KafkaConnect I have added a new label webhook: "true", this will be picked up by the MutatingWebhookConfiguration:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: kafka-webhook-config
namespace: kafka
annotations:
cert-manager.io/inject-ca-from-secret: kafka/kafka-webhook-cert
webhooks:
- name: kafka-webhook.kafka.svc.cluster.local
admissionReviewVersions:
- "v1"
sideEffects: "None"
timeoutSeconds: 30
objectSelector:
matchLabels:
webhook: "true" |
matchLabels: webhook: "true"
Create the Sink connector
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector namespace: kafka labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.miniosinkfile.CamelMiniosinkSinkConnector CamelFileSinkConnector tasksMax tasksMax: 1 config: topics: my-topic keyvalue.converter: org.apache.kafka.connect.jsonstorage.JsonConverterStringConverter valuekey.converter: org.apache.kafka.connect.storagejson.StringConverterJsonConverter key.converter.schemas.enable: false value.converter.schemas.enable: false camel.beans.aggregate: "#class:com.custom.aggregate.minio.CustomStringAggregator" camel.aggregation.size: 1 camel.aggregation.timeout: 20000 camel.aggregation.disable: false camel.kamelet.minio-sink.bucketName: camelsink.endpoint.fileName: mydata-${date:now:ddMMyy-hhmmss-SSS}.txt camel.kameletsink.minio-sinkpath.accessKeydirectoryName: ybK7RleFUkdDeYBf/opt/kafka/data camel.kameletsink.minio-sinkendpoint.secretKeyfileExist: X0Y4zK84bwdefRTljrnPzOb1l0A6OQj2Append camel.kamelet.minio-sink.endpoint.autoCreate: http://minio.default:9000true camel.kamelet.minio-sink.autoCreateBucketaggregation.disable: true |
The KafkaConnector is configured to use the "org.apache.camel.kafkaconnector.miniosink.CamelMiniosinkSinkConnector class.The value converter are set to "org.apache.kafka.connect.storage.StringConverter".
The minio connection details are specified in camel.kamelet.minio-sink parameters.
The aggregation size is set to 1 so it won't aggregate any of the files
This timeout is also specified so it will only aggregate records arriving within this time out value (20 seconds).
These values can be adjusted depending on your requirements.
The main purpose of the aggregator in this example is for setting the file name, this is done by setting a header with a key of "file".
The yaml for the minio pod is available here: minio.yaml
When the connector is running it copies the kafka records into minio as files:
Here's a screen shot where we use the aggregator to set the file name:
.apache.camel.kafkaconnector.file.CamelFileSinkConnector" class.
The value converter is set to "org.apache.kafka.connect.storage.StringConverter".
The connector filename is set using the expression: mydata-${date:now:ddMMyy-hhmmss-SSS}.txt
When the connector starts your kafka records will be copied to the persistent volume as files:
Links
...
How to Write a Connector for Kafka Connect – Deep Dive into Configuration Handling
Camel Kafka Connector Examples
...
Camel Kafka Connector Source Code
Using secrets in Kafka Connect configuration