1. 背景

在用arthas神器来诊断hbase异常进程这篇文章中,我详细地记录了一起生产环境中使用HBase的事故,事故发生的大致起因是,一个异常scan导致CPU使用率飙升至百分之百,且巨高不下,从而导致整个集群宕机。( 用arthas神器来诊断HBase异常进程 )

虽然,借助于arthas这个神器,我们很轻易地就定位到了是scan的问题。而且事后,我们在业务层面上也采取了很多的优化手段。但是对于这个罪魁祸首,却一直没有找到一个完美的解决方案,总不能让业务那边一用scan就战战兢兢,如履薄冰吧。

在上篇文章的最后,靠着匮乏的多线程功底,对于scan这个问题,我写了写自己的看法。

image-20200601171750336

事故发生的最本质的原因是scan的线程持续地占用着CPU的资源,且不会被释放,我们在业务日志中甚至能看到,某些scan能持续几个小时,这就太令人匪夷所思了。

原因渐渐明晰,就可以对症下药了。HBase提供了几个请求队列相关的核心参数,主要用于设置读写线程配比和进行读写队列隔离,以及get和scan的队列隔离。初看这些个参数的解释,还真搞不明白其中,队列、队列隔离的概念是什么。

在没有看源码之前,我简单地以为HBase服务端处理客户端的读写请求的时候,就是两个线程池,分别处理读和写的请求。甚至,如果你不做读写隔离的话,那就是一个线程池,一个线程是处理所有的读写请求;如果设置了读写隔离,就是两个线程池分别工作;如果进一步设置了get和scan的隔离,那么,就是三个线程池。这三个线程池,分别处理写请求,get请求,scan请求。每种请求所占的线程池资源肯定是不一样的,例如:scan比较耗时,那么,就应该控制其核心线程数的大小,如果此类值设置的过大,耗时的scan请求肯定会占用大量CPU,尤其是发生全表扫描的情况。

在看了源码之后,虽然HBase server端用的不是线程池,但也差不多,反正用的是线程,:smile:。其中队列什么的,应该就是类似于线程池中的队列概念,简单点,就是一个阻塞队列,线程池容纳不了了,就丢到不同的队列中等着呗。所谓队列要分离,那更好理解了,假如现在有三个窗口,卖菜,卖馒头,卖包子的。如果不隔离,仨窗口合一个,那队伍得多长。如果隔离开,我想买包子,就去卖包子的窗口等着。

下面我从源码的角度,具体分析下请求队列的相关参数,是怎么来控制HBase服务端线程池、队列等的初始化的。自己的一些粗浅的理解,有错误下方留言我及时更正:grin:。

2. 在IDEA中搭建HBase1.2.0的源码阅读环境

虽然我们线上的HBase版本是2.1.0,但是这一块的代码,跟HBase1.2.0相比应该没有太大的改变,后续测试的时候也能证实。我是在IDEA中DEBUG HMaster 进程,在 hbase-default.xml 配置HBase请求队列的相关参数,来观察线程的初始化情况。有关HBase1.2.0怎么在IDEA中debug,公众号的历史文章中有涉及,感兴趣的朋友可以参考。

HBASE源码导入IDEA并开启DEBUG调试

3. HBase请求队列的相关参数

我在hbase-default.xml配置的参数是:

<property>
    <name>hbase.regionserver.handler.count</name>
    <value>100</value>
    <description>默认为30,服务器端用来处理用户请求的线程数。生产线上通常需要将该值调到100~200。</description>
  </property>
  <property>
    <name>hbase.ipc.server.callqueue.handler.factor</name>
    <value>0.1</value>
    <description>为0则共享全部队列,服务器端设置队列个数,假如该值为0.1,
      那么服务器就会设置handler.count * 0.1 = 30 * 0.1 = 3个队列</description>
  </property>
  <property>
    <name>hbase.ipc.server.callqueue.read.ratio</name>
    <value>0.5</value>
    <description>默认为0,服务器端设置读写业务分别占用的队列百分比以及handler百分比。
      假如该值为0.5,表示读写各占一半队列,同时各占一半handler
    </description>
  </property>
  <property>
    <name>hbase.ipc.server.callqueue.scan.ratio</name>
    <value>0.2</value>
  </property>

