Home

Awesome

Sample tweet processor (Azure Functions + Kafka)

The following sample includes an Azure Function (written in TypeScript) that triggers on changes to a local Kafka topic. The Kafka topic is being populated by tweets. As the function triggers, it will populate a real-time Power BI dashboard (optional) with information on the tweet sentiment.

Pre-requisites

Setup

The below will walk you through creating a Kafka topic within your function, publishing your function to that cluster, and then publishing an agent to pull data from Twitter and publish it to Kafka. As the events land in Kafka, the function will automatically trigger and scale. Feel free to skip portions if they already exist in your cluster.

Clone the repo and navigate to it

git clone https://github.com/kedacore/sample-typescript-kafka-azure-function
cd sample-typescript-kafka-azure-function

Create a Kafka topic in your cluster

Install helm

Add the confluent helm repo

helm repo add confluentinc https://confluentinc.github.io/cp-helm-charts/
helm repo update

Deploy the Confluent Kafka helm chart

helm install --name kafka --set cp-schema-registry.enabled=false,cp-kafka-rest.enabled=false,cp-kafka-connect.enabled=false,dataLogDirStorageClass=default,dataDirStorageClass=default,storageClass=default confluentinc/cp-helm-charts

You'll need to wait for the deployment to complete before continuing. This may take a few minutes to spin up all the stateful sets.

Deploy a kafka client pod with configuration

kubectl apply -f deploy/kafka-client.yaml

Log into the Kafka client

kubectl exec -it kafka-client -- /bin/bash

Create a kafka topic

kafka-topics --zookeeper kafka-cp-zookeeper-headless:2181 --topic twitter --create --partitions 5 --replication-factor 1 --if-not-exists

exit

Deploying the function app

Deploy the function app

func kubernetes deploy --name twitter-function --registry <docker-hub-username>

Alternatively, you can build and publish the image on your own and provide the --image-name instead of the --registry

Validate the function is deployed

kubectl get deploy

You should see the twitter-function is deployed, but since there are no Twitter events it has 0 replicas.

Feed twitter data

Setup twitter consumer

Open the ./deploy/twitter-to-kafka.yaml file and replace the environment variables near the bottom of the deployment with your own values:

NameDescriptionExample
TWITTER_STREAMING_MODEStreaming mode for tweepynormal
KAFKA_ENDPOINTKafka endpoint to publishkafka-cp-kafka-headless:9092
CONSUMER_KEYTwitter app consumer keyMGxxxxxxxx
CONSUMER_SECRETTwitter app consumer secretRBpw98sxukm3kKYxxxxx
ACCESS_TOKENTwitter app access token126868398-2uGxxxxxx
ACCESS_TOKEN_SECRETTwitter app access token secretoqiewyaPj0QFDk3Xl2Pxxxxx
KAFKA_TOPICKafka topic to publishtwitter
SEARCH_TERMTwitter search termAvengers

Save the changes

Deploy the twitter consumer

kubectl apply -f deploy/twitter-to-kafka.yaml

Validate and view outputs

View the current deployments

As the twitter consumer spins up it should start emitting data. You should then see the twitter-function get 1 or more instances. The scale-out can be adjusted by modifying how many messages each instance will pull at once (defined in the host.json file of the function), or the lagThreshold of the created ScaledObject in Kubernetes.

# View the current Kubernetes deployments
kubectl get deploy

# View the logs of function pods
kubectl get pods
kubectl logs twitter-function-<some-pod-Id>

You should see logs streaming with tweet data and sentiment scores:

info: Function.KafkaTwitterTrigger.User[0]
      Tweet analyzed
      Tweet text: RT @ballerguy: Yeah avengers endgame was good but I found out my boyfriend is a movie clapper so at what cost
      Sentiment: 0.09523809523809523
info: Function.KafkaTwitterTrigger[0]
      Executed 'Functions.KafkaTwitterTrigger' (Succeeded, Id=67cc49a3-0e13-4fa8-b605-a041ce37420a)
info: Host.Triggers.Kafka[0]
      Stored commit offset twitter / [3] / 37119

Clean up resources

You can run the following to clean up resources created as part of this sample:

kubectl delete deploy/twitter-to-kafka-deployment
kubectl delete deploy/twitter-function
kubectl delete ScaledObject/twitter-function
kubectl delete Secret/twitter-function
kubectl delete pod kafka-client
helm delete kafka