Flinkx 1.10.2 数据同步断点续传代码阅读
是否开启断点续传,适用于文件和Jdbc类型的数据源,kafka等流式的不支持
- BaseRichInputFormat在
open时:- 如果isRestore=true,初始化formatState的状态数据(numOfSubTask、numberRead、numberWrite、metrics)
- BaseDataReader构造时,
- 如果
isRestore=true,根据restoreColumnName获得MetaColumn,并设置restoreColumnIndex和restoreColumnType
- 如果
-
JdbcInputFormat.
buildQuerySql时,- 如果
isRestore=true,根据${restoreFilter}构建恢复sql,或根据${restoreFilter}构建增量查询sql
- 如果
-
JdbcInputFormat.
closeInternal时,- 如果isRestore=true,如果task是非正常结束(state!=running),设置
commit=false不提交事物
- 如果isRestore=true,如果task是非正常结束(state!=running),设置
- BaseRichOutputFormat.
writeSingleRecord()时,- 如果
isRestore=false || isStreamButNoWriteCheckpoint,numWriteCounter和snapshotWriteCounter的状态+1
- 如果
- BaseRichOutputFormat.
writeMultipleRecords()时,- 如果isRestore=false,
numWriteCounter的状态+rows.size
- 如果isRestore=false,
- BaseRichOutputFormat.
writeMultipleRecords()失败时,- 如果isRestore=true,直接抛出异常
- 如果isRestore=false,遍历rows执行
writeSingleRecord()
-
BaseFileOutputFormat.
actionBeforeWriteData()时- 如果
restoreConfig.isRestore() && formatState != null,将执行cleanDirtyData()清除脏数据文件
- 如果
-
BaseFileOutputFormat.
writeSingleRecordInternal()时,如果isRestore=true,isStream=false- 如果
currentRow[restoreColumnIndex]==lastRow[restoreColumnIndex],则readyCheckpoint=true
- 如果
-
BaseFileOutputFormat.
getFormatState()获取当前通道的recover point时,- 如果
isRestore=false || lastRow==null时直接返回 - 如果
isRestore=true || !lastRow时,如果isStream=true||readyCheckpoint=true时,执行flushData()持久化数据,并更新状态数据 flushData时,如果isRestore=true,执行moveTemporaryDataBlockFileToDirectory()移动临时数据文件
- 如果
-
BaseFileOutputFormat.
nextBlock()时,如果isRestore=true,- currentBlockFileName前缀附加
.号
- currentBlockFileName前缀附加
-
BaseFileOutputFormat.
closeInternal()时,- 如果task是非正常结束(state!=running),如果
isRestore=false,执行clearTemporaryDataFiles清除临时数据文件; - 如果task是正常结束(state==running),执行
flushData(),如果isRestore=false,主动执行执行moveTemporaryDataBlockFileToDirectory()移动临时数据
- 如果task是非正常结束(state!=running),如果
-
BaseFileOutputFormat.
tryCleanupOnError()时,- 如果
isRestore=false,执行clearTemporaryDataFiles清除临时数据文件;
- 如果
- orc/parquetOutputFormat.
writeSingleRecordToFile()时,- 如果如果isRestore=true,设置lastRow=row
- JdbcOutputFormat.
writeMultipleRecordsInternal()时,- 如果如果isRestore=true,如果
lastRow!=null,设置readyCheckpoint,设置lastRow=row
- 如果如果isRestore=true,如果
是否开启适时传输:用于控制写入数据的时机
bool isStream= (isRestore=true && isStream=true)
BaseDataReader构造时,
- 如果
isStream=true,直接返回,否则判断如果isRestore=true则根据restoreColumnName获得MetaColumn,并设置restoreColumnIndex和restoreColumnType
-
BaseFileOutputFormat.
writeSingleRecordInternal()时,如果isRestore=true,isStream=false- 如果
currentRow[restoreColumnIndex]==lastRow[restoreColumnIndex],则readyCheckpoint=true
- 如果
-
BaseFileOutputFormat.
getFormatState()获取当前通道的recover point时,- 如果
isStream=true||readyCheckpoint时,执行flushData()持久化数据,并更新状态数据
- 如果
-
BaseFileOutputFormat.
afterCloseInternal时- 如果
isStream=true,直接关闭数据源; - 如果
isStream=false,waitForAllTasksToFinish()等待所有通道操作完成,moveAllTemporaryDataFileToDirectory移动临时数据文件,关闭数据源,执行clearTemporaryDataFiles清除临时数据文件;
- 如果