Skip to content

🐂🍺👏🚀 腾讯云TDSQL同步数据到Elasticsearch、Clickhouse

License

Notifications You must be signed in to change notification settings

0000005/sync2any

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

52 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

sync2any可以借助腾讯云数据订阅(DTS)将腾讯云数据库(mysql、tdsql)中的数据实时同步到Elasticsearch(7.x)或mysql。

首先使用mysqldump同步原始数据,再读取CKAFKA中的队列消息实时同步到Elasticsearch或mysql中。

实时同步数据流为:TDSQL -> CKAFKA -> sync2any -> Elasticsearch

  • 支持的数据源:TDSQL
  • 支持的目标源:Mysql、ElasticSearch、Clickhouse

image

为什么开发这个项目?

我们使用了腾讯云的TDSQL,但是腾讯云唯一提供可用的实时导出数据流只有CKAFKA这一个渠道(binlog不可用,因为有多台实例)。于是如果我们想将数据导入到ES或者其他数据源,必须得自己开发中间件,别无他法。

使用先决条件

  • 购买使用了腾讯云提供的TDSQL

状态面板

image

配置与安装

腾讯云端的配置

  1. 登录腾讯云->数据传输服务->数据订阅
  2. 新建数据订阅,选择好库表
  3. 新建消费组

本地启动

  1. 安装好mysqldump
  2. 从Release中下载最新版的源码,或者编译好的jar包
  3. 将config文件夹复制到jar包同目录。
  4. /config/application-test.yml下的配置文件修改成自己对应的配置文件
  5. 执行java -jar sync2any.jar --spring.profiles.active = test运行程序
  6. 访问http://127.0.0.1:9070查看同步状态

配置文件详解

错误的配置可能会导致项目启动报错。配置文件采用yml格式,不熟悉的同学可以先学习一下。

#【必填】腾讯云CKAFKA配置
kafka:
  address: 127.0.0.1:32768

#【必填】同步目标目标的基本配置(支持mysql、es、clickhouse)
target.datasources:
  -
    #【必填】标识数据源,每个必须不一样
    db-id: 1
    #目标数据源的类型(可以为es或mysql或clickhouse)
    type: es
    #当为es时可以填写多个地址,以逗号分割
    url: 192.168.10.208:9200,192.168.10.209:9200
    username: elastic
    password: changeme
  -
    #【必填】标识数据源,每个必须不一样
    db-id: 2
    #目标数据源的类型(可以为es或mysql或clickhouse)
    type: mysql
    url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&useSSL=false&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&useOldAliasMetadataBehavior=true&allowMultiQueries=true&serverTimezone=Hongkong
    username: root
    password: root
  -
    #【必填】标识数据源,每个必须不一样
    db-id: 3
    #目标数据源的类型(可以为es或mysql或clickhouse)
    type: clickhouse
    url: jdbc:clickhouse://127.0.0.1:8121/test
    username: default
    password: default


#【必填】源数据库【Tdsql】,可以配置多个数据库
source.mysql:
  datasources:
    -
      #【必填】标识数据源,每个必须不一样
      db-id: 1
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&useSSL=false&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&useOldAliasMetadataBehavior=true&allowMultiQueries=true&serverTimezone=Hongkong
      username: test
      password: test

#【必填】配置同步到elasticsearch的基本规则
sync2any:
  #【选填】mysqldump工具的地址
  mysqldump: D:\program\mysql-5.7.25-winx64\bin\mysqldump.exe
  #【选填】监控告警,只有填写了此参数才能开启监控告警,具体配置参考下面章节。多个secret用逗号分隔。
  alert:
    secret: aaaa
    
  # 规则比较灵活,可以配置多个
  sync-config-list:
    -
      #【必填】待同步的源数据库ID
      source-db-id: 1
      #【必填】同步到的目标源数据库ID
      target-db-id: 2
      #【必填】要同步的表名,支持正则表达式,多个表名用逗号分隔
      sync-tables: "t_member,t_member_order_[0-9]{10}"
      #【选填】延迟超过60秒,将会触发告警
      max-delay-in-second: 60
      #【选填】超过120分钟没接收到同步消息,将会触发告警
      max-idle-in-minute: 120
      #【选填】告警发生180分钟后,如果未恢复,则再次告警
      next-trigger-alert-in-minute: 180
      #【选填】默认表名后缀(优先级比rules.index_table小)
      target-table-suffix: _all
      #【选填】是否载入原始数据(0否,1是;默认开启【1】)
      dump-origin-data: 0
      mq:
        # 监听的CKAFKA的topic名称
        topic-name: test-t_member
        #消费者使用的topicGroup。每次重启本应用都会从kafka的"earliest"处开始读取。(多个同步任务不可以使用同一个topicGroup)
        topic-group: local-test-consumer-group
        #mq帐号
        username: test
        #mq密码
        password: test
      #【选填】此处可以配置TDSQL到elasticsearch的映射规则
      rules:
        -
          # 匹配此rule的表名,支持正则表达式
          table: t_member_order_[0-9]{10}
          # 同步到目标数据源的表名或index(对es来说)
          index_table: t_member_order
          # 【可选,目标数据库只支持Mysql】分表计算器,填写在spring容器中的bean名称
          dynamic_tablename_assigner: mysqlDynamicDataAssign
          # 【可选,当使用分区计算器时必填】分区健
          sharding_key: group_code
          # 【目标数据库为es时】自定义同步到es的字段名称和字段类型(es的类型),字段类型请参考类:com.jte.sync2any.model.es.EsDateType
          map: '{"user_img":"userImg","user_code":",integer","user_age":"userAge,integer"}'
          # 字段过滤,多个字段用逗号分隔。如果有值,则只保留这里填写的字段。
          field-filter: "user_id,user_name"

