Release H: Apache Flink

Release H: Apache Flink

Introduction

Apache Flink is an open-source, unified stream-processing and batch-processing framework developed by the Apache Software Foundation. 
Flink's pipelined runtime system enables the execution of bulk/batch and stream processing programs

Installation

Copy the following files from the Kubernetes setup page:

flink-configuration-configmap.yaml
jobmanager-service.yaml
jobmanager-session-deployment-non-ha.yaml
taskmanager-session-deployment.yaml



Download the following jar from maven central:

flink-sql-connector-kafka-1.17.1.jar

and copy it somewhere that's accessible from your k8s pods.



We can then use kustomize to configure the installation for our own requirements.



kustomization.yaml
apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization namespace: kafka resources: - flink-user.yaml - flink-configuration-configmap.yaml - jobmanager-service.yaml - jobmanager-session-deployment-non-ha.yaml - taskmanager-session-deployment.yaml patchesStrategicMerge: - patch-jobmanager-env.yml - patch-taskmanager-env.yml - patch-jobmanager-volumes.yml - patch-taskmanager-volumes.yml - jobmanager-service-loadbalancer.yaml replicas: - count: 1 name: flink-taskmanager



flink-user.yaml
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: flink labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls



patch-jobmanager-env.yml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: template: spec: containers: - name: jobmanager env: - name: SSL_KEYSTORE_PASSWORD valueFrom: secretKeyRef: name: flink key: user.password - name: SSL_KEY_PASSWORD valueFrom: secretKeyRef: name: flink key: user.password - name: SSL_TRUSTSTORE_PASSWORD valueFrom: secretKeyRef: name: my-cluster-cluster-ca-cert key: ca.password



patch-taskmanager-env.yml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: template: spec: containers: - name: taskmanager env: - name: SSL_KEYSTORE_PASSWORD valueFrom: secretKeyRef: name: flink key: user.password - name: SSL_KEY_PASSWORD valueFrom: secretKeyRef: name: flink key: user.password - name: SSL_TRUSTSTORE_PASSWORD valueFrom: secretKeyRef: name: my-cluster-cluster-ca-cert key: ca.password



patch-jobmanager-volumes.yml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: template: spec: containers: - name: jobmanager volumeMounts: - name: keystore mountPath: "/etc/ssl/keystore" readOnly: true - name: truststore mountPath: "/etc/ssl/truststore" readOnly: true - name: kafkasql mountPath: /opt/flink/lib/flink-sql-connector-kafka-1.17.1.jar volumes: - name: keystore secret: secretName: flink items: - key: user.p12 path: keystore.p12 - name: truststore secret: secretName: my-cluster-cluster-ca-cert items: - key: ca.p12 path: truststore.p12 - name: kafkasql hostPath: path: /var/flink/lib/flink-sql-connector-kafka-1.17.1.jar type: File



patch-taskmanager-volumes.yml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: template: spec: containers: - name: taskmanager volumeMounts: - name: keystore mountPath: "/etc/ssl/keystore" readOnly: true - name: truststore mountPath: "/etc/ssl/truststore" readOnly: true - name: kafkasql mountPath: /opt/flink/lib/flink-sql-connector-kafka-1.17.1.jar volumes: - name: keystore secret: secretName: flink items: - key: user.p12 path: keystore.p12 - name: truststore secret: secretName: my-cluster-cluster-ca-cert items: - key: ca.p12 path: truststore.p12 - name: kafkasql hostPath: path: /var/flink/lib/flink-sql-connector-kafka-1.17.1.jar type: File



jobmanager-service-loadbalancer.yaml
apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: LoadBalancer



Running kubectl apply -k . will install Flink jobmanager and Flink taskmanager in the kafka namespace.



Flink SQL Client

Find the name of the flink-taskmanager pod in the kafka namespace and log into it

e.g. kubectl exec -it flink-taskmanager-54d79f6dfb-h94h6 -n kafka -- sh



Retrieve the SSL passwords from the environment:

env | grep SSL
SSL_TRUSTSTORE_PASSWORD=m6SgM1gk0OJb
SSL_KEY_PASSWORD=kyXPoimOojcJ
SSL_KEYSTORE_PASSWORD=kyXPoimOojcJ



Start the SQLclient

 ./bin/sql-client.sh

We can now create the tables/views we need to query our data.

The first one is the pmdata table that puts a structure on our json input from kafka

