StreamNative Pulsar Operator Tutorial Part 1

Yuwei Sung
9 min readMay 8, 2022
Photo by James A. Molnar on Unsplash

In the article, I demo how to use StreamNative Pulsar operators to deploy a pulsar cluster in my k8s home lab.

Why use k8s operators? StreamNative provides a good overview of the operator concept here. StreamNative “open-sources” four operators, zookeeper, bookkeeper, broker, and function mesh, which are wrapped as a helm chart. For a quick start, you can follow the official installation doc here. This article explores a step-by-step breakdown way to set up the pulsar cluster and observe its capabilities.

First, instead of “helm install” the chart from the chart repo directly, I fetch the chart and check the details.

>helm repo add streamnative https://charts.streamnative.io
>helm repo update
>helm fetch streamnative/pulsar-operator --untar
>cd pulsar-operator

The command “helm fetch” with the “untar” subcommand will download the chart template to local. Let’s check what is inside the chart.

>cat Chart.yaml
apiVersion: v1
appVersion: 0.9.4
description: Apache Pulsar Operators Helm chart for Kubernetes
home: https://streamnative.io
icon: http://pulsar.apache.org/img/pulsar.svg
kubeVersion: '>= 1.16.0-0 < 1.24.0-0'
maintainers:
- email: support@streamnative.io
name: StreamNative Support
name: pulsar-operator
sources:
- https://github.com/streamnative/pulsar-operators
version: 0.10.0

From the above, the current chart supports k8s version 1.16 to 1.23. My homelab k8s is 1.24.0. If I run the “helm install” directly from the remote chart repo, the installation will stop at the k8s version check.

>helm install sn-operator -n test streamnative/pulsar-operator
Error: INSTALLATION FAILED: chart requires kubeVersion: >= 1.16.0-0 < 1.24.0-0 which is incompatible with Kubernetes v1.24.0

To bypass this, I modify the kubeVersion range to ‘>=1.16.0–0 < 1.25.0–0’. This is YMMV as k8s 1.24 is just released days ago. Give Streamnative team some time to test.

After changing the Chart.yaml, regular helm users go through the default values.yaml file as documentation. This values file is pretty straightforward. It describes what operators you can install. There are zookeeper-operator, bookkeeper-operator, and pulsar-operator (broker/proxy). Then, it tells the image repo location and tags, followed by the operator details like cluster role/role, service account, and operator resource limits and requests.
I keep the default in the “values.yaml”. I will come back to modify those in a more restrictive env, e.g., CRD role vs. cluster role bindings.

After reviewing the values and changing the chart support kubeVersion, I “helm install” the operator in the sn-operator namespace from my local chart.

>kubectl create namespace sn-operator
>helm install -n sn-operator pulsar-operator .
>kubectl get all -n sn-operator
NAME READY STATUS RESTARTS AGE
pod/pulsar-operator-bookkeeper-controller-manager-9c596465-h8nbh 1/1 Running 0 16h
pod/pulsar-operator-pulsar-controller-manager-6f8699ffc-gr989 1/1 Running 0 16h
pod/pulsar-operator-zookeeper-controller-manager-7b54b76c79-rsm6t 1/1 Running 0 16h
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/pulsar-operator-bookkeeper-controller-manager 1/1 1 1 16h
deployment.apps/pulsar-operator-pulsar-controller-manager 1/1 1 1 16h
deployment.apps/pulsar-operator-zookeeper-controller-manager 1/1 1 1 16h
NAME DESIRED CURRENT READY AGE
replicaset.apps/pulsar-operator-bookkeeper-controller-manager-9c596465 1 1 1 16h
replicaset.apps/pulsar-operator-pulsar-controller-manager-6f8699ffc 1 1 1 16h
replicaset.apps/pulsar-operator-zookeeper-controller-manager-7b54b76c79 1 1 1 16h

If you check your k8s API resources, you will find all the API created.

