Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Introduction

Kafka Connect allows you to continuously ingest data from external systems into Kafka via connect sources and write data from Kafka to external system via connect sinks.

Various plugins are available for a variety of different data sources and data sinks.

Confluent Plugins

Camel Plugins

Debezium Plugins

Single Message Transformations (SMTs) are applied to messages as they flow through Connect. 


Connect Demo

Postgres JDBC Sink Connector (Confluent)

Prepare the connector image

Download the JDBC sink connector from JDBC Connector (Source and Sink)

Create a docker file for this connector


Code Block
languagetext
titleDocker
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/jdbc
COPY ./confluentinc-kafka-connect-jdbc-10.6.0.zip /opt/kafka/plugins/j
dbc/
RUN unzip /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip -d /opt/kafka/plugins/jdbc
RUN rm /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip
USER 1001

Build the image and copy it to your docker repository

docker build -f Dockerfile . t ktimoney/strimzi connector


Prepare the postges database

Setup a new username/password and schema for Kafka connect to write to

Code Block
languagetext
titlePostgres
    SELECT 'CREATE DATABASE kafka'
    WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = 'kafka')\gexec
    DO $$
    BEGIN
      IF NOT EXISTS (SELECT FROM pg_user WHERE  usename = 'kafka') THEN
         CREATE USER kafka WITH PASSWORD 'kafka';
         GRANT ALL PRIVILEGES ON DATABASE kafka TO kafka;
      END IF;
    END
    $$;

...

The database connection details are also included.


Influxdb Sink Connector (Apache Camel)

Prepare the connector image

Download the Influxdb connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-influxdb-kafka-connector/0.8.0/camel-influxdb-kafka-connector-0.8.0-package.tar.gz

Create a docker file for this connector


Code Block
languagetext
titleDocker
FROM quay.io/strimzi/kafka:0.22.1-kafka-2.7.0
USER root:root 
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-influxdb-kafka-connector-0.8.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz --directory /opt/kafka/plugins/camel
RUN rm /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz 
USER 1001

Build the image and copy it to your docker repository

docker build -f Dockerfile . t ktimoney/strimzi_influxdb_connector



Prepare the message producer

...

Code Block
languagetext
titleInfluxDB query
{'pretty': 'true', 'db': 'ts_host_metrics', 'q': 'SELECT "region", "host", "free", "used" FROM "disk_space" WHERE "host"=\'localhost\''}
{
    "results": [
        {
            "statement_id": 0,
            "series": [
                {
                    "name": "disk_space",
                    "columns": [
                        "time",
                        "region",
                        "host",
                        "free",
                        "used"
                    ],
                    "values": [
                        [
                            "2023-01-23T13:03:32.1246436Z",
                            "IE-region",
                            "localhost",
                            "81%",
                            "19%"
                        ]
                    ]
                }
            ]
        }
    ]
}


Minio Sink Connector (Apache Camel)

Prepare the connector image

Download the minio sink connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-file-kafka-connector/3.20.0/camel-file-kafka-connector-3.20.0-package.tar.gz


We need to create some custom classes:

CamelMinioConverters.java - this will allow us to deserialize to json and convert the hashmap to an inputstream

Code Block
languagejava
titleCamelMinioConverters
package com.custom.convert.minio;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;
import org.apache.camel.Converter;

@Converter(generateLoader = true)
public final class CamelMinioConverters {

    private CamelMinioConverters() {
    }

    @SuppressWarnings("deprecation")
    @Converter
    public static InputStream fromMapToInputStream(Map<String, Object> map) {

      String json = "{}";
      try {
        json = new ObjectMapper().writeValueAsString(map);
      } catch (JsonProcessingException e) {
        e.printStackTrace();
      }
     return new ByteArrayInputStream(json.getBytes());
    }

}

...

Code Block
languagejava
titleStringAggregator
package com.custom.aggregate.minio;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Message;

public class CustomStringAggregator implements AggregationStrategy {

    //@Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
      Message newIn = newExchange.getIn();
      Map<String, String> keyMap = (HashMap) newIn.getHeaders().get("camel.kafka.connector.record.key");
      String key = keyMap.get("id");
      SimpleDateFormat sdf = new SimpleDateFormat("ddMMyy-hhmmss-SSS");              
      String fileName = "ExchangeKafka-" + key + "-" + sdf.format( new Date() ) + ".json";
      newIn.setHeader("file", fileName);               

