0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看威廉希尔官方网站 视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

实现MySQL与elasticsearch数据同步的方法

OSC开源社区 来源:又拍云 2023-03-17 13:49 次阅读

MySQL 自身简单、高效、可靠,是又拍云内部使用最广泛的数据库。但是当数据量达到一定程度的时候,对整个 MySQL 的操作会变得非常迟缓。而公司内部 robin/logs 表的数据量已经达到 800w,后续又有全文检索的需求。这个需求直接在 MySQL上实施是难以做到的。

原数据库的同步问题

由于传统的 mysql 数据库并不擅长海量数据的检索,当数据量到达一定规模时(估算单表两千万左右),查询和插入的耗时会明显增加。同样,当需要对这些数据进行模糊查询或是数据分析时,MySQL作为事务型关系数据库很难提供良好的性能支持。使用适合的数据库来实现模糊查询是解决这个问题的关键。 但是,切换数据库会迎来两个问题,一是已有的服务对现在的 MySQL重度依赖,二是MySQL的事务能力和软件生态仍然不可替代,直接迁移数据库的成本过大。我们综合考虑了下,决定同时使用多个数据库的方案,不同的数据库应用于不同的使用场景。

而在支持模糊查询功能的数据库中,elasticsearch 自然是首选的查询数据库。这样后续对业务需求的切换也会非常灵活。 那具体该如何实现呢?在又拍云以往的项目中,也有遇到相似的问题。之前采用的方法是在业务中编写代码,然后同步到 elasticsearch 中。具体是这样实施的:每个系统编写特定的代码,修改 MySQL数据库后,再将更新的数据直接推送到需要同步的数据库中,或推送到队列由消费程序来写入到数据库中。 但这个方案有一些明显的缺点:

系统高耦合,侵入式代码,使得业务逻辑复杂度增加

方案不通用,每一套同步都需要额外定制,不仅增加业务处理时间,还会提升软件复复杂度

工作量和复杂度增加

在业务中编写同步方案,虽然在项目早期比较方便,但随着数据量和系统的发展壮大,往往最后会成为业务的大痛点。

解决思路及方案

调整架构

既然以往的方案有明显的缺点,那我们如何来解决它呢?优秀的解决方案往往是 “通过架构来解决问题“,那么能不能通过架构的思想来解决问题呢? 答案是可以的。我们可以将程序伪装成 “从数据库”,主库的增量变化会传递到从库,那这个伪装成 “从数据库” 的程序就能实时获取到数据变化,然后将增量的变化推送到消息队列 MQ,后续消费者消耗 MQ 的数据,然后经过处理之后再推送到各自需要的数据库。 这个架构的核心是通过监听 MySQL 的 binlog 来同步增量数据,通过基于 query 的查询旧表来同步旧数据,这就是本文要讲的一种异构数据库同步的实践。

改进数据库

经过深度的调研,成功得到了一套异构数据库同步方案,并且成功将公司生产环境下的 robin/logs 的表同步到了 elasticsearch 上。 首先对 MySQL 开启 binlog,但是由于 maxwell 需要的 binlog_format=row 原本的生产环境的数据库不宜修改。这里请教了海杨前辈,他提供了”从库联级“的思路,在从库中监听 binlog 绕过了操作生产环境重启主库的操作,大大降低了系统风险。 后续操作比较顺利,启动 maxwell 监听从库变化,然后将增量变化推送到 kafka ,最后配置 logstash 消费 kafka中的数据变化事件信息,将结果推送到 elasticsearch。配置 logstash需要结合表结构,这是整套方案实施的重点。 这套方案使用到了kafka、maxwell、logstash、elasticsearch。其中 elasticsearch 与 kafka已经在生产环境中有部署,所以无需单独部署维护。而 logstash 与 maxwell 只需要修改配置文件和启动命令即可快速上线。整套方案的意义不仅在于成本低,而且可以大规模使用,公司内有 MySQL 同步到其它数据库的需求时,都可以上任。

成果展示前后对比

- 使用该方案同步和业务实现同步的对比

052d642c-c47a-11ed-bfe3-dac502259ad0.png

- 写入到 elasticsearch 性能对比 (8核4G内存)

项目 logstash 业务同步
写入速度 1500 条/s 200 条/s

经过对比测试,800w 数据量全量同步,使用 logstash 写到 elasticsearch,实际需要大概 3 小时,而旧方案的写入时间需要 2.5 天。

方案实施细节

接下来,我们来看看具体是如何实现的。 本方案无需编写额外代码,非侵入式的,实现 MySQL数据与 elasticsearch 数据库的同步。

