博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Learning
阅读量:6946 次
发布时间:2019-06-27

本文共 5352 字,大约阅读时间需要 17 分钟。

hot3.png

offcially manual : 

一: Spark versus Hadoop

Spark is faster than Hadoop cause hadoop execute disk io to retain failure tolerant function,whereas Spark through its functional programming .

二 :spark RDD(resilient distributed datasets)

TRANSFORMATION: LAZY to execute,like filter(),map(),flatMap() and so forth,spark could optimize chain operations,never execute intermediate process.

ACTION: EAGER to execute. like count(), foreach()  countByKey and so forth

三:spark job execution

141202_tFON_2942603.png

141424_j0Ti_2942603.png

四:COMMON USED API

transformation: groupBy   groupByKey  reduceBy reduceByKey  mapValues keys不会立即计算结果(lazy)

093218_fjKQ_2942603.png

111633_JF5O_2942603.png

112213_b4PI_2942603.png

WikipediaRanking assignment:使用inverted index配合reduceByKey排序 比传统的遍历行查找内容aggregate速度快上一倍

package wikipediaimport java.util.stream.Collectorsimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.rdd.RDDimport org.apache.spark.storage.StorageLevelcase class WikipediaArticle(title: String, text: String) {  /**    * @return Whether the text of this article mentions `lang` or not    * @param lang Language to look for (e.g. "Scala")    */  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)}object WikipediaRanking {  val langs = List(    "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",    "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")  val conf: SparkConf = new SparkConf().setAppName("Spark RDD").setMaster("local[*]").set("spark.executor.memory", "2g")  val sc: SparkContext = new SparkContext(conf)  // Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse`  val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).flatMap(lines => lines.split("\n")).map(x=>WikipediaData.parse(x))  /** Returns the number of articles on which the language `lang` occurs.   *  Hint1: consider using method `aggregate` on RDD[T].   *  Hint2: consider using method `mentionsLanguage` on `WikipediaArticle`   */  def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = rdd.aggregate(0)((acc,article)=>    if(article.mentionsLanguage(lang)) acc+1 else acc,(acc1, acc2) => (acc1 + acc2))  /* (1) Use `occurrencesOfLang` to compute the ranking of the languages   *     (`val langs`) by determining the number of Wikipedia articles that   *     mention each language at least once. Don't forget to sort the   *     languages by their occurrence, in decreasing order!   *   *   Note: this operation is long-running. It can potentially run for   *   several seconds.   */  //Result��List(("Scala", 999999), ("JavaScript", 1278), ("LOLCODE", 982), ("Java", 42))  def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =    langs.map((lang)=>(lang,occurrencesOfLang(lang,rdd))).sortWith((x,y)=>x._2>y._2)  /* Compute an inverted index of the set of articles, mapping each language   * to the Wikipedia pages in which it occurs.   */  def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {    rdd.map((w)=>(w,langs.filter((o)=>w.mentionsLanguage(o)).toList)).map(x=>x._2.map((ls)=>(ls,x._1))).flatMap(x=>x).groupByKey()  }  /* (2) Compute the language ranking again, but now using the inverted index. Can you notice   *     a performance improvement?   *   *   Note: this operation is long-running. It can potentially run for   *   several seconds.   */  def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =  index.map((o)=>(o._1,o._2.size)).sortBy(_._2,false).collect().toList  /* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined.   *     Can you notice an improvement in performance compared to measuring *both* the computation of the index   *     and the computation of the ranking? If so, can you think of a reason?   *   *   Note: this operation is long-running. It can potentially run for   *   several seconds.   */  def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =    rdd.map((w)=>(w,langs.filter((o)=>w.mentionsLanguage(o)).toList)).map(x=>x._2.map((ls)=>(ls,x._1))).flatMap(x=>x).map((m)=>(m._1,1)).reduceByKey(_+_).sortBy(_._2,false).collect().toList  def main(args: Array[String]) {    /* Languages ranked according to (1) */    val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))    /* An inverted index mapping languages to wikipedia pages on which they appear */    def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)    /* Languages ranked according to (2), using the inverted index */    val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))    /* Languages ranked according to (3) */    val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))    /* Output the speed of each ranking */    println(timing)    sc.stop()  }  val timing = new StringBuffer  def timed[T](label: String, code: => T): T = {    val start = System.currentTimeMillis()    val result = code    val stop = System.currentTimeMillis()    timing.append(s"Processing $label took ${stop - start} ms.\n")    result  }}

pair RDDs

114036_2eTQ_2942603.png

转载于:https://my.oschina.net/odetteisgorgeous/blog/1530050

你可能感兴趣的文章
编程算法 - 求1+2+...+n(函数指针) 代码(C++)
查看>>
WorldWind源码剖析系列:插件列表视图类PluginListView和插件列表视图项类PluginListItem...
查看>>
JS系列——Linq to js使用小结
查看>>
畅通工程,继续畅通工程,畅通工程再续,多种解法
查看>>
Swift String length property
查看>>
interlliJ idea 不识别文件类型的解决方式
查看>>
Atitit.数据库表的物理存储结构原理与架构设计与实践
查看>>
在Visual Studio Code中配置GO开发环境
查看>>
可以输入也可以下拉选择的select
查看>>
Windows消息传递机制具体解释
查看>>
结合MongoDB开发LBS应用(转)
查看>>
SDWebImage 原理及使用
查看>>
前端开发 Grunt 之 Connect详解
查看>>
IE11下不能引入外部css的解决方法
查看>>
Android 模式对话框提示Dialog
查看>>
mysql之导入与导出
查看>>
python 元祖(tuple)
查看>>
java.lang.Long cannot be cast to java.lang.Integer解决办法
查看>>
设置datagridview中button按钮的背景颜色
查看>>
十大Intellij IDEA快捷键(转)
查看>>