Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL( 六 )


   'connector.properties.1.key' = 'bootstrap.servers'    'connector.properties.1.value' = 'localhost:9092'
   'update-mode' = 'append'    'format.type' = 'json'  -- 数据源格式为 json    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
)

注:可能有用户会觉得其中的 connector.properties.0.key 等参数比较奇怪 , 社区计划将在下一个版本中改进并简化 connector 的参数配置 。

使用 DDL 连接 MySQL 结果表

连接 MySQL 可以使用 Flink 提供的 JDBC connector 。 例如

CREATE TABLE pvuv_sink (
   dt VARCHAR
   pv BIGINT
   uv BIGINT
) WITH (    'connector.type' = 'jdbc' -- 使用 jdbc connector    'connector.url' = 'jdbc:mysql://localhost:3306/flink-test' -- jdbc url    'connector.table' = 'pvuv_sink' -- 表名    'connector.username' = 'root' -- 用户名    'connector.password' = '123456' -- 密码    'connector.write.flush.max-rows' = '1' -- 默认5000条 , 为了演示改为1条

推荐阅读