>kubectl api-resources | grep pulsar
pulsarbrokers pb,broker pulsar.streamnative.io/v1alpha1 true PulsarBroker
pulsarconnections pconn pulsar.streamnative.io/v1alpha1 true PulsarConnection
pulsarnamespaces pns pulsar.streamnative.io/v1alpha1 true PulsarNamespace
pulsarpermissions ppermission pulsar.streamnative.io/v1alpha1 true PulsarPermission
pulsarproxies pp,proxy pulsar.streamnative.io/v1alpha1 true PulsarProxy
pulsartenants ptenant pulsar.streamnative.io/v1alpha1 true PulsarTenant
pulsartopics ptopic pulsar.streamnative.io/v1alpha1 true PulsarTopic

As shown above, there are three controllers/operators, and each handles different “kinds” of clusters. You can use the API to create Topic,Tenant, permissions. Like the standard k8s controllers, deployment, for example, we tell the controller what we want by feeding the cluster definitions. In a regular k8s deployment, you put all kinds of components in a yaml file and use “apply” or “create” to create pods, services, configmap, and others. You can put zookeeper, bookkeeper, and broker cluster definitions in a single yaml file, then deploy them in one shot. In order to understand and troubleshoot the deployment, I break it down into three steps, zookeeper, bookkeeper, then broker. If you want to understand the dependencies among the three, check out Sijie Guo’s TGIP Youtube channel. The sequence is zookeeper, bookkeeper, broker, then others.

The following is the ZooKeeperCluster definition that the zookeeper controller/operator will take to deploy a zookeeper cluster. It is very similar to a deployment. It defines the image location, version, replicas, resources, and persistent disk properties. There should be other properties like JVM flags for tuning. I will discuss this later as we focus on getting a running cluster, and the operator should help make the extra configs updated automatically.

---
apiVersion: zookeeper.streamnative.io/v1alpha1
kind: ZooKeeperCluster
metadata:
name: my
namespace: sn-platform
spec:
image: streamnative/pulsar:2.9.2.15
replicas: 3
pod:
resources:
requests:
cpu: "50m"
memory: "256Mi"
persistence:
reclaimPolicy: Retain
data:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "10Gi"
dataLog:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "2Gi"

Let’s apply this file and see what happens.

>kubectl apply -f zk-cluster.yaml
zookeepercluster.zookeeper.streamnative.io/my created
>kubectl get pod -n sn-platform -w
NAME READY STATUS RESTARTS AGE
my-zk-0 1/1 Running 0 25s
my-zk-1 1/1 Running 0 25s
my-zk-2 1/1 Running 0 25s
>kubectl get svc -n sn-platform
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-zk ClusterIP 10.104.64.179 <none> 2181/TCP,8000/TCP,9990/TCP 42s
my-zk-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP,8000/TCP,9990/TCP 42s

Let’s check the zookeeper controller log and what’s going on in there.

>kubectl logs -n sn-ops pulsar-operator-zookeeper-controller-manager-7b54b76c79-rsm6t{"severity":"info","timestamp":"2022-05-11T02:47:07Z","logger":"controllers.ZooKeeperCluster","message":"Reconciling ZooKeeperCluster","Request.Namespace":"sn-platform","Request.Name":"my"}{"severity":"info","timestamp":"2022-05-11T02:47:07Z","logger":"controllers.ZooKeeperCluster","message":"Updating an existing ZooKeeper StatefulSet","StatefulSet.Namespace":"sn-platform","StatefulSet.Name":"my-zk"}{"severity":"debug","timestamp":"2022-05-11T02:47:07Z","logger":"controller","message":"Successfully Reconciled","reconcilerGroup":"zookeeper.streamnative.io","reconcilerKind":"ZooKeeperCluster","controller":"zookeepercluster","name":"my","namespace":"sn-platform"}{"severity":"info","timestamp":"2022-05-11T02:47:07Z","logger":"controllers.ZooKeeperCluster","message":"Reconciling ZooKeeperCluster","Request.Namespace":"sn-platform","Request.Name":"my"}{"severity":"info","timestamp":"2022-05-11T02:47:07Z","logger":"controllers.ZooKeeperCluster","message":"Updating an existing ZooKeeper StatefulSet","StatefulSet.Namespace":"sn-platform","StatefulSet.Name":"my-zk"}{"severity":"debug","timestamp":"2022-05-11T02:47:07Z","logger":"controller","message":"Successfully Reconciled","reconcilerGroup":"zookeeper.streamnative.io","reconcilerKind":"ZooKeeperCluster","controller":"zookeepercluster","name":"my","namespace":"sn-platform"}{"severity":"info","timestamp":"2022-05-11T02:47:07Z","logger":"controllers.ZooKeeperCluster","message":"Reconciling ZooKeeperCluster","Request.Namespace":"sn-platform","Request.Name":"my"}{"severity":"info","timestamp":"2022-05-11T02:47:07Z","logger":"controllers.ZooKeeperCluster","message":"Updating an existing ZooKeeper StatefulSet","StatefulSet.Namespace":"sn-platform","StatefulSet.Name":"my-zk"}

