如何正确使用Flink Connector?(12)

  • commit offset 方式

Flink kafka consumer commit offset 方式需要区分是否开启了 checkpoint 。

如果 checkpoint 关闭 , commit offset 要依赖于 kafka 客户端的 auto commit 。 需设置 enable.auto.commit , auto.commit.interval.ms 参数到 consumer properties , 就会按固定的时间间隔定期 auto commit offset 到 kafka 。

如果开启 checkpoint , 这个时候作业消费的 offset 是 Flink 在 state 中自己管理和容错 。 此时提交 offset 到 kafka , 一般都是作为外部进度的监控 , 想实时知道作业消费的位置和 lag 情况 。 此时需要 setCommitOffsetsOnCheckpoints 为 true 来设置当 checkpoint 成功时提交 offset 到 kafka 。 此时 commit offset 的间隔就取决于 checkpoint 的间隔 , 所以此时从 kafka 一侧看到的 lag 可能并非完全实时 , 如果 checkpoint 间隔比较长 lag 曲线可能会是一个锯齿状 。

推荐阅读