上述参数的配置值以及说明,参考的是范老师的HBase原理与实践——第12章。按照参数的说明,最终我们可以计算出以下值:

  • handler.count 是100

  • 队列个数是100 x 0.1 = 10

  • 写队列 10 x 0.5 = 5

  • 读队列总值  10 x 0.5 = 5

  • get 队列  5 - 5 x 0.2 = 4

  • scan队列 5 x 0.2 = 1

参数设置完毕,我们就可以在源码层面观察这些值的变化和线程以及队列是如何初始化的啦。

4. debug源码观察队列参数是如何作用的

4.1SimpleRpcScheduler

这几个参数是在哪个类中被初始化的呢?我也不知道,只能在IDEA中Find In Path,搜一下 hbase.ipc.server.callqueue.handler.factor ,找到的类是,SimpleRpcScheduler。其父类是RpcScheduler。RpcScheduler应该就是处理RPC相关的类了,该类的相关UML图如下。

SimpleRpcScheduler

RpcScheduler有四个实现,其中俩在test包下,直接忽略。剩下的SimpleRpcScheduler和FifoRpcScheduler直接看SimpleRpcScheduler吧,该类是RpcScheduler的默认实现。而我们队列的相关参数就在该类中。如图:

image-20200601181343500

接下来我们重点要看的就是,SimpleRpcScheduler的构造函数。捋一捋,其构造函数中的东西很简单,先贴一下源码。

  public SimpleRpcScheduler(Configuration conf,int handlerCount,int priorityHandlerCount,int replicationHandlerCount,PriorityFunction priority,Abortable server,int highPriorityLevel) {

    int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
        handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
    // DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10  (handlerCount * 10)

    this.priority = priority;
    this.highPriorityLevel = highPriorityLevel;
    this.abortable = server;

    String callQueueType = conf.get("hbase.ipc.server.callqueue.type", "deadline");
    float callqReadShare = conf.getFloat("hbase.ipc.server.callqueue.read.ratio", 0);
    float callqScanShare = conf.getFloat("hbase.ipc.server.callqueue.handler.factor", 0);

    float callQueuesHandlersFactor = conf.getFloat("hbase.ipc.server.callqueue.handler.factor", 0);
    int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));

    LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);

    if (numCallQueues > 1 && callqReadShare > 0) {
      // multiple read/write queues
      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
        CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
        callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
            callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
            BoundedPriorityBlockingQueue.class, callPriority);
      } else {
        callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
          callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
      }
    } else {
      // multiple queues
      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
        CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
        callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
          conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
      } else {
        callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
            numCallQueues, maxQueueLength, conf, abortable);
      }
    }

    // Create 2 queues to help priorityExecutor be more scalable.
    this.priorityExecutor = priorityHandlerCount > 0 ?
        new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null;

   this.replicationExecutor =
     replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
       replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
  }

这个构造函数其实就干了三件事,初始化了三个RpcExecutor,也就是处理RPC的线程相关的东西,

  //这是真正处理请求的
    private final RpcExecutor callExecutor;
    // 先不管
  private final RpcExecutor priorityExecutor;
    // 同步复制相关的,也不管
  private final RpcExecutor replicationExecutor;

4.2 RpcExecutor

排除干扰项,SimpleRpcScheduler帮助初始化RpcExecutor,RpcExecutor是一个抽象类,RWQueueRpcExecutor是其一个子类实现,所以,重点分析RWQueueRpcExecutor的初始化。

callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
    callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
    BoundedPriorityBlockingQueue.class, callPriority);

还是先贴一下RWQueueRpcExecutor构造函数的代码吧。

