Running SparkPi SparkApplication in k8s homelab with Harbor Registry

Yuwei Sung
11 min readDec 2, 2024

--

Running Spark applications requires infrastructure support, and most articles and books recommend using cloud providers to simplify the infra processes—for example, Google Dataproc, Databricks Community version, Azure HDInsight, or AWS EMR. If you want to learn/practice Spark/Delta Lake, the free cluster from cloud providers is good enough. When you want to get serious about machine learning, data pipeline, or workflow, the cost of the cloud providers adds up. Many customers are starting to pull back the cloud footprint and move their workloads to private data centers. In this series, I want to show the building process of my data analytic learning journey in my home lab using k8s.
Recently, I started to learn Spark and Delta Lake. Spark is a distributed computing platform that consumes structured/unstructured/streaming data and produces new insights into different analytic endpoints. After getting an overall idea of what Spark could do using local machines, I wondered how to streamline the data processes and CICD pipeline like a “real” data engineer/scientist with my home lab. It turns out Spark does support Kubernetes. I should be able to run any Spark jobs in my home lab. Spark and Delta Lake support local file systems, Hadoop, and Object Store (AWS S3, Google CloudStorage, and Azure Blob Storage). I do have Ceph Object Store in my lab. I also found the Spark operator in the Kubeflow project.

In this first article, I show a manifest of the Spark operator installation, the steps to push Spark application images to the harbor registry, and the manifest of the Spark application accessing the Ceph object storage.

The first thing is “installing the Spark Operator”. Refer to Kubeflow GitHub for the Spark operator installation. There is a Sparkpi example that you can directly apply to the operator too.

# Add the Helm repository
helm repo add spark-operator https://kubeflow.github.io/spark-operator
helm repo update

# Install the operator into the spark-operator namespace and wait for deployments to be ready
helm install spark-operator spark-operator/spark-operator \
--namespace spark-operator --create-namespace --wait
kubectl apply -f https://raw.githubusercontent.com/kubeflow/spark-operator/refs/heads/master/examples/spark-pi.yaml

You should see the spark-pi-driver job in action in your default namespace.

You can check the log and find the output (Pi is roughly 3.1416340862832683).

This shows that the Spark operator is monitoring the SparkApplication CR and deploying the Spark Pi job correctly. Let’s check the SparkApplication CR.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: spark:3.5.3
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar
arguments:
- "5000"
sparkVersion: 3.5.3
driver:
labels:
version: 3.5.3
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.3
instances: 1
cores: 1
memory: 512m

This Pi job does not read from any data source nor write the result in the console. One more concern is that you need to customize the image to serve your data process use cases. For example, you may need to install additional Python modules or Spark connectors to read/write data to external analytic endpoints. This means my data engineers will need to write the Spark application logic locally and push the container image to a private/public registry. Next step, I show how I pull and push images from dockerhub to the harbor registry in my home lab. Refer to my article about harbor registry installation.

If you are using a Mac, be aware of the “platform” you pull/push. My homelab nodes are Linux and only capable of running amd64. If you accidentally push arm images, you will see an “error” running the job.

#from my mac, docker login to my harbor registry
docker login https://harbor.home.lab
Authenticating with existing credentials...
Login Succeeded
#pull spark image from dockerhub with amd64 arch
docker pull --platform linux/amd64 spark:3.5.3
docker tag spark:3.5.3 harbor.home.lab/homelab/spark:3.5.3
docker push harbor.home.lab/homelab/spark:3.5.3
...

Double-check that the image was pushed to my private registry.

Let’s redeploy the Spark Pi application using the new image. Note that I changed the imagePullPolicy to “always” in case the code is modified. The purpose is to make sure the Spark operator can work with my private harbor registry.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: harbor.home.lab/homelab/spark:3.5.3
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar
arguments:
- "5000"
sparkVersion: 3.5.3
driver:
labels:
version: 3.5.3
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.3
instances: 1
cores: 1
memory: 512m

The job is completed without a problem.

Next, let’s set up the Ceph ObjectStore and verify the Spark Application can read/write to the object store bucket. Refer to the Rook/Ceph document for installation. Here is my CephObjectStore.

This is the CR for my CepObjectStore, StorageClass, and ObjectBucketClaim for the Spark application.