0549731a-c47a-11ed-bfe3-dac502259ad0.png

下列是本次方案需要使用所有的组件:

MySQL

Kafka

Maxwell(监听 binlog)

Logstash(将数据同步给 elasticsearch)

Elasticsearch

1. MySQL配置

本次使用 MySQL 5.5 作示范,其他版本的配置可能稍许不同需要

首先我们需要增加一个数据库只读的用户,如果已有的可以跳过。

-- 创建一个 用户名为 maxwell 密码为 xxxxxx 的用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

开启数据库的 `binlog`,修改 `mysql` 配置文件,注意 `maxwell` 需要的 `binlog` 格式必须是`row`。

# /etc/mysql/my.cnf


[mysqld]
# maxwell 需要的 binlog 格式必须是 row
binlog_format=row


# 指定 server_id 此配置关系到主从同步需要按情况设置,
# 由于此mysql没有开启主从同步,这边默认设置为 1
server_id=1


# logbin 输出的文件名, 按需配置
log-bin=master
重启 MySQL 并查看配置是否生效:
sudo systemctl restart mysqld
select @@log_bin;
-- 正确结果是 1
select @@binlog_format;
-- 正确结果是 ROW
如果要监听的数据库开启了主从同步,并且不是主数据库,需要再从数据库开启 binlog 联级同步。
# /etc/my.cnf


log_slave_updates = 1

需要被同步到 elasticsearch 的表结构。
-- robin.logs
show create table robin.logs;


-- 表结构
CREATE TABLE `logs` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `content` text NOT NULL,
  `user_id` int(11) NOT NULL,
  `status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL,
  `type` varchar(20) DEFAULT '',
  `meta` text,
  `created_at` bigint(15) NOT NULL,
  `idx_host` varchar(255) DEFAULT '',
  `idx_domain_id` int(11) unsigned DEFAULT NULL,
  `idx_record_value` varchar(255) DEFAULT '',
  `idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL,
  `idx_orig_record_value` varchar(255) DEFAULT '',
  PRIMARY KEY (`id`),
  KEY `created_at` (`created_at`)
) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8

2.Maxwell 配置

本次使用 maxwell-1.39.2 作示范, 确保机器中包含 java 环境, 推荐 openjdk11

下载 maxwell 程序

wget https://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gz
tar zxvf maxwell-1.39.2.tar.gz **&&**  cd maxwell-1.39.2
maxwell 使用了两个数据库:

一个是需要被监听binlog的数据库(只需要读权限)

另一个是记录maxwell服务状态的数据库,当前这两个数据库可以是同一个

重要参数说明:

host 需要监听binlog的数据库地址

port 需要监听binlog的数据库端口

user 需要监听binlog的数据库用户名

password 需要监听binlog的密码

replication_host 记录maxwell服务的数据库地址

replication_port 记录maxwell服务的数据库端口

replication_user 记录maxwell服务的数据库用户名

filter 用于监听binlog数据时过滤不需要的数据库数据或指定需要的数据库

producer 将监听到的增量变化数据提交给的消费者 (如 stdout、kafka)

kafka.bootstrap.servers kafka 服务地址

kafka_version kafka 版本

kafka_topic 推送到kafka的主题

启动 maxwell

注意,如果 kafka 配置了禁止自动创建主题,需要先自行在 kafka 上创建主题,kafka_version 需要根据情况指定, 此次使用了两张不同的库

./bin/maxwell 
        --host=mysql-maxwell.mysql.svc.cluster.fud3 
        --port=3306 
        --user=root 
        --password=password 
        --replication_host=192.168.5.38 
        --replication_port=3306 
        --replication_user=cloner 
        --replication_password=password
        --filter='exclude: *.*, include: robin.logs' 
        --producer=kafka 
        --kafka.bootstrap.servers=192.168.30.10:9092 
        --kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1

3. 安装 Logstash

Logstash 包中已经包含了 openjdk,无需额外安装。

wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz
tar zxvf logstash-8.5.0-linux-x86_64.tar.gz
删除不需要的配置文件。
rm config/logstash.yml
修改logstash配置文件,此处语法参考官方文档(https://www.elastic.co/guide/en/logstash/current/input-plugins.html)。
# config/logstash-sample.conf


input {
 kafka {
    bootstrap_servers => "192.168.30.10:9092"
    group_id => "main"
    topics => ["maxwell-robinlogs"]
 }
}


filter {
  json {
    source => "message"
  }


  # 将maxwell的事件类型转化为es的事件类型
  # 如增加 -> index 修改-> update
  translate {
    source => "[type]"
    target => "[action]"
    dictionary => {
      "insert" => "index"
      "bootstrap-insert" => "index"
      "update" => "update"
      "delete" => "delete"
    }
    fallback => "unknown"
  }


  # 过滤无效的数据
  if ([action] == "unknown") {
    drop {}
  }


  # 处理数据格式
  if [data][idx_host] {
    mutate {
      add_field => { "idx_host" => "%{[data][idx_host]}" }
    }
  } else {
    mutate {
      add_field => { "idx_host" => "" }
    }
  }


  if [data][idx_domain_id] {
    mutate {
      add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" }
    }
  } else {
    mutate {
      add_field => { "idx_domain_id" => "" }
    }
  }


  if [data][idx_record_value] {
    mutate {
      add_field => { "idx_record_value" => "%{[data][idx_record_value]}" }
    }
  } else {
    mutate {
      add_field => { "idx_record_value" => "" }
    }
  }
  
   if [data][idx_record_opt] {
    mutate {
      add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" }
    }
  } else {
    mutate {
      add_field => { "idx_record_opt" => "" }
    }
  }
 
  if [data][idx_orig_record_value] {
    mutate {
      add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" }
    }
  } else {
    mutate {
      add_field => { "idx_orig_record_value" => "" }
    }
  }
 
  if [data][type] {
    mutate {
      replace => { "type" => "%{[data][type]}" }
    }
  } else {
    mutate {
      replace => { "type" => "" }
    }
  }
 
  mutate {
    add_field => {
      "id" => "%{[data][id]}"
      "content" => "%{[data][content]}"
      "user_id" => "%{[data][user_id]}"
      "status" => "%{[data][status]}"
      "meta" => "%{[data][meta]}"
      "created_at" => "%{[data][created_at]}"
    }
    remove_field => ["data"]
  }


  mutate {
    convert => {
      "id" => "integer"
      "user_id" => "integer"
      "idx_domain_id" => "integer"
      "created_at" => "integer"
    }
  }


  # 只提炼需要的字段
  mutate {
    remove_field => [
      "message",
      "original",
      "@version",
      "@timestamp",
      "event",
      "database",
      "table",
      "ts",
      "xid",
      "commit",
      "tags"
    ]
   }
}


output {
  # 结果写到es
  elasticsearch {
    hosts => ["http://es-zico2.service.upyun:9500"]
    index => "robin_logs"
    action => "%{action}"
    document_id => "%{id}"
    document_type => "robin_logs"
  }


  # 结果打印到标准输出
  stdout {
    codec => rubydebug
  }
}
执行程序:
# 测试配置文件*
bin/logstash -f config/logstash-sample.conf --config.test_and_exit


# 启动*
bin/logstash -f config/logstash-sample.conf --config.reload.automatic

4. 全量同步

完成启动后,后续的增量数据 maxwell 会自动推送给 logstash 最终推送到 elasticsearch ,而之前的旧数据可以通过 maxwell 的 bootstrap 来同步,往下面表中插入一条任务,那么 maxwell 会自动将所有符合条件的 where_clause 的数据推送更新。

INSERT INTO maxwell.bootstrap 
        ( database_name, table_name, where_clause, client_id ) 
values 
        ( 'robin', 'logs', 'id > 1', 'maxwell' );
后续可以在 elasticsearch 检测数据是否同步完成,可以先查看数量是否一致,然后抽样对比详细数据。
# 检测 elasticsearch  中的数据量
GET robin_logs/robin_logs/_count





审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据库
    +关注

    关注

    7

    文章

    3796

    浏览量

    64367
  • MySQL
    +关注

    关注

    1

    文章

    805

    浏览量

    26544
  • Maxwell
    +关注

    关注

    4

    文章

    36

    浏览量

    12646

原文标题:如何高效实现MySQL与elasticsearch的数据同步

文章出处:【微信号:OSC开源社区,微信公众号:OSC开源社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    数据数据恢复—Mysql数据库表记录丢失的数据恢复流程

    Mysql数据库故障: Mysql数据库表记录丢失。 Mysql数据库故障表现: 1、
    的头像 发表于 12-16 11:05 124次阅读
    <b class='flag-5'>数据</b>库<b class='flag-5'>数据</b>恢复—<b class='flag-5'>Mysql</b><b class='flag-5'>数据</b>库表记录丢失的<b class='flag-5'>数据</b>恢复流程

    数据数据恢复—MYSQL数据库ibdata1文件损坏的数据恢复案例

    mysql数据库故障: mysql数据库文件ibdata1、MYI、MYD损坏。 故障表现:1、数据库无法进行查询等操作;2、使用my
    的头像 发表于 12-09 11:05 144次阅读

    香港云服务器怎么部署MySQL数据库?

    在香港云服务器上部署MySQL数据库的步骤如下: 步骤 1: 更新软件包列表 首先,确保软件包列表是最新的。在终端中执行以下命令: sudo apt update 步骤 2: 安装 MySQL
    的头像 发表于 11-14 16:15 159次阅读

    Elasticsearch 再次开源

    Elasticsearch 和 Kibana 又可以被称为开源了。很难表达这句话让我有多高兴。我激动得简直要跳起来了。我们 Elastic 的所有人都是如此。开源是我的 DNA。这也是Elastic的DNA。能够再次将 Elasticsearch 称为开源,我感到非常高兴
    的头像 发表于 11-13 12:14 130次阅读
    <b class='flag-5'>Elasticsearch</b> 再次开源

    适用于MySQL的dbForge架构比较

    dbForge Schema Compare for MySQL 是一种工具,用于轻松有效地比较和部署 MySQL 数据库结构和脚本文件夹差异。该工具提供了 MySQL
    的头像 发表于 10-28 09:41 195次阅读
    适用于<b class='flag-5'>MySQL</b>的dbForge架构比较

    MySQL的整体逻辑架构

    支持多种存储引擎是众所周知的MySQL特性,也是MySQL架构的关键优势之一。如果能够理解MySQL Server与存储引擎之间是怎样通过API交互的,将大大有利于理解MySQL的核心
    的头像 发表于 04-30 11:14 450次阅读
    <b class='flag-5'>MySQL</b>的整体逻辑架构

    Redis与MySQL协同升级企业缓存

    传统的MySQL数据库在处理大规模应用时已经到了瓶颈,RedisEnterprise怎样助力突破这一瓶颈?RedisEnterprise与MYSQL共同用作企业级缓存或副本数据库,会产
    的头像 发表于 02-19 13:18 370次阅读
    Redis与<b class='flag-5'>MySQL</b>协同升级企业缓存

    labview 创建mysql 表时 设置时间 怎么在mysql中是格式是date 而不是datetime?

    选择 时间日期 但是在mysql中是date而不是datetime类型 ,除了sql语句创建表 ,怎么能实现创建表中数据为datetime类型
    发表于 02-04 09:46

    如何将MS访问数据转换为MySQL

    借助dbForgeStudio for MySQL,您可以轻松地将数据从MicrosoftAccess迁移到MySQL,并保持数据和功能的完整性。这个过程将允许您利用更具可伸缩性和功能
    的头像 发表于 01-23 13:47 423次阅读
    如何将MS访问<b class='flag-5'>数据</b>转换为<b class='flag-5'>MySQL</b>

    传送网如何实现频率同步和时间同步

    、频率同步 在传送网中,频率同步是指网络中的各个节点之间的时钟频率保持一致,以便实现数据传输的精确同步。在频率
    的头像 发表于 01-16 14:42 1125次阅读

    MySQL密码忘记了怎么办?MySQL密码快速重置方法步骤命令示例!

    MySQL密码忘记了怎么办?MySQL密码快速重置方法步骤命令示例! MySQL是一种常用的关系型数据库管理系统,如果你忘记了
    的头像 发表于 01-12 16:06 743次阅读

    怎么简单实现由Labview读取的串口数据自增写入mysql5.7数据库中?

    怎么简单实现由Labview读取的串口数据自增写入mysql5.7数据库中? 已实现:串口数据
    发表于 01-11 22:05

    导致MySQL索引失效的情况以及相应的解决方法

    导致MySQL索引失效的情况以及相应的解决方法  MySQL索引的目的是提高查询效率,但有些情况下索引可能会失效,导致查询变慢或效果不如预期。下面将详细介绍导致MySQL索引失效的情况
    的头像 发表于 12-28 10:01 755次阅读

    mysql怎么新建一个数据

    mysql怎么新建一个数据库 如何新建一个数据库在MySQL中 创建一个数据库是MySQL中的基
    的头像 发表于 12-28 10:01 886次阅读

    mysql密码忘了怎么重置

    mysql密码忘了怎么重置  MySQL是一种开源的关系型数据库管理系统,密码用于保护数据库的安全性和保密性。如果你忘记了MySQL的密码,
    的头像 发表于 12-27 16:51 6671次阅读