`hadoop2@ubuntu:/liguodong/software/spark$ bin/spark-shell --master spark://ubuntu:7077 --executor-memory 2g` ```scala //parallelize演示(并行化scala的数据集) val num=sc.parallelize(1 to 10) //将数组并行化成RDD,默认分片 val doublenum=num.map(_*2) //每个元素*2 val threenum=doublenum.filter(_%3==0) //过滤出能整除3的元素 //Action触发job的运行 threenum.collect threenum.toDebugString //并行化时,并进行分片 val num1=sc.parallelize(1 to 10,6) val doublenum1=num1.map(_*2) val threenum1=doublenum1.filter(_ % 3 == 0) threenum1.collect threenum1.toDebugString threenum.cache() val fournum=threenum.map(x=>x*x) fournum.collect fournum.toDebugString threenum.unpersist() //删除掉cache num.reduce(_+_) //元素累教 num.take(5) //选择前5个 num.first //第一个元素 num.count //计数 num.take(5).foreach(println)//前5个数打印出来 ``` ```scala //K-V演示 val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5))) //按key进行排序 kv1.sortByKey().collect //注意sortByKey的小括号不能省 //按key分组,不做合并 kv1.groupByKey().collect ==>res13: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2, 5)), (A,CompactBuffer(1, 4)), (C,CompactBuffer(3))) //按key分组,会做合并 kv1.reduceByKey(_+_).collect ==>res15: Array[(String, Int)] = Array((B,7), (A,5), (C,3)) val kv2=sc.parallelize(List(("A",1),("A",4),("C",3),("A",4),("B",5))) //去重 kv2.distinct.collect ==>res17: Array[(String, Int)] = Array((A,4), (A,1), (B,5), (C,3)) //合并 kv1.union(kv2).collect ==>res18: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,1), (A,4), (C,3), (A,4), (B,5)) val kv3=sc.parallelize(List(("A",10),("B",20),("D",30))) kv1.join(kv3).collect ==>res21: Array[(String, (Int, Int))] = Array((B,(2,20)), (B,(5,20)), (A,(1,10)), (A,(4,10))) kv1.cogroup(kv3).collect ==>res22: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2, 5), CompactBuffer(20))), (D,(CompactBuffer(),CompactBuffer(30))), (A,(CompactBuffer(1, 4),CompactBuffer(10))), (C,(CompactBuffer(3),CompactBuffer()))) val kv4=sc.parallelize(List(List(1,2),List(3,4))) kv4.flatMap(x=>x.map(_+1)).collect ==>res23: Array[Int] = Array(2, 3, 4, 5) ``` ```scala //文件读取演示 val rdd1=sc.textFile("hdfs://localhost:9000/liguodong/test.txt") val words=rdd1.flatMap(_.split(" ")) val wordscount=words.map(x=>(x,1)).reduceByKey(_+_) wordscount.collect wordscount.toDebugString val rdd2=sc.textFile("hdfs://localhost:9000/liguodong/*.txt") rdd2.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect //读取压缩文件 val rdd3=sc.textFile("hdfs://localhost:9000/liguodong/oo.txt.gz") rdd3.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect ``` ```scala //日志处理演示 //下载地址:http://download.labs.sogou.com/dl/q.html 完整版(2GB):gz格式 //访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL //SogouQ1.txt、SogouQ2.txt、SogouQ3.txt分别是用head -n 或者tail -n 从SogouQ数据日志文件中截取 //搜索结果排名第1,但是点击次序排在第2的数据有多少? val rdd1 = sc.textFile("hdfs://localhost:9000/liguodong/SogouQ1.txt.tar.gz") val rdd2=rdd1.map(_.split("\t")).filter(_.length==6) rdd2.count() val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2) rdd3.count() rdd3.toDebugString //session查询次数排行榜 //排序是true表示升序,flase表示降序 val rdd4=rdd2.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)) rdd4.toDebugString rdd4.saveAsTextFile("hdfs://localhost:9000/liguodong/output1") //数据的文件可能有多个,我们可以进行合并输出到本地 hadoop2@ubuntu:/liguodong/software/hadoop$ bin/hdfs dfs -getmerge hdfs://localhost:9000/liguodong/output1 /home/hadoop2/liguodong/result hadoop2@ubuntu:/liguodong/software/hadoop$ ll /home/hadoop2/liguodong/result -rw-r--r-- 1 hadoop2 hadoop 10477740 Dec 22 16:07 /home/hadoop2/liguodong/result hadoop2@ubuntu:/liguodong/software/hadoop$ head /home/hadoop2/liguodong/result (b3c94c37fb154d46c30a360c7941ff7e,676) (cc7063efc64510c20bcdd604e12a3b26,613) (955c6390c02797b3558ba223b8201915,391) (b1e371de5729cdda9270b7ad09484c4f,337) (6056710d9eafa569ddc800fe24643051,277) (637b29b47fed3853e117aa7009a4b621,266) (c9f4ff7790d0615f6f66b410673e3124,231) (dca9034de17f6c34cfd56db13ce39f1c,226) (82e53ddb484e632437039048c5901608,221) (c72ce1164bcd263ba1f69292abdfdf7c,214) //join演示 val format = new java.text.SimpleDateFormat("yyyy-MM-dd") case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float,lng: Float) case class Click (d: java.util.Date, uuid: String, landing_page: Int) val reg = sc.textFile("hdfs://localhost:9000/liguodong/join/reg.tsv").map(_.split("\t")).map(r => (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat))) val clk = sc.textFile("hdfs://localhost:9000/liguodong/join/clk.tsv").map(_.split("\t")).map(c => (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt))) reg.join(clk).take(2) ==>res34: Array[(String, (Register, Click))] = Array((81da510acc4111e387f3600308919594, (Register(Tue Mar 04 00:00:00 CST 2014,81da510acc4111e387f3600308919594,2,33.85701,-117.85574), Click(Thu Mar 06 00:00:00 CST 2014,81da510acc4111e387f3600308919594,61))), (15dfb8e6cc4111e3a5bb600308919594, (Register(Sun Mar 02 00:00:00 CST 2014,15dfb8e6cc4111e3a5bb600308919594,1,33.659943,-117.95812), Click(Tue Mar 04 00:00:00 CST 2014,15dfb8e6cc4111e3a5bb600308919594,11)))) //cache()演示 //检查block命令:bin/hdfs fsck /dataguru/data/SogouQ3.txt -files -blocks -locations val rdd5 = sc.textFile("hdfs://localhost:9000/liguodong/SogouQ1.txt") rdd5.cache() rdd5.count() rdd5.count() //比较时间 ``` **cache之前与cache之后时间的比较:** cache了100%到内存之后,速度会快很多。如果内存不够,没有cache到100%,那么速度会提升,但是不能充分发挥优势。内存不够cache只要磁盘空间足够大,仍然是能够执行的。 ![](image/1.png) **注:** 在Spark中, map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象; 而flatMap函数则是两个操作的集合——正是“先映射后扁平化”: 操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象。 操作2:最后将所有对象合并为一个对象。