StreamNative Pulsar Operator Tutorial Part 2

Yuwei Sung
7 min readMay 16, 2022
Photo by Will Porada on Unsplash

In this part, I show a step-by-step how to create a python producer container using vs-code. I use venv to control the python version in a project. The following code snippet shows how I create python env and python libs before I open the folder using “code .”

mkdir cloudnative-pulsar
cd cloudnative-pulsar
python3 -m venv .py39
source .py39/bin/activate
python -m pip install --upgrade pip
pip install pulsar-client==2.9.2
git init
code .

My initial vs-code interface looks like this. I also toggle up a terminal window so I can run kubectl or docker build in the same window.

In tutorial part 1, I exposed the proxy service as a load balancer (external IP). This way, I can connect to the broker in k8s directly from my home network. Next step, let’s jump into python producer and consumer. Use “command+shift P” to call up the vs-code command palette and type “new” and select “File: New Folder”, then add a folder called ‘producer’.

Also, you can use the icon in the project explorer to create a new python file, ‘test_producer.py’.

Before you create a client (either producer or consumer), you need a topic created first. I used the “kubectl exec” to run the pulsar-admin command in the broker container and create a top in the vs-code terminal window.

kubectl exec -n sn-platform my-broker-0 -- bin/pulsar-admin topics create persistent://public/default/my-topic1

You should be able to see the topic created using the following command

kubectl exec -n sn-platform my-broker-0 -- bin/pulsar-admin topics list public/default
"persistent://public/default/my-topic1"

Now we are ready to type some python codes. Copy the following snippet to test_producer.py and save the file (cmd+s).

import pulsarclient = pulsar.Client('pulsar://10.0.0.36:6650')producer = client.create_producer(
'persistent://public/default/my-topic1',
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10)
def producer_callback(res, msg_id):
print(f"message published {msg_id}")
i = 0
while i <1000:
producer.send_async(
f"Hello-{i}".encode('utf-8'),
producer_callback)
i+=1

In the terminal, you can run the producer code like this.

source .py39/bin/activate
python producer/test_producer.py
2022-05-15 21:49:46.917 INFO [0x104948580] ClientConnection:182 | [<none> -> pulsar://10.0.0.36:6650] Create ClientConnection, timeout=10000
2022-05-15 21:49:46.918 INFO [0x104948580] ConnectionPool:96 | Created connection for pulsar://10.0.0.36:6650
2022-05-15 21:49:46.942 INFO [0x16b81b000] ClientConnection:368 | [10.0.0.7:59912 -> 10.0.0.36:6650] Connected to broker
2022-05-15 21:49:46.979 INFO [0x16b81b000] HandlerBase:64 | [persistent://public/default/my-topic1, ] Getting connection from pool
2022-05-15 21:49:46.986 INFO [0x16b81b000] ClientConnection:182 | [<none> -> pulsar://10.0.0.36:6650] Create ClientConnection, timeout=10000
2022-05-15 21:49:46.986 INFO [0x16b81b000] ConnectionPool:96 | Created connection for pulsar://my-broker-0.my-broker-headless.sn-platform.svc.cluster.local:6650
2022-05-15 21:49:46.993 INFO [0x16b81b000] ClientConnection:370 | [10.0.0.7:59913 -> 10.0.0.36:6650] Connected to broker through proxy. Logical broker: pulsar://my-broker-0.my-broker-headless.sn-platform.svc.cluster.local:6650
2022-05-15 21:49:47.040 INFO [0x16b81b000] ProducerImpl:188 | [persistent://public/default/my-topic1, ] Created producer on broker [10.0.0.7:59913 -> 10.0.0.36:6650]

It looks good? The code returned without an error. How do we know it published 1000 messages to the topic? Let’s use the pulsar-admin to check the topic stats.

kubectl exec -n sn-platform my-broker-0 -- bin/pulsar-admin topics stats public/default/my-topic1
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 14921,
"msgInCounter" : 1000,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 14921,
"backlogSize" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : { },
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
}
}

From this output, you can see that there are 1000 msgIn.

Next step, let’s build the image and make this producer a k8s deployment. Use the command palette to create a Dockerfile.

You can find the Dockerfile and requirements.txt created in your project folder. I move the files to the producer folder and modified the content to fit the folder because I want to create two images, one for the producer and the other for the consumer.

Don’t forget to put the python dependency (pulsar-client==2.9.2) in the requirements.txt.

We are ready to build the docker image. Run the following command to build the image. Note that if you are using Mac M1, you need to specify the image platform to fit your k8s worker OS (mine is Ubuntu).

docker buildx build --platform linux/amd64 . -t yuwsung1/pulsar-python-producer:v0.1
docker push yuwsung1/pulsar-python-producer:v0.1

Once the image is pushed to the dockerhub, you can use kubectl to run the image as a container and check the topic stats for new messages.

kubectl run prod-test --image=yuwsung1/pulsar-python-producer:v0.1
kubectl exec -n sn-platform my-broker-0 -- bin/pulsar-admin topics stats public/default/my-topic1

Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 29482,
"msgInCounter" : 2000,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 14921,
"backlogSize" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : { },
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
}
}

However, this image is pretty much useless. The URL, topic name, and other producer properties are hard-coded in the python code. Next step, we set those properties to k8s configmap, then use deployment to mount those configmap as OS env variables. Then in the python code, we can import the os module and read the env variables to replace those properties. Now the producer codes like this:

import pulsar
import os
pulsar_url = os.environ.get('PULSAR_URL')
topic = os.environ.get('PULSAR_TOPIC')
client = pulsar.Client(pulsar_url)producer = client.create_producer(
topic,
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10)
def producer_callback(res, msg_id):
print(f"message published {msg_id}")
i = 0
while i <1000:
producer.send_async(
f"Hello-{i}".encode('utf-8'),
producer_callback)
i+=1

To test the code locally, I export PULSAR_URL and TOPIC on my current local env.

export PULSAR_URL='pulsar://10.0.0.36:6650'
export PULSAR_TOPIC='persistent://public/default/my-topic1'
python test_producer.py

kubectl exec -n sn-platform my-broker-0 -- bin/pulsar-admin topics stats public/default/my-topic1
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 44763,
"msgInCounter" : 3000,

"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 14921,
"backlogSize" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : { },
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
}
}

Rebuild the image with a new tag.

docker buildx build --platform linux/amd64 . -t yuwsung1/pulsar-python-producer:v0.2
docker push yuwsung1/pulsar-python-producer:v0.2

Note that the v2 will need PULSAR_URL and PULSAR_TOPIC ingested into the producer pod to work. The following are the deployment and configmap.

apiVersion: v1
kind: ConfigMap
metadata:
name: pulsar-producer-config
data:
pulsar_url: "pulsar://10.0.0.36:6650"
topic: "my-topic1"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-producer
spec:
selector:
matchLabels:
app: my-producer
replicas: 1
template:
metadata:
labels:
app: my-producer
spec:
containers:
- name: pulsar-producer
image: yuwsung1/pulsar-python-producer:v0.2
resources:
limits:
cpu: "500m"
memory: "128Mi"
env:
- name: PULSAR_URL
valueFrom:
configMapKeyRef:
name: pulsar-producer-config
key: pulsar_url
- name: PULSAR_TOPIC
valueFrom:
configMapKeyRef:
name: pulsar-producer-config
key: topic

Use kubectl to deploy this configmap and pod, then check the topic stats.

kubectl apply -f pulsar-producer.yaml
kubectl exec -n sn-platform my-broker-0 -- bin/pulsar-admin topics stats public/default/my-topic1
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 59684,
"msgInCounter" : 4000,

"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 14921,
"backlogSize" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : { },
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
}
}

Now we can follow the same steps to create a consumer image and deployment. I will skip the steps in this tutorial, but you can find the consumer codes in my github repo.

Next part, we will discuss how to use argocd to migrate consumer/producer and upgrade pulsar operator. Stay tune.

--

--

Yuwei Sung

A data nerd started from data center field engineer to cloud database reliability engineer.