In this exploration, we’re breaking down how to blend Kafka Streams, Redis, and Telegraf to make data flow super easy. Whether you’re a tech pro or just curious, follow along as we explain these tools and show you how they work together to make managing data a breeze. Curious to see how synchronized data can make things simpler? Let’s explore Kafka Streams, Redis, and Telegraf together!
System Components
I would like to present a collection of key ingredients and techniques, outlining the process of harmoniously streaming data from Apache Kafka to Redis as the target system. Let’s begin by exploring the core component, Apache Kafka.
Apache Kafka
Exploring Apache Kafka in short details, highlighting its significance in modern data architectures. Apache Kafka is an open-source distributed event streaming platform known for its scalability, fault tolerance, and high throughput.
In the core structure of Apache Kafka, three fundamental components drive its powerful data processing capabilities: Brokers, Producers, and Consumers.
- Brokers: lies at the heart of Kafka’s architecture, enabling the creation of robust and scalable data pipelines for real-time data streaming.
- Producers: are responsible for sending data to Kafka topics, initiating the flow of information.
- Consumers: on the other hand, subscribe to these topics and process the messages, making decisions or taking actions based on the received data.
Brokers act as intermediaries, managing the storage and retrieval of messages between producers and consumers.
System Architecture
Telegraf as a Producer/Consumer
The orchestration of this demonstration is facilitated by Telegraf, serving as both the producer and consumer coordinator. In its role as a producer, Telegraf employs a robust plugin, specifically the Kafka Output Plugin, which efficiently pushes data from the source system into the Kafka broker service.
On the ingestion side, the Kafka Consumer takes charge of extracting data from the Kafka cluster. Here, Telegraf plays a pivotal role as a consummate handler, leveraging its Kafka Consumer Input Plugin. Engineered for seamless data consumption within the Kafka cluster, this plugin capitalizes on Telegraf’s lightweight architecture, flexible configuration options, and reliability.
Target System: Redis
Redis is widely recognized as a powerful caching system, and in this demonstration, we are leveraging its capabilities to continuously update the latest data obtained from Telegraf. This is achieved by integrating the Execd Output Plugin with a straightforward shell script.
Through this integration, we ensure an effective flow of real-time information, enhancing the efficiency and responsiveness of the overall system.
Hands-On
In this demonstration, I’ve prepared a code example available at this GitHub repository.
To start, let’s initiate the backend services:
$ docker-compose -f compose/backend.yml up -d
Exploring the backend components: Apache Kafka and Redis. These backend services are defined in the docker-compose
file.
The relationship between Zookeeper and Kafka is fundamental to the operation of Kafka clusters. Zookeeper essentially acts as a centralized service for maintaining configuration information, providing distributed synchronization, and providing group services.
Kafka uses Zookeeper for tasks like maintaining topic configuration, managing broker membership, and electing a leader broker for each partition. Without Zookeeper, Kafka brokers wouldn’t be able to coordinate and manage these tasks effectively in a distributed environment.
Redis serves as the target system for storing the latest cryptocurrency prices fetched from CoinGecko, leveraging its high-performance in-memory storage capabilities.
Gather cryptocurrency prices
$ docker-compose -f compose/producer.yml up -d
Exploring the producer component: a scraper service that collects data from CoinGecko using Telegraf, configured with the scraper.conf
file.
The file scraper.conf
is a configuration file for the scraper service used in the example project. This configuration file likely includes settings that dictate how the scraper should operate, such as endpoints to fetch data from, scheduling parameters, and other relevant options.
[agent]
debug = true
quiet = false
metric_buffer_limit = 1000000
omit_hostname = true
[[inputs.http]]
# Read formatted metrics from one or more HTTP endpoints
urls = ["https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum,ripple,cardano,binancecoin,solana,dogecoin,chainlink,near,decentraland&vs_currencies=usd&precision=18"]
tagexclude = ["host", "url"]
data_format = "json"
[[outputs.kafka]]
## URLs of kafka brokers
brokers = ["kafka:19092"]
## Kafka topic for producer messages
topic = "currency.rate"
exclude_topic_tag = true
data_format = "json"
This configuration sets up the scraper service to fetch cryptocurrency data from CoinGecko using the HTTP input plugin defined in the [[inputs.http]]
section.
The data is then published to the Kafka topic currency.rate
using the Kafka output plugin specified in the [[outputs.kafka]]
section.
Explore scraped data in Kafka
- Entering the Kafka container to run Kafka commands:
$ docker-compose -f compose/backend.yml exec kafka sh
2. List topics by specified Bootstrap Server URL:
sh-4.4$ kafka-topics --list --bootstrap-server localhost:9092
3. Start consuming and displaying messages from currency.rate
from the beginning of the topic:
sh-4.4$ kafka-console-consumer --bootstrap-server localhost:9092 --topic currency.rate --from-beginning
If you only want to consume new messages arriving in the topic (not from the beginning), you can omit the
--from-beginning
option
Push cryptocurrency prices into Redis
$ docker-compose -f compose/consumer.yml up -d
Exploring the consumer component: a currency-rates-redis-consumer
service to consume cryptocurrency prices data from Kafka, configured with the consumer.conf
file.
The file consumer.conf
is a configuration file for the consumer service used in the example project. It configures the Telegraf consumer plugin, specifying how Telegraf will consume data from a Kafka topic and write it to an output, such as a database or another data sink.
[agent]
debug = true
quiet = false
metric_buffer_limit = 1000000
omit_hostname = true
[[inputs.kafka_consumer]]
brokers = ["kafka:19092"]
topic_regexps = ["currency.rate"]
topic_tag = "topic_currency_rate"
consumer_group = "currency_rate_metrics"
offset = "oldest"
max_message_len = 1000000
data_format = "json"
json_string_fields = ["fields"]
tag_keys = ["fields"]
json_time_key = "timestamp"
json_time_format = "unix"
[[outputs.execd]]
command = ["sh", "/etc/scripts/push-rates-redis.sh", "redis"]
data_format = "json"
This configuration file sets up Telegraf to:
- Operate with detailed logging enabled (debug mode), without suppressing any non-error logs.
- Buffer up to 1,000,000 metrics to prevent data loss during high load or network issues.
- Consume JSON-formatted messages from Kafka topics matching
currency.rate
, starting from the oldest message. - Add specific tags to the metrics based on message fields and topic names.
- Execute a custom shell script to push the consumed data to a Redis instance, passing the data in JSON format.
The push-rates-redis.sh
shell script processes JSON data lines received through standard input, typically from the Execd Output Plugin at [[outputs.execd]]
section of Telegraf configuration.
For each line, it extracts the timestamp
and fields
, merges them into a structured JSON object, and reformats this object to include a timestamp
, quote
, and prices
array.
After formatting the JSON, it prints the formatted JSON to standard output and pushes it to a Redis under the key coingecko:rate:usd
. The script defaults to connecting to a Redis server on localhost
but can connect to a different server if a hostname is provided as the first argument.