apiVersion: ceph.rook.io/v1
kind: CephObjectStore
metadata:
name: homelab
namespace: rook-ceph
spec:
metadataPool:
failureDomain: host
replicated:
size: 3
dataPool:
failureDomain: host
# For production it is recommended to use more chunks, such as 4+2 or 8+4
erasureCoded:
dataChunks: 2
codingChunks: 1
preservePoolsOnDelete: false
gateway:
# annotations:
# metallb.universe.tf/loadBalancerIPs: 10.10.0.39
# sslCertificateRef: ceph-homelab-tls
# caBundleRef: homelab
port: 80
#securePort: 443
instances: 1
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: rook-ceph-delete-bucket
provisioner: rook-ceph.ceph.rook.io/bucket # driver:namespace:cluster
# set the reclaim policy to delete the bucket and all objects
# when its OBC is deleted.
reclaimPolicy: Delete
parameters:
objectStoreName: homelab
objectStoreNamespace: rook-ceph # namespace:cluster
# To accommodate brownfield cases reference the existing bucket name here instead
# of in the ObjectBucketClaim (OBC). In this case the provisioner will grant
# access to the bucket by creating a new user, attaching it to the bucket, and
# providing the credentials via a Secret in the namespace of the requesting OBC.
#bucketName:
---
apiVersion: objectbucket.io/v1alpha1
kind: ObjectBucketClaim
metadata:
name: spark-object-store
spec:
# To create a new bucket specify either `bucketName` or
# `generateBucketName` here. Both cannot be used. To access
# an existing bucket the bucket name needs to be defined in
# the StorageClass referenced here, and both `bucketName` and
# `generateBucketName` must be omitted in the OBC.
#bucketName:
generateBucketName: ceph-bkt
storageClassName: rook-ceph-delete-bucket
additionalConfig:
# To set for quota for OBC
#maxObjects: "1000"
#maxSize: "2G"

Once the object bucket is ready, we can write a container image to send data to the bucket. Here is the information on the object store needed by a Spark job (AWS 3S Host, name, port, access key ID, and secret access key). The SparkApplication CR should be able to mount the secret and configMap to access the object store.

Let’s test the bucket access with the rook/ceph toolbox operator. Deploy a toolbox operator deployment and shell into the pod. Note that the “toolbox” deployment does not have s5cmd.

export AWS_HOST=rook-ceph-rgw-homelab.rook-ceph.svc
export PORT=80
export BUCKET_NAME=ceph-bkt-7c5f4833-e3ff-4830-84fb-21a6470bc508
export AWS_ACCESS_KEY_ID=RGLCL15NVEOTKFXRRUI5
export AWS_SECRET_ACCESS_KEY=i9q5BxZJME984bH5wq8s8aZPv9uDHEnnel1Z1I4D
echo "Hello Rook" > /tmp/rookObj
# cp /tmp/rookObj to s3 bucket
s5cmd --endpoint-url http://$AWS_HOST:$PORT cp /tmp/rookObj s3://$BUCKET_NAME
# you should get the following response
cp /tmp/rookObj s3://ceph-bkt-7c5f4833-e3ff-4830-84fb-21a6470bc508/rookObj
# download the same file from s3 bucket
s5cmd --endpoint-url http://$AWS_HOST:$PORT cp s3://$BUCKET_NAME/rookObj /tmp/rookObj-download
# you should get the following response
cp s3://ceph-bkt-7c5f4833-e3ff-4830-84fb-21a6470bc508/rookObj /tmp/rookObj-download
# if you cat /tmp/rookObj-download
cat /tmp/rookObj-download
Hello Rook

It’s time to write a pySpark container image. Ceph ObjectStore supports s3a protocol, but the default spark image does not have the jar included. I need to add two jars to the image. Also, due to the self-signed/private ca, I used in my home lab, I need to make a new image with my root.crt so pySpark and hadoop-aws can pick up the trust store in JVM. Here is my Dockerfile.

FROM spark:3.5.3

WORKDIR /opt/spark
USER root

RUN apt upgrade -y
RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -P $SPARK_HOME/jars
RUN wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -P $SPARK_HOME/jars
COPY root.crt /opt/spark
RUN keytool -import -trustcacerts -file /opt/spark/root.crt -alias homelab -keystore /opt/java/openjdk/lib/security/cacerts -noprompt -storepass changeit
RUN rm /opt/spark/root.crt

USER spark
ENTRYPOINT [ "/opt/entrypoint.sh" ]

This is the command I build the new image.

docker buildx build --platform linux/amd64 -t spark:3.5.3 .
docker tag spark:3.5.3 harbor.home.lab/homelab/spark:3.5.3
docker push harbor.home.lab/homelab/spark:3.5.3

Next, I create a deployment to test this new image with pySpark. In this deployment, I mount the ceph-bucket secret and configMap (containing S3 endpoint and credential) as env. Note that I changed the command and args of the container, so I have a chance to shell into the container and run pySpark.

