Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL( 五 )


{\"user_id\": \"662867\" \"item_id\":\"2244074\" \"category_id\": \"1575622\" \"behavior\": \"pv\" \"ts\": \"2017-11-26T01:00:00Z\"

为了模拟真实的 Kafka 数据源 , 笔者还特地写了一个 source-generator.sh 脚本(感兴趣的可以看下源码) , 会自动读取 user_behavior.log 的数据并以默认每毫秒1条的速率灌到 Kafka 的 user_behavior topic 中 。

有了数据源后 , 我们就可以用 DDL 去创建并连接这个 Kafka 中的 topic(详见 src/main/resources/q1.sql) 。

CREATE TABLE user_log (
   user_id VARCHAR
   item_id VARCHAR
   category_id VARCHAR
   behavior VARCHAR
   ts TIMESTAMP
) WITH (    'connector.type' = 'kafka' -- 使用 kafka connector    'connector.version' = 'universal'  -- kafka 版本 , universal 支持 0.11 以上的版本    'connector.topic' = 'user_behavior'  -- kafka topic    'connector.startup-mode' = 'earliest-offset' -- 从起始 offset 开始读取    'connector.properties.0.key' = 'zookeeper.connect'  -- 连接信息    'connector.properties.0.value' = 'localhost:2181'

推荐阅读