Elasticsearch数据接入

文章目录

  • 简介
  • LogStash接入
  • ES-Hadoop接入
    • 重要配置
    • 批处理导入
    • 流数据写入
      • Dstream to ES
      • Structured Streaming to ES
    • 几个注意事项
      • 主键设置
      • 写入冲突
      • Exactly Once
      • ES写入性能

简介

Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎,可以对大量数据进行快速的搜索和聚合分析。同时它能够水平扩展,每秒钟可处理海量事件,同时能够自动管理索引和查询在集群中的分布方式,以实现极其流畅的操作。而且其可以确保集群(和数据)的安全性和可用性。

使用Elasticsearch首先需要接入数据,而在大数据应用场景下,数据往往存储在hadoop集群中。如何将Hadoop集群中的数据方便地接入到Elasticsearch集群中是关键。

LogStash接入

Elasticsearch的数据接入,首先想到的是利用LogStash组件。LogStash是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到Elasticsearch。不过从LogStash组件官方支持的接入插件中并不支持HDFS输入。结合其他插件实现HDFS数据接入的三种方式:

  • 将数据导入到kafka集群中,然后使用LogStash的kafka插件接入;
  • 利用HDFS的Webhdfs接口服务读取数据,然后使用LogStash的http插件接入;
  • 利用FUSE安装HDFS,然后使用LogStash的文件插件接入。

ES-Hadoop接入

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。同时,也可以借助 ES 作为数据存储层,然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。具体可参考:官方文档。

ES-Hadoop连接器架构

重要配置

