flinkCDC-旧版配置

官方文档:MySQL CDC Connector
github doc:flink-cdc/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md at master · apache/flink-cdc

一、MySQL设置

  1. MySQL需要开启binlog,格式设置为ROW

    log-bin = /data/mysql8_data/mysql/mysql-bin
    binlog_format=ROW
  2. 定义一个 MySQL 用户,该用户对 Debezium MySQL 连接器监控的所有数据库具有适当的权限

    CREATE USER 'user' IDENTIFIED BY 'password';
    GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'@'%' IDENTIFIED BY 'password';

二、flink配置

  1. 下载 flink-sql-connector-mysql-cdc-3.0.0.jar 并将其放在flink/lib目录;
  2. 设置内存与任务槽个数

参考文档:
TaskManager内存配置 jobManager内存配置 flink参数配置

vim conf/flink-conf.yaml
#一般来讲,我们设定该参数时可以将它理解成一个TaskManager可以利用的CPU核心数
taskmanager.numberOfTaskSlots: 16

taskmanager.memory.flink.size: 8192m
jobmanager.memory.flink.size: 4096m

三、编写代码

  1. pom文件配置:

    <dependencies>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.1</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.18.1</version>
        </dependency>
    
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>3.0.0</version>
            <scope>provided</scope>
        </dependency>
    
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.43</version>
        </dependency>
    
    </dependencies>
    <build>
        <finalName>mysql-cdc-info</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.dudu.flink.cdc.MysqlCDCTest</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  2. 主入口函数

 //1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2.通过FlinkCDC构建SourceFunction并读取数据
MySqlSource<String> build = MySqlSource.<String>builder()
        .hostname("127.0.0.1")
        .port(33306)
        .username("root")
        .password("Qs@12345")
        .databaseList("d_mid")
        .tableList("d_mid.user")   //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
        .deserializer(new JsonDebeziumDeserializationSchema())
        .startupOptions(StartupOptions.initial())
        .build();

// enable checkpoint
env.enableCheckpointing(3000);
env.fromSource(build, WatermarkStrategy.noWatermarks(), "MySQL Source")
        .setParallelism(4)
        .print().setParallelism(1);

env.execute("Print MySQL Snapshot + Binlog");

四、上传任务

webUi上传

命令启动

官网参考文档

启动
./bin/flink run --detached ../jobs/mysql-cdc-demo.jar

退出任务
直接退出任务:cancel命令。(仅在测试操作下使用!!!!!!!!)
./bin/flink cancel "2d00d9650411c869818de197262d2df1"

使用stop命令退出任务

退出任务应尽量使用stop命令,stop操作结合了cancelsavepoint操作来停止正在运行的作业,同时还创建一个保存点以重新开始。

  1. 创建保存点

如果在conf/flink-conf.yaml中没有配置state.savepoints.dir,则在创建保存点的时候需要指定路径:

./bin/flink savepoint 5091f1c0327eea4bd71a8397ff0b13ca /tmp/flink-savepoints


如果在conf/flink-conf.yaml中配置了state.savepoints.dir,则可以不指定savepoint保存的目录,会默认使用state.savepoints.dir配置的路径:

  1. 使用保存点停止现有的作业

如果在conf/flink-conf.yaml中配置了state.savepoints.dir,则可以直接使用stop命令,会自动生成保存点。

否则使用如下命令使用保存点停止作业:

./bin/flink stop --savepointPath /tmp/flink-savepoints 5091f1c0327eea4bd71a8397ff0b13ca

从保存点恢复作业
./bin/flink run --detached --fromSavepoint /tmp/flink-savepoints/savepoint-5091f1-a8912663a0ca/_metadata ../jobs/mysql-cdc-hotline-demo.jar

五、更新作业

参考官方文档:从保存点恢复作业

  1. 使用保存点停止现有的 Flink 作业。

    ./bin/flink stop $Existing_Flink_JOB_ID
  2. 从保存点恢复更新的 Flink 作业。

    ./bin/flink run \
      --detached \ 
      --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
      ./FlinkCDCExample.jar

flinkCDC-新版本-管道连接器配置

flink-cdc/docs/content/docs/get-started/quickstart/mysql-to-doris.md at master · apache/flink-cdc
以上链接对应以下文档(防止github文档迁移导致找不到):flinkCDC3.0-mysql-to-doris.md

官方文档地址:
MySQL to Doris

flink-cluster 配置

  1. 在配置文件conf/flink-conf.yaml中启用检查点,execution.checkpointing.interval没有默认值,所以这里需要手动指定。配置可参考:配置参数
execution.checkpointing.interval: 3000
  1. 设置FLINK_HOME环境变量:

flink-cdc在bin/flink-cdc.sh文件中配置了FLINK_HOME的判断。不配置会报错

export FLINK_HOME=/home/flink/flink-1.18.0
export PATH=$PATH:$FLINK_HOME/bin

或者在启动命令上加上:

--flink-home /data/software/flink/flink-1.18.0

编写mysql-to-doris.yaml文件

mysql-pipeline管道连接配置 doris-pipeline管道连接配置

source:
  type: mysql
  hostname: 127.0.0.1
  port: 33306
  username: root
  password: Qs@pass
  #d_mid库所有以sys_开头的表
  tables: d_mid.sys_\.*
  server-id: 5488
  server-time-zone: Asia/Shanghai

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: "hits@pass"
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 1

执行

./bin/flink-cdc.sh mysql-to-doris.yaml 
#--flink-home /data/software/flink/flink-1.18.0

从保存点恢复作业

savePoint的形式

  1. 首先根据jobId停止作业

    ./flink-1.18.0/bin/flink stop 4a8d565fe295f70d1cb25c1ec45a057d

  2. 根据上图的Savepoint path,在conf/flink-conf.yaml中配置execution.savepoint.path

    execution.savepoint.path: /home/flink/flink-savepoints/savepoint-4a8d56-765ea2800a9b
  3. 重新启动cdc作业

    cd flink-cdc-3.0.0
    ./bin/flink-cdc.sh job/allDatabase-to-doris.yaml --flink-home /home/flink/flink-1.18.0

checkPoint的形式

首先保证配置开启了[RETAIN_ON_CANCELLATION](https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention)[checkpoint-storage](https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/config/#state-checkpoint-storage),如果没有开启,手动cancel是不会保留checkPoint目录的。

execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
#Flink 支持的文件系统中用于存储检查点的数据文件和元数据的默认目录。存储路径必须可从所有参与的进程/节点(即所有 TaskManager 和 JobManager)访问。
state.checkpoints.dir: file:///data/software/flink/flink-checkpoints
#保存点的默认目录。由将保存点写入文件系统的状态后端使用(HashMapStateBackend、EmbeddedRocksDBStateBackend)
state.savepoints.dir: file:///data/software/flink/flink-savepoints
#可识别的快捷方式名称是“jobmanager”和“filesystem”。
state.checkpoint-storage: filesystem
  1. 退出任务

根据jobId退出任务

./bin/flink cancel dc42b6881e759e82ddf69a6c718b3ac3


直接在webUI上手动cancel任务

  1. 查看checkpoints目录,发现checkPoint文件夹并没有删除。

  1. 根据指定的checkPoint目录在conf/flink-conf.yaml中配置execution.savepoint.path

    execution.savepoint.path: /home/flink/flink-checkpoints/dc42b6881e759e82ddf69a6c718b3ac3/chk-999
  2. 重新启动cdc作业

    cd flink-cdc-3.0.0
    ./bin/flink-cdc.sh job/allDatabase-to-doris.yaml --flink-home /home/flink/flink-1.18.0

By zk

一个程序员

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注