华为云对象存储OBSFlink对接OBS_云淘科技

概述

Flink是一个分布式的数据处理引擎,用于处理有界和无界流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。

注意事项

flink-obs-fs-hadoop目前仅支持OBS并行文件系统。
不推荐state状态数据存储在OBS上。
为了减少日志输出,在/opt/flink-1.12.1/conf/log4j.properties文件中增加配置:

logger.obs.name=com.obs
logger.obs.level=ERROR

flink-obs-fs-hadoop的实现基于flink的plugin加载机制(flink从1.9开始引入),flink-obs-fs-hadoop必须通过flink的plugin机制进行加载,即将flink-obs-fs-hadoop放入/opt/flink-1.12.1/plugins/obs-fs-hadoop目录下。

对接步骤

以flink-1.12.1为例。

下载flink-1.12.1-bin-scala_2.11.tgz,并解压到/opt/flink-1.12.1目录。
在/etc/profile文件中增加配置:

export FLINK_HOME=/opt/flink-1.12.1
export PATH=$FLINK_HOME/bin:$PATH

安装flink-obs-fs-hadoop。

在Github下载flink-obs-fs-hadoop:下载地址。

flink-obs-fs-hadoop-${flinkversion}-hw-${version}.jar版本规则:flinkversion为对应的flink版本号,version为flink-obs-fs-hadoop版本号。
若没有匹配版本的jar包,可自行修改flink-obs-fs-hadoop目录下pom文件中的flink版本重新编译生成。详情见编译指南。

在/opt/flink-1.12.1/plugins目录下创建obs-fs-hadoop目录,并将上述jar放入此目录。

配置flink。

在/opt/flink-1.12.1/conf/flink-conf.yaml文件中或在代码中设置如下参数:

fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
fs.obs.access.key: xxx
fs.obs.secret.key: xxx
fs.obs.endpoint: xxx
fs.obs.buffer.dir: /data/buf #写数据到OBS时需要的本地临时目录,flink程序需具备此目录读写权限

编写flink应用程序。

StateBackend设置为OBS中的路径。

示例:

1
env.setStateBackend(new FsStateBackend("obs://obs-bucket/test/checkpoint"));

StreamingFileSink设置为OBS中的路径。

示例:

1
2
3
4
5
6
7
final StreamingFileSink sink = StreamingFileSink
.forRowFormat(new Path("obs://obs-bucket/test/data"),
new SimpleStringEncoder("UTF-8"))
.withBucketAssigner(new BasePathBucketAssigner())
.withRollingPolicy(rollingPolicy)
.withBucketCheckInterval(1000L)
.build();

父主题: 对接大数据组件

同意关联代理商云淘科技,购买华为云产品更优惠(QQ 78315851)

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家