offcially manual :
一: Spark versus Hadoop
Spark is faster than Hadoop cause hadoop execute disk io to retain failure tolerant function,whereas Spark through its functional programming .
二 :spark RDD(resilient distributed datasets)
TRANSFORMATION: LAZY to execute,like filter(),map(),flatMap() and so forth,spark could optimize chain operations,never execute intermediate process.
ACTION: EAGER to execute. like count(), foreach() countByKey and so forth
三:spark job execution
四:COMMON USED API
transformation: groupBy groupByKey reduceBy reduceByKey mapValues keys不会立即计算结果(lazy)
WikipediaRanking assignment:使用inverted index配合reduceByKey排序 比传统的遍历行查找内容aggregate速度快上一倍
package wikipediaimport java.util.stream.Collectorsimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.rdd.RDDimport org.apache.spark.storage.StorageLevelcase class WikipediaArticle(title: String, text: String) { /** * @return Whether the text of this article mentions `lang` or not * @param lang Language to look for (e.g. "Scala") */ def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)}object WikipediaRanking { val langs = List( "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS", "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy") val conf: SparkConf = new SparkConf().setAppName("Spark RDD").setMaster("local[*]").set("spark.executor.memory", "2g") val sc: SparkContext = new SparkContext(conf) // Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse` val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).flatMap(lines => lines.split("\n")).map(x=>WikipediaData.parse(x)) /** Returns the number of articles on which the language `lang` occurs. * Hint1: consider using method `aggregate` on RDD[T]. * Hint2: consider using method `mentionsLanguage` on `WikipediaArticle` */ def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = rdd.aggregate(0)((acc,article)=> if(article.mentionsLanguage(lang)) acc+1 else acc,(acc1, acc2) => (acc1 + acc2)) /* (1) Use `occurrencesOfLang` to compute the ranking of the languages * (`val langs`) by determining the number of Wikipedia articles that * mention each language at least once. Don't forget to sort the * languages by their occurrence, in decreasing order! * * Note: this operation is long-running. It can potentially run for * several seconds. */ //Result��List(("Scala", 999999), ("JavaScript", 1278), ("LOLCODE", 982), ("Java", 42)) def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = langs.map((lang)=>(lang,occurrencesOfLang(lang,rdd))).sortWith((x,y)=>x._2>y._2) /* Compute an inverted index of the set of articles, mapping each language * to the Wikipedia pages in which it occurs. */ def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = { rdd.map((w)=>(w,langs.filter((o)=>w.mentionsLanguage(o)).toList)).map(x=>x._2.map((ls)=>(ls,x._1))).flatMap(x=>x).groupByKey() } /* (2) Compute the language ranking again, but now using the inverted index. Can you notice * a performance improvement? * * Note: this operation is long-running. It can potentially run for * several seconds. */ def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = index.map((o)=>(o._1,o._2.size)).sortBy(_._2,false).collect().toList /* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined. * Can you notice an improvement in performance compared to measuring *both* the computation of the index * and the computation of the ranking? If so, can you think of a reason? * * Note: this operation is long-running. It can potentially run for * several seconds. */ def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = rdd.map((w)=>(w,langs.filter((o)=>w.mentionsLanguage(o)).toList)).map(x=>x._2.map((ls)=>(ls,x._1))).flatMap(x=>x).map((m)=>(m._1,1)).reduceByKey(_+_).sortBy(_._2,false).collect().toList def main(args: Array[String]) { /* Languages ranked according to (1) */ val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd)) /* An inverted index mapping languages to wikipedia pages on which they appear */ def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd) /* Languages ranked according to (2), using the inverted index */ val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index)) /* Languages ranked according to (3) */ val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd)) /* Output the speed of each ranking */ println(timing) sc.stop() } val timing = new StringBuffer def timed[T](label: String, code: => T): T = { val start = System.currentTimeMillis() val result = code val stop = System.currentTimeMillis() timing.append(s"Processing $label took ${stop - start} ms.\n") result }}