Flume & Spark Streaming Integration

In this post, we will integrate Flume and Spark Streaming. We will use Spark Streaming to process data that we will get from Flume. Flume will be our source for streaming data.

Flume is considered to be to most suitable to have source from log files. We will get log file streaming data using flume and process it by selecting only lines from log data where date time is there. Let’s get started.

Flume Stream Data — Input for Spark Streaming

The content of event log file are in following log format:

Prepare flume configuration file

vi stlog.conf# Name the components on this agent
stlog.sources = r1
stlog.sinks = k1
stlog.channels = c1

# Describe/configure the source
stlog.sources.r1.type = exec
stlog.sources.r1.command = tail -F -s 2 /mnt/home/ad/rtp/ssc/event2.log

# Describe the sink
stlog.sinks.k1.type = hdfs
stlog.sinks.k1.hdfs.path = /user/ad/rtp/ssc/%Y-%m-%d/
stlog.sinks.k1.hdfs.filePrefix = events-
stlog.sinks.k1.hdfs.fileSuffix = .log
stlog.sinks.k1.hdfs.useLocalTimeStamp = true
stlog.sinks.k1.hdfs.fileType = DataStream

# Use a channel c1 buffers events in memory
stlog.channels.c1.type = memory
stlog.channels.c1.capacity = 1000
stlog.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
stlog.sources.r1.channels = c1
stlog.sinks.k1.channel = c1


  • tail -F will read last 10 lines and will not end but will wait to read any new last 10 lines that are added to the source file. This will end only by terminating it by Ctrl + C thus creating a continuous stream of data.
  • -s 2 is to sleep for 2 seconds.

Once file is ready, We will run the Flume agent

flume-ng agent -n stlog -f /mnt/home/ad/rtp/ssc /stlog.conf - Dflume.root.logger=INFO,console


Once agent run completes, you can find log files created at HDFS location with timestamp in filename.

Now we have our flume stream ready to be processed by Spark Streaming. Flume agent will be running continuously until you stop the agent.

Streaming data from Flume and processing using Spark Streaming Code

Let’s create a sbt package. For this create the sbt expected directory structure.

cd /mnt/home/ad /rtp/ssc/
mkdir sbt
cd sbt
mkdir src
mkdir src/main
mkdir src/main/scala

Now let’s create the build file — build.sbt. Add the following content.

vi build.sbtname                   := "SparkFlumeProject"
version := "1.0"
organization := "DataGuy"
scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"
resolvers += Resolver.mavenLocal

Spark Streaming Code

vi LogParser.scalaimport scala.util.matching.Regex
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.sql.Encoders
import org.apache.spark.ml._

case class LogEvent(timestamp:String, etype:String, mesage:String)

object LogParser {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Logparser Streaming")
val ssc = new StreamingContext(conf, Seconds(4))

// define the regular expression - <date-time> <log-type> <message>
val regex = raw"^([\d/]+ [\d:]+) ([a-zA-Z]+) (.*)".r
//read data
val lines = ssc.textFileStream("/user/ad/rtp/ssc/")

// apply regex adn create a case class from filter matches
val df = lines.map(regex.findAllIn(_)).filter(_.isEmpty == false).map(matches => LogEvent(matches.group(1), matches.group(2), matches.group(3)))

Place this file within src/main/scala/ folder

And then, let’s package our files into jar:

cd /mnt/home/ad/ rtp/ssc/sbt/ 
sbt package

You will find the jar file created as ‘sparkflumeproject_2.11–1.0.jar’

ls /mnt/home/ad /rtp/ssc/sbt/target/scala-2.11/

Check the jar file contents to check for LogParser class

jar tf /mnt/home/ad/rtp/ssc/sbt/target/scala-2.11/sparkflumeproject_2.11-1.0.jar

We will now submit Spark Streaming code:

spark2-submit --class LogParser --deploy-mode client target/scala-2.11/sparkflumeproject_2.11-1.0.jar > /mnt/home/ad/rtp/ssc/event2.log  2>&1

The code will keep running until you stop it. This will also replace the file you read initially and thus the cycle will go on forming a stream of data.

We can check the content of the data processed and placed at /mnt/home/ad/rtp/ssc/event2.log

cat /mnt/home/ad/rtp/ssc/event2.log

Wonderful! We can see the log data have been processed and moved to new file. You may now stop both flume agent and Spark streaming. In real world, though you will let it run so that any new content in file that lands can be moved from Local file system to HDFS using Flume and then processed by Spark streaming and placed at destination local file system again.

Hope you enjoyed the data processing.

Happy Learning folks!! :)



Hi! I am Amit your DataGuy. Folks do call me ‘AD’ as well. I have worked over a decade with into multiple roles - developer, dba, data engineer, data architect.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Dataguy! - databare.com

Hi! I am Amit your DataGuy. Folks do call me ‘AD’ as well. I have worked over a decade with into multiple roles - developer, dba, data engineer, data architect.