Scale RabbitMQ applications on Vultr Kubernetes Engine with KEDA
-
by cobra_admin
- 147
Introduction
KEDA
is a Kubernetes-based Event Driven Autoscaler that handles scaling operations of any container workload in a cluster based on the number of events that need to be processed. It works with standard Kubernetes components such as Horizontal Pod Autoscaler (HPA) that allow you to map the applications you want to scale in an event-driven format. KEDA scalers are components that monitor external systems to generate metrics and drive the scaling of Kubernetes workloads.
A KEDA RabbitMQ scaler can scale workloads based on the length of a RabbitMQ queue. This article demonstrates how to use the RabbitMQ scaler on a Vultr Kubernetes Engine (VKE) cluster to allow RabbitMQ consumer application Pods to scale up and down based on the number of unprocessed items in the queue.
Prerequisites
Before you begin:
- Deploy a One-Click Docker instance using the Vultr Marketplace Application to use as your management system.
- Deploy a Vultr Kubernetes Engine (VKE) cluster with at least
3
nodes. - Access the Docker server using SSH as a non-root sudo user.
- Install Kubectl to access the VKE cluster.CONSOLECopy$ sudo snap install kubectl –classic
- Set your cluster manifest as the
KUBECONFIG
environment variable to use with Kubectl.CONSOLECopy$ export KUBECONFIG=<enter path to VKE kubeconfig file> - Install the Helm CLI.CONSOLECopy$ sudo snap install helm –classic
- Install GolangCONSOLECopy$ sudo snap install go –classic
Note
Ensure that your Kubernetes cluster nodes have atleast 2
vCPUs and 4096
MB memory each. Verify that your VKE deployment region supports Vultr Block Storage.
Install the KEDA Operator
- Create a new project files directory.CONSOLECopy$ mkdir vke-keda-rabbitmq
- Switch to the directory.CONSOLECopy$ cd vke-keda-rabbitmq
- Deploy the latest KEDA operator version to your cluster using Kubectl.CONSOLECopy$ kubectl apply –server-side -f https://github.com/kedacore/keda/releases/download/v2.12.1/keda-2.12.1-core.yaml The above command installs the operator version
2.12.1
to your cluster. To use the latest version, visit the KEDA releases page.Output:customresourcedefinition.apiextensions.k8s.io/clustertriggerauthentications.keda.sh serverside-applied customresourcedefinition.apiextensions.k8s.io/scaledjobs.keda.sh serverside-applied customresourcedefinition.apiextensions.k8s.io/scaledobjects.keda.sh serverside-applied customresourcedefinition.apiextensions.k8s.io/triggerauthentications.keda.sh serverside-applied ...
- Wait for at least
1
minute, then, view all deployments in the newkeda
namespace to verify that the operator is installed in your cluster.CONSOLECopy$ kubectl get deployment -n keda Output:NAME READY UP-TO-DATE AVAILABLE AGE keda-metrics-apiserver 1/1 1 1 30s keda-operator 1/1 1 1 40s
Install RabbitMQ
- Add the Bitnami chart to your local Helm repositories.CONSOLECopy$ helm repo add bitnami https://charts.bitnami.com/bitnami
- Update your Helm repositories.CONSOLECopy$ helm repo update
- Deploy RabbitMQ to your cluster.CONSOLECopy$ helm install rabbitmq –set auth.username=user –set auth.password=s3cr3t –set persistence.storageClass=vultr-block-storage –set persistence.size=10Gi bitnami/rabbitmq RabbitMQ installs to your cluster as a Kubernetes
StatefulSet
. The above command above usesvultr-block-storage
as the storage class to use with RabbitMQ for data persistence. When successful, navigate to your VKE Cluster Linked Resources tab and verify the attached block storage volumes. - Wait for at least
3
minutes for the installation to complete, then, verify that all RabbitMQ Pods are available and running in your cluster.CONSOLECopy$ kubectl get pods -l=app.kubernetes.io/instance=rabbitmq Output:NAME READY STATUS RESTARTS AGE rabbitmq-0 1/1 Running 0 2m28s
Create the RabbitMQ Producer Application
- Initialize a new Go module to create a
go.mod
file.CONSOLECopy$ go mod init rabbitmq-app - Using a text editor such as Nano, create a new file
producer.go
.CONSOLECopy$ nano producer.go - Add the following contents to the file.GOCopypackage main import ( “context” “fmt” “log” “os” “strconv” “time” amqp “github.com/rabbitmq/amqp091-go” ) var conn *amqp.Connection var queueName string func init() { url := os.Getenv(“RABBITMQ_URI”) if url == “” { log.Fatal(“missing environment variable RABBITMQ_URI”) } queueName = os.Getenv(“RABBITMQ_QUEUE_NAME”) if queueName == “” { log.Fatal(“missing environment variable RABBITMQ_QUEUE_NAME”) } var err error conn, err = amqp.Dial(url) if err != nil { log.Fatal(“dial failed “, err) } } func main() { defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf(“Failed to create channel: %v”, err) } defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) if err != nil { log.Fatalf(“Failed to declare queue: %v”, err) } for i := 0; i <= 1000000; i++ { _i := strconv.Itoa(i) msg := “message-” + _i err = ch.PublishWithContext( context.Background(), “”, q.Name, false, false, amqp.Publishing{ ContentType: “text/plain”, Body: []byte(msg), }, ) if err != nil { log.Fatalf(“failed to publish message: %v”, err) } fmt.Println(“message”, msg, “sent to queue”, q.Name) time.Sleep(1 * time.Second) } } Save and close the file.The above configuration uses the RabbitMQ connection string and queue name from the available environment variables to establish a connection to RabbitMQ. Then, it uses a
for
loop to send messages to the queue with a wait duration of1
second between each message.
Containerize the Producer Application
- Create a new file
Dockerfile.producer
.CONSOLECopy$ nano Dockerfile.producer - Add the following contents to the file.DOCKERFILECopyFROM golang AS build WORKDIR /app COPY go.mod ./ COPY go.sum ./ RUN go mod download COPY producer.go ./ RUN CGO_ENABLED=1 go build -o /rabbitmq-go-app FROM cgr.dev/chainguard/glibc-dynamic WORKDIR / COPY –from=build /rabbitmq-go-app /rabbitmq-go-app EXPOSE 8080 USER nonroot:nonroot ENTRYPOINT [“/rabbitmq-go-app”] Save and close the file.The above Dockerfile uses
golang
as the base image for the first stage and builds the producer program binary. The file usescgr.dev/chainguard/glibc-dynamic
as the base image for the second stage to copy the binary produced by the first stage.
Deploy the RabbitMQ Producer Application
- Fetch the application Go module dependencies to create
go.sum
file.CONSOLECopy$ go mod tidy - Login to your Docker Hub account.CONSOLECopy$ docker login When prompted, enter your Docker Hub username and password to use it as your default registry. However, you can also use the Vultr Container Registry to build and push images for deployment in your cluster.
- Build the application image. Replace
exampleuser
with your actual DockerHub username.CONSOLECopy$ docker build -t exampleuser/rabbitmq-producer-app -f Dockerfile.producer . - Push the image to your DockerHub profile.CONSOLECopy$ docker push exampleuser/rabbitmq-producer-app
- Create a new file
producer.yaml
.CONSOLECopy$ nano producer.yaml - Add the following contents to the file. Replace
exampleuser
with your Docker Hub username.YAMLCopyapiVersion: apps/v1 kind: Deployment metadata: name: rabbitmq-producer spec: replicas: 1 selector: matchLabels: app: rabbitmq-producer template: metadata: labels: app: rabbitmq-producer spec: containers: – name: rabbitmq-producer image: exampleuser/rabbitmq-producer-app imagePullPolicy: Always env: – name: RABBITMQ_QUEUE_NAME value: demo-queue – name: RABBITMQ_URI value: amqp://user:s3cr3t@rabbitmq.default.svc.cluster.local:5672 Save and close the file. - Deploy the producer application to your cluster.CONSOLECopy$ kubectl apply -f producer.yaml Output:
deployment.apps/rabbitmq-producer created
- Wait for at least
20
seconds and view therabbitmq-producer
pods to verify that the new applicationPod
isRunning
.CONSOLECopy$ kubectl get pods -l=app=rabbitmq-producer Your output should look like the one below:NAME READY STATUS RESTARTS AGE rabbitmq-producer-847f6866c5-tpxdx 1/1 Running 0 10s
- View the producer application logs to verify the queue messages progress.CONSOLECopy$ kubectl logs -f $(kubectl get pod -l=app=rabbitmq-producer -o jsonpath='{.items[0].metadata.name}’) Output:
message message-0 sent to queue demo-queue message message-1 sent to queue demo-queue message message-2 sent to queue demo-queue message message-3 sent to queue demo-queue ....
The above logs show that the messages are successfully sent to thedemo-queue
RabbitMQ queue.
Create the RabbitMQ Consumer Application
- Create a new file
consumer.go
.CONSOLECopy$ nano consumer.go - Add the following contents to the file.GOCopypackage main import ( “fmt” “log” “os” “time” “github.com/google/uuid” amqp “github.com/rabbitmq/amqp091-go” ) var conn *amqp.Connection var queueName string var instanceName string func init() { url := os.Getenv(“RABBITMQ_URI”) if url == “” { log.Fatal(“missing environment variable RABBITMQ_URI”) } queueName = os.Getenv(“RABBITMQ_QUEUE_NAME”) if queueName == “” { log.Fatal(“missing environment variable RABBITMQ_QUEUE_NAME”) } var err error conn, err = amqp.Dial(url) if err != nil { log.Fatal(err) } instanceName = os.Getenv(“INSTANCE_NAME”) if instanceName == “” { instanceName = “rabbitmq-consumer-” + uuid.NewString() } } func main() { defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf(“Failed to create channel: %v”, err) } defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) if err != nil { log.Fatalf(“Failed to declare queue: %v”, err) } err = ch.Qos( 1, 0, false, ) msgs, err := ch.Consume( q.Name, “”, false, false, false, false, nil, ) if err != nil { log.Fatalf(“failed to consume messages from queue: %v”, err) } fmt.Println(“consumer instance”, instanceName, “waiting for messages…..”) for msg := range msgs { fmt.Println(“Instance”, instanceName, “received message”, string(msg.Body), “from queue”, q.Name) msg.Ack(false) time.Sleep(3 * time.Second) } } Save and close the file.The above configuration uses the RabbitMQ connection string and queue name environment variables to establish a connection to RabbitMQ. It detects the Pod instance name using the
INSTANCE_NAME
environment variable. Then, it declares a queue and uses afor
loop to receive messages. In addition, all messages are logged to the console and acknowledged. - To containerize the application for deployment, create a new Dockerfile
Dockerfile.consumer
.CONSOLECopy$ nano Dockerfile.consumer - Add the following contents to the file.DOCKERFILECopyFROM golang AS build WORKDIR /app COPY go.mod ./ COPY go.sum ./ RUN go mod download COPY consumer.go ./ RUN CGO_ENABLED=1 go build -o /rabbitmq-go-app FROM cgr.dev/chainguard/glibc-dynamic WORKDIR / COPY –from=build /rabbitmq-go-app /rabbitmq-go-app EXPOSE 8080 USER nonroot:nonroot ENTRYPOINT [“/rabbitmq-go-app”] Save and close the file.The above Dockerfile uses two-stage build process which includes
golang
that works as a base image for the first stage and builds the consumer program binary. Then,cgr.dev/chainguard/glibc-dynamic
is the base image for the second stage that copies the binary produced by the first stage.
Deploy the RabbitMQ Consumer Application
- Fetch the application Go module dependencies.CONSOLECopy$ go mod tidy
- Build the application image. Replace
exampleuser
with your actual Docker Hub username.CONSOLECopy$ docker build -t exampleuser/rabbitmq-consumer-app -f Dockerfile.consumer . - Push the application image to your new Docker Hub profile.CONSOLECopy$ docker push exampleuser/rabbitmq-consumer-app
- Create a new file
consumer.yaml
.CONSOLECopy$ nano consumer.yaml - Add the following contents to the file. Replace
exampleuser
with your Docker Hub username.YAMLCopyapiVersion: apps/v1 kind: Deployment metadata: name: rabbitmq-consumer spec: replicas: 1 selector: matchLabels: app: rabbitmq-consumer template: metadata: labels: app: rabbitmq-consumer spec: containers: – name: rabbitmq-consumer image: exampleuser/rabbitmq-consumer-app imagePullPolicy: Always env: – name: RABBITMQ_QUEUE_NAME value: demo-queue – name: RABBITMQ_URI value: amqp://user:s3cr3t@rabbitmq.default.svc.cluster.local:5672 – name: INSTANCE_NAME valueFrom: fieldRef: fieldPath: metadata.name Save and close the file. - Deploy the consumer application to your cluster.CONSOLECopy$ kubectl apply -f consumer.yaml Output:
deployment.apps/rabbitmq-consumer created
- Wait for at least
20
seconds and verify the status of runningrabbitmq-consumer
application pods.CONSOLECopy$ kubectl get pods -l=app=rabbitmq-consumer Your output should look like the one below:NAME READY STATUS RESTARTS AGE rabbitmq-consumer-5b8884c78b-2sjds 1/1 Running 0 10s
Create the KEDA Scaler
- Create a new YAML manifest
scaler.yaml
.CONSOLECopy$ nano scaler.yaml - Add the following contents to the file.YAMLCopyapiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: rabbitmq-scaledobject namespace: default spec: scaleTargetRef: name: rabbitmq-consumer minReplicaCount: 1 maxReplicaCount: 5 triggers: – type: rabbitmq metadata: protocol: amqp queueName: demo-queue mode: QueueLength value: “10” hostFromEnv: RABBITMQ_URI Save and close the file.
- Install the Scaler to your cluster.CONSOLECopy$ kubectl apply -f scaler.yaml Output:
scaledobject.keda.sh/rabbitmq-scaledobject created
Verify the Application Autoscaling
- View the RabbitMQ consumer application deployment status and verify that READY field includes an upscaled number of pods.CONSOLECopy$ kubectl get deployment/rabbitmq-consumer Your output should look like the one below:
NAME READY UP-TO-DATE AVAILABLE AGE rabbitmq-consumer 5/5 5 5 53s
- View the consumer application Pods and verify the number of available resources.CONSOLECopy$ kubectl get pods -l=app=rabbitmq-consumer Output:
NAME READY STATUS RESTARTS AGE rabbitmq-consumer-5b8884c78b-7mhg7 1/1 Running 0 98s rabbitmq-consumer-5b8884c78b-ktjf9 1/1 Running 0 83s rabbitmq-consumer-5b8884c78b-vx7wv 1/1 Running 0 98s rabbitmq-consumer-5b8884c78b-xc2rh 1/1 Running 0 3m25s rabbitmq-consumer-5b8884c78b-xfqhr 1/1 Running 0 98s
The RabbitMQ queuedemo-queue
is monitored by theKEDA
scaler. When the unprocessed message count exceeds10
, therabbitmq-consumer
deployment automatically scales out. In the above output, only five Pods are available because themaxReplicaCount
attribute in theScaledObject
configuration is set to5
. When the consumer pods stop processing messages from the RabbitMQ queue,KEDA
scales the deployment back to a single pod. - View the consumer application logs using a RabbitMQ Pod of your choice. For example:
rabbitmq-consumer-5b8884c78b-xfqhr
.CONSOLECopy$ kubectl logs -f rabbitmq-consumer-5b8884c78b-xfqhr Output:Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-120 from queue demo-queue Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-125 from queue demo-queue Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-130 from queue demo-queue Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-135 from queue demo-queue Instance rabbitmq-consumer-5b8884c78b-7mhg7 received message message-140 from queue demo-queue ....
As displayed in the above pod output, the consumer application has five replicas, and the message processing is load balanced amongst them. This is evident in the log message which logs the pod name and the message processed by it. If you view other pod logs, they include different processing information since a different set of messages is sent by the producer. This enables horizontal scalability of the consumer application.
Conclusion
You have configured KEDA
to scale RabbitMQ consumer application deployed in your VKE cluster. In addition, you set up KEDA and RabbitMQ using client applications (producer and consumer) to verify the auto-scaling behavior. You can apply auto scaling with different applications in your cluster using KEDA.
Introduction KEDA is a Kubernetes-based Event Driven Autoscaler that handles scaling operations of any container workload in a cluster based on the number of events that need to be processed. It works with standard Kubernetes components such as Horizontal Pod Autoscaler (HPA) that allow you to map the applications you want…
Introduction KEDA is a Kubernetes-based Event Driven Autoscaler that handles scaling operations of any container workload in a cluster based on the number of events that need to be processed. It works with standard Kubernetes components such as Horizontal Pod Autoscaler (HPA) that allow you to map the applications you want…