es数据同步方案
- 前言
- 方案对比
- logstash方式
- 安装
- 配置
- 启动
- 测试
- canal方式
- MQ方式
前言
上两篇文章介绍过EL的安装和使用,接下来,我们面临的问题是怎么把数据同步到es里,比如,商城的商品数据,商品需要搜索,所以在添加商品的同时,除了往数据库添加一份,同时还要把这些数据同步到es里才行。
方案对比
解决方案,目前比较流行的可分为以下几种:
1. 程序同步
直接在代码里写逻辑,数据在增删改查进数据库的同时,也往es里同步一份。
优点:方便,无需集成其他的技术;
缺点:代码耦合性太高,增加接口的处理时间;
2. logstash
定时查询数据库,查询到数据有变化就发送到es中。
优点:解耦,官方推荐。
缺点:
1、不支持同步删除,当然可以在数据库用逻辑删除代替物理删除,对于logstash来说就是更新操作了;
2、大数据量有性能问题,在对数据库的压力上,logstash的原理是定时扫描变动的表,所以对数据库有一定压力,并且如果有其他程序在进行某条语句更新,锁住了这条行数据,那logstash读取数据时,就会被“卡住”,如果这个时间过长,可能会影响服务器卡死。
3、无法做到实时同,只能秒级同步。
如果实时性要求不高,并且定时时间内数据变化量不大,推荐使用这个,学习维护成本比较低,毕竟是官方推荐,ELK全家桶。
3. canal
利用数据库的binlog同步变化数据,然后将数据发送给es,当然也可以通过java代码监听拿到数据,再发送到es或做其他处理。
优点:解耦,实时同步,没有大数据性能压力。
缺点:学习和维护成本高,要求对数据库有创建用户的权限,毕竟要用到数据库同步功能,开启binlog数据库的压力也会增加。
4. MQ中间件
有数据变化的时候,就通知mq,然后监听mq实现数据同步到mq。
优点:解耦,适合高并发。
缺点:如何保证消息可靠性,需要在业务代码中加入发送消息到MQ的代码。
logstash方式
演示同步mysql数据到es。
安装
官网下载
下载之后解压即可。
配置
本文安装的版本是logstash-7.12.1,准备测试的数据库表结构
- 创建自己的专属 配置
进入解压好的文件夹logstash-7.12.1,创建自定义配置文件夹,用来存放配置文件。
mkdir mysqlnote
创建核心配置文件,名字自定义,主要用来配置mysql和logstash的数据映射和数据转换等等信息。
touch logstash-sync.conf
配置详情
input {jdbc {# 设置 MySql/MariaDB 数据库url以及数据库名称jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&allowMultiQuerie=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"# 用户名和密码jdbc_user => "root"jdbc_password => "root"# 数据库驱动mysql-connector-java-8.0.19.jar所在位置,可以是绝对路径或者相对路径jdbc_driver_library => "/usr/local/logstash/logstash-7.12.1/mysqlnote/mysql-connector-java-8.0.27.jar"# 驱动类名jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# 是否开启分页,ture为开启jdbc_paging_enabled => false# 分页每页数量jdbc_page_size => "50"# 设置时区jdbc_default_timezone =>"Asia/Shanghai"# 执行的sql文件路径statement_filepath => "/usr/local/logstash/logstash-7.12.1/mysqlnote/mysql.sql"#使用这个可以直接写sql语句,但是复杂的语句最好是写在文件内#statement =>""# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务schedule => "*/5 * * * * *"#是否需要记录某个字段值,如果为true,我们可以自定义要记录的数据库某个字段值,例如id或date字段。如果为false,记录的是上次执行的标记,默认是一个timestampuse_column_value => true#记录上次执行字段值路径。我们可以在sql语句中这么写:WHERE ID > :last_sql_value。其中 :sql_last_value 取得就是该文件中的值,这个last_time会以文件形式存在last_run_metadata_path => "/usr/local/logstash/logstash-7.12.1/mysqlnote/last_time"#如果use_column_value为真,需配置此参数. 指定增量更新的字段名。当然该字段必须是递增的,比如id或date字段。tracking_column => "updateTime"# tracking_column 对应字段的类型,只能选择timestamp或者numeric(数字类型),默认numeric。tracking_column_type => "timestamp"#如果为true,每次会记录所更新的字段的值,并保存到 last_run_metadata_path 指定的文件中record_last_run => true# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录clean_run => false# 是否将字段名称转小写。默认是true。这里注意Elasticsearch是区分大小写的lowercase_column_names => false}
}
output {elasticsearch {# es地址 集群数组hosts => ["127.0.0.1:9200","127.0.0.1:9201"] hosts => ["127.0.0.1:9200"] # 同步的索引名必须要有@timestamp 不然yyyyMM不起效index => "user"# 设置_docID和数据相同document_id => "%{id}"#自定的模板名称#template_name => "ps_seal_log"#自定义的模板配置文件#template => "/usr/local/logstash/logstash-7.12.1/mysqlnote/ps_test_log_template.json"#是否重写模板#template_overwrite => true }# 日志输出形式设置stdout {codec => json_lines#codec => rubydebug}
}
注意:output 中的映射模板注释掉了,原因是找了一圈不知道怎么写这个文件,网上的案例也是众说纷纭,所以,最好是直接用kibana直接先定义好,例如下边:
PUT /user
{"mappings": {"properties": {"id": {"type": "keyword"},"userName": {"type": "text","analyzer": "ik_max_word"},"age": {"type": "integer","index" : false},"sex": {"type": "integer"},"address": {"type": "text"},"price": {"type": "double","index" : false},"createTime": {"type": "date","index" : false},"updateTime": {"type": "date","index" : false},"status": {"type": "keyword"}}}
}
这个配置满足一般的需求了,如果有更多的需求,此配置无法满足,可以使用logstash的过滤配置,input和output中间还有一个filter配置,可以用来过滤和转换数据,有需求的可以自行研究,这里因为暂时没需要,就没有去研究了。
- 准备资源
接下来,准备上边配置所需要的东西,本次配置需要下边的文件和jar包。
创建时间记录文件,文件内容会在logstash启动连上数据库之后自动创建。
touch last_time
创建sql文件,查询出你要映射到es中的数据,我这里演示最简单的查询全部
vim mysql.sql
文件内容
select id,user_name as userName,age,sex,address,price,create_time as createTime,update_time as updateTime,status
from user
where update_time > :sql_last_value;
这个sql_last_value就是上边last_time文件中记录的值。
下载数据库驱动jar包。
官网地址
注意,5版本和8版本设置驱动类名(jdbc_driver_class)是有区别的:
5版本:com.mysql.jdbc.Driver。
8版本:com.mysql.cj.jdbc.Driver。
启动
进入logstash文件夹
./bin/logstash -f ./mysqlnote/logstash-sync.conf
测试
往数据库里添加数据,每5秒之后logstash就会查询数据库,增加添加数据到es中。
由此可见,因为同步数据是依赖查询mysql,所以logstash不支持同步删除操作,当然,就像我开头说的,可以设置逻辑删除字段status,以更新代替删除。
如果实在不想用逻辑删除,可以用下边两种方式。
- 其他
多表同步或者海量数据初同步方法,请参考这篇文章的后半部分
canal方式
canal分为三个组件构成,deployer服务端,adapter客户端、admin web管理端,有集群高可用的一套解决方案,官方文档也比较详细,大项目推荐使用。
canal同步MySQL数据到Elasticsearch
MQ方式
MQ有很多种,此处采用kafka来实现。
利用springboot集成kafka,随后接口收到消息,直接将增删改的消息扔进kafka,在消费端处理信息,之后同步到ES即可,缺点就是需要维护一套kafka以及消息处理的逻辑代码。
kafka的安装和整合springboot,参考下方的文章即可:
Linux安装kafka
springboot集成整合kafka