Flume & Spark Streaming Integration

Flume Stream Data — Input for Spark Streaming

Input data is log files generated from commands in Linux and placed at sink which is HDFS location. We will create file in HDFS location with timestamp.

Prepare flume configuration file

vi stlog.conf# Name the components on this agent
stlog.sources = r1
stlog.sinks = k1
stlog.channels = c1

# Describe/configure the source
stlog.sources.r1.type = exec
stlog.sources.r1.command = tail -F -s 2 /mnt/home/ad/rtp/ssc/event2.log

# Describe the sink
stlog.sinks.k1.type = hdfs
stlog.sinks.k1.hdfs.path = /user/ad/rtp/ssc/%Y-%m-%d/
stlog.sinks.k1.hdfs.filePrefix = events-
stlog.sinks.k1.hdfs.fileSuffix = .log
stlog.sinks.k1.hdfs.useLocalTimeStamp = true
stlog.sinks.k1.hdfs.fileType = DataStream

# Use a channel c1 buffers events in memory
stlog.channels.c1.type = memory
stlog.channels.c1.capacity = 1000
stlog.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
stlog.sources.r1.channels = c1
stlog.sinks.k1.channel = c1
  • tail -F will read last 10 lines and will not end but will wait to read any new last 10 lines that are added to the source file. This will end only by terminating it by Ctrl + C thus creating a continuous stream of data.
  • -s 2 is to sleep for 2 seconds.
flume-ng agent -n stlog -f /mnt/home/ad/rtp/ssc /stlog.conf - Dflume.root.logger=INFO,console

Streaming data from Flume and processing using Spark Streaming Code

We now need to pick up the stream data from HDFS location generated by Flume and use our Spark Streaming code to process the data.

cd /mnt/home/ad /rtp/ssc/
mkdir sbt
cd sbt
mkdir src
mkdir src/main
mkdir src/main/scala
vi build.sbtname                   := "SparkFlumeProject"
version := "1.0"
organization := "DataGuy"
scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"
resolvers += Resolver.mavenLocal

Spark Streaming Code

Let’s write our code for processing the data. We have read an event log file from local file system in Linux and moved to HDFS with name ‘event-localtimestamp.log’ within ‘yyyy-mm-dd’ folder. We will use regular expressions to identify logs with timestamp from our file contents and then will save them to another file on local file system on Linux.

vi LogParser.scalaimport scala.util.matching.Regex
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.sql.Encoders
import org.apache.spark.ml._

case class LogEvent(timestamp:String, etype:String, mesage:String)

object LogParser {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Logparser Streaming")
val ssc = new StreamingContext(conf, Seconds(4))

// define the regular expression - <date-time> <log-type> <message>
val regex = raw"^([\d/]+ [\d:]+) ([a-zA-Z]+) (.*)".r
//read data
val lines = ssc.textFileStream("/user/ad/rtp/ssc/")

// apply regex adn create a case class from filter matches
val df = lines.map(regex.findAllIn(_)).filter(_.isEmpty == false).map(matches => LogEvent(matches.group(1), matches.group(2), matches.group(3)))
cd /mnt/home/ad/ rtp/ssc/sbt/ 
sbt package
ls /mnt/home/ad /rtp/ssc/sbt/target/scala-2.11/
jar tf /mnt/home/ad/rtp/ssc/sbt/target/scala-2.11/sparkflumeproject_2.11-1.0.jar
spark2-submit --class LogParser --deploy-mode client target/scala-2.11/sparkflumeproject_2.11-1.0.jar > /mnt/home/ad/rtp/ssc/event2.log  2>&1
cat /mnt/home/ad/rtp/ssc/event2.log



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Dataguy! - databare.com

Dataguy! - databare.com


Hi! I am Amit your DataGuy. Folks do call me ‘AD’ as well. I have worked over a decade with into multiple roles - developer, dba, data engineer, data architect.