大数据时代快速SQL引擎-Impala( 八 )

下图展示了执行select t1.n1 t2.n2 count(1) as c from t1 join t2 on t1.id = t2.id join t3 on t1.id = t3.id where t3.n3 between ‘a’ and ‘f’ group by t1.n1 t2.n2 order by c desc limit 100;查询的执行逻辑 , 首先Query Planner生成单机的物理执行计划 , 如下图所示:

和大多数数据库实现一样 , 第一步生成了一个单节点的执行计划 , 利用Parquet等列式存储 , 可以在SCAN操作的时候只读取需要的列 , 并且可以将谓词下推到SCAN中 , 大大降低数据读取 。 然后执行join、aggregation、sort和limit等操作 , 这样的执行计划需要再转换成分布式执行计划 , 如下图 。

这类的查询执行流程类似于Dremel , 首先根据三个表的大小权衡使用的join方式 , 这里T1和T2使用hash join , 此时需要按照id的值分别将T1和T2分散到不同的Impalad进程 , 但是相同的id会散列到相同的Impalad进程 , 这样每一个join之后是全部数据的一部分 。 对于T3的join使用boardcast的方式 , 每一个节点都会收到T3的全部数据(只需要id列) , 在执行完join之后可以根据group by执行本地的预聚合 , 每一个节点的预聚合结果只是最终结果的一部分(不同的节点可能存在相同的group by的值) , 需要再进行一次全局的聚合 , 而全局的聚合同样需要并行 , 则根据聚合列进行hash分散到不同的节点执行merge运算(其实仍然是一次聚合运算) , 一般情况下为了较少数据的网络传输 , intermediate节点同样也是worker节点 。 通过本次的聚合 , 相同的key只存在于一个节点 , 然后对于每一个节点进行排序和TopN计算 , 最终将每一个Worker的结果返回给coordinator进行合并、排序、limit计算 , 返回结果给用户 。

推荐阅读