其他碎碎念

消费位置重置

当我们发现了一些数据错误,想要重新消费之前的消息时可以用到该接口。但是该功能只支持一个分区的情况。

  1. 登录腾讯云,进入数据订阅,找到消费管理中的“查询消费点”功能。
  2. 查询你要重置的时间当时的消费点。
  3. 以put方法调用 http://127.0.0.1:9070/offset?topicName=topic-subs-7a4bu6zz48-tdsqlshard-1dksu10j&sourceDbId=1&topicGroup=consumer-grp-subs-7a4bu6zz48-test&offset=0 接口。

自定义目标数据分发规则

当我们源数据库的A表要按照一定规则同步到目标数据库的A_1,A_2,A_3等多个表时可以用到自定义目标数据分发规则

  1. 配置文件中rules下配置dynamic_tablename_assigner属性,该属性的值为Spring bean的名称。
  2. 该新增的自定义规则类只需要基础抽象类com.jte.sync2any.load.DynamicDataAssign,并实现dynamicTableName方法即可。

一些约定

  • 当tdsql的表中存在数据,且es中index不存在或者index中无document时才会dump原始数据同步到es。
  • es的index默认命名规则为“数据库名-表名”,es那边的字段名和tdsql保持一致。默认所有同步过去的名称都会转化为小写。
  • mysql的table默认命名和源数据库一致。
  • 一旦同步过程中发生任何错误,该任务会停止继续同步,防止数据错乱。其他正常的任务可继续执行,不影响。
  • sync2any会将数据库中的主键当作es中document的主键。碰到复合主键时,多个主键使用“_”符号隔开。
  • 在es中的更新和删除操作都是通过es的主键来定位document
  • 因为在同步原始数据前队列中会堆积消息(binlog),这意味着我们会重复消费到以前的binlog。此时要求update不能修改主键值,否则重复消费binlog时可能导致消息错乱。
  • 当目标数据库的表引擎为CollapsingMergeTree系列时会默认追加_sign列。

最佳实践

  • 如果想要保证数据同步的正确性,创建数据订阅时,kafka的分区数填1。
  • 延迟告警的设置最好大于10秒,因为本身就有3-4秒的延迟。
  • 建议每个表都单独建立一个同步任务,并且用不同的topic。这样可以隔离故障,降低时同步延迟。目前腾讯云的数据订阅费用较高,如果只建立一个同步任务,就只能使用不同的topicGroup进行隔离。

性能

  • CKAFKA中每一条msg都对应数据库中一行被修改的记录,这意味着当你一次修改很多行记录时,同步延迟会加大。
  • 不同的机器性能不一样,要算每个TOPIC的QPS,可以通过控制面板的tpq参数计算。也就是1000(ms)/tpq=QPS。

使用限制

  • 表中必须要有主键。

常见问题

  • 如何判断同步成功?

可以通过count来比较记录的数量,也可以用关键指标(如余额、订单金额)“求和”看两端是否一致。

  • 如果同步途中要新增字段怎么办?

首先需要在目标端(ck或es)添加新的字段,再在源端添加新字段。最后重启sync2any来重新获取源端最新表结构。

  • 其他版本的es能够使用吗?

目前只测试了7.x版本,其他版本可以自己试一试。

  • 同步途中失败了,导致同步停止了该怎么办?

首先确定是否是sync2any的代码问题,如果是解析逻辑有问题,则需要修改bug。如果是es端发生问题,可以重启sync2any重新同步。怕消息有丢失的话,可以到CKAFKA控制台重新设置消费者的位置(可按时间)。如果是同步任务本身有问题,则需要新建同步任务,或工单联系腾讯云。

  • clickhouse中的当表引擎使用版本折叠树时,同步后数据不一致?

**如果目标库clickhouse使用版本折叠树,则初次全量同步和增量同步期间必须得让数据库停止写入数据。(相当于初次同步没有增量数据)**因为当sync2es同步完全量数据之后,再追加增量数据,此时增量数据中包含了一部分重复数据(全量数据的部分数据)。这样会造成版本折叠树无法正确的折叠数据。

参与项目开发

本项目基于spring boot 2.x,充分利用了spring的生态。非常利于扩展和二次开发。如果感兴趣或者需要对接腾讯云其他数据源,可基于本项目进行扩展。

About

🐂🍺👏🚀 腾讯云TDSQL同步数据到Elasticsearch、Clickhouse

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages