Spark Structured Streaming
Spark Structured Streaming#
Recently reading a blog Structured Streaming in PySpark It's implemented in Databricks platform. Then I try to implement in my local Spark. Some tricky issue happened during my work.
Reading Data#
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, StringType, StructType, StructField
spark = SparkSession.builder.appName("Test Streaming").enableHiveSupport().getOrCreate()
json_schema = StructType([
StructField("time", TimestampType(), True),
StructField("customer", StringType(), True),
StructField("action", StringType(), True),
StructField("device", StringType(), True)
])
file_path = "local_file_path<file:///..."
read json as same as method in the blog#
input = spark.read.schema(json_schema).json(file_path)
input.show()
# +----+--------+------+------+
# |time|customer|action|device|
# +----+--------+------+------+
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# +----+--------+------+------+
input.count()
# 20000
read a single json file to check schema#
input = spark.read.schema(json_schema).json(file_path+'/1.json')
input.show()
# +----+--------+------+------+
# |time|customer|action|device|
# +----+--------+------+------+
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# |null| null| null| null|
# +----+--------+------+------+
# same error
# Then I drop schema option and use inferSchema
input = spark.read.json(file_path+'/1.json')
input.show()
# +--------------------+-----------+-----------------+--------------------+---------------+
# | _corrupt_record| action| customer| device| time|
# +--------------------+-----------+-----------------+--------------------+---------------+
# |[{"time":"3:57:09...| null| null| null| null|
# | null| power off|Nicolle Pargetter| August Doorbell Cam| 1:29:05.000 AM|
# | null| power on| Concordia Muck|Footbot Air Quali...| 6:02:06.000 AM|
# | null| power off| Kippar McCaughen| ecobee4| 5:40:19.000 PM|
# | null| power off| Sidney Jotham| GreenIQ Controller| 4:54:28.000 PM|
# | null| power off| Fanya Menzies| ecobee4| 3:12:48.000 PM|
# | null|low battery| Jeanne Gresch| ecobee4| 5:39:47.000 PM|
# | null| power on| Chen Cuttelar| August Doorbell Cam| 2:45:44.000 PM|
# | null| power off| Merwyn Mix| Amazon Echo| 9:23:41.000 PM|
# | null| power off| Angelico Conrath| Amazon Echo| 4:53:13.000 AM|
# | null| power on| Gilda Emmett| August Doorbell Cam|12:32:29.000 AM|
# | null|low battery| Austine Davsley| ecobee4| 3:35:12.000 AM|
# | null|low battery| Zackariah Thoday| Amazon Echo| 1:26:13.000 PM|
# | null| power off| Ewen Gillson| Amazon Echo| 7:47:20.000 AM|
# | null| power on| Itch Durnill| ecobee4| 4:45:55.000 AM|
# | null| power off| Winni Dow| GreenIQ Controller| 4:12:54.000 AM|
# | null| power on|Talbot Valentelli| August Doorbell Cam| 7:35:23.000 PM|
# | null|low battery| Vikki Muckeen| August Doorbell Cam| 1:17:30.000 PM|
# | null| power off| Christie Karran|Footbot Air Quali...| 9:38:13.000 PM|
# | null|low battery| Evonne Guest| Amazon Echo| 8:02:21.000 AM|
# +--------------------+-----------+-----------------+--------------------+---------------+
Remove [ and ] in source file#
input = spark.read.json(file_path+'/1.json')
input.show()
# +-----------+-----------------+--------------------+---------------+
# | action| customer| device| time|
# +-----------+-----------------+--------------------+---------------+
# | power off| Alexi Barts| GreenIQ Controller| 3:57:09.000 PM|
# | power off|Nicolle Pargetter| August Doorbell Cam| 1:29:05.000 AM|
# | power on| Concordia Muck|Footbot Air Quali...| 6:02:06.000 AM|
# | power off| Kippar McCaughen| ecobee4| 5:40:19.000 PM|
# | power off| Sidney Jotham| GreenIQ Controller| 4:54:28.000 PM|
# | power off| Fanya Menzies| ecobee4| 3:12:48.000 PM|
# |low battery| Jeanne Gresch| ecobee4| 5:39:47.000 PM|
# | power on| Chen Cuttelar| August Doorbell Cam| 2:45:44.000 PM|
# | power off| Merwyn Mix| Amazon Echo| 9:23:41.000 PM|
# | power off| Angelico Conrath| Amazon Echo| 4:53:13.000 AM|
# | power on| Gilda Emmett| August Doorbell Cam|12:32:29.000 AM|
# |low battery| Austine Davsley| ecobee4| 3:35:12.000 AM|
# |low battery| Zackariah Thoday| Amazon Echo| 1:26:13.000 PM|
# | power off| Ewen Gillson| Amazon Echo| 7:47:20.000 AM|
# | power on| Itch Durnill| ecobee4| 4:45:55.000 AM|
# | power off| Winni Dow| GreenIQ Controller| 4:12:54.000 AM|
# | power on|Talbot Valentelli| August Doorbell Cam| 7:35:23.000 PM|
# |low battery| Vikki Muckeen| August Doorbell Cam| 1:17:30.000 PM|
# | power off| Christie Karran|Footbot Air Quali...| 9:38:13.000 PM|
# |low battery| Evonne Guest| Amazon Echo| 8:02:21.000 AM|
# +-----------+-----------------+--------------------+---------------+
input.printSchema()
# root
# |-- action: string (nullable = true)
# |-- customer: string (nullable = true)
# |-- device: string (nullable = true)
# |-- time: string (nullable = true)
add one feature multiLine#
Read the file without schema but add one feature multiLine
input = spark.read.json("file:///path/pyspark_test_data", multiLine=True)
# OR input = spark.read.option('multiLine', True).json("file:///path/pyspark_test_data")
# +-----------+--------------------+--------------------+---------------+
# | action| customer| device| time|
# +-----------+--------------------+--------------------+---------------+
# | power on| Raynor Blaskett|Nest T3021US Ther...| 3:35:09.000 AM|
# | power on|Stafford Blakebrough| GreenIQ Controller|10:59:46.000 AM|
# | power on| Alex Woolcocks|Nest T3021US Ther...| 6:26:36.000 PM|
# | power on| Clarice Nayshe|Footbot Air Quali...| 4:46:28.000 AM|
# | power off| Killie Pirozzi|Footbot Air Quali...| 8:58:43.000 AM|
# | power on| Lynne Dymidowicz|Footbot Air Quali...| 4:20:49.000 PM|
# | power on| Shaina Dowyer| ecobee4| 3:41:33.000 AM|
# |low battery| Barbee Melato| August Doorbell Cam|10:40:24.000 PM|
# | power off| Clem Westcot|Nest T3021US Ther...|11:13:38.000 PM|
# | power off| Kerri Galfour| Amazon Echo|10:12:15.000 PM|
# |low battery| Trev Ashmore| GreenIQ Controller|11:04:41.000 AM|
# | power on| Coral Jahnisch| August Doorbell Cam| 3:06:31.000 AM|
# | power on| Feliza Cowdrey|Nest T3021US Ther...| 2:49:02.000 AM|
# | power off| Amabelle De Haven|Footbot Air Quali...|12:11:59.000 PM|
# | power off| Benton Redbourn|Nest T3021US Ther...| 3:57:39.000 AM|
# |low battery| Asher Potten| August Doorbell Cam| 1:34:44.000 AM|
# |low battery| Lorianne Hullyer| August Doorbell Cam| 7:26:42.000 PM|
# | power off| Ruperto Aldcorn|Footbot Air Quali...| 3:54:49.000 AM|
# | power on| Agatha Di Giacomo|Footbot Air Quali...| 7:15:20.000 AM|
# | power on| Eunice Penwright| ecobee4|11:14:14.000 PM|
# +-----------+--------------------+--------------------+---------------+
input.printSchema()
# root
# |-- action: string (nullable = true)
# |-- customer: string (nullable = true)
# |-- device: string (nullable = true)
# |-- time: string (nullable = true)
change the schema#
Set time as StringType
json_schema = StructType([
StructField("time", StringType(), True),
StructField("customer", StringType(), True),
StructField("action", StringType(), True),
StructField("device", StringType(), True)
])
input = spark.read.schema(json_schema).json("file:///path/pyspark_test_data", multiLine=True)
input.show()
# +---------------+--------------------+-----------+--------------------+
# | time| customer| action| device|
# +---------------+--------------------+-----------+--------------------+
# | 3:35:09.000 AM| Raynor Blaskett| power on|Nest T3021US Ther...|
# |10:59:46.000 AM|Stafford Blakebrough| power on| GreenIQ Controller|
# | 6:26:36.000 PM| Alex Woolcocks| power on|Nest T3021US Ther...|
# | 4:46:28.000 AM| Clarice Nayshe| power on|Footbot Air Quali...|
# | 8:58:43.000 AM| Killie Pirozzi| power off|Footbot Air Quali...|
# | 4:20:49.000 PM| Lynne Dymidowicz| power on|Footbot Air Quali...|
# | 3:41:33.000 AM| Shaina Dowyer| power on| ecobee4|
# |10:40:24.000 PM| Barbee Melato|low battery| August Doorbell Cam|
# |11:13:38.000 PM| Clem Westcot| power off|Nest T3021US Ther...|
# |10:12:15.000 PM| Kerri Galfour| power off| Amazon Echo|
# |11:04:41.000 AM| Trev Ashmore|low battery| GreenIQ Controller|
# | 3:06:31.000 AM| Coral Jahnisch| power on| August Doorbell Cam|
# | 2:49:02.000 AM| Feliza Cowdrey| power on|Nest T3021US Ther...|
# |12:11:59.000 PM| Amabelle De Haven| power off|Footbot Air Quali...|
# | 3:57:39.000 AM| Benton Redbourn| power off|Nest T3021US Ther...|
# | 1:34:44.000 AM| Asher Potten|low battery| August Doorbell Cam|
# | 7:26:42.000 PM| Lorianne Hullyer|low battery| August Doorbell Cam|
# | 3:54:49.000 AM| Ruperto Aldcorn| power off|Footbot Air Quali...|
# | 7:15:20.000 AM| Agatha Di Giacomo| power on|Footbot Air Quali...|
# |11:14:14.000 PM| Eunice Penwright| power on| ecobee4|
# +---------------+--------------------+-----------+--------------------+
TimestampType#
In offical document, the class pyspark.sql.DataFrameReader has one parameter - timestampFormat
sets the string that indicates a timestamp format.
Custom date formats follow the formats at java.text.SimpleDateFormat.
This applies to timestamp type. If None is set, it uses the default value, yyyy-MM-dd'T'HH
ss.SSSXXX.
input = spark.read.schema(schema).option("multiLine", True).json("file:///path/pyspark_test_data", timestampFormat="h:mm:ss.SSS aa")
input.show()
# +-------------------+--------------------+-----------+--------------------+
# | time| customer| action| device|
# +-------------------+--------------------+-----------+--------------------+
# |1970-01-01 03:35:09| Raynor Blaskett| power on|Nest T3021US Ther...|
# |1970-01-01 10:59:46|Stafford Blakebrough| power on| GreenIQ Controller|
# |1970-01-01 18:26:36| Alex Woolcocks| power on|Nest T3021US Ther...|
# |1970-01-01 04:46:28| Clarice Nayshe| power on|Footbot Air Quali...|
# |1970-01-01 08:58:43| Killie Pirozzi| power off|Footbot Air Quali...|
# |1970-01-01 16:20:49| Lynne Dymidowicz| power on|Footbot Air Quali...|
# |1970-01-01 03:41:33| Shaina Dowyer| power on| ecobee4|
# |1970-01-01 22:40:24| Barbee Melato|low battery| August Doorbell Cam|
# |1970-01-01 23:13:38| Clem Westcot| power off|Nest T3021US Ther...|
# |1970-01-01 22:12:15| Kerri Galfour| power off| Amazon Echo|
# |1970-01-01 11:04:41| Trev Ashmore|low battery| GreenIQ Controller|
# |1970-01-01 03:06:31| Coral Jahnisch| power on| August Doorbell Cam|
# |1970-01-01 02:49:02| Feliza Cowdrey| power on|Nest T3021US Ther...|
# |1970-01-01 12:11:59| Amabelle De Haven| power off|Footbot Air Quali...|
# |1970-01-01 03:57:39| Benton Redbourn| power off|Nest T3021US Ther...|
# |1970-01-01 01:34:44| Asher Potten|low battery| August Doorbell Cam|
# |1970-01-01 19:26:42| Lorianne Hullyer|low battery| August Doorbell Cam|
# |1970-01-01 03:54:49| Ruperto Aldcorn| power off|Footbot Air Quali...|
# |1970-01-01 07:15:20| Agatha Di Giacomo| power on|Footbot Air Quali...|
# |1970-01-01 23:14:14| Eunice Penwright| power on| ecobee4|
# +-------------------+--------------------+-----------+--------------------+
All yyyy-MM-dd are 1970-01-01 because source file only hh-mm-ss. These source files are in wrong format in Windows.
Streaming Our Data#
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, StringType, StructType, StructField
spark = SparkSession.builder.appName("Test Streaming").enableHiveSupport().getOrCreate()
json_schema = StructType([
StructField("time", StringType(), True),
StructField("customer", StringType(), True),
StructField("action", StringType(), True),
StructField("device", StringType(), True)
])
streamingDF = spark.readStream.schema(json_schema) \
.option("maxFilesPerTrigger", 1) \
.option("multiLine", True) \
.json("file:///path/pyspark_test_data")
streamingActionCountsDF = streamingDF.groupBy('action').count()
# streamingActionCountsDF.isStreaming
spark.conf.set("spark.sql.shuffle.partitions", "2")
# View stream in real-time
# query = streamingActionCountsDF.writeStream \
# .format("memory").queryName("counts").outputMode("complete").start()
# format choice:
# parquet
# kafka
# console
# memory
# query = streamingActionCountsDF.writeStream \
# .format("console").queryName("counts").outputMode("complete").start()
query = streamingActionCountsDF.writeStream.format("console") \
.queryName("counts").outputMode("complete").start().awaitTermination(timeout=10)
# Output Mode choice:
# append
# complete
# update