The zookeeper controller is running a reconcile loop which keeps checking the “my” ZooKeeperCluster status in sn-platform namespace. This is a recommended operator design pattern. This operator pod log is handy to troubleshoot pulsar deployment.

The next component is the bookkeeper cluster. Following the same pattern, I define the BookKeeperCluster kind like the following.

---
apiVersion: bookkeeper.streamnative.io/v1alpha1
kind: BookKeeperCluster
metadata:
name: my
namespace: sn-platform
spec:
image: streamnative/pulsar:2.9.2.15
replicas: 3
pod:
resources:
requests:
cpu: "200m"
memory: "256Mi"
storage:
reclaimPolicy: Retain
journal:
numDirsPerVolume: 1
numVolumes: 1
volumeClaimTemplate:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "8Gi"
ledger:
numDirsPerVolume: 1
numVolumes: 1
volumeClaimTemplate:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "16Gi"
zkServers: my-zk-headless:2181

Note that “zkServers” is required in this yaml file, and it should point to the headless service (reaching all three zkServers).

>kubectl apply -f bk-cluster.yaml
bookkeepercluster.bookkeeper.streamnative.io/my created
>kubectl get pod -n sn-platform
NAME READY STATUS RESTARTS AGE
my-bk-0 1/1 Running 0 90s
my-bk-1 1/1 Running 0 90s
my-bk-2 1/1 Running 0 90s
my-bk-auto-recovery-0 1/1 Running 0 48s
my-zk-0 1/1 Running 0 4m51s
my-zk-1 1/1 Running 0 4m51s
my-zk-2 1/1 Running 0 4m51s

You can follow the same command to find out what the bookkeeper operator is doing behind the scenes.

