下载

为了运行Flink,需提前安装好 Java 8 或以上.
下载flink1.18.1安装包并解压。

tar -zxf flink-1.18.1-bin-scala_2.12.tgz
cd flink-1.18.1

修改配置

修改flink-conf.yaml

vim conf/flink-conf.yaml

官方文档-配置参数

修改监听端口

# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
rest.port: 8111

修改绑定ip,使webUi可以外部访问。

# To enable this, set the bind address to one that has access to outside-facing
# network interface, such as 0.0.0.0.
rest.bind-address: 0.0.0.0

修改state配置


#修改任务槽数,建议和cpu核数一致
taskmanager.numberOfTaskSlots: 16

#默认的快捷指定方式是:'hashmap' 和 'rocksdb'.
state.backend.type: hashmap
#Flink 支持的文件系统中用于存储检查点的数据文件和元数据的默认目录。存储路径必须可从所有参与的进程/节点(即所有 TaskManager 和 JobManager)访问。
state.checkpoints.dir: file:///data/software/flink/flink-checkpoints
#保存点的默认目录。由将保存点写入文件系统的状态后端使用(HashMapStateBackend、EmbeddedRocksDBStateBackend)
state.savepoints.dir: file:///data/software/flink/flink-savepoints
#可识别的快捷方式名称是“jobmanager”和“filesystem”。
state.checkpoint-storage: filesystem
#flink默认只保留最近成功生成的1个Checkpoint,此配置可以指定要保留的已完成检查点的最大数量
state.checkpoints.num-retained: 10
#此配置定义了在job取消时应如何清理checkpoint
#execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

state配置
externalized-checkpoint-retention配置

内存配置

参考文档:内存调优指南 配置 Flink 进程的内存

配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

配置项 TaskManager 配置参数 JobManager 配置参数
Flink 总内存 taskmanager.memory.flink.size jobmanager.memory.flink.size
进程总内存 taskmanager.memory.process.size jobmanager.memory.process.size

不建议同时设置_进程总内存_和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。

独立部署模式下,我们通常更关注 Flink 应用本身使用的内存大小。 建议配置 Flink 总内存taskmanager.memory.flink.size 或者 jobmanager.memory.flink.size)或其组成部分。 此外,如果出现 Metaspace 不足的问题,可以调整 JVM Metaspace 的大小。
这种情况下通常无需配置进程总内存,因为不管是 Flink 还是部署环境都不会对 JVM 开销 进行限制,它只与机器的物理资源相关。

#根据服务器实际情况选择
#taskmanager.memory.process.size: 8192m
#jobmanager.memory.process.size: 4096m
taskmanager.memory.flink.size: 8192m
jobmanager.memory.flink.size: 4096m

修改pekko

#由于机器速度慢或网络拥塞肯呢个会造成超时,需要配置timeout
pekko.ask.timeout: 120s

修改bin/config.sh文件

Flink启动时会把启动的进程的ID(pid)默认存到系统的/tmp目录下的一个文件中。由于是临时目录,会被系统清理,所以会导致pid找不到,导致使用./bin/stop-cluster.sh无法关闭集群。因此我们需要修改config.sh文件中默认的DEFAULT_ENV_PID_DIR目录。

vim bin/config.sh

DEFAULT_ENV_PID_DIR="/home/flink/flink-1.18.0/tmp"


kill掉flink的进程之后重新使用./bin/start-cluster.sh启动集群,查看新建的tmp目录,如下:

启动集群

Flink 附带了一个 bash 脚本,可以用于启动本地集群。

./bin/start-cluster.sh

测试提交一个作业

Flink 的 Releases 附带了许多的示例作业。你可以任意选择一个,快速部署到已运行的集群上

./bin/flink run examples/streaming/WordCount.jar
tail log/flink-*-taskexecutor-*.out

停止集群

./bin/stop-cluster.sh

By zk

一个程序员

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注