如何高效实现 MySQL 与 elasticsearch 的数据同步
 
                    
MySQL 自身简单、高效、可靠,是又拍云内部使用最广泛的数据库。但是当数据量达到一定程度的时候,对整个 MySQL 的操作会变得非常迟缓。而公司内部 robin/logs 表的数据量已经达到 800w,后续又有全文检索的需求。这个需求直接在 MySQL 上实施是难以做到的。
原数据库的同步问题
- 系统高耦合,侵入式代码,使得业务逻辑复杂度增加 
- 方案不通用,每一套同步都需要额外定制,不仅增加业务处理时间,还会提升软件复复杂度 
- 工作量和复杂度增加 
解决思路及方案
调整架构
改进数据库
成果展示前后对比

方案实施细节

- MySQL 
- Kafka 
- Maxwell(监听 binlog) 
- Logstash(将数据同步给 elasticsearch) 
- Elasticsearch 
1. MySQL配置
本次使用 MySQL 5.5 作示范,其他版本的配置可能稍许不同需要 
-- 创建一个 用户名为 maxwell 密码为 xxxxxx 的用户CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
开启数据库的 `binlog`,修改 `mysql` 配置文件,注意 `maxwell` 需要的 `binlog` 格式必须是`row`。
# /etc/mysql/my.cnf[mysqld]# maxwell 需要的 binlog 格式必须是 rowbinlog_format=row# 指定 server_id 此配置关系到主从同步需要按情况设置,# 由于此mysql没有开启主从同步,这边默认设置为 1server_id=1# logbin 输出的文件名, 按需配置log-bin=master
sudo systemctl restart mysqld
select @@log_bin;-- 正确结果是 1select @@binlog_format;-- 正确结果是 ROW
# /etc/my.cnflog_slave_updates = 1
-- robin.logsshow create table robin.logs;-- 表结构CREATE TABLE `logs` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT,`content` text NOT NULL,`user_id` int(11) NOT NULL,`status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL,`type` varchar(20) DEFAULT '',`meta` text,`created_at` bigint(15) NOT NULL,`idx_host` varchar(255) DEFAULT '',`idx_domain_id` int(11) unsigned DEFAULT NULL,`idx_record_value` varchar(255) DEFAULT '',`idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL,`idx_orig_record_value` varchar(255) DEFAULT '',PRIMARY KEY (`id`),KEY `created_at` (`created_at`)) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8
2. Maxwell 配置
本次使用 maxwell-1.39.2 作示范, 确保机器中包含 java 环境, 推荐 openjdk11 
下载 maxwell 程序
wget https://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gztar zxvf maxwell-1.39.2.tar.gz **&&** cd maxwell-1.39.2
- 一个是需要被监听binlog的数据库(只需要读权限) 
- 另一个是记录maxwell服务状态的数据库,当前这两个数据库可以是同一个 
- host 需要监听binlog的数据库地址 
- port 需要监听binlog的数据库端口 
- user 需要监听binlog的数据库用户名 
- password 需要监听binlog的密码 
- replication_host 记录maxwell服务的数据库地址 
- replication_port 记录maxwell服务的数据库端口 
- replication_user 记录maxwell服务的数据库用户名 
- filter 用于监听binlog数据时过滤不需要的数据库数据或指定需要的数据库 
- producer 将监听到的增量变化数据提交给的消费者 (如 stdout、kafka) 
- kafka.bootstrap.servers kafka 服务地址 
- kafka_version kafka 版本 
- kafka_topic 推送到kafka的主题 
启动 maxwell
./bin/maxwell--host=mysql-maxwell.mysql.svc.cluster.fud3--port=3306--user=root--password=password--replication_host=192.168.5.38--replication_port=3306--replication_user=cloner--replication_password=password--filter='exclude: *.*, include: robin.logs'--producer=kafka--kafka.bootstrap.servers=192.168.30.10:9092--kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1
3. 安装 Logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gztar zxvf logstash-8.5.0-linux-x86_64.tar.gz
rm config/logstash.yml
# config/logstash-sample.confinput {kafka {bootstrap_servers => "192.168.30.10:9092"group_id => "main"topics => ["maxwell-robinlogs"]}}filter {json {source => "message"}# 将maxwell的事件类型转化为es的事件类型# 如增加 -> index 修改-> updatetranslate {source => "[type]"target => "[action]"dictionary => {"insert" => "index""bootstrap-insert" => "index""update" => "update""delete" => "delete"}fallback => "unknown"}# 过滤无效的数据if ([action] == "unknown") {drop {}}# 处理数据格式if [data][idx_host] {mutate {add_field => { "idx_host" => "%{[data][idx_host]}" }}} else {mutate {add_field => { "idx_host" => "" }}}if [data][idx_domain_id] {mutate {add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" }}} else {mutate {add_field => { "idx_domain_id" => "" }}}if [data][idx_record_value] {mutate {add_field => { "idx_record_value" => "%{[data][idx_record_value]}" }}} else {mutate {add_field => { "idx_record_value" => "" }}}if [data][idx_record_opt] {mutate {add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" }}} else {mutate {add_field => { "idx_record_opt" => "" }}}if [data][idx_orig_record_value] {mutate {add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" }}} else {mutate {add_field => { "idx_orig_record_value" => "" }}}if [data][type] {mutate {replace => { "type" => "%{[data][type]}" }}} else {mutate {replace => { "type" => "" }}}mutate {add_field => {"id" => "%{[data][id]}""content" => "%{[data][content]}""user_id" => "%{[data][user_id]}""status" => "%{[data][status]}""meta" => "%{[data][meta]}""created_at" => "%{[data][created_at]}"}remove_field => ["data"]}mutate {convert => {"id" => "integer""user_id" => "integer""idx_domain_id" => "integer""created_at" => "integer"}}# 只提炼需要的字段mutate {remove_field => ["message","original","@version","@timestamp","event","database","table","ts","xid","commit","tags"]}}output {# 结果写到eselasticsearch {hosts => ["http://es-zico2.service.upyun:9500"]index => "robin_logs"action => "%{action}"document_id => "%{id}"document_type => "robin_logs"}# 结果打印到标准输出stdout {codec => rubydebug}}
# 测试配置文件*bin/logstash -f config/logstash-sample.conf --config.test_and_exit# 启动*bin/logstash -f config/logstash-sample.conf --config.reload.automatic
4. 全量同步
INSERT INTO maxwell.bootstrap( database_name, table_name, where_clause, client_id )values( 'robin', 'logs', 'id > 1', 'maxwell' );
# 检测 elasticsearch 中的数据量GET robin_logs/robin_logs/_count
快 来 找 又 小 拍 

推 荐 阅 读 


设为星标

更新不错过




设为星标

更新不错过
[广告]赞助链接:
                        关注数据与安全,洞悉企业级服务市场:https://www.ijiandao.com/
                        让资讯触达的更精准有趣:https://www.0xu.cn/
                    

 关注KnowSafe微信公众号
            关注KnowSafe微信公众号 
                 
             
             
            
 
        
 
        