public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
      int numWriteQueues, int numReadQueues, float scanShare,
      final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
      final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
    super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));

    int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
    int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
    if ((numReadQueues - numScanQueues) > 0) {
      numReadQueues -= numScanQueues;
      readHandlers -= scanHandlers;
    } else {
      numScanQueues = 0;
      scanHandlers = 0;
    }

    this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
    this.readHandlersCount = Math.max(readHandlers, numReadQueues);
    this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
    this.numWriteQueues = numWriteQueues;
    this.numReadQueues = numReadQueues;
    this.numScanQueues = numScanQueues;
    this.writeBalancer = getBalancer(numWriteQueues);
    this.readBalancer = getBalancer(numReadQueues);
    this.scanBalancer = getBalancer(numScanQueues);

    queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
    LOG.info(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
        + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
        + numScanQueues + " scanHandlers=" + scanHandlersCount);

    for (int i = 0; i < numWriteQueues; ++i) {
      queues.add((BlockingQueue<CallRunner>)
        ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
    }

    for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
      queues.add((BlockingQueue<CallRunner>)
        ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
    }
  }

这个构造函数干的事,也很简单初始化读写请求的队列。

那么,读写请求的线程池在哪里初始化呢?

RWQueueRpcExecutor的startHandlers方法,然后最终被调用的是父类RpcExecutor中的startHandlers方法

源码细节是

protected void startHandlers(final String nameSuffix, final int numHandlers,
      final List<BlockingQueue<CallRunner>> callQueues,
      final int qindex, final int qsize, final int port) {
    final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
    for (int i = 0; i < numHandlers; i++) {
      final int index = qindex + (i % qsize);
      Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
          consumerLoop(callQueues.get(index));
        }
      });
      t.setDaemon(true);
      t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
        ",queue=" + index + ",port=" + port);
      t.start();
      LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
      handlers.add(t);
    }
  }

其中的handlers的构成是

this.handlers = new ArrayList<Thread>(handlerCount);

这里简单说明一下,hbase.regionserver.handler.count的值设置为100,那这里的handlers数量就是100,里面初始化100个线程,100个线程按照上述比例分别属于写、get、scan。

4.3 DEBUG的效果

初始化RWQueueRpcExecutor这里加一个断点。

image-20200601191342026

初始化RpcExecutor的队列比例等参数,按照上述公式计算的DEBUG结果:

image-20200601191730527

继续把断点打到RWQueueRpcExecutor的构造函数内部。

image-20200601191923979

上面有队列相关的日志输出:

2020-06-01 19:18:08,567 INFO  [main] ipc.RWQueueRpcExecutor: RW.default writeQueues=5 writeHandlers=50 readQueues=4 readHandlers=40 scanQueues=1 scanHandlers=10

对应着,在startHandlers中初始化的线程池,应该也是这样的配比。为了印证这种猜想,继续DEBUG。把断点加到RpcExecutor的startHandlers方法内部。

image-20200601192400121

最终,handlers的结果是,处理写请求的线程50个,处理读请求的线程40个,处理scan的线程数10个。DEBUG的结果呢?

image-20200601192907647
image-20200601192950152

中间虽然省略了一部分,但是真实的数据,跟我们计算的是一样的。

4.4 arthas线程诊断

那么,用arthas分析下,服务器上是不是也是这样的线程比例呢?设置同样的参数,然后重启集群,用arthas观察线程情况,结果如图:

image-20200601193419032

跟我们本地DEBUG的结果是一致的。

这个时候,我们执行一些耗时的scan请求,正常情况下,我们的scan请求仅仅会占满scan的线程,而不会打满read线程池,这样就实现了读写的隔离。运行代码测试一下。

from happybase import ConnectionPool
import time

pool = ConnectionPool(size=1, host='ip', port=9090, timeout=2000)
for i in range(200):
    start = time.time()
    try:
        with pool.connection(2000) as con:
            table = con.table("test_table_name")
            res = list(table.scan(filter="PrefixFilter('273810955|')",
                                  row_start='\x0f\x10&R\xca\xdf\x96\xcb\xe2\xad7$\xad9khE\x19\xfd\xaa\x87\xa5\xdd\xf7\x85\x1c\x81ku ^\x92k',
                                  limit=3))
    except Exception as e:
        pass
    end = time.time()
    print 'timeout: %d' % (end - start)

arthas观察线程情况:

image-20200601193815898

