Flink-JAVA开发.01-搭建简单的开发环境


Flink算得上是大数据领域比较优秀的一个工具。

它已经被收归于阿帕奇基金会之下。

本文简介适用于开发的环境,不面向生产。

一、Flink简介

注:以下内容由edge的Copilot生成,本人稍微整理。

🧬 Flink 的起源与发展

Apache Flink 起源于 2009 年德国柏林工业大学的一个研究项目 Stratosphere,这是一个由德国研究基金会资助的分布式数据处理平台研究计划。2014 年,该项目正式捐赠给 Apache 基金会,并更名为 Flink,在德语中意为“敏捷、快速”。

  • 2014 年 3 月:成为 Apache 孵化器项目

  • 2014 年 12 月:晋升为 Apache 顶级项目

  • 2015 年起:阿里巴巴大规模采用 Flink,并基于其开发了内部版本 Blink

  • 2019 年:阿里将 Blink 开源并合并入 Flink 主干,极大增强了 Flink 的 SQL 能力和性能

 

Flink 的核心特性

Apache Flink 是一个统一的流处理与批处理引擎,具备以下关键能力:

  1. 原生流处理(Stream-first):Flink 从设计之初就是为流处理而生,支持无界数据流的低延迟处理。

  2. 批流统一(Unified Runtime):通过同一套 API 和执行引擎处理批数据和流数据,简化开发。

  3. 事件时间与乱序处理:支持事件时间语义(Event Time)和 Watermark 机制,能处理乱序数据。

  4. 状态管理(Stateful Processing):支持大规模有状态计算,状态可持久化并容错。

  5. Exactly-once 语义:通过 Checkpoint 和 Savepoint 实现端到端精确一次处理。

  6. 高可扩展性与容错性:支持大规模分布式部署,自动恢复失败任务。

  7. 多语言支持:支持 Java、Scala、Python、SQL 等多种语言。

  8. 丰富的连接器生态:内置 Kafka、MySQL、HDFS、Elasticsearch、Pulsar 等连接器。

 

Flink 的生态与应用场景

Flink 已成为全球范围内实时数据处理的事实标准,广泛应用于:

  • 实时数据仓库(如阿里巴巴的实时数仓)

  • 实时监控与告警系统

  • 实时推荐与个性化服务

  • 实时 ETL 与数据集成(如 Flink CDC)

  • IoT 数据流处理

  • 金融风控与交易监控

此外,Flink 还衍生出多个子项目,如:

  • Flink Table API / SQL:用于声明式流批一体开发

  • Flink CDC:用于实时数据同步

  • Apache Paimon(原 Flink Table Store):构建流式数据湖的关键组件

 

🧭 当前与未来

截至 2025 年,Flink 已发布至 2.0.0 版本,标志着其架构进入新阶段。

Flink 2.0 引入了更强的云原生能力(如状态存储分离)、更灵活的调度机制,以及更统一的流批语义,进一步巩固了其在实时计算领域的领先地位。

 

二、为什么选择Flink

公司做了不少政企项目,有不少还有IOT的数据处理,例如收集消防栓、手表、各种监测芯片的数据。

虽然数据不算特别大,但是普遍都需要一个实时监控的功能。

 

比较久以前都比较傻瓜:数据先入库,然后再通过websocket或者restapi提供给前端。如果需要做一些其它计算,就更慢了。

但是那种方式有较大的局限性:数据不能太多,及时性不能太高,编码较多。

最大的问题是及时性在某些场景中差强人意,或者不太好满足。

 

所以,如果有一个工具,可以解决这些不是很好?

自行基于netty等net框架开发,行不行? 可以,但是必要性不充分,为什么现成的不用了?

 

Flink在绝大部分场景下都能完美解决这些问题:

1.支持流计算,支持实时计算

2.支持动态扩展

3.支持java语言

4.java编写,支持各种操作系统,包括国产化

此外它支持流批一体。

 

再看看Flink和其它类似框架的比较。

比较

Apache Flink 与其他主流大数据处理框架对比(多维度)

