Producing and Consuming Avro Messages With Redpanda Schema Registry
8 min read
Should you’re accustomed to Apache Kafka®, then you definately may need encountered a Kafka-compatible schema registry—a separate part that you simply deploy exterior of your Kafka cluster, since Kafka itself doesn’t have one built-in.
Basically, a schema is a logical description of how your knowledge is organized, and so a schema registry gives a central repository for these schemas, permitting producers and shoppers to seamlessly ship and obtain knowledge between them. For event-driven architectures, this could change into advanced and troublesome to handle as you scale, since knowledge schemas can change and evolve over time (doubtlessly breaking issues down the road).
That is the place Redpanda steps in. Redpanda is a Kafka API-compatible streaming knowledge platform that’s designed from the bottom as much as be quick, easy, and cost-effective. In line with that mission, Redpanda comes with a schema registry already inbuilt so you’ll be able to retailer, model management, and validate schemas on your real-time functions with out deploying or managing something apart from the Redpanda cluster.
To provide you a style of the way it works, this publish walks you thru constructing a easy clickstream processor utilizing the Redpanda schema registry to supply and devour Apache Avro™ messages. We selected Avro because it’s the preferred option to serialize knowledge in a compact binary format and assist schema evolution. This tutorial is simply 5 steps lengthy and the whole lot you want is in this GitHub repository.
Prepared? Let’s get began.
How To Construct a Clickstream Processing Software
It’s time to get right down to the code. In simply 5 steps, we’ll spin up a single-node Redpanda cluster in Redpanda Console with Docker Compose, after which use a schema-registry-aware SDK consumer (confluent-kafka) to supply and devour Avro messages to Redpanda. The SDK will do a lot of the heavy lifting, together with schema registration, and schema compatibility checking.
In short, the Python producer software collects person interplay occasions from an online software, serializes them in Avro, and publishes them into the clickstream matter in Redpanda. One other Python software consumes them from Redpanda, deserializes, and makes use of them to research person habits.
The format of a pattern ClickStream occasion would appear like this:
"user_id":2323,
"event_type":"BUTTON_CLICK",
"ts":"2018-11-12 01:02:03.123456789"
To imitate the above use case, we’ll write a easy producer and shopper in Python that produce and devour messages from the clickstream Redpanda matter. They’ll use the PandaProxy REST API to speak with the schema registry.
Earlier than we begin, be sure you have Docker Desktop and Python 3 (with pip) put in in your native machine.
1. Clone the GitHub Repository
This GitHub repository incorporates a number of artifacts to bootstrap the tutorial. It can show you how to get began instantly. We’ll create further code artifacts as we progress by means of the tutorial.
Execute the next instructions to clone the repository to your native machine.
git clone https://github.com/redpanda-data-blog/2023-schema-registry-kafka-avro-tutorial.git code
cd code
2. Set up Python Dependencies
We’ll set up a number of Python libraries utilized by the Python shoppers you will notice later on this tutorial. You can find them within the necessities.txt file on the root degree of the repo. To forestall them from clashing along with your native dependencies, let’s create a brand new digital atmosphere for them with venv.
Execute the next instructions in a terminal window.
python3 -m venv env
supply env/bin/activate
pip set up --upgrade pip
pip set up -r necessities.txt
Additionally, word that we’re utilizing the confluent-kafka Python SDK for all of the API communications with the Redpanda schema registry. It’s a schema-registry-aware SDK that’s additionally suitable with the Confluent schema registry. Due to that, confluent-kafka will do a whole lot of heavy lifting for us beneath the hood, resembling including padding for every message consisting of the magic byte and the schemaID. Additionally, it will possibly routinely register the schemas with the registry.
One other benefit is that you simply use the Redpanda schema registry along with your Confluent SDK shoppers, without having any code modifications.
3. Begin a Redpanda Cluster
Subsequent, we’ll use Docker Compose to create a Redpanda cluster.
Find the docker-compose.yml
file on the root degree of the cloned repository and run the next command in a terminal.
That can spin up a single-node Redpanda cluster with the Redpanda console. This Redpanda node incorporates the schema registry built-in. You possibly can visually discover the schema definitions saved within the schema registry with the Redpanda console.
Entry the console by logging into http://localhost:8080/brokers. Click on on the Schema Registry within the sidebar to see the schema definitions.
You need to see an empty display since we began the cluster from scratch.
4. Write the Producer Code
Now that we’ve got a functioning Redpanda cluster, the subsequent step is to supply Avro-formatted messages.
The producer.py within the cloned repository has the Python code for occasion manufacturing. Its content material would appear like this:
import json
from uuid import uuid4
from confluent_kafka import KafkaException
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
def delivery_callback(error, message):
if error:
print("Didn't ship the message: %s" % error)
else:
print(f"Message with the important thing message.key() has been produced to the subject message.matter()")
def load_avro_schema_from_file():
key_schema_string = """
"sort": "string"
"""
key_schema = avro.hundreds(key_schema_string)
value_schema = avro.load('./schemas/click_event.avsc')
return key_schema, value_schema
def produce():
config =
'bootstrap.servers' : "localhost:9092",
'schema.registry.url' : "http://localhost:8081"
key_schema, value_schema = load_avro_schema_from_file()
producer = AvroProducer(
config,
default_key_schema = key_schema,
default_value_schema = value_schema
)
strive:
key = str(uuid4())
value_str=""user_id":2,"event_type":"CLICK","ts":"2021-12-12""
worth = json.hundreds(value_str)
producer.produce(
matter = "clickstream",
key = key,
worth = worth,
on_delivery = delivery_callback
)
producer.ballot(10000)
producer.flush()
besides KafkaException as e:
print("Error occurred throughout message manufacturing:", e)
print("Completed!")
def essential():
produce()
if __name__ == "__main__":
essential()
Many of the strategies are simply the boilerplate code and are self-explanatory. So, let’s stroll by means of the strategies that matter most to serialization.
First, we cross the schema URL to the SDK consumer by setting the configuration property, schema.registry.url
Subsequent, the load_avro_schema_from_file()
methodology returns two schemas for a ClickStream occasion—the schema for the important thing and the schema for the worth.
def load_avro_schema_from_file():
key_schema_string = """
"sort": "string"
"""
key_schema = avro.hundreds(key_schema_string)
value_schema = avro.load('./schemas/click_event.avsc')
return key_schema, value_schema
Word that the worth schema is loaded from the schemas/click_event.avsc file within the repository. That file incorporates the next Avro schema definition which defines the construction of a ClickStream occasion.
"sort" : "report",
"namespace" : "com.redpanda.examples.avro",
"title" : "ClickEvent",
"fields" : [
"name": "user_id", "type" : "int" ,
"name": "event_type", "type" : "string" ,
"name": "ts", "type": "string"
]
As soon as each schemas are derived, they’re handed because the arguments to the SDK’s serializer (AvroProducer
). Should you recall the serialization workflow above, that is the place the AvroProducer
requested the schemaID for the topic, ClickEvent. Right here, the topic title is derived from the title area, specified within the Avro schema definition.
producer = AvroProducer(
config,
default_key_schema = key_schema,
default_value_schema = value_schema
)
When the producer code runs for the primary time, the AvroProducer
routinely registers the ClickEvent schema within the schema registry and fetches the schemaID, which occurs to be 1. This schema ID is exclusive throughout the Redpanda cluster, and you need to use it to retrieve the schema later.
The following invocations will learn the schemaID from the cache.
Subsequent, run the file on a terminal to supply Avro messages.
Log into the Redpanda Console’s Matters web page to see if the clickstream
matter has been populated with a single occasion.
Screenshot of the Matters web page in Redpanda Console
Word that the Redpanda Console can deserialize Avro messages for you, exhibiting you the content material of the clickstream occasion’s payload (worth) since we used a schema registry-aware Python SDK. As each the producer consumer and the Console use the identical schema registry, the Console can decide which schema to make use of for deserialization by wanting on the schemaID portion carrying in every message.
Subsequent, log in to the Redpanda console’s Schema Registry web page to confirm the schema registration. You will notice the clickstream-key
and the clickstream-value
schema definitions have already been registered with the schema registry.
Screenshot of the Schema Registry web page in Redpanda Console
Now that we’ve got Avro-formatted messages within the clickstream
matter, let’s deserialize them with a Python shopper.
The shopper.py file within the repository incorporates the code to devour the clickstream matter, deserialize the messages, and print their content material on the terminal. The file content material would appear like this:
import json
from confluent_kafka import KafkaException
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import avro
def devour():
config =
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081",
"group.id": "my-connsumer1",
"auto.offset.reset": "earliest"
shopper = AvroConsumer(config)
shopper.subscribe(["clickstream"])
whereas True:
strive:
msg = shopper.ballot(1)
if msg is None:
proceed
print("Secret is :" + json.dumps(msg.key()))
print("Worth is :" + json.dumps(msg.worth()))
print("-------------------------")
besides KafkaException as e:
print('Kafka failure ' + e)
shopper.shut()
def essential():
devour()
if __name__ == '__main__':
essential()
The code is fairly self-explanatory. As we did with the producer, the schema registry URL has been configured with config object and handed into the Avro deserializer, AvroConsumer
, which subscribes to the clickstream
matter.
That’s all you must know. The remainder, together with the schemaID discovery, schema retrieval, and eventually, the deserialization shall be taken care of by the AvroConsumer
.
Run the file in a terminal.
You need to see a single occasion in return, with their deserialized content material as follows.
Secret is :"39950858-1cfd-4d56-a3ac-2bde1c806f6f"
Worth is :"user_id": 2, "event_type": "CLICK", "ts": "2021-12-12"
Simplifying Schema Registry in Kafka With Redpanda
Should you made it this far, give your self a pat on the again since you simply used Redpanda’s built-in schema registry to supply and devour Avro messages! This is only one approach Redpanda works to make streaming knowledge in Kafka sooner and less complicated. There aren’t any new binaries to put in, no new providers to deploy and keep, and the default configuration simply works.
Be at liberty to customise this clickstream processing instance additional and mess around with different knowledge schema codecs, like Protobuf. To be taught extra about Redpanda, take a look at our documentation.