scan线程队列一下子就被打满了,还占用了少许read的线程,占用read线程的原因是,(猜测),scan表要get元数据表来获取region的一些信息,如果不走缓存的话。此时,我们的CPU的状况:

image-20200601194041569

CPU还是被打满,此时集群的状态是不良的,到了这里我们虽然已经做到了把scan的线程队列单独隔离开。但是过多的scan请求还是打满了我们的CPU,接下来怎们办呢?只能缩小这个scan队列试试啦,减小参数配比,控制scan的线程为1,重复上述测试。

  <property>
    <name>hbase.regionserver.handler.count</name>
    <value>30</value>
    <description>默认为30,服务器端用来处理用户请求的线程数。生产线上通常需要将该值调到100~200。</description>
  </property>
  <property>
    <name>hbase.ipc.server.callqueue.handler.factor</name>
    <value>1</value>
    <description>为0则共享全部队列,服务器端设置队列个数,假如该值为0.1,
      那么服务器就会设置handler.count * 0.1 = 30 * 0.1 = 3个队列</description>
  </property>
  <property>
    <name>hbase.ipc.server.callqueue.read.ratio</name>
    <value>0.5</value>
    <description>默认为0,服务器端设置读写业务分别占用的队列百分比以及handler百分比。
      假如该值为0.5,表示读写各占一半队列,同时各占一半handler
    </description>
  </property>
  <property>
    <name>hbase.ipc.server.callqueue.scan.ratio</name>
    <value>0.1</value>
    <description>scan请求所占的比例</description>
  </property>

此时服务端scan队列的线程为1。

image-20200601200053227

运行我们的测试代码

image-20200601200256900

等一会儿再看CPU负载,如下图

image-20200601200754199

CPU负载还是高,此时一个scan线程运行也能把CPU跑这么高?看起来,虽然我们可以随心所欲地控制scan线程的比例。但是,对于降低机器的CPU负载上好像还是没有帮助。

这个时候老大哥给了一个提示,我们的测试集群单节点CPU貌似只有8核。难道是RUNNING的这几个线程就快把核吃满了?为了验证这个猜想,我只能再进一步压缩总的线程池大小。总体来说就是给总的read更小的线程分配,例如2。也就是所有的读请求就只用2个线程。调整参数,重启,线程初始化情况如下图。

image-20200601220648486

图中可以看到,初始化的handler线程数为2。接着运行我们的测试代码。线程开始转为running状态。

image-20200601221152651

让测试代码跑一会,我们拉一个CPU的趋势,观察是否可以压制CPU的持续上升状态,然后与之前的测试结果作对比。

image-20200601221445272

CPU的负载被压制住了,与之前相比,更小的线程运行,对CPU的压力减少为之前的一半。

5. 总结

之前对这几个参数的理解仅仅浮于表面,未曾深入进去,因此特地撰写此文。从源码层面,梳理了HBase服务端处理客户端读写请求时候线程的初始化工作,加深了对这几个队列相关参数使用的同时,也发表了一些自己的想法(大佬勿喷哈)。

整个测试流程下来,所有的疑团一个接一个被打开,之前配置的这些参数之所以不生效的原因是:我们当时的handler.count为30,设置的较低,其他几个参数的调整也不合理,所以导致,写、get、scan的队列隔离没有完全体验出来。所以,在生产环境中,个人这里调参的建议是,handler.count调大,读写、scan所占的比例适当调小,最好是根据自己实际的读写场景来分配。

测试集群因为机器的核数少,所以少量的线程就能打满CPU,所以,做队列相关参数测试的时候,一定要考虑到这一点。

但是,通过压制scan来减小对集群的影响,从而减小对其他业务的影响。这种方式,却实实在在牺牲了scan的体验。难道在HBase中,所有的读写场景不应该都是公平的吗?我们是否可以着手在客户端来优化scan的查询姿势呢?还有,我们的客户端迟迟得不到服务端的相应,早已经放弃了这次查询,在某个scan的超时时间内,服务端那个查询线程为何没有及时被kill掉,使系统的资源得到及时有效的释放呢?

未完待续!

相关文章