
Flink算得上是大数据领域比较优秀的一个工具。
它已经被收归于阿帕奇基金会之下。
本文简介适用于开发的环境,不面向生产。
一、Flink简介
注:以下内容由edge的Copilot生成,本人稍微整理。
🧬 Flink 的起源与发展
Apache Flink 起源于 2009 年德国柏林工业大学的一个研究项目 Stratosphere,这是一个由德国研究基金会资助的分布式数据处理平台研究计划。2014 年,该项目正式捐赠给 Apache 基金会,并更名为 Flink,在德语中意为“敏捷、快速”。
-
2014 年 3 月:成为 Apache 孵化器项目
-
2014 年 12 月:晋升为 Apache 顶级项目
-
2015 年起:阿里巴巴大规模采用 Flink,并基于其开发了内部版本 Blink
-
2019 年:阿里将 Blink 开源并合并入 Flink 主干,极大增强了 Flink 的 SQL 能力和性能
Flink 的核心特性
Apache Flink 是一个统一的流处理与批处理引擎,具备以下关键能力:
-
原生流处理(Stream-first):Flink 从设计之初就是为流处理而生,支持无界数据流的低延迟处理。
-
批流统一(Unified Runtime):通过同一套 API 和执行引擎处理批数据和流数据,简化开发。
-
事件时间与乱序处理:支持事件时间语义(Event Time)和 Watermark 机制,能处理乱序数据。
-
状态管理(Stateful Processing):支持大规模有状态计算,状态可持久化并容错。
-
Exactly-once 语义:通过 Checkpoint 和 Savepoint 实现端到端精确一次处理。
-
高可扩展性与容错性:支持大规模分布式部署,自动恢复失败任务。
-
多语言支持:支持 Java、Scala、Python、SQL 等多种语言。
-
丰富的连接器生态:内置 Kafka、MySQL、HDFS、Elasticsearch、Pulsar 等连接器。
Flink 的生态与应用场景
Flink 已成为全球范围内实时数据处理的事实标准,广泛应用于:
-
实时数据仓库(如阿里巴巴的实时数仓)
-
实时监控与告警系统
-
实时推荐与个性化服务
-
实时 ETL 与数据集成(如 Flink CDC)
-
IoT 数据流处理
-
金融风控与交易监控
此外,Flink 还衍生出多个子项目,如:
-
Flink Table API / SQL:用于声明式流批一体开发
-
Flink CDC:用于实时数据同步
-
Apache Paimon(原 Flink Table Store):构建流式数据湖的关键组件
🧭 当前与未来
截至 2025 年,Flink 已发布至 2.0.0 版本,标志着其架构进入新阶段。
Flink 2.0 引入了更强的云原生能力(如状态存储分离)、更灵活的调度机制,以及更统一的流批语义,进一步巩固了其在实时计算领域的领先地位。
二、为什么选择Flink
公司做了不少政企项目,有不少还有IOT的数据处理,例如收集消防栓、手表、各种监测芯片的数据。
虽然数据不算特别大,但是普遍都需要一个实时监控的功能。
比较久以前都比较傻瓜:数据先入库,然后再通过websocket或者restapi提供给前端。如果需要做一些其它计算,就更慢了。
但是那种方式有较大的局限性:数据不能太多,及时性不能太高,编码较多。
最大的问题是及时性在某些场景中差强人意,或者不太好满足。
所以,如果有一个工具,可以解决这些不是很好?
自行基于netty等net框架开发,行不行? 可以,但是必要性不充分,为什么现成的不用了?
Flink在绝大部分场景下都能完美解决这些问题:
1.支持流计算,支持实时计算
2.支持动态扩展
3.支持java语言
4.java编写,支持各种操作系统,包括国产化
此外它支持流批一体。
再看看Flink和其它类似框架的比较。
比较
Apache Flink 与其他主流大数据处理框架对比(多维度)
维度 | Apache Flink | Apache Spark | Apache Storm | Kafka Streams | Google Dataflow |
---|---|---|---|---|---|
处理模型 | 原生流处理,支持批处理(流批统一) | 批处理为主,支持微批流处理 | 原生流处理 | 原生流处理,依赖 Kafka | 流批统一(Beam 模型) |
延迟表现 | 毫秒级低延迟 | 秒级延迟(微批) | 毫秒级 | 毫秒级 | 毫秒级 |
吞吐能力 | 高吞吐,适合大规模数据流 | 高吞吐,适合批量计算 | 中等吞吐 | 中等吞吐 | 高吞吐 |
状态管理 | 内建强状态管理(支持 RocksDB) | 状态管理较弱 | 无内建状态 | 内存状态轻量支持 | 支持(Beam State API) |
容错机制 | Exactly-once,基于 Checkpoint | Exactly-once(Structured Streaming) | At-least-once | At-least-once | Exactly-once |
事件时间支持 | 强 Watermark + EventTime 语义 | 支持事件时间(逻辑复杂) | 无事件时间 | 无事件时间 | 原生支持 |
窗口机制 | 支持滚动/滑动/会话/自定义窗口 | 支持滚动、滑动窗口 | 基础窗口支持 | 滚动/滑动窗口 | 多种窗口类型 |
开发语言支持 | Java、Scala、Python、SQL | Java、Scala、Python、R、SQL | Java、Clojure | Java、Scala | Java、Python、Go |
部署复杂度 | 中等(需 JobManager、TaskManager) | 中等(需 Spark 集群) | 简单(轻量部署) | 嵌入式部署 | 云端托管(GCP) |
生态系统 | Flink SQL、CDC、Paimon、StateFun | Spark SQL、MLlib、GraphX | 组件稀缺 | Kafka 紧耦合组件 | GCP 生态紧密集成 |
适用场景 | 实时数仓、金融风控、IoT、CEP | 离线分析、批处理、ML | 日志监控、低延迟计算 | Kafka 应用中的轻量流逻辑 | 云原生数据流 ETL |
总结亮点
- Apache Flink:流处理能力领先,强状态支持、精确一次语义,适用于实时分析与复杂计算。
- Apache Spark:批处理标杆,适用于海量离线任务与大数据建模。
- Apache Storm:超低延迟,但功能较轻,不适合复杂数据流场景。
- Kafka Streams:适合 Kafka 消费端的轻流计算,部署简易。
- Google Dataflow:云原生、简洁强大,适合全托管数据管道。
三、搭建简单开发环境
需要注意是,虽然2.0版本已经可用,但Flink对1.20.x的支持是长期的。
搭建一个用于开发的,相对简单一些。
有两种方案可以选择:
1.不装Flink — 即使没有Flink服务器,也能利用本地的mini集群做一些基本的开发测试。但是其它的作业管理、提交作业比较难搞
2.装Flink – 可以体会类似生产环境的性能、并发、并能够比较直观地提交和管理作业
我选择安装Flink,虽然初期很长一段时间根本就没有用。
3.1、在centos上安装Flink单机
参考资料:
https://nightlies.apache.org/flink/flink-docs-release-2.0/zh/
1.创建一个flink用户
并创建两个目录,只有flink用户有权限访问。直接用root安装也可以
2.安装jdk17
设置JAVA_HOME变量.具体略
3.修改配置
Flink解压后,在conf目录下有问文件:
主要修改那个config.yaml即可。 通常修改三个地方即可:
JAVA_HOME
注意这个opts部分的设置,如果你在本地启动需要参考这个。
jobmanager
bind-host:指定了可以访问的客户机 port:指定了对外的端口
rest客户端
Flink默认在服务端使用log4j作为日志,毕竟是服务端。 如果要修改也可以在这里指定Log4j的配置。
4、启动和停止
bin目录下:
启动和停止很简单,不要带参数 例如启动: ./start-cluster.sh 如果成功,则可以通过浏览器访问管理客户端:
注意:这个仅仅是开发环境,如果生产环境是万万不能没有认证的。
5.测试
这里主要测试直接通过flink命令提交作业:
$ ./bin/flink run examples/streaming/WordCount.jar
3.2、创建一个Springboot工程
使用ide创建一个jdk17,springboot3.x的项目具体略 。
为了测试Flink的批处理和流处理,有必要搭建一些其它环境:
a.一个rdbms,例如mysql,postgresql等都可以
b.一个mqtt服务服务器,也可以考虑使用amqp服务器。我则选择安装了apache的artemis. artemis支持mqtt,amqp,stomp等协议。
artermis有个优点,小巧,容易装,资料齐全,而且artemis本身是完全使用java编写的,意味着可以在各种环境运行,自然包括国产环境。
以上两个安装略。
这里列出可以参考的pom.xml配置:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.lzfto.flink</groupId>
<artifactId>learn-flink-spring</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>learn-flink</name>
<properties>
<lzfto.flinktest.version>0.0.1-SNAPSHOT</lzfto.flinktest.version>
<java.version>17</java.version>
<!--Spring boot -->
<spring.boot.version>3.4.7</spring.boot.version>
<flink.version>1.20.0</flink.version>
<flink-connector.jdbc.version>3.3.0-1.20</flink-connector.jdbc.version>
<mysql-version>8.0.33</mysql-version>
<mysql-j-version>9.3.0</mysql-j-version>
<apache-poi-version>5.4.1</apache-poi-version>
<alibaba-fastjson-version>2.0.57</alibaba-fastjson-version>
<!-- mqtt -->
<mqtt-client.version>6.5.0</mqtt-client.version>
<!-- maven 等 -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencyManagement>
<dependencies>
<!-- SpringBoot的依赖配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<!--<artifactId>spring-jdbc</artifactId> -->
</dependency>
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<!-- Original Flink Dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope> -->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- tableapi 桥接器,主要和DataStream进行对接 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope> -->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- ide调试 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope> -->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink-connector.jdbc.version}</version>
</dependency>
<!-- 执行工厂定义 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 数据库连接 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${alibaba-fastjson-version}</version>
</dependency>
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- apache poi 操作excel -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${apache-poi-version}</version> <!-- 请检查最新版本 -->
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${apache-poi-version}</version> <!-- 请检查最新版本 -->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
<parameters>true</parameters>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.6.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<!-- 如果想排除一些包 -->
<excludes>
<!--<exclude>org.apache.flink:force-shading</exclude> -->
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise,
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.lzfto.flink.demo.DemoApplication</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources/</directory>
<includes>
<include>**/*</include>
</includes>
</resource>
<resource>
<!-- directory 表示取该目录下的文件 -->
<directory>libs</directory>
<!--targetPath 指定打包到哪个目录下 默认是放到class目录下 -->
<targetPath>/BOOT-INF/lib/</targetPath>
<!-- 取符合格式的所有文件 *代表全部 -->
<includes>
<include>**/*.jar</include>
</includes>
</resource>
</resources>
<finalName>lzfto-${lzfto.flinktest.version}</finalName>
</build>
</project>
最后本人编写了一些代码用于测试,代码位于:
https://gitee.com/lu_zhifei/learn-flink/tree/master/java/first-tableapi-spring