Skip to content

Instantly share code, notes, and snippets.

@juanpampliega
Created May 16, 2015 06:18
Show Gist options
  • Select an option

  • Save juanpampliega/a16fe285c63c0342d40a to your computer and use it in GitHub Desktop.

Select an option

Save juanpampliega/a16fe285c63c0342d40a to your computer and use it in GitHub Desktop.
Twitter Top Hashtags with Spark Streaming in spark-shell
import com.google.gson.Gson
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = "xxx"
val apiSecret = "xxx"
val accessToken = "xxx"
val accessTokenSecret = "xxx"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
val ssc = new StreamingContext(sc, Seconds(15))
val stream = TwitterUtils.createStream(ssc, None)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
val topCounts120 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(120)).map{case (topic, count) => (count, topic)}.transform(_.sortByKey(false))
val topCounts30 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30)).map{case (topic, count) => (count, topic)}.transform(_.sortByKey(false))
// Print popular hashtags
topCounts120.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 120 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
topCounts30.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 30 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
ssc.awaitTermination()
@Marcela1396
Copy link

Marcela1396 commented Aug 13, 2020

Hola, me puedes ayudar. Deseo correr el script en mi aplicación de Apache Spark montada localmente pero tengo problemas, podrias ayudarme con los comandos que debo incluir para lograr el objetivo?
Lo que hago es iniciar un spark-shell con los archivos .jars y alli copiar el codigo pero obtengo lo siguiente

Exception in thread "streaming-start" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
Estoy trabajando en Linux. Gracias

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment