水善利万物而不争,处众人之所恶,故几于道💦
文章目录
- 1. collect()
- 2. count()
- 3. first()
- 4. take()
- 5. takeOrdered()
- 6. countByKey()
- 7. saveAS...()
- 8. foreach()
- 9. foreachPartition() ***
1. collect()
收集RDD每个分区的数据以数组封装之后发给Driver
如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,工作中一般需要将Driver内存设置为5-10G。可以通过bin/spark-submit --driver-memory 10G
这样设置
@Testdef collect(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))val arr = rdd1.collect()println(arr.toList)}
结果:
2. count()
返回RDD中元素的个数
@Test
def count(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))println(rdd1.count())
}
结果:
3. first()
返回RDD中的第一个元素
他会从多个分区取数据,如果0号分区取到了数据的话就只有一个job;如果0号分区没有取到数据,或者取到的数据不够,那就会再启动一个job去其他分区取
@Testdef first(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),7)// 0号分区没有数据所以就会再启动一个job从后面的分区取,所以web页面看到有两个jobval i = rdd1.first()println(i)Thread.sleep(10000000)
}
结果:
4. take()
返回RDD中前n个元素组成的数组
take和first一样如果取到就一个job如果取不到或者没取够就再来一个job去取
@Test
def take(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),3)println(rdd1.take(3).toList)Thread.sleep(10000000)
}
结果:
5. takeOrdered()
这个是取排序之后的前几个元素
takeOrdered没有shuffle,因为只需要每个分区取前三然后拉到一起再取一次前三就完事了
@Test
def takeOrdered(): Unit ={val rdd1 = sc.parallelize(List(1, 7,98,3,7,86,23,54, 9, 42, 6),3)val ints = rdd1.takeOrdered(3)println(ints.toList)Thread.sleep(1000000)
}
结果:
6. countByKey()
统计每个key出现的次数,返回的结果是(key,次数)
@Test
def countByKey(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))val rdd2 = rdd1.countByKey()println(rdd2.toList)
}
结果:
7. saveAS…()
saveAsTextFile(path)
将数据保存成text文件,有几个task就保存几个文件
saveAsSequenceFile(path)
将数据保存成Sequencefile文件【只有kv类型RDD有该操作,单值的没有】
saveAsObjectFile(path)
将数据序列化成对象保存到文件
@Test
def save(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.saveAsTextFile("output/text") // 为啥保存出来8个文件因为有8个taskrdd1.saveAsObjectFile("output/ObjectFile")rdd1.saveAsSequenceFile("output/SequenceFile")
}
结果:
8. foreach()
遍历RDD中的每个元素
@Test
def foreach(): Unit = {val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.foreach(println)
}
结果:
9. foreachPartition() ***
对每个分区遍历,参数列表传入的函数是针对每个分区的操作,有多少个分区函数就执行多少次
foreachPartition的使用场景是:一般用于将数据写入mysql/redis/hbase等位置,可以减少连接的创建、销毁次数,提高效率
@Test
def foreachPartition(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.foreachPartition(it=>{var connection:Connection = nullvar statement:PreparedStatement = nulltry{connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")statement = connection.prepareStatement("insert into wc values(?,?)")// 计数器var count = 0it.foreach(x=>{statement.setString(1,x._1)statement.setInt(2,x._2)// 添加到批中,一批一批的执行statement.addBatch()// 满1000条执行一批if(count % 1000 == 0){statement.executeBatch()// todo 执行完批后要记得clearBatch !!!!!statement.clearBatch()}count = count+1})// 最后不满1000条的也执行一次statement.executeBatch()}catch {case e:Exception => e.printStackTrace()}finally {if (connection != null) {connection.close()}if (statement != null) {statement.close()}}})
}
结果: