Flink-应用

Flink-应用

第一部分 概念深究

checkPoint

基础概念

元数据

检查点元数据信息主要包括:

  1. 检查点的ID,用于标识检查点的唯一性。
  2. 检查点的触发时间,指的是执行检查点操作的时间点。
  3. Checkpoint状态,指示是否已经完成或取消检查点。
  4. 创建检查点时应用程序的并行任务操作符状态。
  5. 每个并行任务操作符的私有状态,比如值状态和列表状态等。
  6. 快照的状态大小,以字节为单位,用于监视检查点的大小。

流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
检查点是Flink中用于实现高可靠性和容错性的重要机制,它能够在应用程序发生故障时提供可靠的状态恢复功能。下面是检查点在Flink中的整体流程以及各个组件之间的关系:
1. 检查点

(1) 每个任务运行时都会有一个计划好的检查点,并在指定的时间点执行检查点操作。

(2) 检查点操作将应用程序的状态保存在一个持久化存储中。

(3) 一旦一个检查点被完全完成,Flink会将其元数据信息保存在JobManager的内存中。

(4) 如果在检查点保存后应用程序发生故障,Flink会尝试从最近完成的检查点开始恢复应用程序的状态。

(5) 当应用程序成功地从故障中恢复后,Flink会在其内部重新计划偏离恢复点的计算任务。

1. 生成逻辑
当应用程序在运行过程中,Flink会按照一定的时间间隔自动生成检查点。在生成检查点时,Flink会执行以下操作:

(1) 在收到生成检查点的命令后,Flink会通知所有的TaskManager暂停计算,并开始对应用程序状态进行快照。

(2) 等待所有任务完成对应状态的快照后,Flink会执行算子的快照操作并持久化全部状态到内部或者外部存储系统中(如HDFS、S3或其他云存储服务)。

(3) 当检查点的全部数据已经写入到存储系统中并完成备份,Flink会更新检查点元数据。此时,检查点已经生成完毕,并等待恢复时使用。

2. 执行逻辑
在应用程序发生故障需要恢复时,Flink会按照以下方式执行检查点:

(1) 从最近已完成的检查点开始恢复。

(2) 读取检查点元数据,加载检查点数据并将其应用到应用程序状态中。

(3) 重新启动应用程序并从恢复点恢复数据流管道。

3. 恢复逻辑
在发生故障后,Flink会从检查点开始进行恢复。总的恢复流程如下:

(1) Flink会获取最近的完成检查点,并加载检查点元数据和对应的状态数据。

(2) 对于每个状态后端,Flink会在内部或外部存储系统中查找最新的写入的状态和元数据快照,并构建状态快照。

(3) 通过组合所有状态快照,Flink会构造应用程序的完整状态。

(4) 重新启动应用程序的数据流管道,并从恢复点开始继续处理数据。

4. 检查逻辑
Flink会周期性地重新处理一些数据以检查应用程序是否已经成功地恢复。具体检查逻辑如下:

(1) 从当前时间的后面某个时间开始,向上回溯。

(2) 对于每个算子,Flink会判断是否所有数据块都已经处理完毕。

(3) 如果某个算子还有未处理完的数据块,Flink会重新启动该算子并重新处理还未处理的数据。

总之,Flink中的检查点机制主要涉及Flink、Job Manager、Task Manager、Zookeeper和第三方存储系统之间的协作。在生成、执行、恢复和检查过程中,不同的组件都会发挥不同的作用,以确保应用程序状态的完整性和可靠性。

涉及组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Flink、Job Manager、Task Manager、Zookeeper和第三方存储系统在检查点机制方面的协作如下:

1. Flink

Flink是检查点机制的核心,它的任务包括生成、执行、恢复和检查检查点。由于检查点机制需要涉及到跨多个Task Manager节点的协作,因此Flink需要负责统一调度和协调检查点的生成和执行,以及恢复过程中的资源协调管理等。

2. Job Manager
Job Manager是Flink的主控节点,也是检查点机制的协调者和管理者。它负责协调和管理每个Task Manager节点的检查点操作,以及保存检查点的元数据和状态信息。

3. Task Manager
Task Manager是Flink中的工作节点。每个Task Manager节点都需要执行检查点操作,生成和保存应用程序的状态信息,并且在恢复时重新启动数据流管道和应用程序。同时,Task Manager还需要与Job Manager进行通信,向其汇报检查点状态和进度信息。

4. Zookeeper
Zookeeper是用于分布式应用程序的一种协调服务。在Flink中,Zookeeper主要用于协调Job Manager和Task Manager之间的信息交换和状态同步操作,并确保在Flink集群中的Task Manager和Job Manager能够有效地相互协作。Zookeeper还用于管理和协调检查点的存储,以便在应用程序发生故障时,Flink可以从最近的检查点开始快速恢复应用程序状态。

5. 第三方存储系统
在Flink中,检查点的数据需要存储在第三方存储系统中,以保证数据在Flink集群中的故障和不可用的情况下的安全性和持久性。此外,第三方存储系统还需要集成Flink的状态后端,以便Flink能够通过其API接口来操作存储系统中的检查点数据和元数据。

综上所述,在Flink中,不同组件之间的协作非常重要,各自需要承担不同的角色,以实现检查点机制的高可靠性和稳定性。

savePoint

  1. 保存点

(1) 保存点是创建检查点的一个手动快照。

(2) 手动创建保存点时,Flink会停止计算过程并保存应用程序的状态。

(3) 保存点的元数据会记录在JobManager的内存中和外部持久化存储中。

(4) 如果应用程序发生故障,可以使用最近的保存点来恢复应用程序的状态。

(5) 创建保存点时需要保证存储的状态是可用的,并将元数据保存到持久化存储中以防止应用程序故障。

流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Savepoint是Flink用于在应用程序更新、升级和迁移等场景下进行状态备份和恢复的机制之一。与检查点(Checkpoint)不同,Savepoint是一种手动创建的快照,它对应用程序的状态进行全量备份,并不需要周期性、自动化地生成。

下面是Savepoint在Flink中的整体流程以及各个组件之间的关系:

1. 生成逻辑
在运行时,应用程序可以通过触发Savepoint命令来手动创建Savepoint。在生成Savepoint时,Flink会执行以下操作:

(1) 向JobManager发送Savepoint操作的请求,并开始对应用程序状态进行快照。

(2) 等待所有任务完成对应状态的快照后,开始执行算子的快照操作,并将全部状态持久化。

(3) 当Savepoint中的全部数据已经完全保存在存储系统上并完成备份,Flink会更新元数据,表示Savepoint已经生成。

2. 执行逻辑
在应用程序发生故障需要恢复时,可以从最近的Savepoint开始恢复。在执行Savepoint时,Flink会执行以下操作:

(1) 读取Savepoint的元数据和状态数据。

(2) 加载状态数据并将其应用到应用程序状态中。

(3) 重新启动应用程序的数据流管道并从Savepoint开始处理数据。

3. 恢复逻辑
与检查点数据的恢复逻辑类似,Savepoint数据的恢复逻辑如下:

(1) Flink会获取最近的Savepoint,并加载指定的状态数据。

(2) 对于每个状态后端,Flink会在存储系统中查找最新的状态和元数据快照,并构建状态快照。

(3) 通过组合所有状态快照,Flink会构造应用程序的完整状态。

(4) 重新启动应用程序数据流管道,并从Savepoint开始恢复数据。

4. 检查逻辑
与检查点类似,Flink会周期性地重新处理一些数据来检查应用程序是否已经成功地恢复。具体检查逻辑如下:

(1) 从当前时间的一个时间点开始向上回溯。

(2) 对于每个算子,检查其是否已经处理所有数据块和所有任务。

二者对比

区别:

1
2
3
保存点只是作用于单个应用程序,它只是快速地保存了当前应用程序的全局状态,并没有像检查点那样与Flink整个集群相关。

如果保存点中的数据丢失,可以通过重新启动应用程序并从最近的检查点中恢复来恢复状态。但是这可能会导致一定的数据丢失,因为保存点只有应用程序的状态,而检查点是整个应用程序的状态。而且,保存点的新创建与计划的检查点不同。

状态

时间

三种时间(Processing Time、Event Time、Ingestion Time)在Flink中都有各自的作用和应用场景。在这之上,水位线作为事件时序的标识,会对这三种时间产生影响。

Processing Time

Processing Time(处理时间)是指事件(数据)进入Flink算子之后,处理时间所对应的实际时间。Processing Time常用于需要实现极低延迟、实时快速处理的场景。例如对于某些需要即时计算得出确认结果的实时应用,处理时间就是最好的事件时间戳。

水位线对Processing Time的影响:水位线不能直接影响Processing Time,因为Processing Time是由Flink系统内部的时钟指定的,主要是用于计算过期和超时。所以,水位线常被用来控制Event Time的延迟。

Event Time

Event Time(事件时间)是指事件实际发生的时间而非进入Flink系统的时间,因此Event Time会受到延迟、乱序等影响,但也因此更贴近于实际业务场景。基于Event Time的处理方式,通常会先将数据根据时间戳转换为数据时间,再进行时间窗口、时间滑动和增量聚合等操作,并用水位线来标记窗口是否关闭。

应用场景:当系统中要处理分布式数据流时,通常选择使用基于Event Time模式来触发数据的聚合、嵌套、排序和分区操作。常见的应用场景包括数据分析、数据挖掘、实时仪表盘和监控等。

水位线对Event Time的影响:水位线是用来标识事件的进度和业务时间的进展,因此在基于Event Time的数据处理流程中,水位线起到了至关重要的作用。水位线通常用于触发窗口的计算和关闭,以保证计算结果的准确性。

Ingestion Time

Ingestion Time(摄入时间)是指事件进入Flink系统的时间而产生的时间戳,它与Processing Time和Event Time不同,因为它不是由事件自己所传递的时间戳所确定的,而是由接收事件的Flink节点系统内部时钟所确定的。因此,Ingestion Time具有中等的准确度和可信度,通常情况下,它被用于实现对数据到达延迟的控制与分析。

应用场景:Ingestion Time相对于Event Time准确性来说较低,但它通常用于在需要在实时应用程序中,记录数据写入的时间等场景中。通常场景包括实时刷卡、在线支付、实时广告类实时数据处理等场景。

水位线对Ingestion Time的影响:Ingestion Time并不是由事件中的时间戳所确定,因此水位线对Ingestion Time并没有直接影响,但是水位线可以用来控制Ingestion Time的乱序和延迟,确保数据的可靠性和准确性。

总之,在Flink中,Processing Time、Event Time和Ingestion Time都有各自的应用场景和所适用的数据类型和处理方式。同时,水位线对于三种时间的影响也是决定数据处理和计算结果准确性和可靠性的关键所在。

[窗口](#视窗- window - 窗口)

1
2
3
4
[](#视窗- window - 窗口)
跳转语法

<a href="#top">a标签回到顶部</a>

第一章 flink 组件

Manager

  1. TaskManager(任务管理器):

    • 概念:TaskManager是Flink集群中的工作节点,负责执行具体的任务。
    • 职责:TaskManager负责接收和执行任务,管理任务的资源,包括CPU、内存和网络等资源。它还负责与JobManager进行通信,接收任务的分配和调度指令,并将任务的执行结果返回给JobManager。
    • 调度时机:TaskManager在启动时会向ResourceManager注册自己的资源,并等待JobManager的任务分配。
    • 调度流程:
      1. TaskManager向ResourceManager注册自己的资源。
      2. TaskManager等待JobManager的任务分配。
      3. JobManager根据作业的拓扑结构和任务之间的依赖关系,将任务分配给TaskManager。
      4. TaskManager接收到任务后,根据任务的执行逻辑执行计算,并将结果返回给JobManager。
  2. ResourceManager(资源管理器):

    • 概念:ResourceManager是Flink集群中的资源管理组件,负责分配和管理集群中的资源。
    • 职责:ResourceManager负责接收来自JobManager的作业提交请求,并根据集群的资源情况进行资源分配。它管理集群中的资源池,包括CPU、内存和网络等资源,并根据作业的需求进行动态调整。ResourceManager还负责监控TaskManager的健康状态,以及处理TaskManager的故障和重启。
    • 调度时机:ResourceManager在Flink集群启动时被启动,负责整个集群的资源管理。
    • 调度流程:
      1. ResourceManager启动,并初始化资源池。
      2. JobManager提交作业请求给ResourceManager。
      3. ResourceManager根据作业的资源需求和集群的资源情况,进行资源分配。
      4. ResourceManager将分配的任务发送给相应的TaskManager。
      5. ResourceManager监控TaskManager的健康状态,处理故障和重启。
  3. JobManager(作业管理器):

    • 概念:JobManager是Flink集群中的主节点,负责整个作业的管理和调度。
    • 职责:JobManager负责接收作业提交请求,将作业转换为JobGraph,并进行作业的调度和执行。它还负责管理作业的状态、检查点和故障恢复等功能。
    • 调度时机:JobManager在Flink集群启动时被启动,负责整个作业的管理和调度。
    • 调度流程:
      1. JobManager启动,并等待作业的提交请求。
      2. 用户提交作业请求给JobManager。
      3. JobManager将作业转换为JobGraph,包括作业的拓扑结构和任务之间的依赖关系。
      4. JobManager根据作业的调度策略,将任务分配给TaskManager。
      5. TaskManager接收到任务后,执行计算,并将结果返回给JobManager。
      6. JobManager监控作业的执行状态,处理检查点和故障恢复等操作。

这些组件共同协作,实现了Flink的分布式流处理和批处理功能。

其他组件

  1. JobGraph(作业图):

    • 概念:JobGraph是一个有向无环图,表示一个Flink作业的拓扑结构和任务之间的依赖关系。
    • 职责:JobGraph描述了作业中的算子和数据流之间的关系,包括输入输出关系、任务之间的依赖关系等。它是作业提交给JobManager的输入,用于作业的调度和执行。
    • 调度时机:JobGraph在作业提交时生成,并在作业调度和执行过程中使用。
  2. ExecutionGraph(执行图):

    • 概念:ExecutionGraph是一个正在执行的Flink作业的有向无环图,包含了作业的执行状态、任务的调度信息和任务之间的依赖关系。
    • 职责:ExecutionGraph是JobManager根据JobGraph生成的执行计划,用于管理作业的执行过程。它记录了作业的执行状态、任务的调度信息、任务之间的依赖关系等,并负责监控任务的执行状态,处理故障恢复和任务重启等操作。
    • 调度时机:ExecutionGraph在作业提交后由JobManager生成,并在作业执行过程中使用。
  3. Task(任务):

    • 概念:Task是Flink作业中的一个执行单元,负责执行具体的计算逻辑。
    • 职责:Task负责接收输入数据,执行计算逻辑,并产生输出结果。一个作业可以包含多个任务,每个任务负责处理作业的一部分数据。任务之间可以通过网络进行数据交换和通信。
    • 调度时机:Task在作业执行过程中根据调度策略由TaskManager执行。
  4. TaskExecutor(任务执行器):

    • 概念:TaskExecutor是运行在TaskManager上的组件,负责接收和执行任务,并管理任务的资源。
    • 职责:TaskExecutor负责接收来自JobManager的任务分配请求,并根据任务的调度信息执行任务。它管理任务的资源,包括CPU、内存和网络等资源,并负责与其他TaskExecutor进行数据交换和通信。
    • 调度时机:TaskExecutor在TaskManager启动时被启动,并在整个作业执行过程中持续运行。
  5. BlobServer(资源服务器):

    • 概念:BlobServer是用于存储和提供作业所需的资源文件,例如JAR包、配置文件等。
    • 职责:BlobServer负责接收和存储作业提交时所需的资源文件,并提供给TaskExecutor进行下载和使用。它提供了高效的资源分发机制,减少了作业提交和执行的时间。
    • 调度时机:BlobServer在Flink集群启动时被启动,并在整个作业执行过程中持续运行。
  6. CheckpointCoordinator(检查点协调器):

    • 概念:CheckpointCoordinator负责管理作业的检查点,以实现容错和恢复。
    • 职责:CheckpointCoordinator定期触发作业的检查点操作,将作业的状态和数据保存到持久化存储中。在发生故障时,可以使用检查点来恢复作业的状态,保证作业的一致性和可靠性。
    • 调度时机:CheckpointCoordinator在作业执行过程中负责生成和管理检查点。
  7. StateBackend(状态后端):

    • 概念:StateBackend用于管理作业的状态,包括算子状态和键值状态。
    • 职责:StateBackend负责将作业的状态存储在内存或外部存储中,并提供状态的读写接口。它可以根据作业的需求选择不同的存储方式,例如内存、文件系统或分布式存储系统。
    • 调度时机:StateBackend在作业执行过程中负责管理状态的读写操作。
  8. ShuffleManager(洗牌管理器):

    • 概念:ShuffleManager负责将数据进行分区和洗牌,以便在任务之间进行数据交换。
    • 职责:ShuffleManager根据作业的需求,将输入数据进行分区和洗牌,并将洗牌后的数据分发给相应的任务进行计算。它提供了高效的数据交换机制,减少了数据传输的开销。
    • 调度时机:ShuffleManager在任务执行过程中负责数据的分区和洗牌操作
  9. TaskManagerRunner(任务管理器运行器):

    • 概念:TaskManagerRunner用于启动和管理TaskManager进程。
    • 职责:TaskManagerRunner负责启动TaskManager进程,并提供管理和监控TaskManager的功能。它负责与ResourceManager进行通信,接收任务的分配和调度指令,并将任务的执行结果返回给JobManager。
    • 调度时机:TaskManagerRunner在Flink集群启动时被启动,并在整个作业执行过程中持续运行。
  10. ResourceManagerRunner(资源管理器运行器):

    • 概念:ResourceManagerRunner用于启动和管理ResourceManager进程。
    • 职责:ResourceManagerRunner负责启动ResourceManager进程,并提供管理和监控ResourceManager的功能。它负责接收来自JobManager的作业提交请求,并根据集群的资源情况进行资源分配和调度。
    • 调度时机:ResourceManagerRunner在Flink集群启动时被启动,并在整个作业执行过程中持续运行。

这些组件共同协作,实现了Flink的分布式流处理和批处理功能。在作业执行过程中,JobGraph和ExecutionGraph描述了作业的结构和执行状态,Task和TaskExecutor负责具体的计算和任务执行,BlobServer存储和提供作业所需的资源文件,CheckpointCoordinator管理作业的检查点以实现容错和恢复,StateBackend管理作业的状态,ShuffleManager负责数据的分区和洗牌,TaskManagerRunner和ResourceManagerRunner分别负责启动和管理TaskManager和ResourceManager进程。通过这些组件的协作,Flink能够实现高效的分布式计算和数据处理。

“单位”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
在Flink中还有以下关键词:

1. Job(作业):
- 概念:Job是一个用户定义的数据处理任务,由一个或多个算子组成的有向无环图。它是Flink中最高级别的抽象,表示一个完整的数据处理流程。
- 场景:Job适用于需要对数据进行复杂的处理和转换的场景,例如数据清洗、数据分析、实时计算等。
- 功能:Job定义了数据处理的逻辑和流程,包括输入数据的来源、数据的转换和计算逻辑、输出数据的目的地等。

2. 算子(Operator):
- 概念:算子是作业中的一个基本计算单元,负责对输入数据进行转换和处理。算子可以是数据源(Source)、数据接收器(Sink)或数据转换操作(Transformation)。
- 场景:算子适用于对数据进行各种类型的操作,例如过滤、映射、聚合、连接等。
- 功能:算子定义了对输入数据的处理逻辑,可以对数据进行转换、过滤、聚合等操作,并将结果发送给下游算子或输出到外部系统。

3. Source(数据源):
- 概念:Source是作业中的一个算子,负责从外部系统读取输入数据并生成数据流。它可以从文件、消息队列、数据库等数据源中读取数据。
- 场景:Source适用于需要从外部系统读取数据并进行处理的场景,例如实时流处理、批处理等。
- 功能:Source负责读取输入数据,并将数据发送给下游算子进行处理。它可以根据数据源的特性进行数据的分区和并行读取,实现高吞吐量的数据处理。

4. Sink(数据接收器):
- 概念:Sink是作业中的一个算子,负责将计算结果输出到外部系统或存储介质。它可以将数据写入文件、数据库、消息队列等目的地。
- 场景:Sink适用于需要将计算结果输出到外部系统或存储介质的场景,例如数据持久化、结果展示、数据传输等。
- 功能:Sink接收来自上游算子的计算结果,并将结果写入外部系统或存储介质。它可以根据目的地的特性进行数据的分区和并行写入,实现高吞吐量的数据输出。

5. 依赖(Dependency):
- 概念:依赖表示作业中算子之间的关系,描述了数据流的传递和计算顺序。一个算子的输入数据依赖于其他算子的输出数据。
- 场景:依赖用于描述算子之间的数据流和计算顺序,确保数据的正确传递和计算的顺序性。
- 功能:依赖定义了算子之间的数据流和计算顺序,确保每个算子在正确的时机接收和处理数据,保证作业的正确性和一致性。

这些关键词在Flink中扮演着重要的角色。Job表示一个完整的数据处理任务,算子是作业中的基本计算单元,负责数据的转换和处理。Source负责从外部系统读取输入数据,Sink负责将计算结果输出到外部系统。依赖描述了算子之间的关系,确保数据的正确传递和计算的顺序性。通过这些关键词的使用,Flink能够实现灵活、高效的数据处理和计算。

job、算子、task

在Flink中,Job、Task和算子的关系如下:

  • Job:代表一个完整的数据处理任务,由一系列算子和依赖关系组成。
  • 算子(Operator):代表数据处理过程中的具体操作,如数据源的读取、数据的转换和计算、数据的输出等。算子可以由一个或多个Task组成,并且可以通过Flink的算子函数API自定义编写算子逻辑。
  • Task:是执行算子的具体执行单元,每个Task负责处理一个数据分区的数据,即算子的并行实例。每个算子可以由多个并行的Task组成,Task之间独立运行并发相互之间通过网络连接进行数据交换。

Job、算子和Task的关系如下图所示:

Flink Job, Operator, Task关系图

其中,Job描述了整个数据处理过程的逻辑和依赖关系,算子则表示实现不同任务的数据处理逻辑,而Task则是算子的具体执行单元。整个数据处理过程可以看做是一棵计算图,Job是整个计算图的根节点,每个算子是树的分支节点,每个Task是叶子节点。

实际上,Flink还有一个更为精细的中间元素——数据流(DataStream),用于描述数据在算子之间的流动关系,它连接了不同算子之间的依赖关系,使得整个计算图更加清晰和直观。但是,这并不影响Job、Task、算子在Flink中的核心地位。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
以一个简单的WordCount任务为例,这个任务需要从一个数据源中读取数据,将每个单词进行计数,并输出结果。那么:

- 这个WordCount任务就是一个Job。
- 数据源读取、单词计数、输出结果三个操作对应着三个Task,这些Task可以并行执行。

在用户使用Flink进行数据处理时,如下所示:

- 用户首先需要定义一个Job,描述数据处理流程、指定数据源和输出位置等。
- 将Job提交到Flink集群上运行。
- Flink集群根据Job的配置自动创建多个Task,每个Task处理Job中的一个分片数据,并将处理结果输出到目标位置。

例如,用户可以通过编写Flink程序,定义一个Job来实现WordCount任务,可以通过以下方式来提交任务:

```
bin/flink run -c com.example.WordCount /path/to/wordcount.jar
```

在这个例子中,用户将WordCount.java编译为wordcount.jar,并使用flink run命令提交任务。因此,整个WordCount任务就是一个Job,而每个用于处理单词计数、数据读取和结果输出的Task各自负责对应的处理逻辑。


算子:
在Flink中,算子(Operator)是一个特殊的Task,负责对输入数据进行一些处理、计算、过滤等操作,并将结果输出到下一个算子或最终的输出位置。每个算子都可以由一个或多个并行的Task组成,这些Task在运行时并发处理输入数据。

以WordCount任务为例,我们可以将其分解成三个算子(Operator):

1. 数据源算子(Source Operator):从数据源中读取数据,将输入数据传递给下一个算子。
2. 转换算子(Transformation Operator):将输入数据拆分成单词,并对单词进行计数,将计数后的结果传递给下一个算子。
3. 输出算子(Sink Operator):将计数后的结果输出到指定的数据存储位置。

每个算子都可以由多个Task组成,每个Task处理输入数据的一部分,并将处理结果传递给下一个算子或最终的输出位置。例如,在转换算子中,每个Task负责处理输入数据的一部分,并将计算结果传递给下一个算子或输出算子。

因此,可以说算子是Flink中一个比Task更高层次的抽象,每个算子都对应着一些具体的操作,而每个Task则负责具体的输入数据处理和计算。

第二部分 基础API

第零部分 概念

0.1 概述

批处理 和 流处理

Flink程序是实现分布式集合转换的常规程序(例如,Filter,映射,更新状态,Join,分组,定义窗口,聚合)。

集合最初是从源创建的(例如,通过读取文件,kafka主题或从本地的内存中集合)。

结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如,命令行终端)。

Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

根据数据源的类型(即有界或无界源),您可以编写批处理程序或流程序,其中DataSet API用于批处理,DataStream API用于流式处理。本指南将介绍两种API共有的基本概念,但请参阅我们的 流处理指南批处理指南,了解有关使用每个API编写程序的具体信息。

注:当显示的API时,如何使用,我们将用实际的例子 StreamingExecutionEnvironmentDataStreamAPI。DataSetAPI中的概念完全相同,只需替换为ExecutionEnvironmentDataSet

0.2 DataSet 与 DataStream

Flink具有特殊类DataSetDataStream在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。在DataSet数据有限的情况下,对于一个DataStream数据元的数量可以是无界的。

0.3 Flink计划的剖析

Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

  1. 获得一个execution environment
  2. 加载/创建初始数据,
  3. 指定此数据的转换,
  4. 指定放置计算结果的位置,
  5. 触发程序执行

0.4 支持类型

元组和案例类 – Tuple

元组是包含固定数量的具有各种类型的字段的复合类型。Java API提供Tuple1最多的类Tuple25。元组的每个字段都可以是包含更多元组的任意Flink类型,从而产生嵌套元组。可以使用字段名称直接访问元组的字段tuple.f4,或使用通用getter方法 tuple.getField(int position)。字段索引从0开始。请注意,这与Scala元组形成鲜明对比,但它与Java常规索引更为一致。

1
2
3
4
5
6
7
8
9
10
11
12
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));

wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});

wordCounts.keyBy(0); // also valid .keyBy("f0")

POJOs

如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型:

  • 这类必须公开。
  • 它必须有一个没有参数的公共构造函数(默认构造函数)。
  • 所有字段都是公共的,或者必须通过getter和setter函数访问。对于一个名为foogetter和setter方法的字段必须命名getFoo()setFoo()
  • Flink必须支持字段的类型。目前,Flink使用Avro序列化任意对象(例如Date)。

Flink分析了POJO类型的结构,即它了解了POJO的字段。因此,POJO类型比一般类型更容易使用。此外,Flink可以比一般类型更有效地处理POJO。

以下示例显示了一个包含两个公共字段的简单POJO。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class WordWithCount {

public String word;
public int count;

public WordWithCount() {}

public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}

DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));

wordCounts.keyBy("word"); // key by field expression "word"

原始类型 - Integer等

Flink支持所有Java和Scala的原始类型,如IntegerStringDouble

一般类别 - 非POJO

Flink支持大多数Java和Scala类(API和自定义)。限制适用于包含无法序列化的字段的类,如文件指针,I / O流或其他本机资源。遵循Java Beans约定的类通常可以很好地工作。

所有未标识为POJO类型的类(请参阅上面的POJO要求)都由Flink作为常规类类型处理。Flink将这些数据类型视为黑盒子,并且无法访问其内容(即,用于有效排序)。使用序列化框架Kryo对常规类型进行反序列化。

– Value

_值_类型手动描述它们的序列化和反序列化。它们不是通过通用序列化框架,而是通过org.apache.flinktypes.Value使用方法read和实现接口为这些 算子操作提供自定义代码write。当通用序列化效率非常低时,使用值类型是合理的。一个示例是将数据元的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零数据元使用特殊编码,而通用序列化只需编写所有数组数据元。

org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部克隆逻辑。

Flink带有与基本数据类型对应的预定义值类型。(ByteValueShortValueIntValueLongValueFloatValueDoubleValueStringValueCharValueBooleanValue)。这些Value类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并从垃圾收集器中消除压力。

Hadoop Writables

您可以使用实现该org.apache.hadoop.Writable接口的类型。write()readFields()方法中定义的序列化逻辑将用于序列化。

特殊类型

您可以使用特殊类型,包括Scala的EitherOptionTry。Java API有自己的自定义实现Either。与Scala类似Either,它代表两种可能类型的值,_左_或_右_。 Either可用于错误处理或需要输出两种不同类型记录的 算子。

0.5 累加器和计数器

累加器是具有添加 算子操作最终累积结果的简单构造,可在作业结束后使用。

最直接的累加器是一个计数器:您可以使用该Accumulator.add(V value)方法递增它 。在工作结束时,Flink将汇总(合并)所有部分结果并将结果发送给客户。在调试过程中,或者如果您想快速了解有关数据的更多信息,累加器非常有用。

Flink目前有以下内置累加器。它们中的每一个都实现了 Accumulator 接口。

  • IntCounterLongCounter DoubleCounter:请参阅下面的使用计数器的示例。
  • 直方图:离散数量的区间的直方图实现。在内部,它只是一个从Integer到Integer的映射。您可以使用它来计算值的分布,例如字数统计程序的每行字数的分布。

如何使用累加器:

首先,您必须在要使用它的用户定义转换函数中创建累加器对象(此处为计数器)。

1
private IntCounter numLines = new IntCounter();复制ErrorOK!

其次,您必须注册累加器对象,通常在_富_函数的open()方法中 。在这里您还可以定义名称。

1
getRuntimeContext().addAccumulator("num-lines", this.numLines);复制ErrorOK!

您现在可以在 算子函数中的任何位置使用累加器,包括在open()close()方法中。

1
this.numLines.add(1);复制ErrorOK!

整个结果将存储在JobExecutionResultexecute()运行环境的方法返回的对象中(当前这仅在执行等待作业完成时才有效)。

1
myJobExecutionResult.getAccumulatorResult("num-lines")复制ErrorOK!

所有累加器每个作业共享一个命名空间。因此,您可以在作业的不同算子函数中使用相同的累加器。Flink将在内部合并所有具有相同名称的累加器。

关于累加器和迭代的注释:目前累加器的结果仅在整个作业结束后才可用。我们还计划在下一次迭代中使前一次迭代的结果可用。您可以使用 聚合器 来计算每次迭代统计信息,并根据此类统计信息确定迭代的终止。

定制累加器:

要实现自己的累加器,只需编写Accumulator接口的实现即可。如果您认为自定义累加器应与Flink一起提供,请随意创建拉取请求。

您可以选择实现 AccumulatorSimpleAccumulator

Accumulator<V,R>最灵活:它定义V要添加的值的类型R,以及最终结果的结果类型。例如,对于直方图,V是数字并且R是直方图。SimpleAccumulator适用于两种类型相同的情况,例如计数器。

第一章 DataStreamAPI

1.1 框架

数据源 - 输入

调用:StreamExecutionEnvironment.addSource(sourceFunction)

自定义:addSource() 接口

1
源是您的程序从中读取输入的位置。您可以使用将附加源附加到程序StreamExecutionEnvironment.addSource(sourceFunction)。Flink附带了许多预先实现的源函数,但您可以通过实现SourceFunction 非并行源,或通过实现ParallelSourceFunction接口或扩展 RichParallelSourceFunction并行源来编写自己的自定义源。

DataStream转换 - 操作

算子的转化:算子将一个或多个DataStream转换为新的DataStream。程序可以将多个转换组合成复杂的数据流拓扑。

算子 (apachecn.org)

转化方式:参见链接 (map、)

数据接收 - 输出

调用:数据接收器使用DataStream并将它们转发到文件,套接字,外部系统或打印它们。Flink带有各种内置输出格式,这些格式封装在DataStreams上的 算子操作后面:

  • writeAsText()/ TextOutputFormat- 按字符串顺序写入数据元。通过调用每个数据元的_toString()_方法获得字符串。
  • writeAsCsv(...)/ CsvOutputFormat- 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的_toString()_方法。
  • print()/ printToErr() - 在标准输出/标准错误流上打印每个数据元的_toString()_值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的_打印_调用。如果并行度大于1,则输出也将与生成输出的任务的标识符一起添加。
  • writeUsingOutputFormat()/ FileOutputFormat- 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • writeToSocket - 根据a将数据元写入套接字 SerializationSchema
  • addSink - 调用自定义接收器函数。Flink捆绑了其他系统(如Apache Kafka)的连接器,这些系统实现为接收器函数。

自定义:addSink() 接口

其他操作

控制延迟

目的:控制吞吐量 和 延迟

默认情况下,数据元不会逐个传输到网络上(这会导致不必要的网络流量),但会被缓冲。可以在Flink配置文件中设置缓冲区的大小(实际在计算机之间传输)。虽然此方法适用于优化吞吐量,但当传入流速度不够快时,可能会导致延迟问题。要控制吞吐量和延迟,您可以env.setBufferTimeout(timeoutMillis)在运行环境(或单个 算子)上使用以设置缓冲区填充的最长等待时间。在此之后,即使缓冲区未满,也会自动发送缓冲区。此超时的默认值为100毫秒。

用法:

1
2
3
4
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

迭代 - 迭代器

​ 迭代流程序实现步进函数并将其嵌入到IterativeStream。由于DataStream程序可能永远不会完成,因此没有最大迭代次数。相反,您需要指定流的哪个部分反馈到迭代,哪个部分使用split转换或转发到下游filter。在这里,我们展示了使用过滤器的示例。首先,我们定义一个IterativeStream.

例如,这里是从一系列整数中连续减去1直到它们达到零的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});

收集数据源

即:形如。List<Tuple2<String, Integer>> 等特殊集合类型,转为 DataStream

Flink提供了特殊的数据源,这些数据源由Java集合支持,以方便测试。一旦程序经过测试,源和接收器可以很容易地被读取/写入外部系统的源和接收器替换。

集合数据源可以使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

迭代器数据接收器

Flink还提供了一个接收器,用于收集DataStream结果以进行测试和调试。它可以使用如下:

1
2
3
4
import org.apache.flink.streaming.experimental.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

1.2关键词下包含的概念

1.2.1 算子

视窗- window - 窗口

被Keys化与非被Keys化Windows

要指定的第一件事是您的流是否应该键入。必须在定义窗口之前完成此 算子操作。使用the keyBy(...)将您的无限流分成逻辑被Key化的数据流。如果keyBy(...)未调用,则表示您的流不是被Keys化的。

对于被Key化的数据流,可以将传入事件的任何属性用作键(此处有更多详细信息)。拥有被Key化的数据流将允许您的窗口计算由多个任务并行执行,因为每个逻辑被Key化的数据流可以独立于其余任务进行处理。引用相同Keys的所有数据元将被发送到同一个并行任务。

在非被Key化的数据流的情况下,您的原始流将不会被拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,_即_并行度为1。

区别:窗口 可并行 与 不可并行

被Keys化Windows

1
2
3
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"

非被Keys化Windows

1
2
stream
.windowAll(...) <- required: "assigner"

窗口分配器 - windowAssigner

指定窗口类型

翻滚的Windows – 固定窗口(tumbling)

一个_翻滚窗口_分配器的每个数据元分配给指定的窗口_的窗口大小_。翻滚窗具有固定的尺寸,不重叠。例如,如果指定大小为5分钟的翻滚窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口,如下图所示。

img

以下代码段显示了如何使用翻滚窗口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DataStream<T> input = ...;

// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);

// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);复制ErrorOK!

时间间隔可以通过使用一个指定Time.milliseconds(x)Time.seconds(x)Time.minutes(x),等等。

如上一个示例所示,翻滚窗口分配器还采用可选offset 参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时翻滚窗口划时代对齐,这是你会得到如窗口 1:00:00.000 - 1:59:59.9992:00:00.000 - 2:59:59.999等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.9992:15:00.000 - 3:14:59.999等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量Time.hours(-8)

滑动窗口

该_滑动窗口_分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,_窗口大小_由_窗口大小_参数配置。附加的_窗口滑动_参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,数据元被分配给多个窗口。

例如,您可以将大小为10分钟的窗口滑动5分钟。有了这个,你每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件,如下图所示。

img

以下代码段显示了如何使用滑动窗口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DataStream<T> input = ...;

// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);

// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);复制ErrorOK!

时间间隔可以通过使用一个指定Time.milliseconds(x)Time.seconds(x)Time.minutes(x),等等。

如上一个示例所示,滑动窗口分配器还采用可选offset参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时窗口半小时滑动与时代一致,那就是你会得到如窗口 1:00:00.000 - 1:59:59.9991:30:00.000 - 2:29:59.999等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.9991:45:00.000 - 2:44:59.999等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量Time.hours(-8)

会话窗口

在_会话窗口_中按活动会话分配器组中的数据元。与_翻滚窗口_和_滑动窗口_相比,会话窗口不重叠并且没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到数据元时,_即_当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态_会话间隙_或 _会话间隙提取器_函数,该函数定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续数据元将分配给新的会话窗口。

img

以下代码段显示了如何使用会话窗口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
DataStream<T> input = ...;

// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);

// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);

// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);

// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);

静态间隙可以通过使用中的一个来指定Time.milliseconds(x)Time.seconds(x)Time.minutes(x),等。

通过实现SessionWindowTimeGapExtractor接口指定动态间隙。

注意由于会话窗口没有固定的开始和结束,因此它们的评估方式与翻滚和滑动窗口不同。在内部,会话窗口算子为每个到达的记录创建一个新窗口,如果它们彼此之间的距离比定义的间隙更接近,则将窗口合并在一起。为了可合并的,会话窗口 算子操作者需要一个合并触发器和一个合并 的窗函数,如ReduceFunctionAggregateFunction,或ProcessWindowFunctionFoldFunction不能合并。)

全局Windows

一个_全局性的窗口_分配器分配使用相同的Keys相同的单个的所有数据元_全局窗口_。此窗口方案仅在您还指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有我们可以处理聚合数据元的自然结束。

img

以下代码段显示了如何使用全局窗口。

1
2
3
4
5
6
DataStream<T> input = ...;

input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);

窗口函数 - Function

定义窗口分配器后,我们需要指定要在每个窗口上执行的计算。这是_窗口函数_的职责,_窗口函数_用于在系统确定窗口准备好进行处理后处理每个(可能是被Keys化的)窗口的数据元(请参阅Flink如何确定窗口何时准备好的触发器)。

的窗函数可以是一个ReduceFunctionAggregateFunctionFoldFunctionProcessWindowFunction。前两个可以更有效地执行(参见[State Size](#state size)部分),因为Flink可以在每个窗口到达时递增地聚合它们的数据元。A ProcessWindowFunction获取Iterable窗口中包含的所有数据元以及有关数据元所属窗口的其他元信息。

具有a的窗口转换ProcessWindowFunction不能像其他情况一样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的_所有_数据元。这可以通过组合来减轻ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction以获得两个窗口元件的增量聚合并且该附加元数据窗口 ProcessWindowFunction接收。我们将查看每个变体的示例。

触发器

A Trigger确定何时_窗口函数_准备好处理窗口(由_窗口分配器_形成)。每个都有默认值。如果默认触发器不符合您的需要,您可以使用指定自定义触发器。WindowAssigner``Trigger``trigger(...)

触发器界面有五种方法可以Trigger对不同的事件做出反应:

  • onElement()为添加到窗口的每个数据元调用该方法。
  • onEventTime()在注册的事件时间计时器触发时调用该方法。
  • onProcessingTime()在注册的处理时间计时器触发时调用该方法。
  • onMerge()方法与状态触发器相关,并且当它们的相应窗口合并时合并两个触发器的状态,_例如_当使用会话窗口时。
  • 最后,该clear()方法在移除相应窗口时执行所需的任何动作。

关于上述方法需要注意两点:

1)前三个决定如何通过返回a来对其调用事件进行 算子操作TriggerResult。该 算子操作可以是以下之一:

  • CONTINUE: 没做什么,
  • FIRE:触发计算,
  • PURGE:清除窗口中的数据元,和
  • FIRE_AND_PURGE:触发计算并清除窗口中的数据元。

2)这些方法中的任何一种都可用于注册处理或事件时间计时器以用于将来的 算子操作。

火与清除

一旦触发器确定窗口已准备好进行处理,它就会触发,_即_它返回FIREFIRE_AND_PURGE。这是窗口算子发出当前窗口结果的信号。给定一个窗口,将ProcessWindowFunction 所有数据元传递给ProcessWindowFunction(可能在将它们传递给逐出器后)。窗口ReduceFunctionAggregateFunctionFoldFunction简单地发出他们急切地汇总结果。

当触发器触发时,它可以FIRE或者FIRE_AND_PURGE。虽然FIRE保持了窗口的内容,FIRE_AND_PURGE删除其内容。默认情况下,预先实现的触发器只是FIRE没有清除窗口状态。

注意清除将简单地删除窗口的内容,并将保存有关窗口和任何触发状态的任何潜在元信息。

逐出器 -Evictor

Flink的窗口模型允许指定Evictor除了WindowAssigner和之外的可选项Trigger。这可以使用evictor(...)方法(在本文档的开头显示)来完成。所述逐出器必须从一个窗口中删除数据元的能力_之后_触发器触发和_之前和/或之后_被施加的窗口函数。为此,该Evictor接口有两种方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); 复制ErrorOK!

evictBefore()包含窗口函数之前被施加驱逐逻辑,而evictAfter() 包含窗口函数之后要施加的一个。在应用窗口函数之前被逐出的数据元将不会被处理。

Flink带有三个预先实施的驱逐者。这些是:

  • CountEvictor:保持窗口中用户指定数量的数据元,并从窗口缓冲区的开头丢弃剩余的数据元。
  • DeltaEvictor:取a DeltaFunction和a threshold,计算窗口缓冲区中最后一个数据元与其余每个数据元之间的差值,并删除delta大于或等于阈值的值。
  • TimeEvictor:以interval毫秒为单位作为参数,对于给定窗口,它查找max_ts其数据元中的最大时间戳,并删除时间戳小于的所有数据元max_ts - interval

默认默认情况下,所有预先实现的驱逐程序在窗口函数之前应用它们的逻辑。

注意指定逐出器会阻止任何预聚合,因为在应用计算之前,必须将窗口的所有数据元传递给逐出器。

注意 Flink不保证窗口中数据元的顺序。这意味着尽管逐出器可以从窗口的开头移除数据元,但这些数据元不一定是首先或最后到达的数据元。

Join

窗口连接连接两个共享公共Keys并位于同一窗口中的流的数据元。可以使用窗口分配器定义这些窗口,并对来自两个流的数据元进行评估。

然后将来自双方的数据元传递给用户定义的,JoinFunction或者FlatJoinFunction用户可以发出满足连接条件的结果。

一般用法可概括如下:

1
2
3
4
5
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)复制ErrorOK!

关于语义的一些注释:

  • 两个流的数据元的成对组合的创建表现得像内部连接,意味着如果它们没有来自要连接的另一个流的对应数据元,则不会发出来自一个流的数据元。
  • 那些关联的数据元将在其时间戳中包含仍位于相应窗口中的最大时间戳。例如,[5, 10)具有其边界的窗口将导致连接的数据元具有9作为其时间戳。

在下一节中,我们将使用一些示例性场景概述不同类型的窗口连接的行为。

窗口连接 join

差异: .window(TumblingEventTimeWindows.of(Time.seconds(2))) //

​ .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size /, Time.milliseconds(1) / slide */))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});


// --------------------------------------------------------------
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});

间隔关联

区间连接使用公共Keys连接两个流的数据元(我们现在将它们称为A和B),并且流B的数据元具有时间戳,该时间戳位于流A中数据元的时间戳的相对时间间隔中。

这也可以更正式地表达为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b是共享公共Keys的A和B的数据元。只要下限总是小于或等于上限,下限和上限都可以是负数或上限。间隔连接当前仅执行内连接。

当一对数据元传递给ProcessJoinFunction它们时,它们将被赋予ProcessJoinFunction.Context两个数据元的更大的时间戳(可以通过它访问)。

注意间隔连接当前仅支持事件时间。

img

在上面的例子中,我们连接两个流’orange’和’green’,下限为-2毫秒,上限为+1毫秒。缺省情况下,这些界限是包容性的,但.lowerBoundExclusive().upperBoundExclusive可以应用到改变行为。

再次使用更正式的表示法,这将转化为

1
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

如三角形所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
// 区间[t - 2, t + 1]
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});

Process

ProcessFunction

ProcessFunction是一个低级流处理 算子操作,可以访问所有(非循环)流应用程序的基本构建块:

  • 事件(流数据元)
  • state(容错,一致,仅在被Key化的数据流上)
  • 定时器(事件时间和处理时间,仅限被Key化的数据流)

ProcessFunction可被认为是一个FlatMapFunction可以访问Keys状态和定时器。它通过为输入流中接收的每个事件调用来处理事件。

对于容错状态,ProcessFunction可以访问Flink的被Keys化状态,可以通过其访问 RuntimeContext,类似于其他有状态函数可以访问被Keys化状态的方式。

定时器允许应用程序对处理时间和事件时间的变化作出反应。每次调用该函数processElement(...)都会获得一个Context对象,该对象可以访问数据元的事件时间戳和_TimerService_。的TimerService可用于注册为将来事件- /处理-时刻回调。达到计时器的特定时间时,将onTimer(...)调用该方法。在该调用期间,所有状态再次限定为创建计时器的键,允许计时器操纵被Keys化状态。

注意如果要访问被Keys化状态和计时器,则必须应用ProcessFunction被Key化的数据流:

1
stream.keyBy(...).process(new MyProcessFunction())

Async – 异步

API

Flink的Async I / O API允许用户将异步请求客户端与数据流一起使用。API处理与数据流的集成,以及处理顺序,事件时间,容错等。

假设有一个目标数据库的异步客户端,则需要三个部分来实现对数据库的异步I / O流转换:

  • 的实现AsyncFunction是把请求分派
  • 一个_回调_,它接受 算子操作的结果并将其交给ResultFuture
  • 在DataStream上应用异步I / O 算子操作作为转换

以下代码示例说明了基本模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;

@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}

@Override
public void close() throws Exception {
client.close();
}

@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);

// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
CompletableFuture.supplyAsync(new Supplier<String>() {

@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

超时处理

当异步I / O请求超时时,默认情况下会引发异常并重新启动作业。如果要处理超时,可以覆盖该AsyncFunction#timeout方法。

结果顺序

AsyncFunction一些未定义的顺序经常完成的并发请求,基于哪个请求首先完成。为了控制发出结果记录的顺序,Flink提供了两种模式:

  • 无序:异步请求完成后立即发出结果记录。在异步I / O 算子之后,流中记录的顺序与之前不同。当使用_处理时间_作为基本时间特性时,此模式具有最低延迟和最低开销。使用AsyncDataStream.unorderedWait(...)此模式。
  • Ordered:在这种情况下,保存流顺序。结果记录的发出顺序与触发异步请求的顺序相同( 算子输入记录的顺序)。为此,算子缓冲结果记录,直到其所有先前记录被发出(或超时)。这通常会在检查点中引入一些额外的延迟和一些开销,因为与无序模式相比,记录或结果在检查点状态下保持更长的时间。使用AsyncDataStream.orderedWait(...)此模式。

活动时间

当流应用程序与事件时间一起工作,异步I / O算子将正确处理水印。这意味着两种订单模式具体如下:

  • 无序:水印不会超过记录,反之亦然,这意味着水印建立了一个_订单边界_。记录仅在水印之间无序发出。只有在发出水印后才会发出某个水印后发生的记录。反过来,只有在水印发出之前输入的所有结果记录之后才会发出水印。

    这意味着,在水印的存在,将_无序的_方式介绍了一些相同的延迟和管理开销的_订购_模式一样。开销量取决于水印频率。

  • Ordered:保存记录的水印顺序,就像保存记录之间的顺序一样。与_处理时间_相比,开销没有显着变化。

请记住,_摄取时间_是_事件时间的_一种特殊情况,其中自动生成的水印基于源处理时间。

容错保证

异步I / O 算子提供完全一次的容错保证。它将检查点中的飞行中异步请求的记录存储起来,并在从故障中恢复时恢复/重新触发请求。

实施技巧

对于实现_期货_有一个_执行人_(或_执行上下文_在Scala)回调,我们建议使用DirectExecutor,因为回调通常做最少的工作,并且DirectExecutor避免了额外的线程到线程切换的开销。回调通常只将结果传递给ResultFuture,将其添加到输出缓冲区。从那里开始,包括记录发射和与检查点副本记录交互的重要逻辑无论如何都发生在专用线程池中。

A DirectExecutor可以通过org.apache.flink.runtime.concurrent.Executors.directExecutor()或 获得com.google.common.util.concurrent.MoreExecutors.directExecutor()

警告

AsyncFunction不称为多线程

我们想在这里明确指出的常见混淆AsyncFunction是不以多线程方式调用。只存在一个实例,AsyncFunction并且对于流的相应分区中的每个记录顺序调用它。除非该asyncInvoke(...)方法快速返回并依赖于回调(由客户端),否则它将不会导致正确的异步I / O.

例如,以下模式会导致阻塞asyncInvoke(...)函数,从而使异步行为无效:

  • 使用其查找/查询方法调用阻塞的数据库客户端,直到收到结果为止
  • 阻止/等待asyncInvoke(...)方法内异步客户端返回的future-type对象

1.2.2 活动时间 - time

事件时间/处理时间/摄取时间

Flink 在流处理节目中支持不同的_时间_概念。

处理时间(process time):处理算子时,使用本地 机器的 系统时钟

  • 处理时间:处理时间是指执行相应 算子操作的机器的系统时间。

    当流程序在处理时间运行时,所有基于时间的 算子操作(如时间窗口)将使用运行相应算子的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整个小时之间到达特定算子的所有记录。例如,如果应用程序在上午9:15开始运行,则第一个每小时处理时间窗口将包括在上午9:15到上午10:00之间处理的事件,下一个窗口将包括在上午10:00到11:00之间处理的事件,因此上。

    处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供最佳性能和最低延迟。但是,在分布式和异步环境中,处理时间不提供确定性,因为它容易受到记录到达系统的速度(例如从消息队列)到记录在系统内的算子之间流动的速度的影响。和停电(预定或其他)。

事件时间(event time): - 配合水位线,管理区间内的事件;可以无序到达

  • 事件时间:事件时间是每个事件在其生产设备上发生的时间。此时间通常在进入Flink之前嵌入记录中,并且 可以从每个记录中提取该_事件时间戳_。在事件时间,时间的进展取决于数据,而不是任何挂钟**。事件时间程序必须指定如何生成_事件时间水印(水位线)_**,这是表示事件时间进度的机制。该水印机制在下面的后面部分中描述。

    在一个完美的世界中,事件时间处理将产生完全一致和确定的结果,无论事件何时到达,或者它们的排序。但是,除非事件已知按顺序到达(按时间戳),否则事件时间处理会在等待无序事件时产生一些延迟。由于只能等待一段有限的时间,因此限制了确定性事件时间应用程序的可能性。

    假设所有数据都已到达,事件时间 算子操作将按预期运行,即使在处理无序或延迟事件或重新处理历史数据时也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有落入该小时的事件时间戳的所有记录,无论它们到达的顺序如何,或者何时处理它们。(有关更多信息,请参阅有关迟发事件的部分。)

    请注意,有时当事件时间程序实时处理实时数据时,它们将使用一些处理时间 算子操作,以确保它们及时进行。

摄取时间:Data Source生成时的时间

  • 摄取时间(ingestion time):摄取时间是事件进入Flink的时间。在源算子处,每个记录将源的当前时间作为时间戳,并且基于时间的 算子操作(如时间窗口)引用该时间戳。

    _摄取时间_在概念上位于_事件时间_和_处理时间之间_。与_处理时间_相比 ,它稍贵一些,但可以提供更可预测的结果。因为 _摄取时间_使用稳定的时间戳(在源处分配一次),所以对记录的不同窗口 算子操作将引用相同的时间戳,而在_处理时间中,_每个窗口算子可以将记录分配给不同的窗口(基于本地系统时钟和任何运输延误)。

    与_事件时间_相比,_摄取时间_程序无法处理任何无序事件或后期数据,但程序不必指定如何生成_水印_。

    在内部,_摄取时间_与_事件时间_非常相似,但具有自动时间戳分配和自动水印生成函数。

img

时间戳分配器/水印生成器

时间戳分配器获取流并生成带有带时间戳数据元和水印的新流。如果原始流已经有时间戳和/或水印,则时间戳分配器会覆盖它们。

时间戳分配器通常在数据源之后立即指定,但并非严格要求这样做。例如,常见的模式是在时间戳分配器之前解析(_MapFunction_)和过滤(_FilterFunction_)。在任何情况下,需要在事件时间的第一个 算子操作之前指定时间戳分配器(例如第一个窗口 算子操作)。作为一种特殊情况,当使用Kafka作为流式传输作业的源时,Flink允许在源(或消费者)本身内指定时间戳分配器/水印发射器。有关如何 算子操作的更多信息,请参阅 Kafka Connector文档

注意:本节的其余部分介绍了程序员必须实现的主要接口,以便创建自己的时间戳提取器/水印发射器。要查看Flink附带的预先实现的提取器,请参阅 预定义的时间戳提取器/水印发射器页面。http://flink.apachecn.org/#/docs/1.7-SNAPSHOT/17?id=tab_scala_2)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);

具有递增时间戳的分发者

_定期_水印生成的最简单的特殊情况是给定源任务看到的时间戳按升序发生的情况。在这种情况下,当前时间戳始终可以充当水印,因为没有更早的时间戳会到达。

请注意,_每个并行数据源任务_只需要提升时间戳。例如,如果在特定设置中,一个并行数据源实例读取一个Kafka分区,则只需要在每个Kafka分区中时间戳递增。当并行流被混洗,联合,连接或合并时,Flink的水印合并机制将生成正确的水印。http://flink.apachecn.org/#/docs/1.7-SNAPSHOT/18?id=tab_scala_0)

1
2
3
4
5
6
7
8
9
10
DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {

@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});

允许固定数量的迟到的分配者

定期水印生成的另一个例子是当水印滞后于在流中看到的最大(事件 - 时间)时间戳一段固定的时间。这种情况涵盖了预先知道流中可能遇到的最大延迟的情况,例如,当创建包含时间戳在固定时间段内扩展的数据元的自定义源以进行测试时。对于这些情况,Flink提供了BoundedOutOfOrdernessTimestampExtractor作为参数的参数maxOutOfOrderness,即在计算给定窗口的最终结果时,在忽略数据元之前允许数据元延迟的最长时间。延迟对应于结果t - t_w,其中t是数据元的(事件 - 时间)时间戳,以及t_w前一个水印的时间戳。如果lateness > 0然后,该数据元被认为是迟到的,并且在计算其对应窗口的作业结果时默认被忽略。有关 使用延迟数据元的更多信息,请参阅有关允许延迟的文档。

1
2
3
4
5
6
7
8
9
10
DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {

@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});

1.2.3 状态和容错 – status / fault- tolerant

检查点

启用和配置检查点

默认情况下,禁用检查点。为了使检查点,调用enableCheckpointing(n)StreamExecutionEnvironment,其中 N 是以毫秒为单位的检查点间隔。

检查点的其他参数包括:

  • _完全一次与至少一次_:您可以选择将模式传递给enableCheckpointing(n)方法,以在两个保证级别之间进行选择。对于大多数应用来说,恰好一次是优选的。至少一次可能与某些超低延迟(始终为几毫秒)的应用程序相关。

  • _checkpoint timeout(检查点超时)_:如果当前检查点未完成,则中止检查点的时间。

  • _检查点之间的最短时间_:为确保流应用程序在检查点之间取得一定进展,可以定义检查点之间需要经过多长时间。如果将此值设置为例如_5000_,则无论检查点持续时间和检查点间隔如何,下一个检查点将在上一个检查点完成后的5秒内启动。请注意,这意味着检查点间隔永远不会小于此参数。

    通过定义“检查点之间的时间”而不是检查点间隔来配置应用程序通常更容易,因为“检查点之间的时间”不易受检查点有时需要比平均时间更长的事实的影响(例如,如果目标存储系统暂时很慢)。

    请注意,此值还表示并发检查点的数量为_一_。

  • _并发检查点数_:默认情况下,当一个检查点仍处于运行状态时,系统不会触发另一个检查点。这可确保拓扑不会在检查点上花费太多时间,也不会在处理流方面取得进展。可以允许多个重叠检查点,这对于具有特定处理延迟的管道(例如,因为函数调用需要一些时间来响应的外部服务)而感兴趣,但是仍然希望执行非常频繁的检查点(100毫秒) )在失败时重新处理很少。

    当定义检查点之间的最短时间时,不能使用此选项。

  • _外部化检查点_:您可以将_外围检查点_配置为外部持久化。外部化检查点将其元数据写入持久存储,并且在作业失败时_不会_自动清除。这样,如果您的工作失败,您将有一个检查点可以从中-recovery。有关外部化检查点部署说明中有更多详细信息。

  • _关于检查点错误的失败/继续任务_:这确定如果在执行任务的检查点过程中发生错误,任务是否将失败。这是默认行为。或者,当禁用此选项时,任务将简单地拒绝检查点协调器的检查点并继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

