如何正确使用Flink Connector?( 九 )


message String topic int partition long offset) 函数 。

另外 Flink 中也提供了一些常用的序列化反序列化的 schema 类 。 例如 , SimpleStringSchema , 按字符串方式进行序列化、反序列化 。 TypeInformationSerializationSchema , 它可根据 Flink 的 TypeInformation 信息来推断出需要选择的 schema 。 JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息 , 并返回 ObjectNode , 可以使用 .get(“property”) 方法来访问相应字段 。

  • 消费起始位置设置

如何设置作业从 kafka 消费数据最开始的起始位置 , 这一部分 Flink 也提供了非常好的封装 。 在构造好的 FlinkKafkaConsumer 类后面调用如下相应函数 , 设置合适的起始位置 。