Twitter Sentiments Analysis

Pre-requisites:

Spark Streaming Code for Twitter analysis

Now let’s prepare our code for twitter analysis. We will start with our sbt directory structure and build.sbt file defining all dependencies.

mkdir sbt
cd sbt
mkdir src
mkdir src/main
mkdir src/main/scala
vi build.sbtname         := "SparkMe Project"
version := "1.0"
organization := "DataGuy"
scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-streaming" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVersion
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.0"
resolvers += Resolver.mavenLocal

Tweets Analysis Code

Let’s create our program and we will upload this to src/main/scala location. Make sure you update the AFINN file location.

vi TwitterSentiments.scalaimport org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
object TwitterSentiments {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: TwitterSentiments <consumer key> <consumer secret> " +
"<access token> <access token secret> [<filters>]")
System.exit(1)
}
// Set logging level if log4j not configured (override by adding log4j.properties to classpath)
if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
Logger.getRootLogger.setLevel(Level.WARN)
}
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
// Set the system properties so that Twitter4j library used by Twitter stream
// can use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments") // check Spark configuration for master URL, set it to local if not configured
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[2]")
}
val ssc = new StreamingContext(sparkConf, Seconds(2)) // change log level to warning
ssc.sparkContext.setLogLevel("WARN")
val stream = TwitterUtils.createStream(ssc, None, filters)
val textElems = stream.map(status => status.getText)
textElems.print()
val hashTags =textElems.flatMap(_.split(" "))
// Read in the word-sentiment list and create a static RDD from it
val wordSentimentFilePath = "/user/ad/rtp/ssc/twitterSentiment/AFINN-111.txt"
val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
val Array(word, happinessValue) = line.split("\t")
(word, happinessValue.toInt)
}.cache()
// Determine the hash tags with the highest sentiment values by joining the streaming RDD
// with the static RDD inside the transform() method and then multiplying
// the frequency of the hash tag by its sentiment value
val happiest10 = hashTags.map(hashTag => (hashTag, 1))
.reduceByKeyAndWindow(_ + _, Seconds(10))
.transform{topicCount => wordSentiments.join(topicCount)}
.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
.map{case (topic, happinessValue) => (happinessValue, topic)}
.transform(_.sortByKey(false))
happiest10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
})
ssc.start()
ssc.awaitTermination()
}
}
sbt compile
sbt package

Running Analysis

We will now submit Spark Streaming code in following format:

spark2-submit --class TwitterSentiments --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.0 --deploy-mode client target/scala-2.11/sparkme-project_2.11-1.0.jar <consumer key> <consumer secret> <access token> <access token secret> “keyword”
spark2-submit --class TwitterSentiments --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.0 --deploy-mode client /mnt/home/ad/rtp/ssc/twitterSentiment/target/scala-2.11/sparkme-project_2.11-1.0.jar xxxxxxx xxxxxxx xxxxxxx xxxxxxx "cricket"

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Dataguy! - databare.com

Dataguy! - databare.com

11 Followers

Hi! I am Amit your DataGuy. Folks do call me ‘AD’ as well. I have worked over a decade with into multiple roles - developer, dba, data engineer, data architect.