     // lets append the old body to the new body
        if (oldExchange == null) {
            return newExchange;
        }

        String body = oldExchange.getIn().getBody(String.class);
        if (body != null) {
            String newBody = newIn.getBody(String.class);
            if (newBody != null) {
                body += System.lineSeparator() + newBody;
            }

            newIn.setBody(body);
        }
        return newExchange;
    }
}

We can then create a custom jar for our code and copy it into the docker container.

Note: You can also set the filename in the Kafka header using either CamelHeader.file or CamelHeader.ce-file


Create a docker file for this connector


Code Block
languagetext
titleDocker
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-minio-sink-kafka-connector-3.20.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.20.0-package.tar.gz --directory /opt/kafka/plugins/camel
COPY ./custom-converter-1.0.0.jar /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector/
RUN rm /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.20.0-package.tar.gzRUN rm -rf /opt/kafka/plugins/camel/docs
USER 1001

Build the image and copy it to your docker repository

docker build -f Dockerfile . t ktimoney/strimzi_minio_connector



Prepare the message producer

...

Here's a screen shot where we use the aggregator to set the file name:



File Sink Connector (Apache Camel)

Prepare the connector image

Download the file connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-file-kafka-connector/3.20.0/camel-file-kafka-connector-3.20.0-package.tar.gz

Create a docker file for this connector


Code Block
languagetext
titleDocker
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-file-kafka-connector-3.20.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-file-kafka-connector-3.20.0-package.tar.gz --directory /opt/kafka/plugins/camel
RUN rm /opt/kafka/plugins/camel/camel-file-kafka-connector-3.20.0-package.tar.gz
RUN rm -rf /opt/kafka/plugins/camel/docs
USER 1001

Build the image and copy it to your docker repository

docker build -f Dockerfile . t ktimoney/strimzi_file_connector



Prepare the message producer

...

Run: kubectl create -f kafka webhook.yaml to start the pod.

Create the Note: You can delete the webhook once the connector pod starts.


Create the KafkaConnect object

Code Block
languagetext
titleKafkaConnect
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: connect
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: kafka  
  labels:
    webhook: "true"
  annotations:
  # use-connector-resources configures this KafkaConnect
  # to use KafkaConnector resources to avoid
  # needing to call the Connect REST API directly
    strimzi.io/use-connector-resources: "true"
spec:   
  image: ktimoney/strimzi-file-connect
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
      - secretName: my-cluster-clients-ca-cert
        certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      secretName: connect
      certificate: user.crt
      key: user.key
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1

...

The image tag references the image we built earlier : ktimoney/strimzi-file-connectCreate the Sink connector

In the metadata section of KafkaConnect I have added a new label webhook: "true", this will be picked up by the MutatingWebhookConfiguration:


Code Block
languagetextyml
titleKafkaConnectorMutatingWebhookConfiguration
apiVersion: kafkaadmissionregistration.strimzik8s.io/v1beta2v1
kind: KafkaConnectorMutatingWebhookConfiguration
metadata:
  name: mykafka-sinkwebhook-connectorconfig
  namespace: kafka
  labelsannotations:
    strimzicert-manager.io/clusterinject-ca-from-secret: mykafka/kafka-connectwebhook-clustercert
specwebhooks:
  - name: kafka-webhook.kafka.svc.cluster.local
    admissionReviewVersions:
      - "v1"
    sideEffects: "None"
    timeoutSeconds: 30
    objectSelector:
      matchLabels:
        webhook: "true"

matchLabels: webhook: "true"


Create the Sink connector

Code Block
languagetext
titleKafkaConnector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:   
  class: org.apache.camel.kafkaconnector.file.CamelFileSinkConnector
  tasksMax: 1
  config:
    topics: my-topic 
    value.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
    camel.sink.endpoint.fileName: mydata-${date:now:ddMMyy-hhmmss-SSS}.txt
    camel.sink.path.directoryName: /opt/kafka/data
    camel.sink.endpoint.fileExist: Append
    camel.sink.endpoint.autoCreate: true
    camel.aggregation.disable: true

...

How to Write a Connector for Kafka Connect – Deep Dive into Configuration Handling

Apache Camel

Camel Kafka Connector Examples

...

Camel Basic Configuration

Camel Components Source Code

Camel Kafka Connector Source Code

Using secrets in Kafka Connect configuration

Kafka Connect Transformations