文章目录
- 简介
- LogStash接入
- ES-Hadoop接入
- 重要配置
- 批处理导入
- 流数据写入
- Dstream to ES
- Structured Streaming to ES
- 几个注意事项
- 主键设置
- 写入冲突
- Exactly Once
- ES写入性能
简介
Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎,可以对大量数据进行快速的搜索和聚合分析。同时它能够水平扩展,每秒钟可处理海量事件,同时能够自动管理索引和查询在集群中的分布方式,以实现极其流畅的操作。而且其可以确保集群(和数据)的安全性和可用性。
使用Elasticsearch首先需要接入数据,而在大数据应用场景下,数据往往存储在hadoop集群中。如何将Hadoop集群中的数据方便地接入到Elasticsearch集群中是关键。
LogStash接入
Elasticsearch的数据接入,首先想到的是利用LogStash组件。LogStash是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到Elasticsearch。不过从LogStash组件官方支持的接入插件中并不支持HDFS输入。结合其他插件实现HDFS数据接入的三种方式:
- 将数据导入到kafka集群中,然后使用LogStash的kafka插件接入;
- 利用HDFS的
Webhdfs
接口服务读取数据,然后使用LogStash的http插件接入; - 利用FUSE安装HDFS,然后使用LogStash的文件插件接入。
ES-Hadoop接入
ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。同时,也可以借助 ES 作为数据存储层,然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。具体可参考:官方文档。
重要配置
ES-Hadoop核心是通过 ES 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明:
- es.nodes:需要连接的 es 节点(不需要配置全部master节点,默认会自动发现其他可用节点);
- es.port:节点 http 通讯端口;
- es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
- es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
- es.index.auto.create:是否自动创建不存在的索引,默认为 true;
- es.resource:设置写入的索引和类型,索引和类型名均支持动态变量;
- es.mapping.id:设置文档 _id 对应的字段名;
- es.mapping.exclude:设置写入时忽略的字段,支持通配符。
设置配置参数主要有两种方式:通过SparkConf的set接口进行设置;在调用save或load函数时动态传入。
批处理导入
在Scala中,只需导入org.elasticsearch.spark.sql包,即可使用saveToEs等方法接入DataFrame数据(RDD数据可导入org.elasticsearch.spark包,接口方法具有相同的签名):
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
// elasticsearch-hadoop Spark package import
import org.elasticsearch.spark.sql._ ...// sc = existing SparkContext
val sqlContext = new SQLContext(sc)// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)// create DataFrame
val people = sc.textFile("people.txt") .map(_.split(",")).map(p => Person(p(0), p(1), p(2).trim.toInt)).toDF()val cfg = Map(("es.nodes", "localhost"),("es.write.operation" , "upsert"),("es.mapping.id" , "name")
)
// Index the resulting DataFrame to Elasticsearch through the saveToEs method
people.saveToEs("spark/people", cfg)
默认情况下,elasticsearch-hadoop将忽略空值,而不是不写整个文档。 由于DataFrame应被视为结构化表格数据,因此可通过将es.spark.dataframe.write.null设置切换为true,将空值写入DataFrame对象的空值字段。
流数据写入
Dstream to ES
任何DStream都可以保存到Elasticsearch,只要其内容可以翻译成文档即可。 这意味着DStream类型需要是Map(Scala或Java),JavaBean或Scala的case class类。如果不是这种情况,需要在Spark中转换数据或插入自定义的ValueWriter类。
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._import org.elasticsearch.spark.streaming._ ...val conf = ...
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1)) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")val rdd = sc.makeRDD(Seq(numbers, airports))
val microbatches = mutable.Queue(rdd) val cfg = Map(("es.nodes", "localhost"),("es.write.operation" , "upsert")
)
ssc.queueStream(microbatches).saveToEs("spark/docs", cfg) ssc.start()
ssc.awaitTermination()
注意:Streaming支持提供了特殊的优化,以便在运行具有非常小的处理窗口的作业时保留Spark执行程序上的网络资源。出于这个原因,应该更倾向于使用这种集成方式,而不是在DStream的foreachRDD返回的RDD上调用saveToEs。
Structured Streaming to ES
Spark Structured Streaming提供了内置于Spark SQL集成的统一流和批处理的接口。
import org.apache.spark.sql.SparkSession ...val spark = SparkSession.builder().appName("EsStreamingExample") .getOrCreate()// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)// create DataFrame
val people = spark.readStream .textFile("/path/to/people/files/*") .map(_.split(",")).map(p => Person(p(0), p(1), p(2).trim.toInt))val cfg = Map(("es.nodes", "localhost"),("es.write.operation" , "upsert"),("es.mapping.id" , "name")
)
people.writeStream.option("checkpointLocation", "/save/location") .format("es").start("spark/people", cfg)
Spark在基于批处理和基于流的数据集之间没有进行基于类型的区分,如果在基于流的数据集或DataFrame上调用这些方法,则会抛出非法参数异常。
几个注意事项
主键设置
向ES中插入数据时,如果没有指定主键(即_id
)则会自动生成一个id。在数据发生变化或者数据重导时可能会导致赃数据,为了保证数据的可控性,应该在插入数据时显示指定记录的主键值(自定义生成方式)。
写入冲突
如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。
-
设置 es.write.operation 为 upsert(该方法要求设置记录唯一id),这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。
-
自定义冲突处理类,通过自定义类来处理相关错误,例如忽略冲突等:
public class IgnoreConflictsHandler extends BulkWriteErrorHandler {public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {if (entry.getResponseCode() == 409) {StaticLog.warn("Encountered conflict response. Ignoring old data.");return HandlerResult.HANDLED;}return collector.pass("Not a conflict response code.");}}
Exactly Once
设置自定义主键id,并将写入模式设置为upsert,可以实现数据导入“Exactly Once”保证。异常数据经过流式处理后,保证结果数据中(并不能保证处理过程中),每条数据最多出现一次,且最少出现一次。
Streaming接口提供了Checkpoint功能,可以让程序再次启动时,从上一次异常退出的位置,重新开始计算。
ES写入性能
单个ES结点的写入速度大概是每秒1万行,增加Spark Streaming的计算能力,无法突破这个瓶颈。在写入数据量过大时会出现拒绝写入错误,因此在新业务上线时需要进行谨慎评估。可以通过增加集群节点来水平扩展,提高写入性能。