Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL( 四 )


;      String value = https://mparticle.uc.cn/api/call.operands[1
;      // 设置参数
     tEnv.getConfig().getConfiguration().setString(key value);      break;    case CREATE_TABLE:      String ddl = call.operands[0
;
     tEnv.sqlUpdate(ddl);      break;    case INSERT_INTO:      String dml = call.operands[0
;
     tEnv.sqlUpdate(dml);      break;    default:      throw new RuntimeException(\"Unsupported command: \" + call.command);
 
// 提交作业tEnv.execute(\"SQL Job\");

使用 DDL 连接 Kafka 源表

在 flink-sql-submit 项目中 , 我们准备了一份测试数据集(来自阿里云天池公开数据集 , 特别鸣谢) , 位于 src/main/resources/user_behavior.log 。 数据以 JSON 格式编码 , 大概长这个样子:

{\"user_id\": \"543462\" \"item_id\":\"1715\" \"category_id\": \"1464116\" \"behavior\": \"pv\" \"ts\": \"2017-11-26T01:00:00Z\"

推荐阅读