维度 Apache Flink Apache Spark Apache Storm Kafka Streams Google Dataflow
处理模型 原生流处理,支持批处理(流批统一 批处理为主,支持微批流处理 原生流处理 原生流处理,依赖 Kafka 流批统一(Beam 模型)
延迟表现 毫秒级低延迟 秒级延迟(微批) 毫秒级 毫秒级 毫秒级
吞吐能力 高吞吐,适合大规模数据流 高吞吐,适合批量计算 中等吞吐 中等吞吐 高吞吐
状态管理 内建强状态管理(支持 RocksDB) 状态管理较弱 无内建状态 内存状态轻量支持 支持(Beam State API)
容错机制 Exactly-once,基于 Checkpoint Exactly-once(Structured Streaming) At-least-once At-least-once Exactly-once
事件时间支持 强 Watermark + EventTime 语义 支持事件时间(逻辑复杂) 无事件时间 无事件时间 原生支持
窗口机制 支持滚动/滑动/会话/自定义窗口 支持滚动、滑动窗口 基础窗口支持 滚动/滑动窗口 多种窗口类型
开发语言支持 Java、Scala、Python、SQL Java、Scala、Python、R、SQL Java、Clojure Java、Scala Java、Python、Go
部署复杂度 中等(需 JobManager、TaskManager) 中等(需 Spark 集群) 简单(轻量部署) 嵌入式部署 云端托管(GCP)
生态系统 Flink SQL、CDC、Paimon、StateFun Spark SQL、MLlib、GraphX 组件稀缺 Kafka 紧耦合组件 GCP 生态紧密集成
适用场景 实时数仓、金融风控、IoT、CEP 离线分析、批处理、ML 日志监控、低延迟计算 Kafka 应用中的轻量流逻辑 云原生数据流 ETL

总结亮点

  • Apache Flink:流处理能力领先,强状态支持、精确一次语义,适用于实时分析与复杂计算。
  • Apache Spark:批处理标杆,适用于海量离线任务与大数据建模。
  • Apache Storm:超低延迟,但功能较轻,不适合复杂数据流场景。
  • Kafka Streams:适合 Kafka 消费端的轻流计算,部署简易。
  • Google Dataflow:云原生、简洁强大,适合全托管数据管道。

 

三、搭建简单开发环境

需要注意是,虽然2.0版本已经可用,但Flink对1.20.x的支持是长期的。

搭建一个用于开发的,相对简单一些。

有两种方案可以选择:

1.不装Flink  — 即使没有Flink服务器,也能利用本地的mini集群做一些基本的开发测试。但是其它的作业管理、提交作业比较难搞

2.装Flink –  可以体会类似生产环境的性能、并发、并能够比较直观地提交和管理作业

我选择安装Flink,虽然初期很长一段时间根本就没有用。

3.1、在centos上安装Flink单机

参考资料:
https://nightlies.apache.org/flink/flink-docs-release-2.0/zh/  

1.创建一个flink用户

并创建两个目录,只有flink用户有权限访问。直接用root安装也可以

2.安装jdk17

设置JAVA_HOME变量.具体略

3.修改配置

Flink解压后,在conf目录下有问文件:
主要修改那个config.yaml即可。 通常修改三个地方即可:
JAVA_HOME
注意这个opts部分的设置,如果你在本地启动需要参考这个。  
jobmanager
bind-host:指定了可以访问的客户机 port:指定了对外的端口  
rest客户端
  Flink默认在服务端使用log4j作为日志,毕竟是服务端。  如果要修改也可以在这里指定Log4j的配置。  

4、启动和停止

bin目录下:

启动和停止很简单,不要带参数 例如启动: ./start-cluster.sh 如果成功,则可以通过浏览器访问管理客户端:
注意:这个仅仅是开发环境,如果生产环境是万万不能没有认证的。  

5.测试

本地模式安装 | Apache Flink

这里主要测试直接通过flink命令提交作业:

$ ./bin/flink run examples/streaming/WordCount.jar

 

3.2、创建一个Springboot工程

使用ide创建一个jdk17,springboot3.x的项目具体略 。

为了测试Flink的批处理和流处理,有必要搭建一些其它环境:

a.一个rdbms,例如mysql,postgresql等都可以

b.一个mqtt服务服务器,也可以考虑使用amqp服务器。我则选择安装了apache的artemis. artemis支持mqtt,amqp,stomp等协议。

artermis有个优点,小巧,容易装,资料齐全,而且artemis本身是完全使用java编写的,意味着可以在各种环境运行,自然包括国产环境。

以上两个安装略。

 

这里列出可以参考的pom.xml配置:

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>org.lzfto.flink</groupId>
	<artifactId>learn-flink-spring</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>learn-flink</name>
	<properties>
		<lzfto.flinktest.version>0.0.1-SNAPSHOT</lzfto.flinktest.version>
		<java.version>17</java.version>
		<!--Spring boot -->
		<spring.boot.version>3.4.7</spring.boot.version>
		<flink.version>1.20.0</flink.version>
		<flink-connector.jdbc.version>3.3.0-1.20</flink-connector.jdbc.version>
		<mysql-version>8.0.33</mysql-version>
		<mysql-j-version>9.3.0</mysql-j-version>
		<apache-poi-version>5.4.1</apache-poi-version>
		<alibaba-fastjson-version>2.0.57</alibaba-fastjson-version>
		<!-- mqtt -->
		<mqtt-client.version>6.5.0</mqtt-client.version>

		<!-- maven 等 -->
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
	</properties>
	<dependencyManagement>
		<dependencies>
			<!-- SpringBoot的依赖配置 -->
			<dependency>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-dependencies</artifactId>
				<version>${spring.boot.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<!-- Spring Boot Starter Web -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
			<!--<artifactId>spring-jdbc</artifactId> -->
		</dependency>
		<!-- websocket -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>

		<!-- Original Flink Dependencies -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-core</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java</artifactId>
			<version>${flink.version}</version>
			<!--<scope>provided</scope> -->
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!-- tableapi 桥接器,主要和DataStream进行对接 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
			<!--<scope>provided</scope> -->
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<!-- ide调试 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-loader</artifactId>
			<version>${flink.version}</version>
			<!--<scope>provided</scope> -->
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		
		<dependency>
		  <groupId>org.apache.flink</groupId>
		  <artifactId>flink-test-utils</artifactId>
		  <version>${flink.version}</version>
		</dependency>


		<!-- 连接器 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc</artifactId>
			<version>${flink-connector.jdbc.version}</version>
		</dependency>

		<!-- 执行工厂定义 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<!-- 数据库连接 -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql-version}</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
		<dependency>
			<groupId>com.alibaba.fastjson2</groupId>
			<artifactId>fastjson2</artifactId>
			<version>${alibaba-fastjson-version}</version>
		</dependency>

		<!-- mqtt -->
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
		</dependency>
		<!-- apache poi 操作excel -->
		<dependency>
			<groupId>org.apache.poi</groupId>
			<artifactId>poi</artifactId>
			<version>${apache-poi-version}</version> <!-- 请检查最新版本 -->
		</dependency>
		<dependency>
			<groupId>org.apache.poi</groupId>
			<artifactId>poi-ooxml</artifactId>
			<version>${apache-poi-version}</version> <!-- 请检查最新版本 -->
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>

		<!-- 测试依赖 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.junit.jupiter</groupId>
			<artifactId>junit-jupiter-api</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.junit.jupiter</groupId>
			<artifactId>junit-jupiter-engine</artifactId>
			<scope>test</scope>
		</dependency>

	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.13.0</version>
				<configuration>
					<source>${java.version}</source>
					<target>${java.version}</target>
					<encoding>${project.build.sourceEncoding}</encoding>
					<parameters>true</parameters>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.6.0</version>
				<executions>
					<!-- Run shade goal on package phase -->
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<!-- 如果想排除一些包 -->
								<excludes>
									<!--<exclude>org.apache.flink:force-shading</exclude> -->
									<exclude>com.google.code.findbugs:jsr305</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder. Otherwise, 
										this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer
									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>org.lzfto.flink.demo.DemoApplication</mainClass>
								</transformer>
								<transformer
									implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
		<resources>
			<resource>
				<directory>src/main/resources/</directory>
				<includes>
					<include>**/*</include>
				</includes>
			</resource>
			<resource>
				<!-- directory 表示取该目录下的文件 -->
				<directory>libs</directory>
				<!--targetPath 指定打包到哪个目录下 默认是放到class目录下 -->
				<targetPath>/BOOT-INF/lib/</targetPath>
				<!-- 取符合格式的所有文件 *代表全部 -->
				<includes>
					<include>**/*.jar</include>
				</includes>
			</resource>
		</resources>
		<finalName>lzfto-${lzfto.flinktest.version}</finalName>
	</build>
</project>

 

最后本人编写了一些代码用于测试,代码位于:

https://gitee.com/lu_zhifei/learn-flink/tree/master/java/first-tableapi-spring