Streaming Data from Kafka to Elasticsearch & Visualization via Kibana — Tutorial

Gürkan ŞAMAN
4 min readDec 27, 2021

--

###Datasets:###

This dataset was collected from 255 sensor time series located in 51 rooms on 4 floors of the Sutardja Dai Hall (SDH) at UC Berkeley. It can be used to investigate patterns in the physical properties of a room in a building.

Each room contains 5 types of measurements:

· CO2 concentration,

· Humidity: room humidity,

· Temperature: room temperature,

· Light: brightness

· PIR: motion sensor data

Data were collected over a one-week period from Friday, August 23, 2013, to Saturday, August 31, 2013

A passive infrared sensor (PIR sensor) is an electronic sensor that measures infrared (IR) light emitted by objects in the field of view and measures the occupancy in a room.

About 6% of the PIR data is non-zero and indicates room occupancy. The remaining 94% of the PIR data is zero, indicating an empty room.

###Kafka Producer:###

• 1. Start with Kafka & Zookeeper:

sudo systemctl start zookeeper

sudo systemctl start kafka

• 2. Virtual Env. Is activated:

source ~/venvspark/bin/activate

• 3. As called “office-input”, the topic is created:

Kafka-topics.sh — bootstrap-server localhost:9092 — create — topic office-input — replication-factor 1

• 4. Kafka topics Listed: “office-input” so that it is controlled whether or it is true

Kafka-topics.sh — bootstrap-server localhost:9092 -list

• 5. Producer:

python dataframe_to_kafka.py — input ./input/sensor.txt — topic office-input

Kafka Producer

###Stream data produced from Kafka is making processed and written to CSV via Spark Streaming###

import findspark

findspark.init(“/opt/manual/spark”)

from pyspark.sql import SparkSession, functions as F

spark = (SparkSession.builder

.appName(“Read From Kafka”)

.master(“local[2]”)

.config(“spark.jars.packages”,”org.apache.spark:spark-sql-kafka-0–10_2.12:3.1.1")

.config(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)

#.config(“spark.jars.packages”,”org.elasticsearch:elasticsearch-spark-30_2.12:7.12.1")

.getOrCreate())

spark.sparkContext.setLogLevel(‘ERROR’)

# Read data from kafka source

lines = (spark

.readStream

.format(“kafka”)

.option(“kafka.bootstrap.servers”, “localhost:9092”)

.option(“subscribe”, “office-input”)

.load())

# Deserialize Key and Value

lines2 = lines.selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”)

# “Value” splitted and previous values removed:

lines3 = lines2.withColumn(“event_ts”, F.trim(F.split(F.col(“value”), “,”)[1])) \

.withColumn(“event_min”, F.trim(F.split(F.col(“value”), “,”)[2])) \

.withColumn(“event_ts_min”, F.concat(F.col(“event_ts”), F.lit(“ “), F.col(“event_min”))) \

.withColumn(“ts_min_bignt”, F.split(F.col(“value”), “,”)[3]) \

.withColumn(“room”, F.trim(F.split(F.col(“value”), “,”)[4])) \

.withColumn(“co2”, F.split(F.col(“value”), “,”)[5]) \

.withColumn(“light”, F.trim(F.split(F.col(“value”), “,”)[6])) \

.withColumn(“temp”, F.split(F.col(“value”), “,”)[7]) \

.withColumn(“humidity”, F.trim(F.split(F.col(“value”), “,”)[8])) \

.withColumn(“pir”, F.split(F.col(“value”), “,”)[9]) \

.drop(“value”) \

.drop(“event_ts”) \

.drop(“event_min”)

output =”file:///home/train/data-generator/output”

# offset vs transaction logs are kept here:

checkpoint_dir = “file:///tmp/streaming/read_from_kafka”

# write result to console sink

streamingQuery = (lines3

.writeStream

.format(“csv”)

.option(“format”, “complete”)

.outputMode(“append”)

.option(“path”, output)

.trigger(processingTime=”2 second”)

.option(“checkpointLocation”, checkpoint_dir)

.option(“numRows”,20)

.option(“truncate”,False)

.start())

# Start streaming

streamingQuery.awaitTermination()

Streaming data testing on The Console Screen
Saving as a CSV format Path: dosyaların ./output

### Composing ELK & KIBANA via DOCKER###

# “elk” is created folder named under home/train directory for ElasticSearch & Kibana:

# 1. Entering “elk” folder:

sudo systemctl start docker

# 2. venspark is activated:

source ~/venvspark/bin/activate

# 3. Ram is increased not to exit:

sudo sysctl -w vm.max_map_count=262144

# 4. Elastic and Kibana compose and install the following image file:

docker-compose -f es-single-kibana.yml up -d

# Controlling:

http://localhost:5601

###Write to Elasticsearch of previous CSV files at Spark Stream the Session###

import findspark

findspark.init(“/opt/manual/spark”)

from pyspark.sql import SparkSession, functions as F

spark = (SparkSession.builder

.appName(“Read From Kafka”)

.master(“local[2]”)

.config(“spark.driver.memory”,”2048m”)

.config(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)

.config(“spark.jars.packages”,”org.elasticsearch:elasticsearch-spark-30_2.12:7.12.1")

.getOrCreate())

spark.sparkContext.setLogLevel(‘ERROR’)

df = spark.read.format(“csv”) \

.option(“header”,False) \

.option(“inferSchema”,True) \

.load(“file:///home/train/data-generator/output/*.csv”)

df =df.withColumn(“key”, F.col(“_c0”).cast(“integer”)).drop(“_c0”)

df =df.withColumn(“event_ts_min”, F.col(“_c1”).cast(“timestamp”)).drop(“_c1”)

df =df.withColumn(“ts_min_bignt”, F.col(“_c2”).cast(“integer”)).drop(“_c2”)

df =df.withColumn(“room”, F.col(“_c3”).cast(“string”)).drop(“_c3”)

df =df.withColumn(“co2”, F.col(“_c4”).cast(“float”)).drop(“_c4”)

df =df.withColumn(“light”, F.col(“_c5”).cast(“float”)).drop(“_c5”)

df =df.withColumn(“temp”, F.col(“_c6”).cast(“float”)).drop(“_c6”)

df =df.withColumn(“humidity”, F.col(“_c7”).cast(“float”)).drop(“_c7”)

df =df.withColumn(“pir”, F.col(“_c8”).cast(“float”)).drop(“_c8”)

#Controlling:

df.printSchema()

df.show()

df.write \

.format(“org.elasticsearch.spark.sql”) \

.mode(“overwrite”) \

.option(“es.nodes”, “localhost”) \

.option(“es.port”,”9200") \

.save(“sensor”)

Demonstrating the ‘Sensor’ Mapping Kibana Dev Tools we wrote in Elasticsearch
How many sensor data from which room was sent to elasticsearch — TOP10 // Average lighting data by room –TOP10
Avg. CO2 levels — TOP10 // Density of rooms with PIR>0 on the basis of units — TOP 10

--

--

Gürkan ŞAMAN
Gürkan ŞAMAN

Written by Gürkan ŞAMAN

Growth Analyst | Data Scientist | Data Analyst

No responses yet