RicAsn2Json Converter
Message handling sequence
Players and roles
PLAYER | ROLE |
Configuration | · Yaml configuration model |
Consumer |
|
Processor |
|
Producer |
|
Main |
|
Configurations
brokers | List of ip:port | |
Consumer-group | Name - same for all instances | |
In-topic | Name | |
Out-topic | Name | |
In-buffer-size | For inChannel | default value: 2 * #CPUs |
Out-buffer-size | For outChannel | default value: 1 * #CPUs |
Optional future configurations (for next versions, if more tuning needed)
Consumer-heartbeat-interval-ms | The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than | see https://docs.confluent.io/current/installation/configuration/consumer-configs.html |
Consumer-session-timeout-ms | The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by | see https://docs.confluent.io/current/installation/configuration/consumer-configs.html |
Processor-pool-size | Default=#CPUs | We start with go-routine per message. We will use this if we’ll need to define routine-pool. |
Producer-pool-size | Default=#CPUS*FACTOR | We start with go-routine per message. We will use this if we’ll need to define routine-pool. |
Concurrency, Synchronicity & Scalability
Single Consumer (per partition. Multiple partitions will have multiple instances)
Multiple Processors (each message is processed by a go-routine)
Multiple Producers (each message is produced by a go-routine)
Asynchronic Producer – log errors
Multiple instances of app will address multiple partitions – this is managed exogenically by Kafka + K8S.
Dependencies
confluent-kafka-go
librdkafka
goxml2json
Error handling
Shutdown for these errors (otherwise, log and proceed):
Subscription error
Polling error (Partion error, AllBrokersAreDown)
Suggested JSON message format
{
"header": {<header data>},
"body": {<E2AP as json>}
}
Suggested name
E2JStreamer