ES-Hadoop核心是通过 ES 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明:

  • es.nodes:需要连接的 es 节点(不需要配置全部master节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量;
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。

设置配置参数主要有两种方式:通过SparkConf的set接口进行设置;在调用save或load函数时动态传入。

批处理导入

在Scala中,只需导入org.elasticsearch.spark.sql包,即可使用saveToEs等方法接入DataFrame数据(RDD数据可导入org.elasticsearch.spark包,接口方法具有相同的签名):

import org.apache.spark.sql.SQLContext    
import org.apache.spark.sql.SQLContext._
// elasticsearch-hadoop Spark package import
import org.elasticsearch.spark.sql._      ...// sc = existing SparkContext
val sqlContext = new SQLContext(sc)// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)//  create DataFrame
val people = sc.textFile("people.txt")    .map(_.split(",")).map(p => Person(p(0), p(1), p(2).trim.toInt)).toDF()val cfg = Map(("es.nodes", "localhost"),("es.write.operation" , "upsert"),("es.mapping.id" , "name")
)
// Index the resulting DataFrame to Elasticsearch through the saveToEs method
people.saveToEs("spark/people", cfg) 

默认情况下,elasticsearch-hadoop将忽略空值,而不是不写整个文档。 由于DataFrame应被视为结构化表格数据,因此可通过将es.spark.dataframe.write.null设置切换为true,将空值写入DataFrame对象的空值字段。

流数据写入

Dstream to ES

任何DStream都可以保存到Elasticsearch,只要其内容可以翻译成文档即可。 这意味着DStream类型需要是Map(Scala或Java),JavaBean或Scala的case class类。如果不是这种情况,需要在Spark中转换数据或插入自定义的ValueWriter类。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._               
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._import org.elasticsearch.spark.streaming._           ...val conf = ...
val sc = new SparkContext(conf)                      
val ssc = new StreamingContext(sc, Seconds(1))       val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")val rdd = sc.makeRDD(Seq(numbers, airports))
val microbatches = mutable.Queue(rdd)                val cfg = Map(("es.nodes", "localhost"),("es.write.operation" , "upsert")
)
ssc.queueStream(microbatches).saveToEs("spark/docs", cfg) ssc.start()
ssc.awaitTermination() 

注意:Streaming支持提供了特殊的优化,以便在运行具有非常小的处理窗口的作业时保留Spark执行程序上的网络资源。出于这个原因,应该更倾向于使用这种集成方式,而不是在DStream的foreachRDD返回的RDD上调用saveToEs。

Structured Streaming to ES

Spark Structured Streaming提供了内置于Spark SQL集成的统一流和批处理的接口。

import org.apache.spark.sql.SparkSession    ...val spark = SparkSession.builder().appName("EsStreamingExample")           .getOrCreate()// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)//  create DataFrame
val people = spark.readStream                   .textFile("/path/to/people/files/*")    .map(_.split(",")).map(p => Person(p(0), p(1), p(2).trim.toInt))val cfg = Map(("es.nodes", "localhost"),("es.write.operation" , "upsert"),("es.mapping.id" , "name")
)
people.writeStream.option("checkpointLocation", "/save/location")  .format("es").start("spark/people", cfg)                               

Spark在基于批处理和基于流的数据集之间没有进行基于类型的区分,如果在基于流的数据集或DataFrame上调用这些方法,则会抛出非法参数异常。

几个注意事项

主键设置

向ES中插入数据时,如果没有指定主键(即_id)则会自动生成一个id。在数据发生变化或者数据重导时可能会导致赃数据,为了保证数据的可控性,应该在插入数据时显示指定记录的主键值(自定义生成方式)。

写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

  • 设置 es.write.operation 为 upsert(该方法要求设置记录唯一id),这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

  • 自定义冲突处理类,通过自定义类来处理相关错误,例如忽略冲突等:

      public class IgnoreConflictsHandler extends BulkWriteErrorHandler {public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {if (entry.getResponseCode() == 409) {StaticLog.warn("Encountered conflict response. Ignoring old data.");return HandlerResult.HANDLED;}return collector.pass("Not a conflict response code.");}}
    

Exactly Once

设置自定义主键id,并将写入模式设置为upsert,可以实现数据导入“Exactly Once”保证。异常数据经过流式处理后,保证结果数据中(并不能保证处理过程中),每条数据最多出现一次,且最少出现一次。

Streaming接口提供了Checkpoint功能,可以让程序再次启动时,从上一次异常退出的位置,重新开始计算。

ES写入性能

单个ES结点的写入速度大概是每秒1万行,增加Spark Streaming的计算能力,无法突破这个瓶颈。在写入数据量过大时会出现拒绝写入错误,因此在新业务上线时需要进行谨慎评估。可以通过增加集群节点来水平扩展,提高写入性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/252454.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Linux网络连接命令

目录 1. hostname 2. ping 3. ifconfig 4. iwconfig 5. nslookup 6. traceroute 7. finger 8. telnet 9. ethtool 10. netstat 网络和监控命令类似于这些&#xff1a; hostname, ping, ifconfig, iwconfig, netstat, nslookup, traceroute, finger, telnet, ethtool …

linux ubuntu 网络,Ubuntu无法连接网络的解决办法

虚拟机中Ubuntu无法连接网络的有效解决办法&#xff1a; 1、Ubuntu网络设置&#xff1a; 依次单击【System Settings】-->【Network】-->【Wired】-->【Options…】&#xff0c;如下图所示&#xff1a; 依次选择【General】&#xff0c;勾选如下图所示的单选框,最后点…

Windows自动修复无法修复你的电脑

报错信息 &#xff1a; 自动修复无法修复你的电脑 解决办法 方法1、 1、先点击高级设置 2、点击系统还原选项。 3、实验第二个选项 4、清除所有的驱动&#xff0c;因为可能是驱动与系统不太匹配导致。 5、然后选择完全清理驱动器的选项 6、最后我们点击初始化就可以了…

Windows“自动修复”无法修复

想必很多Windows系统的用户都遇到了这个问题吧&#xff0c;本以为重启能解决所有问题&#xff0c;结果发现无限套娃了……怀揣着电脑里的猫片要丢失的担忧&#xff0c;死活不愿意重置的笔者找到了一种无须重置电脑&#xff0c;windows下的任何文件也不会丢失的简单修复方法。 废…

【windows : 系统问题】解决win10自动修复无法开机问题

问题描述&#xff1a; win10系统自动更新&#xff08;不知道是不是某个驱动&#xff1f;&#xff09;后重启电脑后就出现自动修复的蓝屏问题。可是结果却是无法开机成功。 一下是转载了一篇非重装系统下的解决方法&#xff0c;亲测可用。 Win10自动修复无法开机【完美解决】 …

windows10自动修复无法开机

Windows10操作系统于2015年7月29日正式发布&#xff0c;此后&#xff0c;win10也就成了新上市的笔记本电脑或者台式机电脑的预装操作系统&#xff01;win10系统给我们带了全新的体验&#xff0c;当然也带来了一定的烦恼&#xff01;就拿win10自动修复这个功能来说&#xff0c;玩…

win10系统开机自动修复失败的解决方法

事情是这样的&#xff0c;在某一天的傍晚帮对象装了一个arcgis软件&#xff0c;学学科地理的同学应该都要用的&#xff0c;当然作为一个没有正版资源的人&#xff0c;首选了荡来的安装包&#xff0c;并试图安装&#xff0c;经过一顿操作&#xff0c;确认各项功能都可以正常使用…

win10自动修复重启,无法开机

Windows 10是美国微软公司研发的跨平台及设备应用的操作系统&#xff0c;2018年6月15日&#xff0c;微软宣布Windows 10四月更新已经全员推送就绪&#xff0c;目前的部署安装量超2.5亿。新操作系统给我们带来了许多全新的功能&#xff0c;同时在使用过程中我们也遇到了很多麻烦…

win10自动修复重启,无法开机「完美解决」

相信很多人遇到过这样的问题&#xff0c;本人是因为电脑开机忘了插电源&#xff0c;吃了个早饭回来发现电脑关机了&#xff0c;在启动就是这样了。开始遇到时没这么慌&#xff0c;上网一查才发现了事情的严重性&#xff0c;我的论文还存在桌面上呢。从网上看了一些方法&#xf…

解决win10 自动修复失败电脑无法开机问题

写在前面的话&#xff1a; 1.博主不定期上线&#xff0c;所以有时候看到私信的时候&#xff0c;时间已经过去好久了&#xff0c;所以就不会回复私信了&#xff1b; 2.再就是写这篇博客仅仅是为了记录一下自己解决该问题的办法&#xff0c;博主本人对修电脑这方面也不是很在行&a…

HTTP Web安全

验证安全机制 会话管理机制 SQL注入原理 SELECT * FROM test.user WHERE username or 11 and passwordanyxxxxx;当username值为’ or 11时&#xff0c;SQL语句的逻辑表达式结构被修改了&#xff0c;因为 or 1‘1’ &#xff0c;致使不论密码是否正确&#xff0c;其验证都将通过…

html浏览器安全调色板,网页安全色调色盘

在HTML的语法中&#xff0c;颜色会以十六进位值(例如&#xff1a;#FF0000)或颜色名称(例如&#xff1a;Red)来表示。网页安全色是指在256色模式下&#xff0c;无论在Windows或Macintosh系统下&#xff0c;在Netscape Navigator和Microsoft Internet Explorer浏览器中都能显示相…

Web安全色的意义

问题&#xff1a; 不同的平台&#xff08;Mac、PC等&#xff09;有不同的调色板&#xff0c;不同的浏览器也有自己的调色板。这就意味着对于一幅图&#xff0c;显示在Mac上的Web浏览器中的图像&#xff0c;与它在PC上相同浏览器中显示的效果可能差别很大。 选择特定的颜色时&am…

浏览器安全之网络安全(HTTPS)

为什么要使用 HTTPS 协议 在将 HTTP 数据提交给 TCP 层之后&#xff0c;数据会经过用户电脑、WiFi 路由器、运营商和目标服务器&#xff0c;在这中间的每个环节中&#xff0c;数据都有可能被窃取或篡改。比如用户电脑被黑客安装了恶意软件&#xff0c;那么恶意软件就能抓取和篡…

【Web安全】PHP安全

一、文件包含漏洞 严格来说&#xff0c;文件包含就是代码注入的一种。代码注入&#xff0c;其原理就是注入一段用户能控制的脚本或代码并让服务器端执行。代码注入的典型代表就是文件包含。文件包含可能会出现在JSP、PHP、ASP等语言中&#xff0c;常见函数如下&#xff1a; PHP…

vim插件管理工具vundle安装与配置

目录 1 下载vundle2 配置3 安装插件4 关于代码补全 1 下载vundle 默认下载到~/.vim/bundle/vundle目录下 git clone https://github.com/gmarik/vundle.git ~/.vim/bundle/vundle2 配置 在.vimrc 中添加bundle的配置 注意&#xff0c;.vimrc是自己在home目录下创建&#xff…

VIM插件管理工具Vundle安装

Vundle安装 软件环境&#xff1a;WIN7 32bit GVIM 1.安装git 除PATH环境变量设置如图以外&#xff0c;其它选项默认&#xff1a; 2.添加curl.cmd&#xff0c;位置为C:\Program Files\Git\cmd&#xff0c;假设git安装在C:\Program Files\Git 3.修改vimrc 4.下载vundle。git clo…

使用Vundle管理Vim插件

几种管理插件 Vim 的插件管理工具有蛮多&#xff0c;比如&#xff1a; Vundle vim-addon-manager vpathogen.vim vvundle vvimana Vim-addon-manager 和 vimana 的对比&#xff0c;参见Vim的插件管理工具 我最会选择了 Vundle&#xff0c;通过子目录管理插件&#xff0c;支持…

Vundle使用帮助

关于 Vundle 是 Vim bundle 的简称,是一个 Vim 插件管理器. Vundle 允许你做… 同时在.vimrc中跟踪和管理插件安装特定格式的插件(a.k.a. scripts/bundle)更新特定格式插件通过插件名称搜索Vim scripts中的插件清理未使用的插件可以通过单一按键完成以上操作,详见interactiv…

vim 插件vundle中Plugin和Bundle的区别

参考博客&#xff1a;https://segmentfault.com/q/1010000010384766 目前vundle正在改变&#xff0c;版本不同&#xff0c;使用的命令就不同&#xff0c; 现在正在改变&#xff0c;借口正在改变&#xff0c;可以看到&#xff0c;名字已经改成Vundle.vim&#xff0c;不再是之前…