1.2.4 管理状态的自定义序列化

如果您的应用程序使用Flink的托管状态,则可能需要为特殊用例实现自定义序列化逻辑。

此页面的目标是需要对其状态使用自定义序列化的用户,包括如何提供自定义序列化程序以及如何处理序列化程序的升级以实现兼容性。如果您只是使用Flink自己的序列化程序,则此页面无关紧要,可以跳过。

使用自定义序列化器

如上例所示,当注册托管算子或被Keys化状态时,StateDescriptor需要指定状态名称以及有关状态类型的信息。Flink的类型序列化框架使用类型信息为 状态创建适当的序列化程序。

也可以完全绕过这个并让Flink使用您自己的自定义序列化程序来序列化托管状态,只需直接StateDescriptor使用您自己的TypeSerializer实现实例化:

1
2
3
4
5
6
7
8
public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"state-name",
new CustomTypeSerializer());

checkpointedState = getRuntimeContext().getListState(descriptor);

1.2.5 旁路输出

除了DataStream 算子操作产生的主流之外,您还可以生成任意数量的附加旁路输出结果流。结果流中的数据类型不必与主流中的数据类型匹配,并且不同旁路输出的类型也可以不同。当您希望拆分通常必须复制流的数据流,然后从每个流中过滤掉您不希望拥有的数据时,此 算子操作非常有用。

使用旁路输出时,首先需要定义一个OutputTag用于标识旁路输出流的方法:

1
2
// this needs to be an anonymous inner class, so that we can analyze the type
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

注意如何OutputTag根据旁路输出流包含的数据元类型键入。

可以通过以下函数将数据发送到旁路输出:

您可以使用Context在上述函数中向用户公开的参数将数据发送到由a标识的旁路输出OutputTag。以下是从以下位置发出旁路输出数据的示例ProcessFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DataStream<Integer> input = ...;

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {

@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// emit data to regular output
out.collect(value);

// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});

使用getSideOutput(OutputTag)DataStream 算子操作结果上使用的旁路输出流。这将为您提供一个DataStream输入到旁路输出流的结果:

1
2
3
4
5
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> mainDataStream = ...;

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

1.2.6 Stream Connect

内置连接器

内置Connector 是 Flink 内置的连接器,用于将 Flink 与外部系统进行集成。 Flink 提供多种内置 Connector,如Kafka、Kinesis、Cassandra、Elasticsearch、JDBC 等。

这些内置 Connector 都是一些通用的应用程序接口(API)的实现,Flink 用户可以将这些标准的内置 Connector 用于将 Flink 与其他的数据源或数据汇进行无缝集成。利用这些内置 Connector,Flink 用户可以方便地通过API将数据源和数据汇插入到现有的 Flink 应用程序中,而不必编写自己的代码从外部系统中读取或写入数据。

在 Flink 应用程序中,用户可以使用内置 Connector 根据需要特定于源和汇的属性进行配置。这些 Connector 会自动与 Flink 应用程序中的 State 和 Checkpointing 机制集成,以确保应用程序的一致性和准确性。由于内置 Connector 被广泛使用并得到了社区的强烈支持,因此大多数情况下,Flink 用户可以使用这些内置 Connector 解决数据集成的大多数常见问题,并轻松地将 Flink 与外部系统进行无缝集成和扩展。

SQL获取内置连接器信息

如果您想从 SQL 中获取 Flink 内置连接器的信息,可以使用 Flink SQL CLI 工具或通过 Flink WebUI 进行查询。以下是使用 Flink SQL CLI 工具的示例:

  1. 启动 Flink SQL CLI 工具。您可以通过以下命令启动 Flink SQL CLI 工具:

    1
    ./bin/sql-client.sh embedded -l /path/to/log

    注意:将上述命令中的/path/to/log替换为您希望日志文件所在的路径。

  2. 一旦连接成功,在 Flink SQL CLI 工具中,您可以使用以下命令获取当前可用的内置 Connector 列表:

    1
    SHOW CATALOGS;

    这将显示所有可用的 Flink 连接器和目录信息。如果使用内置连接器,则应看到以下目录之一:

    • default_catalog:用于默认目录;
    • kafka:用于 Apache Kafka 连接器;
    • elasticsearch:用于 Elasticsearch 连接器;
    • jdbc:用于 JDBC 连接器;
    • cassandra:用于 Cassandra 连接器;
    • kinesis:用于 Amazon Kinesis 连接器;
  3. 如果要获取特定内置 Connector 的信息,请使用以下命令:

    1
    SHOW TABLES IN <catalogname>;

    注意:将上述命令中的<catalogname>替换为您要查询的目录的名称。

    例如,在 Kafka Connector 中,您可以使用以下命令查看所有可用的主题列表:

    1
    SHOW TABLES IN kafka;

这些步骤可以让您通过 Flink SQL CLI 工具获取当前可用的 Flink 内置 Connector 的信息,并查询特定内置 Connector 的详细信息。同时,您也可以通过 Flink WebUI 进行查询,从“Catalog Management”页面查看内置连接器的信息。

Flink支持的内置连接器

,Flink 还提供了多种内置 Connector,以下是 Flink 中常用的一些内置连接器:

  • Kinesis Connector:用于将 Flink 与 Amazon Kinesis 集成。
  • Elasticsearch Connector:用于将 Flink 与 Elasticsearch 集成。
  • Cassandra Connector:用于将 Flink 与 Cassandra 集成。
  • JDBC Connector:用于将 Flink 与关系型数据库(如 MySQL、PostgreSQL)集成。
  • HBase Connector:用于将 Flink 与 HBase 集成。
  • AWS S3 Connector:用于将 Flink 与 Amazon S3 集成。

这些内置 Connector 都是标准 API 的实现,可以方便地将 Flink 应用程序与常见的数据源或数据汇集成。有了这些内置 Connector,Flink 用户可以通过 SQL 访问外部系统中的数据,并使用 Flink 提供的强大的数据处理和流处理能力。同时,您也可以通过编写自定义连接器来将 Flink 集成到其他外部系统中,以实现更多的扩展和集成需求。

第二章 DataSet API

大致参照 DataStream API

示例程序

以下程序是WordCount的完整工作示例。您可以复制并粘贴代码以在本地运行它。您只需要在项目中包含正确的Flink库(请参见使用Flink链接)并指定导入。那你就准备好了!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class WordCountExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);

wordCounts.print();
}

public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

2.1 转化

DataSet -》 DataSet

参照api文档 :数据集转换 (apachecn.org)

2.2 容错

Flink的容错机制在出现故障时恢复程序并继续执行它们。此类故障包括机器硬件故障,网络故障,瞬态程序故障等。

批处理容错(DataSet API)

_DataSet API中_程序的容错能力通过重试失败的执行来实现。Flink在作业声明为失败之前重试执行的时间可通过_执行重试_参数进行配置。值_0_有效意味着禁用容错。

要激活容错,请将_执行重试次数_设置为大于零的值。常见的选择是值为3。

此示例显示如何配置Flink DataSet程序的执行重试。

1
2
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setNumberOfExecutionRetries(3);

重试延迟

执行重试可以配置为延迟。延迟重试意味着在执行失败后,重新执行不会立即开始,而是仅在一定延迟之后。

当程序与外部系统交互时,延迟重试可能会有所帮助,例如,在尝试重新执行之前,连接或挂起的事务应该达到超时。

您可以按如下方式为每个程序设置重试延迟(示例显示DataStream API - DataSet API的工作方式类似):

1
2
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay

第三章 Table API和SQL

Apache Flink具有两个关系API - Table API和SQL - 用于统一流和批处理。 Table API是Scala和Java语言集成查询API,允许以非常直观的方式组合来自关系 算子的查询,例如选择,过滤和连接。Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。

Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用 Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。


Flink-应用
http://example.com/2023/06/01/分布式系统-大数据/Flink笔记-新/
作者
where
发布于
2023年6月1日
许可协议