>kubectl logs -n sn-ops pulsar-operator-bookkeeper-controller-manager-9c596465-h8nbhW0512 11:45:15.235940       1 warnings.go:67] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget{"severity":"info","timestamp":"2022-05-12T11:47:27Z","logger":"controllers.BookKeeperCluster","message":"Reconciling BookKeeperCluster","Request.Namespace":"sn-platform","Request.Name":"my"}{"severity":"info","timestamp":"2022-05-12T11:47:27Z","logger":"controllers.BookKeeperCluster","message":"Updating the status for the BookKeeperCluster","Namespace":"sn-platform","Name":"my","Status":{"observedGeneration":4,"replicas":4,"readyReplicas":4,"updatedReplicas":4,"labelSelector":"cloud.streamnative.io/app=pulsar,cloud.streamnative.io/cluster=my,cloud.streamnative.io/component=bookie","conditions":[{"type":"AutoRecovery","status":"True","reason":"Deploy","message":"Ready","lastTransitionTime":"2022-05-08T19:58:26Z"},{"type":"Bookie","status":"True","reason":"Ready","message":"Bookies are ready","lastTransitionTime":"2022-05-09T00:34:12Z"},{"type":"Initialization","status":"True","reason":"Initialization","message":"Initialization succeeded","lastTransitionTime":"2022-05-08T19:57:43Z"},{"type":"Ready","status":"True","reason":"Ready","lastTransitionTime":"2022-05-09T00:34:12Z"}]}}{"severity":"debug","timestamp":"2022-05-12T11:47:27Z","logger":"controller","message":"Successfully Reconciled","reconcilerGroup":"bookkeeper.streamnative.io","reconcilerKind":"BookKeeperCluster","controller":"bookkeepercluster","name":"my","namespace":"sn-platform"}W0512 11:49:18.419391       1 warnings.go:67] autoscaling/v2beta2 HorizontalPodAutoscaler is deprecated in v1.23+, unavailable in v1.26+; use autoscaling/v2 HorizontalPodAutoscalerW0512 11:52:35.237812       1 warnings.go:67] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudgetW0512 11:57:06.421507       1 warnings.go:67] autoscaling/v2beta2 HorizontalPodAutoscaler is deprecated in v1.23+, unavailable in v1.26+; use autoscaling/v2 HorizontalPodAutoscalerW0512 11:58:43.240049       1 warnings.go:67] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudgetW0512 12:04:06.423448       1 warnings.go:67] autoscaling/v2beta2 HorizontalPodAutoscaler is deprecated in v1.23+, unavailable in v1.26+; use autoscaling/v2 HorizontalPodAutoscalerW0512 12:05:03.242609       1 warnings.go:67] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudgetW0512 12:09:08.425304       1 warnings.go:67] autoscaling/v2beta2 HorizontalPodAutoscaler is deprecated in v1.23+, unavailable in v1.26+; use autoscaling/v2 HorizontalPodAutoscalerW0512 12:14:15.245078       1 warnings.go:67] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudgetW0512 12:18:47.427470       1 warnings.go:67] autoscaling/v2beta2 HorizontalPodAutoscaler is deprecated in v1.23+, unavailable in v1.26+; use autoscaling/v2 HorizontalPodAutoscalerW0512 12:19:30.247840       1 warnings.go:67] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudgetW0512 12:25:04.249159       1 warnings.go:67] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudgetW0512 12:25:13.430394       1 warnings.go:67] autoscaling/v2beta2 HorizontalPodAutoscaler is deprecated in v1.23+, unavailable in v1.26+; use autoscaling/v2 HorizontalPodAutoscaler

There are a bunch of warnings complaining about api deprecated. This is normal because I have the current version (v1.24.0). Kube1.24.0 has many big changes.

The next component is the broker cluster. The following is the PulsarBroker yaml file. From the file, you can see that broker also depends on zkServer. Note that I add “config.custom” in the descriptor.

---
apiVersion: pulsar.streamnative.io/v1alpha1
kind: PulsarBroker
metadata:
name: my
namespace: sn-platform
spec:
image: streamnative/pulsar:2.9.2.15
pod:
resources:
requests:
cpu: 200m
memory: 256Mi
terminationGracePeriodSeconds: 30
config:
custom:
webSocketServiceEnabled: "true"
replicas: 2
zkServers: my-zk-headless:2181

Like zookeeper and bookkeeper clusters, creating a broker cluster is the same as creating standard k8s objects. I used ‘kubectl get pod -n <namespace> -w’ to watch the sequence of pod creation. This is helpful to understand the pulsar component dependencies. I skipped the operator log in this article. You can run a similar command to trace the controller log.

