本文介绍了druid的基础架构以及工作过程,通过一个应用案例加深了解。

durid简介

druid是一种高性能、列式存储、分布式数据存储的时序数据分析引擎。能支持“PB”级数据的秒级查询。类似的产品有kylin/clickhouse。druid典型的应用就是OLAP场景下的cube组合查询分析。如数据钻取(Drill-down)、上卷(Roll-up)、切片(Slice)、切块(Dice)以及旋转(Pivot)。后面的应用示例章节再详细阐述。

durid基础架构

先来了解一下durid主要节点:

1、broker node(代理节点)

Broker节点扮演着历史节点和实时节点的查询路由的角色。主要负责接收外部查询,转发查询至各个segment数据所在的节点,并聚合结果返回。

2、historical node(历史节点)

historical主要负责历史数据存储和查询,接收协调节点数据加载与删除指令,从deepstoage中下载segment,完成数据加载或者删除后在zk中进行通告。历史节点遵循shared-nothing的架构,因此节点间没有单点问题。节点间是相互独立的并且提供的服务也是简单的,它们只需要知道如何加载、删除和处理不可变的segment。historical节点也可以进行分组,组合成不同的historical tier。这会在集群规模较大的时候体现出优势。如做数据的冷热分离,按不同业务的数据分离(一定程度的资源隔离)。当然,historical 节点是整个集群查询性能的核心所在,因为historical会承担绝大部分的segment查询。

3、coordinator node(协调节点)

主要负责数据的管理和在历史节点上的分布。协调节点告诉历史节点加载新数据、卸载过期数据、复制数据、和为了负载均衡移动数据。可以配置load数据及drop数据规则。

4、overlord node(index service 可以理解为任务管理节点)

功能描述:负责接收任务,管理任务。接收外部http请求(新建任务、查询任务状态、kill任务等),分配管理任务(当有新的任务请求,overload node会将任务分配给middleManager node去执行)。

5、middleManager node(可以理解为overlord节点的工作节点)

功能描述:可以启动n(可配置)个peon,接收overlord分配的task,再交给自己peon去执行。

查询过程

见上图蓝色箭头,Broker节点接收到查询(Q1),再将查询发送给历史节点与实时节点(Q2,Q3),在上图的模式中,实时节点是MM节点上启动的task。该task会负责数据的摄入以及提供实时数据的查询。

数据摄入过程

见上图红色箭头,D1是client生产数据最终写入kafka(这个过程可能在client与kafka的中间,还包含了多个环节,如数据传输与数据清洗),D2和D3过程是部署tranquility-kafka服务,消费kafka数据写入对应的task,tranquility-kakfa启动的时候会跟overlord节点通信,由overlord节点分配任务给middleManager执行。D4是task 负责的segment段正常结束,然后将segment数据写入deepstorage过程。(实时task运行时间是segmentGranularity+windowPeriod+intermediatePersistPeriod)。D5则是historical节点从deepstorage下载segment并在zk中声明负责该segment段查询的过程。

目前druid数据摄入过程还有一种更推荐的方式就是kafka index service(简称kis),有兴趣的同学可以参考官方文档,kis对kafka的版本有强要求。

druid整体架构虽然略为复杂,但是整体稳定性非常不错,几乎很少出现集群故障。抛开集群硬件故障和数据本身问题,SLA基本能到4个9。coordinator,overlord两个节点是主从模式,保证每个角色起两个实例即可。broker节点无状态,可以起多个实例,前面挂个域名即可(为了保证缓存命中,最好配置ip hash)。historical节点无状态,有一定冗余即可。middleManager用作数据摄入节点,若task没有配置副本,则节点宕机会引发丢数据的风险。当然,kis可以避免该问题。

durid数据聚合、存储核心思想

druid 数据存储分为三部分timestamp、dimensions、metrics。其中,timestamp、metrics部分是采用lz4直接压缩。

但是dimensions部分需要支持过滤查询以及分组查询。所以dimensions部分的每个维度都采用了以下三种数据结构做转码、存储:

A dictionary that maps values (which are always treated as strings) to integer IDs,For each distinct value in the column,a bitmap that indicates which rows contain that value,andA list of the column’s values,encoded using the dictionary in 1

举个例子,源数据如下:

name列来说

1. Dictionary that encodes column values

字典表的key都是唯一的,所以Map的key是unique的column value,Map的value从0开始不断增加。 示例数据的name列只有两个不同的值。所以张三编号为0,李四编号为1:

{ "张三": 0 "李四": 1 }

2. Column data

要保存的是每一行中这一列的值,值是ID而不是原始的值。因为有了上面的Map字典,所以有下面的对应关系:

[0,

1,

1,

0]

3. Bitmaps - one for each unique value of the column

BitMap的key是第一步Map的key(原始值), value则是真假的一个标识(是|否?等于|不等于?),取值只有0、1,如下:

value="张三": [1,0,0,1]

value=“李四": [0,1,1,0]

所以由上可知最坏的情况可能是随着数据量的增加,bitmap的个数也成线性增长,为数据量大小*列的个数。那么在什么情况下会导致这种线性增长?这里我们引入了一个基数(cardinality)的概念。基数=unique(dim1,dim2.....),如若dim取值均为各种爆炸性id或者随机数,则druid的预聚合将完全失去意义。所以在druid的应用场景中,基数约小,聚合效率越高。

讲了dimensions怎么存储,那么metrics又是怎么聚合(roll-up)呢?这就要引入druid数据schema定义了。下一章结合应用一块看一个示例。

应用示例与实践经验

假设有这样一份数据,典型的商品销售数据。

我们构造成druid中的数据schema如下:

{ "dataSources" : [ { "spec" : { "dataSchema" : { "dataSource" : "test_datasource", "granularitySpec" : { "segmentGranularity" : "hour", "queryGranularity" : "minute", "type" : "uniform" }, "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "column" : "time", "format" : "auto" }, "dimensionsSpec" : { "dimensions" : [ "productName", "city", "channel", “action"] } } }, "metricsSpec" : [ { "name" : "count", "type" : "count" }, { "type" : "doubleSum", "fieldName" : "price", "name" : “sale" } ] }, "tuningConfig" : { "type" : "realtime", "windowPeriod" : "PT10M", "intermediatePersistPeriod" : "PT10M", "maxRowsInMemory" : "100000" } }, "properties" : { "topicPattern" : "test_datasource", "task.partitions" : "2", "task.replicants" : "1" } } ], "properties" : { ... } }

前面重点说了dimensions,我们再来看下metrics。在上面的例子中我们只定义count和针对price的doubleSum,那么这些指标就已经固定了后期的分析需求。我们看到上面table中的一二行标红部分,所有dim取值完全相同,queryGranularity为一分钟。那么在这2018-06-11 12:23:00这个点,这两行数据就被聚合成一行,count=2,sale=0。以此类推。

然后我们再来看看具体的分析需求,一个钻取的例子。我们首先查看商品A昨天的点击量,select sum(count) from table where productName=‘A’ and action=‘click',再想看看地区=北京,渠道=web呢?是不是再加几个where就搞定了?select sum(count) from table where productName=‘A’ and city=‘北京’ and channel=‘web' and action=‘click’; 然后就是切片和切块,也很简单,就是几个group by。这些在druid中都能非常轻松的支持。

具体使用上的经验总结:

1. reindex思想。一般我们实时数据查询粒度配置的会比较小,秒级或者分钟级。那么对于一天前,三天前,一个月前的数据呢?这时候一般关注的粒度将不再那么细,所以我们一般会采取redinx的策略进行再聚合

2. 针对历史数据,可能对于某些维度将不在关心,这时候我们也可以在reindex时,将无用的维度剔除掉,可能大大减少整体数据的基数。

3. 一般数据压缩比例。这里提供一个大概的参考值。数据总基数在10W以下,每天数据量约百亿左右,druid中聚合后的索引数据与原始数据大小之比可以到1:100,甚至1:1000。

4. druid适用于常规的olap场景,能非常轻松的支撑每天百亿甚至千亿级别的数据写入。

5. 爆炸性维度数据,以及频繁update数据的需求,不适用于druid的场景。

总结

本文主要对druid做了入门级的基础介绍,可以给大家做olap引擎技术选型时做一个参考。以及对druid的初学者做一个大致介绍。druid是一款非常优秀的olap引擎,从性能、稳定性上来说,都是非常不错的。

查看原文 >>
相关文章