pmdata
CREATE TABLE pmdata ( event ROW<commonEventHeader ROW<domain VARCHAR,eventName VARCHAR, sourceName VARCHAR, reportingEntityName VARCHAR, startEpochMicrosec BIGINT, lastEpochMicrosec BIGINT, timeZoneOffset VARCHAR>, perf3gppFields ROW<perf3gppFieldsVersion VARCHAR, measDataCollection ROW<granularityPeriod INT, measuredEntityUserName VARCHAR, measuredEntityDn VARCHAR, measuredEntitySoftwareVersion VARCHAR, measInfoList ARRAY< ROW<measInfoId ROW<sMeasInfoId STRING>, measTypes ROW<sMeasTypesList ARRAY<VARCHAR>>, measValuesList ARRAY<ROW<measObjInstId VARCHAR, suspectFlag VARCHAR, measResults ARRAY<ROW<p int, sValue STRING>>> > > >>>> ) WITH ( 'connector' = 'kafka', 'topic' = 'pm-readings', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9093', 'properties.security.protocol' = 'SSL', 'properties.ssl.truststore.location' = '/etc/ssl/truststore/truststore.p12', 'properties.ssl.truststore.type' = 'PKCS12', 'properties.ssl.truststore.password' = 'm6SgM1gk0OJb', 'properties.ssl.keystore.location' = '/etc/ssl/keystore/keystore.p12', 'properties.ssl.keystore.type' = 'PKCS12', 'properties.ssl.keystore.password' = 'kyXPoimOojcJ', 'properties.ssl.key.password' = 'kyXPoimOojcJ', 'properties.group.id' = 'my-group', 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', 'value.fields-include' = 'ALL', 'value.json.ignore-parse-errors' = 'true' );

Note: I'm using the passwords i retrieved from the environment earlier.

Before running any queries we need to set the query results format:

SET 'sql-client.execution.result-mode' = 'tableau';





We need to carry out further transformations to get the data into the format we want.

measValues
CREATE TEMPORARY VIEW measValues AS SELECT perf3gppFieldsVersion, measuredEntityDn, sMeasInfoId, measObjInstId, sMeasTypesList, suspectFlag, p, sValue from ( SELECT perf3gppFieldsVersion, measuredEntityDn, sMeasInfoId, measObjInstId, suspectFlag, sMeasTypesList, measResults from( SELECT perf3gppFieldsVersion, measuredEntityDn, measInfoId.sMeasInfoId, measTypes.sMeasTypesList, measValuesList FROM pmdata CROSS JOIN UNNEST(measInfoList) AS t (measInfoId, measTypes, measValuesList) ) CROSS JOIN UNNEST(measValuesList) AS t (measObjInstId, suspectFlag, measResults ) ) CROSS JOIN UNNEST(measResults) AS t (p, sValue);



measTypes
CREATE TEMPORARY VIEW measTypes AS select perf3gppFieldsVersion, measuredEntityDn, measObjInstId, sMeasTypesList, sMeasType, ROW_NUMBER() OVER (PARTITION BY measObjInstId, sMeasTypesList ORDER BY proctime()) as p from ( SELECT perf3gppFieldsVersion, measuredEntityDn, sMeasInfoId, measObjInstId, suspectFlag, sMeasTypesList, measResults, cardinality(sMeasTypesList) as numMeas from( SELECT perf3gppFieldsVersion, measuredEntityDn, measInfoId.sMeasInfoId, measTypes.sMeasTypesList, measValuesList FROM pmdata CROSS JOIN UNNEST(measInfoList) AS t (measInfoId, measTypes, measValuesList) ) CROSS JOIN UNNEST(measValuesList) AS t (measObjInstId, suspectFlag, measResults ) ) CROSS JOIN UNNEST(sMeasTypesList) AS t (sMeasType);



measResults
CREATE TEMPORARY VIEW measResults AS select v.perf3gppFieldsVersion, v.measuredEntityDn, v.measObjInstId, t.sMeasType, v.sValue from measValues v, measTypes t where v.measObjInstId = t.measObjInstId and v.p = t.p and v.sMeasTypesList = t.sMeasTypesList;



We can now run a query against the measResults table: select * from measResults where sMeasType = 'succImmediateAssignProcs8';





Links

Apache Flink vs Spark

Built-in Functions

Expanding arrays into new rows

How Does Flink Opensource SQL Parse Nested JSON?

Streaming SQL with Apache Flink: A Gentle Introduction

Apache Kafka SQL Connector

SSL Setup

SQL Client

Flink Examples

Flink InfluxDB Connector

JSON SQL functions in Apache Flink