Release H: Apache Flink
Introduction
Apache Flink is an open-source, unified stream-processing and batch-processing framework developed by the Apache Software Foundation.
Flink's pipelined runtime system enables the execution of bulk/batch and stream processing programs
Installation
Copy the following files from the Kubernetes setup page:
flink-configuration-configmap.yaml
jobmanager-service.yaml
jobmanager-session-deployment-non-ha.yaml
taskmanager-session-deployment.yaml
Download the following jar from maven central:
flink-sql-connector-kafka-1.17.1.jar
and copy it somewhere that's accessible from your k8s pods.
We can then use kustomize to configure the installation for our own requirements.
apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization namespace: kafka resources: - flink-user.yaml - flink-configuration-configmap.yaml - jobmanager-service.yaml - jobmanager-session-deployment-non-ha.yaml - taskmanager-session-deployment.yaml patchesStrategicMerge: - patch-jobmanager-env.yml - patch-taskmanager-env.yml - patch-jobmanager-volumes.yml - patch-taskmanager-volumes.yml - jobmanager-service-loadbalancer.yaml replicas: - count: 1 name: flink-taskmanager
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: flink labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: template: spec: containers: - name: jobmanager env: - name: SSL_KEYSTORE_PASSWORD valueFrom: secretKeyRef: name: flink key: user.password - name: SSL_KEY_PASSWORD valueFrom: secretKeyRef: name: flink key: user.password - name: SSL_TRUSTSTORE_PASSWORD valueFrom: secretKeyRef: name: my-cluster-cluster-ca-cert key: ca.password
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: template: spec: containers: - name: taskmanager env: - name: SSL_KEYSTORE_PASSWORD valueFrom: secretKeyRef: name: flink key: user.password - name: SSL_KEY_PASSWORD valueFrom: secretKeyRef: name: flink key: user.password - name: SSL_TRUSTSTORE_PASSWORD valueFrom: secretKeyRef: name: my-cluster-cluster-ca-cert key: ca.password
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: template: spec: containers: - name: jobmanager volumeMounts: - name: keystore mountPath: "/etc/ssl/keystore" readOnly: true - name: truststore mountPath: "/etc/ssl/truststore" readOnly: true - name: kafkasql mountPath: /opt/flink/lib/flink-sql-connector-kafka-1.17.1.jar volumes: - name: keystore secret: secretName: flink items: - key: user.p12 path: keystore.p12 - name: truststore secret: secretName: my-cluster-cluster-ca-cert items: - key: ca.p12 path: truststore.p12 - name: kafkasql hostPath: path: /var/flink/lib/flink-sql-connector-kafka-1.17.1.jar type: File
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: template: spec: containers: - name: taskmanager volumeMounts: - name: keystore mountPath: "/etc/ssl/keystore" readOnly: true - name: truststore mountPath: "/etc/ssl/truststore" readOnly: true - name: kafkasql mountPath: /opt/flink/lib/flink-sql-connector-kafka-1.17.1.jar volumes: - name: keystore secret: secretName: flink items: - key: user.p12 path: keystore.p12 - name: truststore secret: secretName: my-cluster-cluster-ca-cert items: - key: ca.p12 path: truststore.p12 - name: kafkasql hostPath: path: /var/flink/lib/flink-sql-connector-kafka-1.17.1.jar type: File
apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: LoadBalancer
Running kubectl apply -k . will install Flink jobmanager and Flink taskmanager in the kafka namespace.
Flink SQL Client
Find the name of the flink-taskmanager pod in the kafka namespace and log into it
e.g. kubectl exec -it flink-taskmanager-54d79f6dfb-h94h6 -n kafka -- sh
Retrieve the SSL passwords from the environment:
env | grep SSL
SSL_TRUSTSTORE_PASSWORD=m6SgM1gk0OJb
SSL_KEY_PASSWORD=kyXPoimOojcJ
SSL_KEYSTORE_PASSWORD=kyXPoimOojcJ
Start the SQLclient
./bin/sql-client.sh
We can now create the tables/views we need to query our data.
The first one is the pmdata table that puts a structure on our json input from kafka
CREATE TABLE pmdata ( event ROW<commonEventHeader ROW<domain VARCHAR,eventName VARCHAR, sourceName VARCHAR, reportingEntityName VARCHAR, startEpochMicrosec BIGINT, lastEpochMicrosec BIGINT, timeZoneOffset VARCHAR>, perf3gppFields ROW<perf3gppFieldsVersion VARCHAR, measDataCollection ROW<granularityPeriod INT, measuredEntityUserName VARCHAR, measuredEntityDn VARCHAR, measuredEntitySoftwareVersion VARCHAR, measInfoList ARRAY< ROW<measInfoId ROW<sMeasInfoId STRING>, measTypes ROW<sMeasTypesList ARRAY<VARCHAR>>, measValuesList ARRAY<ROW<measObjInstId VARCHAR, suspectFlag VARCHAR, measResults ARRAY<ROW<p int, sValue STRING>>> > > >>>> ) WITH ( 'connector' = 'kafka', 'topic' = 'pm-readings', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9093', 'properties.security.protocol' = 'SSL', 'properties.ssl.truststore.location' = '/etc/ssl/truststore/truststore.p12', 'properties.ssl.truststore.type' = 'PKCS12', 'properties.ssl.truststore.password' = 'm6SgM1gk0OJb', 'properties.ssl.keystore.location' = '/etc/ssl/keystore/keystore.p12', 'properties.ssl.keystore.type' = 'PKCS12', 'properties.ssl.keystore.password' = 'kyXPoimOojcJ', 'properties.ssl.key.password' = 'kyXPoimOojcJ', 'properties.group.id' = 'my-group', 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', 'value.fields-include' = 'ALL', 'value.json.ignore-parse-errors' = 'true' );
Note: I'm using the passwords i retrieved from the environment earlier.
Before running any queries we need to set the query results format:
SET 'sql-client.execution.result-mode' = 'tableau';
We need to carry out further transformations to get the data into the format we want.
CREATE TEMPORARY VIEW measValues AS SELECT perf3gppFieldsVersion, measuredEntityDn, sMeasInfoId, measObjInstId, sMeasTypesList, suspectFlag, p, sValue from ( SELECT perf3gppFieldsVersion, measuredEntityDn, sMeasInfoId, measObjInstId, suspectFlag, sMeasTypesList, measResults from( SELECT perf3gppFieldsVersion, measuredEntityDn, measInfoId.sMeasInfoId, measTypes.sMeasTypesList, measValuesList FROM pmdata CROSS JOIN UNNEST(measInfoList) AS t (measInfoId, measTypes, measValuesList) ) CROSS JOIN UNNEST(measValuesList) AS t (measObjInstId, suspectFlag, measResults ) ) CROSS JOIN UNNEST(measResults) AS t (p, sValue);
CREATE TEMPORARY VIEW measTypes AS select perf3gppFieldsVersion, measuredEntityDn, measObjInstId, sMeasTypesList, sMeasType, ROW_NUMBER() OVER (PARTITION BY measObjInstId, sMeasTypesList ORDER BY proctime()) as p from ( SELECT perf3gppFieldsVersion, measuredEntityDn, sMeasInfoId, measObjInstId, suspectFlag, sMeasTypesList, measResults, cardinality(sMeasTypesList) as numMeas from( SELECT perf3gppFieldsVersion, measuredEntityDn, measInfoId.sMeasInfoId, measTypes.sMeasTypesList, measValuesList FROM pmdata CROSS JOIN UNNEST(measInfoList) AS t (measInfoId, measTypes, measValuesList) ) CROSS JOIN UNNEST(measValuesList) AS t (measObjInstId, suspectFlag, measResults ) ) CROSS JOIN UNNEST(sMeasTypesList) AS t (sMeasType);
CREATE TEMPORARY VIEW measResults AS select v.perf3gppFieldsVersion, v.measuredEntityDn, v.measObjInstId, t.sMeasType, v.sValue from measValues v, measTypes t where v.measObjInstId = t.measObjInstId and v.p = t.p and v.sMeasTypesList = t.sMeasTypesList;
We can now run a query against the measResults table: select * from measResults where sMeasType = 'succImmediateAssignProcs8';
Links
Expanding arrays into new rows
How Does Flink Opensource SQL Parse Nested JSON?
Streaming SQL with Apache Flink: A Gentle Introduction
JSON SQL functions in Apache Flink