apiVersion: apps/v1
kind: Deployment
metadata:
name: sptest
labels:
app: spark
spec:
replicas: 1
selector:
matchLabels:
app: spark
template:
metadata:
labels:
app: spark
spec:
containers:
- name: spark
image: harbor.home.lab/homelab/spark:3.5.3
imagePullPolicy: Always
command: ["/bin/bash"]
args: ["-c", "while true; do sleep 100; done"]
envFrom:
- configMapRef:
name: ceph-bucket
- secretRef:
name: ceph-bucket

After the deployment is running, shell into the pod and run the following commands in pySpark.

import os
access_key = os.getenv('AWS_ACCESS_KEY_ID')
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
s3_host = os.getenv('BUCKET_HOST')
s3_port = os.getenv('BUCKET_PORT')
bucket = os.getenv('BUCKET_NAME')
s3_endpoint = f"https://{s3_host}:{s3_port}"
s3_url = f"s3a://{bucket}/test"

sc._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
sc._jsc.hadoopConfiguration().set('com.amazonaws.sdk.disableCertChecking', 'true')
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint', s3_endpoint)
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_key)
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secret_key)
sc._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.strictverify', 'false')
sc._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')

data = spark.range(0,100)
data.write.format('parquet').mode('overwrite').save(s3_url)

If everything is fine, you should see the following.

24/12/09 22:23:22 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 50.67% for 15 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 47.50% for 16 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 44.71% for 17 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 42.22% for 18 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 40.00% for 19 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 38.00% for 20 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 36.19% for 21 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 34.55% for 22 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 33.04% for 23 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 31.67% for 24 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 33.04% for 23 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 34.55% for 22 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 36.19% for 21 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 38.00% for 20 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 40.00% for 19 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 42.22% for 18 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 44.71% for 17 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 47.50% for 16 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 50.67% for 15 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 54.29% for 14 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/09 22:23:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
>>>

The above output confirms that the “root.crt import” resolves the private ca.crt issue when using aws-hadoop lib to access Ceph Object store with TLS.
In the final step, I added the same Python script to the Dockerfile and built it again for SparkApplication CR. The Python script looks like this.

import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

access_key = os.getenv('AWS_ACCESS_KEY_ID')
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
s3_host = os.getenv('BUCKET_HOST')
s3_port = os.getenv('BUCKET_PORT')
bucket = os.getenv('BUCKET_NAME')
s3_endpoint = f"https://{s3_host}:{s3_port}"
s3_url = f"s3a://{bucket}/test"

sc._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
sc._jsc.hadoopConfiguration().set('com.amazonaws.sdk.disableCertChecking', 'true')
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint', s3_endpoint)
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_key)
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secret_key)
sc._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.strictverify', 'false')
sc._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')

data = spark.range(0,100)
data.write.format('parquet').mode('overwrite').save(s3_url)

Add the Python file to the work-dir. The new Dockerfile looks like this.

FROM spark:3.5.3

WORKDIR /opt/spark
USER root

RUN apt upgrade -y
RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -P $SPARK_HOME/jars
RUN wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -P $SPARK_HOME/jars
COPY root.crt /opt/spark
RUN keytool -import -trustcacerts -file /opt/spark/root.crt -alias homelab -keystore /opt/java/openjdk/lib/security/cacerts -noprompt -storepass changeit
RUN rm /opt/spark/root.crt
COPY run.py /opt/spark/work-dir

USER spark
ENTRYPOINT [ "/opt/entrypoint.sh" ]

After the new image is pushed to the harbor registry, we can apply the SparkApplication CR and check the output in the Ceph Object Store.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-ceph
namespace: default
spec:
type: Python
pythonVersion: "3"
mainApplicationFile: local:///opt/spark/work-dir/run.py
mode: cluster
image: harbor.home.lab/homelab/spark-py:3.5.3
deps:
sparkConf:
hadoopConf:
driver:
cores: 1
memory: "512m"
serviceAccount: spark-operator-spark
envFrom:
- configMapRef:
name: ceph-bucket
- secretRef:
name: ceph-bucket
javaOptions:
volumeMounts:
- name: sparky-data
mountPath: /spark-data
executor:
cores: 1
instances: 2
memory: "512m"
envFrom:
- configMapRef:
name: ceph-bucket
- secretRef:
name: ceph-bucket

There are many options to run a data pipeline in a Kubernetes cluster. In this article, I show the possibility of running a basic spark job using Ceph Object Store. More questions pop up in my brain. What else do I need to automate/version/monitor/collaborate the data pipeline? Argo? Delta Lake? GitHub/Gitea? Keycloak for rbac? security (cve)? Next article, I will try data pipeline.

--

--

Yuwei Sung
Yuwei Sung

Written by Yuwei Sung

A data nerd started from data center field engineer to cloud database reliability engineer.

No responses yet