flinkCDC-旧版配置
官方文档:MySQL CDC Connector
github doc:flink-cdc/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md at master · apache/flink-cdc
一、MySQL设置
-
MySQL需要开启binlog,格式设置为ROW
log-bin = /data/mysql8_data/mysql/mysql-bin binlog_format=ROW
-
定义一个 MySQL 用户,该用户对 Debezium MySQL 连接器监控的所有数据库具有适当的权限
CREATE USER 'user' IDENTIFIED BY 'password'; GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'@'%' IDENTIFIED BY 'password';
二、flink配置
- 下载
flink-sql-connector-mysql-cdc-3.0.0.jar
并将其放在flink/lib
目录; - 设置内存与任务槽个数
参考文档:
TaskManager内存配置 jobManager内存配置 flink参数配置
vim conf/flink-conf.yaml
#一般来讲,我们设定该参数时可以将它理解成一个TaskManager可以利用的CPU核心数
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.flink.size: 8192m
jobmanager.memory.flink.size: 4096m
三、编写代码
-
pom文件配置:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.18.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.18.1</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>3.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.43</version> </dependency> </dependencies> <build> <finalName>mysql-cdc-info</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.dudu.flink.cdc.MysqlCDCTest</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
-
主入口函数
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.通过FlinkCDC构建SourceFunction并读取数据
MySqlSource<String> build = MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(33306)
.username("root")
.password("Qs@12345")
.databaseList("d_mid")
.tableList("d_mid.user") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
// enable checkpoint
env.enableCheckpointing(3000);
env.fromSource(build, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("Print MySQL Snapshot + Binlog");
四、上传任务
webUi上传
命令启动
启动
./bin/flink run --detached ../jobs/mysql-cdc-demo.jar
退出任务
直接退出任务:cancel命令。(仅在测试操作下使用!!!!!!!!)
./bin/flink cancel "2d00d9650411c869818de197262d2df1"
使用stop命令退出任务
退出任务应尽量使用stop
命令,stop
操作结合了cancel
和 savepoint
操作来停止正在运行的作业,同时还创建一个保存点以重新开始。
- 创建保存点
如果在conf/flink-conf.yaml
中没有配置state.savepoints.dir
,则在创建保存点的时候需要指定路径:
./bin/flink savepoint 5091f1c0327eea4bd71a8397ff0b13ca /tmp/flink-savepoints
如果在conf/flink-conf.yaml
中配置了state.savepoints.dir
,则可以不指定savepoint保存的目录,会默认使用state.savepoints.dir
配置的路径:
- 使用保存点停止现有的作业
如果在conf/flink-conf.yaml
中配置了state.savepoints.dir
,则可以直接使用stop
命令,会自动生成保存点。
否则使用如下命令使用保存点停止作业:
./bin/flink stop --savepointPath /tmp/flink-savepoints 5091f1c0327eea4bd71a8397ff0b13ca
从保存点恢复作业
./bin/flink run --detached --fromSavepoint /tmp/flink-savepoints/savepoint-5091f1-a8912663a0ca/_metadata ../jobs/mysql-cdc-hotline-demo.jar
五、更新作业
参考官方文档:从保存点恢复作业
-
使用保存点停止现有的 Flink 作业。
./bin/flink stop $Existing_Flink_JOB_ID
-
从保存点恢复更新的 Flink 作业。
./bin/flink run \ --detached \ --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \ ./FlinkCDCExample.jar
flinkCDC-新版本-管道连接器配置
flink-cdc/docs/content/docs/get-started/quickstart/mysql-to-doris.md at master · apache/flink-cdc
以上链接对应以下文档(防止github文档迁移导致找不到):flinkCDC3.0-mysql-to-doris.md
官方文档地址:
MySQL to Doris
flink-cluster 配置
- 在配置文件
conf/flink-conf.yaml
中启用检查点,execution.checkpointing.interval
没有默认值,所以这里需要手动指定。配置可参考:配置参数
execution.checkpointing.interval: 3000
- 设置FLINK_HOME环境变量:
flink-cdc在bin/flink-cdc.sh
文件中配置了FLINK_HOME
的判断。不配置会报错
export FLINK_HOME=/home/flink/flink-1.18.0
export PATH=$PATH:$FLINK_HOME/bin
或者在启动命令上加上:
--flink-home /data/software/flink/flink-1.18.0
编写mysql-to-doris.yaml文件
mysql-pipeline管道连接配置 doris-pipeline管道连接配置
source:
type: mysql
hostname: 127.0.0.1
port: 33306
username: root
password: Qs@pass
#d_mid库所有以sys_开头的表
tables: d_mid.sys_\.*
server-id: 5488
server-time-zone: Asia/Shanghai
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: "hits@pass"
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 1
执行
./bin/flink-cdc.sh mysql-to-doris.yaml
#--flink-home /data/software/flink/flink-1.18.0
从保存点恢复作业
savePoint的形式
-
首先根据jobId停止作业
./flink-1.18.0/bin/flink stop 4a8d565fe295f70d1cb25c1ec45a057d
-
根据上图的Savepoint path,在
conf/flink-conf.yaml
中配置execution.savepoint.path
execution.savepoint.path: /home/flink/flink-savepoints/savepoint-4a8d56-765ea2800a9b
-
重新启动cdc作业
cd flink-cdc-3.0.0 ./bin/flink-cdc.sh job/allDatabase-to-doris.yaml --flink-home /home/flink/flink-1.18.0
checkPoint的形式
首先保证配置开启了[RETAIN_ON_CANCELLATION](https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention)
和[checkpoint-storage](https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/config/#state-checkpoint-storage)
,如果没有开启,手动cancel是不会保留checkPoint目录的。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
#Flink 支持的文件系统中用于存储检查点的数据文件和元数据的默认目录。存储路径必须可从所有参与的进程/节点(即所有 TaskManager 和 JobManager)访问。
state.checkpoints.dir: file:///data/software/flink/flink-checkpoints
#保存点的默认目录。由将保存点写入文件系统的状态后端使用(HashMapStateBackend、EmbeddedRocksDBStateBackend)
state.savepoints.dir: file:///data/software/flink/flink-savepoints
#可识别的快捷方式名称是“jobmanager”和“filesystem”。
state.checkpoint-storage: filesystem
- 退出任务
根据jobId退出任务
./bin/flink cancel dc42b6881e759e82ddf69a6c718b3ac3
或
直接在webUI上手动cancel任务
- 查看checkpoints目录,发现checkPoint文件夹并没有删除。
-
根据指定的checkPoint目录在
conf/flink-conf.yaml
中配置execution.savepoint.path
execution.savepoint.path: /home/flink/flink-checkpoints/dc42b6881e759e82ddf69a6c718b3ac3/chk-999
-
重新启动cdc作业
cd flink-cdc-3.0.0 ./bin/flink-cdc.sh job/allDatabase-to-doris.yaml --flink-home /home/flink/flink-1.18.0