>kubectl apply -f br-cluster.yaml
pulsarbroker.pulsar.streamnative.io/my created
>kubectl get pod -n sn-platform -w
NAME READY STATUS RESTARTS AGE
my-bk-0 1/1 Running 0 2m11s
my-bk-1 1/1 Running 0 2m11s
my-bk-2 1/1 Running 0 2m11s
my-bk-auto-recovery-0 1/1 Running 0 89s
my-broker-metadata-init-gghqc 0/1 Completed 0 6s
my-zk-0 1/1 Running 0 5m32s
my-zk-1 1/1 Running 0 5m32s
my-zk-2 1/1 Running 0 5m32s
my-broker-metadata-init-gghqc 0/1 Completed 0 7s
my-broker-metadata-init-gghqc 0/1 Completed 0 7s
my-broker-0 0/1 Pending 0 0s
my-broker-1 0/1 Pending 0 0s
my-broker-0 0/1 Pending 0 0s
my-broker-1 0/1 Pending 0 0s
my-broker-1 0/1 Init:0/1 0 0s
my-broker-0 0/1 Init:0/1 0 0s
my-broker-metadata-init-gghqc 0/1 Terminating 0 8s
my-broker-metadata-init-gghqc 0/1 Terminating 0 8s
my-broker-0 0/1 Init:0/1 0 0s
my-broker-1 0/1 Init:0/1 0 0s
my-broker-1 0/1 PodInitializing 0 1s
my-broker-0 0/1 PodInitializing 0 1s
my-broker-1 0/1 Running 0 2s
my-broker-0 0/1 Running 0 2s
my-broker-0 0/1 Running 0 10s
my-broker-1 0/1 Running 0 10s
my-broker-0 1/1 Running 0 40s
my-broker-1 1/1 Running 0 40s

When all pods are running and READY, I check the services and found all services are ClusterIP. This assumes that all producer and consumer workloads are inside the K8S cluster. In order to test the traffic, I want a LoadBalancer from machines in my home lab but external of the K8S cluster. So the final component in this article is proxy.

---
apiVersion: pulsar.streamnative.io/v1alpha1
kind: PulsarProxy
metadata:
name: my
namespace: sn-platform
spec:
brokerAddress: my-broker-headless
dnsNames: []
#webSocketServiceEnabled: true
image: streamnative/pulsar:2.9.2.15
config:
tls:
enabled: false
issuerRef:
name: ""
pod:
resources:
requests:
cpu: 200m
memory: 256Mi
replicas: 1

Proxy is a bit tricky because TLS is enabled in default which makes sense as it is the external gateway to connect to the pulsar cluster. In this article, I turn off TLS on all components for simplicity. I will discuss enabling TLS using the operator in the next article.

>kubectl apply -f px-cluster.yaml
pulsarproxy.pulsar.streamnative.io/my created
>kubectl get pod -n sn-platform
NAME READY STATUS RESTARTS AGE
my-bk-0 1/1 Running 0 44m
my-bk-1 1/1 Running 0 44m
my-bk-2 1/1 Running 0 44m
my-bk-auto-recovery-0 1/1 Running 0 43m
my-broker-0 1/1 Running 0 42m
my-broker-1 1/1 Running 0 42m
my-proxy-0 1/1 Running 0 57s
my-zk-0 1/1 Running 0 47m
my-zk-1 1/1 Running 0 47m
my-zk-2 1/1 Running 0 47m
>kubectl get svc -n sn-platform
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-bk ClusterIP 10.104.212.43 <none> 3181/TCP,8000/TCP 44m
my-bk-auto-recovery-headless ClusterIP None <none> 3181/TCP,8000/TCP 44m
my-bk-headless ClusterIP None <none> 3181/TCP,8000/TCP 44m
my-broker ClusterIP 10.99.107.224 <none> 6650/TCP,8080/TCP 41m
my-broker-headless ClusterIP None <none> 6650/TCP,8080/TCP 41m
my-proxy-external LoadBalancer 10.109.250.31 10.0.0.36 6650:32751/TCP,8080:30322/TCP 33s
my-proxy-headless ClusterIP None <none> 6650/TCP,8080/TCP 33s
my-zk ClusterIP 10.104.64.179 <none> 2181/TCP,8000/TCP,9990/TCP 47m
my-zk-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP,8000/TCP,9990/TCP 47m

Proxy automatically gets my external LoadBalancer IP (10.0.0.36).
In part 2, let’s write simple python test codes to produce and consume messages. Also, find out how we change component behaviors using operators, e.g. jvm tuning and mTLS.
In part 3, install the function mesh operator and simple functions.
In part 4, Pulsar Observability.

Stay tuned.

--

--

Yuwei Sung

A data nerd started from data center field engineer to cloud database reliability engineer.