吐槽——公司前人自研的分布式任务调度跑不起来,最后我选择了zookeeper

背景

作为创业公司,基础组件全部自研,并且开发团队集体跑路了,留下的坑没有最大,只有更大,近日使用前人造的轮子,分布式任务调度,折腾了近两天,没跑起来,各种问题,又没文档,最后只能放弃使用,而新部署一套也不现实,然后就选择了自己造轮子

对于目前遇到的场景,有个任务,一天跑一次,一次大概2小时能跑完,想过两种方案实现分布式锁解决当前遇到的问题:

  • 使用数据库,建一张任务表,利用行锁来锁定任务,锁定成功后,将标识位设置成已占用,占用者设置成自身机器ip,锁定成功的机器通过定时任务每5秒更新一次心跳时间,任务执行完成后将标识设置成未占用,并停止心跳,其它机器通过定时任务做检查,如果15s没有更新心跳时间则认为当前任务的执行机器挂了,并立即争用
  • 使用zookeeper,为任务创建一个临时节点,如果创建成功,则表示当前机器获得了锁,定时任务在执行前先判断当前机器是否能获取到锁,获取不到则不执行

理论上两种方式都是可行的,并且各有问题,由于不想建表,所以选择了使用zk。

问题分析

初始化及正常流程

对于前面的问题,我在实现的过程中尽量保证实现的更通用一些,正常情况下,在应用创建锁时,向zk创建一个临时节点,定时任务在执行前先判断当前机器是否能获取到锁,获取不到则不执行,正常流程如下:

image

如果不出现其它异常问题,这样简单的处理将非常有效,但是出现网络异常,宕机等情况时,则没这么容易了,如果机器A挂了,机器B不获取锁,则很有可能任务不跑了

异常情况

异常情况主要表现为两种:

  1. 机器A关闭或宕机,这个时间,机器B会收到dataDelete的回调,此时机器B应该要重新创建path,作为后续任务执行的机器
  2. 机器A出现了短暂的连接中断或者机器A的会话过期,此时机器B如果重新创建path,那么机器A就失去了执行任务的权限,而此时任务可能正在执行

对于异常情况,我们保持一个原则:尽量让机器A成功创建path,获得任务的执行权,那么我们可以在机器B的handleDataDeleted回调里延迟10s后再获取锁,如果10s后机器A还没恢复,则认为机器A短时间内恢复了不了。机器A出现连接/会话过期问题时,执行流程类似于下图:

image

而获取任务与连接断开或者会话过期之间可能存在并发的情况,这个时候有两种策略:

  1. 调用方重试,如果10s内获取不到锁,则不执行,等待其它机器获取锁后执行
  2. 阻塞,直到连通zk

不管使用哪种方式都会存在一个问题,机器B上的定时任务先启动,但是获取锁失败,不执行任务,而后机器A在判断自己能否执行任务前,机器A与zk失联(可能是挂了或者会话过期),那么在机器A与zk恢复连接前,机器B可能已经等待超过10s,成功创建了锁对应的path,这个时候,任务将得不到执行,需要使用回调的方式做补尝,在回调方法里判断是否需要启动任务。

锁的状态

基于上述分析,将锁分为以下4个状态:

  • CREATED:表示刚创建,是初始状态,这个状态下,直接参与竞争,去zk上创建节点
  • OWN:表示zk的节点被自己创建成功了,当有调用过来时,如果是此状态,则再调用一次zkClient.create(path),如果成功,则表示可执行任务,避免链接断了而状态没变
  • RELEASED:表示自己没有争用成功,返回false
  • WAITING:表示已经收到了handleDataDeleted回调,在10s等待时间中,此时如果获取锁,则同样等待10s后再尝试重新获取锁的判断

增加两个回调方法:

  1. handleDataDeleted中在延迟10s后争用成功时调用,需要在此回调方法中判断是否需要启动任务,可以通过记录最近一次获取锁的时间,如果离当前时间很近(比如任务执行耗时等),则启动任务(任务没执行完,机器就出了问题,导致当前机器获得了锁,需要继续将任务跑完);或者check一下任务的输出,判断是否需要继续执行任务
  2. 在重新建立会话后,立即争用,如果失败,则触发回调接口,使用方可在此接口的实现中判断是否需要取消任务

实现方式

基于zk的分布式锁的实现,用于支持任务的执行,整理后的代码如下:

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 使用zookeeper实现分布式锁,可用于执行分布式定时任务等互斥类代码,调用{@link #tryLock()},并在任务结束后,一定要调用{@link #unlock()}释放锁
 *
 * @author gaohang
 */
public final class ZookeeperLock {
  private static final Logger logger = LoggerFactory.getLogger(ZookeeperLock.class);

  private static final ScheduledExecutorService delayExecutor = Executors.newScheduledThreadPool(1);
  private static final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1);

  /**
   * 所有锁都在此节点下
   */
  private static final String LOCK_BASE_PATH = "/onlineetl/distributeLock/instance";
  private static final Charset UTF8 = Charset.forName("UTF-8");

  private final int delayTime = 10;

  private final String lockName;
  private final String lockPath;
  private final ZkClient zkClient;

  /**
   * 当前机器的ip
   */
  private final String localhostAddr;

  /**
   * 占用锁的机器的ip
   */
  private String activeHostAddr;

  /**
   * 当前持有锁的状态
   */
  private volatile State state = State.CREATED;

  /**
   * 本地锁,防止本地多线程导致本地非串行
   */
  private final Lock localLock = new ReentrantLock();

  /**
   * 创建分布多锁
   *
   * @param lockName 锁的名字,唯一
   * @param zkClient zk客户端
   * @param callback 当有dataDeleted时,重新获取争用后调用
   * @throws UnknownHostException 获取localhost失败
   */
  ZookeeperLock(String lockName, ZkClient zkClient, LockAddrChangedCallback callback) throws UnknownHostException {
    this.zkClient = zkClient;
    if (StringUtils.isBlank(lockName)) {
      throw new IllegalArgumentException("lockName cannot be blank string");
    }
    this.lockName = lockName;
    this.lockPath = LOCK_BASE_PATH + '/' + lockName;
    final InetAddress localhost = InetAddress.getLocalHost();
    this.localhostAddr = localhost.getHostAddress();
    initLock();
    subscriptDataChange(lockName, zkClient, callback);
    zkClient.subscribeStateChanges(new IZkStateListener() {
      @Override
      public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
        logger.info("zookeeper state changed, state: {}", state);
      }

      @Override
      public void handleNewSession() throws Exception {
        if (!initLock()) {
          callbackExecutor.execute(callback::onPathLose);
        }
      }

      @Override
      public void handleSessionEstablishmentError(Throwable error) throws Exception {
        logger.error("failed to connect to zookeeper", error);
      }
    });
  }

  private void subscriptDataChange(String lockName, ZkClient zkClient, LockAddrChangedCallback callback) {
    zkClient.subscribeDataChanges(lockPath, new IZkDataListener() {

      @Override
      public void handleDataChange(String dataPath, Object data) {
        final String newOwner = new String((byte[]) data, UTF8);
        if (!isMyself(newOwner)) {
          //不是本机
          state = State.RELEASED;
          logger.info("lock [{}] released, owner is {}", lockName, newOwner);
        } else {
          //本机持有了锁
          state = State.OWN;
          logger.info("lock [{}] locked", lockName);
        }
        activeHostAddr = newOwner;
      }

      @Override
      public void handleDataDeleted(String dataPath) {
        //数据被删除有两种情况,
        // 1.持有锁的机器挂了
        // 2.网络异常或者zk出现了异常
        //因此在收到数据删除回调时,未获取锁的机器要优先让上一次获取锁的机器重新获取锁
        if (state == State.OWN && activeHostAddr != null && isMyself(activeHostAddr)) {
          //设置成released,在获取锁成功时防止有新的上锁操作
          state = State.WAITING;
          if (localLock.tryLock()) {
            //说明锁没有在使用中,释放即可,将锁的权限交给其它机器
            localLock.unlock();
            activeHostAddr = null;
            //5s后再获取锁
            delayExecutor.schedule(() -> {
              initLock();
              //对方挂了,自己争用成功
              callbackExecutor.execute(callback::onPathLocked);
            }, 5, TimeUnit.SECONDS);
          } else {
            //锁被占用,立即获取锁
            initLock();
          }
        } else {
          state = State.WAITING;
          activeHostAddr = null;
          //对方可能网络异常了,5s后再获取锁,如果对方5s没恢复,则认为对方短时间内无法恢复了
          delayExecutor.schedule(() -> {
            initLock();
            callbackExecutor.execute(callback::onPathLocked);
          }, delayTime, TimeUnit.SECONDS);
        }
      }
    });
  }

  private boolean isMyself(String activeHostAddr) {
    return StringUtils.equals(activeHostAddr, localhostAddr);
  }

  private boolean initLock() {
    try {
      zkClient.createEphemeral(lockPath, localhostAddr.getBytes(UTF8));
      activeHostAddr = localhostAddr;
      state = State.OWN;
      logger.info("lock [{}] locked", lockName);
      return true;
    } catch (ZkNodeExistsException e) {
      byte[] host = zkClient.readData(lockPath, true);
      if (host == null) {
        // 如果不存在节点,立即尝试一次
        return initLock();
      } else {
        activeHostAddr = new String(host, UTF8);
        state = State.RELEASED;
        logger.info("lock [{}] released, owner is {}", lockName, activeHostAddr);
        if (isMyself(activeHostAddr)) {
          state = State.OWN;
          return true;
        }
        return false;
      }
    } catch (ZkNoNodeException e) {
      zkClient.createPersistent(LOCK_BASE_PATH, true); // 尝试创建父节点
      return initLock();
    } catch (Throwable e) {
      logger.error("lock [{}] failed", lockName, e);
      //不让招出异常
      return false;
    }
  }

  public boolean tryLock() {
    //避免状态被修改,这里先上锁,在handleDataDeleted中,如果trylock成功,则表示没有任务在执行
    if (!localLock.tryLock()) {
      return false;
    }
    switch (state) {
      case OWN:
        if (initLock()) {
          return true;
        }
        state = State.RELEASED;
        return false;
      case RELEASED:
        localLock.unlock();
        return false;
      case CREATED:
        initLock();
        return relock();
      case WAITING:
        try {
          TimeUnit.SECONDS.sleep(delayTime);
        } catch (InterruptedException e) {
          localLock.unlock();
          throw new RuntimeException(e);
        }
        //再次尝试获取锁
        final boolean locked = relock();
        if (locked) {
          return true;
        }
        initLock();
        return relock();
    }

    //理论上不会走到这里
    initLock();
    //这里一定要Unlock,否则tryLock会重入,而unlock没有多次释放徜
    return relock();
  }

  private boolean relock() {
    localLock.unlock();
    return tryLock();
  }

  public void unlock() {
    localLock.unlock();
  }

  public String getLockName() {
    return lockName;
  }

  public String getActiveHostAddr() {
    return activeHostAddr;
  }

  @Override
  public String toString() {
    return "ZookeeperLock{" +
        "lockName='" + lockName + '\'' +
        ", lockPath='" + lockPath + '\'' +
        ", localhostAddr='" + localhostAddr + '\'' +
        ", activeHostAddr='" + activeHostAddr + '\'' +
        ", state=" + state +
        '}';
  }

  /**
   * 锁的状态,当状态是created时,则竞争,OWN状态表示持有锁,RELEASED状态表示未持有锁, waiting表示锁待争用
   */
  private enum State {
    OWN, RELEASED, CREATED, WAITING
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,045评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,114评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,120评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,902评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,828评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,132评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,590评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,258评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,408评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,335评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,385评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,068评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,660评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,747评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,967评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,406评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,970评论 2 341

推荐阅读更多精彩内容

  • 专业考题类型管理运行工作负责人一般作业考题内容选项A选项B选项C选项D选项E选项F正确答案 变电单选GYSZ本规程...
    小白兔去钓鱼阅读 8,966评论 0 13
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,076评论 1 32
  • 最近在公司的分布式项目pegasus中用到了zookeeper的c客户端,在此记录下使用zookeeper c c...
    shengofbig阅读 3,615评论 2 1
  • 最近碰到几个业务场景,会遇到并发的问题。在单实例情况下,我们会通过java.util.concurrent包...
    菜鸟小玄阅读 2,248评论 0 5
  • 一、重新出发2018年的年末,我终于离开了待了三年的公司,离开熟悉的同事,离开这个我付出了很多心血的岗位。这一次的...
    你来替我做个梦阅读 306评论 2 7