Streaming Data from Kafka to Elasticsearch & Visualization via Kibana — Tutorial
###Datasets:###
This dataset was collected from 255 sensor time series located in 51 rooms on 4 floors of the Sutardja Dai Hall (SDH) at UC Berkeley. It can be used to investigate patterns in the physical properties of a room in a building.
Each room contains 5 types of measurements:
· CO2 concentration,
· Humidity: room humidity,
· Temperature: room temperature,
· Light: brightness
· PIR: motion sensor data
Data were collected over a one-week period from Friday, August 23, 2013, to Saturday, August 31, 2013
A passive infrared sensor (PIR sensor) is an electronic sensor that measures infrared (IR) light emitted by objects in the field of view and measures the occupancy in a room.
About 6% of the PIR data is non-zero and indicates room occupancy. The remaining 94% of the PIR data is zero, indicating an empty room.
###Kafka Producer:###
• 1. Start with Kafka & Zookeeper:
sudo systemctl start zookeeper
sudo systemctl start kafka
• 2. Virtual Env. Is activated:
source ~/venvspark/bin/activate
• 3. As called “office-input”, the topic is created:
Kafka-topics.sh — bootstrap-server localhost:9092 — create — topic office-input — replication-factor 1
• 4. Kafka topics Listed: “office-input” so that it is controlled whether or it is true
Kafka-topics.sh — bootstrap-server localhost:9092 -list
• 5. Producer:
python dataframe_to_kafka.py — input ./input/sensor.txt — topic office-input
###Stream data produced from Kafka is making processed and written to CSV via Spark Streaming###
import findspark
findspark.init(“/opt/manual/spark”)
from pyspark.sql import SparkSession, functions as F
spark = (SparkSession.builder
.appName(“Read From Kafka”)
.master(“local[2]”)
.config(“spark.jars.packages”,”org.apache.spark:spark-sql-kafka-0–10_2.12:3.1.1")
.config(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)
#.config(“spark.jars.packages”,”org.elasticsearch:elasticsearch-spark-30_2.12:7.12.1")
.getOrCreate())
spark.sparkContext.setLogLevel(‘ERROR’)
# Read data from kafka source
lines = (spark
.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “localhost:9092”)
.option(“subscribe”, “office-input”)
.load())
# Deserialize Key and Value
lines2 = lines.selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”)
# “Value” splitted and previous values removed:
lines3 = lines2.withColumn(“event_ts”, F.trim(F.split(F.col(“value”), “,”)[1])) \
.withColumn(“event_min”, F.trim(F.split(F.col(“value”), “,”)[2])) \
.withColumn(“event_ts_min”, F.concat(F.col(“event_ts”), F.lit(“ “), F.col(“event_min”))) \
.withColumn(“ts_min_bignt”, F.split(F.col(“value”), “,”)[3]) \
.withColumn(“room”, F.trim(F.split(F.col(“value”), “,”)[4])) \
.withColumn(“co2”, F.split(F.col(“value”), “,”)[5]) \
.withColumn(“light”, F.trim(F.split(F.col(“value”), “,”)[6])) \
.withColumn(“temp”, F.split(F.col(“value”), “,”)[7]) \
.withColumn(“humidity”, F.trim(F.split(F.col(“value”), “,”)[8])) \
.withColumn(“pir”, F.split(F.col(“value”), “,”)[9]) \
.drop(“value”) \
.drop(“event_ts”) \
.drop(“event_min”)
output =”file:///home/train/data-generator/output”
# offset vs transaction logs are kept here:
checkpoint_dir = “file:///tmp/streaming/read_from_kafka”
# write result to console sink
streamingQuery = (lines3
.writeStream
.format(“csv”)
.option(“format”, “complete”)
.outputMode(“append”)
.option(“path”, output)
.trigger(processingTime=”2 second”)
.option(“checkpointLocation”, checkpoint_dir)
.option(“numRows”,20)
.option(“truncate”,False)
.start())
# Start streaming
streamingQuery.awaitTermination()
### Composing ELK & KIBANA via DOCKER###
# “elk” is created folder named under home/train directory for ElasticSearch & Kibana:
# 1. Entering “elk” folder:
sudo systemctl start docker
# 2. venspark is activated:
source ~/venvspark/bin/activate
# 3. Ram is increased not to exit:
sudo sysctl -w vm.max_map_count=262144
# 4. Elastic and Kibana compose and install the following image file:
docker-compose -f es-single-kibana.yml up -d
# Controlling:
###Write to Elasticsearch of previous CSV files at Spark Stream the Session###
import findspark
findspark.init(“/opt/manual/spark”)
from pyspark.sql import SparkSession, functions as F
spark = (SparkSession.builder
.appName(“Read From Kafka”)
.master(“local[2]”)
.config(“spark.driver.memory”,”2048m”)
.config(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)
.config(“spark.jars.packages”,”org.elasticsearch:elasticsearch-spark-30_2.12:7.12.1")
.getOrCreate())
spark.sparkContext.setLogLevel(‘ERROR’)
df = spark.read.format(“csv”) \
.option(“header”,False) \
.option(“inferSchema”,True) \
.load(“file:///home/train/data-generator/output/*.csv”)
df =df.withColumn(“key”, F.col(“_c0”).cast(“integer”)).drop(“_c0”)
df =df.withColumn(“event_ts_min”, F.col(“_c1”).cast(“timestamp”)).drop(“_c1”)
df =df.withColumn(“ts_min_bignt”, F.col(“_c2”).cast(“integer”)).drop(“_c2”)
df =df.withColumn(“room”, F.col(“_c3”).cast(“string”)).drop(“_c3”)
df =df.withColumn(“co2”, F.col(“_c4”).cast(“float”)).drop(“_c4”)
df =df.withColumn(“light”, F.col(“_c5”).cast(“float”)).drop(“_c5”)
df =df.withColumn(“temp”, F.col(“_c6”).cast(“float”)).drop(“_c6”)
df =df.withColumn(“humidity”, F.col(“_c7”).cast(“float”)).drop(“_c7”)
df =df.withColumn(“pir”, F.col(“_c8”).cast(“float”)).drop(“_c8”)
#Controlling:
df.printSchema()
df.show()
df.write \
.format(“org.elasticsearch.spark.sql”) \
.mode(“overwrite”) \
.option(“es.nodes”, “localhost”) \
.option(“es.port”,”9200") \
.save(“sensor”)