这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Zookeeper

SpringBoot整合Zookeeper,并通过zk实现分布式锁来实战演示zk在应用中的想象空间

1 - 1.基础使用介绍

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,广泛应用于分布式系统中,比如有用它做配置中心,注册中心,也有使用它来实现分布式锁的,作为高并发技术栈中不可或缺的一个基础组件,接下来我们将看一下,zk应该怎么玩,可以怎么玩

本文作为第一篇,将主要介绍基于zk-client的基本使用姿势,以次来了解下zk的基本概念

I. 准备

1. zk环境安装

用于学习试点目的的体验zk功能,安装比较简单,可以参考博文: 210310-ZooKeeper安装及初体验

wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz
cd apache-zookeeper-3.6.2-bin

# 前台启动
bin/zkServer.sh start-foreground

2. 项目环境

本文演示的是直接使用apache的zookeeper包来操作zk,与是否是SpringBoot环境无关

核心依赖

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

版本说明:

  • zk: 3.6.2
  • SpringBoot: 2.2.1.RELEASE

II. ZK使用姿势

1. zk基本知识点

首先介绍下zk的几个主要的知识点,如zk的数据模型,四种常说的节点

1.1 数据模型

zk的数据模型和我们常见的目录树很像,从/开始,每一个层级就是一个节点

每个节点,包含数据 + 子节点

注意:EPHEMERAL节点,不能有子节点(可以理解为这个目录下不能再挂目录)

zk中常说的监听器,就是基于节点的,一般来讲监听节点的创建、删除、数据变更

1.2 节点

  • 持久节点 persistent node
  • 持久顺序节点 persistent sequental
  • 临时节点 ephemeral node
  • 临时顺序节点 ephemeral sequental

注意:

  • 节点类型一经指定,不允许修改
  • 临时节点,当会话结束,会自动删除,且不能有子节点

2. 节点创建

接下来我们看一下zk的使用姿势,首先是创建节点,当然创建前提是得先拿到zkClient

初始化连接

private ZooKeeper zooKeeper;

@PostConstruct
public void initZk() throws IOException {
    // 500s 的会话超时时间
    zooKeeper = new ZooKeeper("127.0.0.1:2181", 500_000, this);
}

节点创建方法,下面分别给出两种不同的case

@Service
public class NodeExample implements Watcher {
    /**
     * 创建节点
     *
     * @param path
     */
    private void nodeCreate(String path) {
        // 第三个参数ACL 表示访问控制权限
        // 第四个参数,控制创建的是持久节点,持久顺序节点,还是临时节点;临时顺序节点
        // 返回 the actual path of the created node
        // 单节点存在时,抛异常 KeeperException.NodeExists
        try {
            String node = zooKeeper.create(path + "/yes", "保存的数据".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("create node: " + node);
        } catch (KeeperException.NodeExistsException e) {
            // 节点存在
            System.out.println("节点已存在: " + e.getMessage());
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 带生命周期的节点
        try {
            Stat stat = new Stat();
            // 当这个节点上没有child,且1s内没有变动,则删除节点
            // 实测抛了异常,未知原因
            String node = zooKeeper.create(path + "/ttl", ("now: " + LocalDateTime.now()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_WITH_TTL, stat, 1000);
            System.out.println("ttl nod:" + node + " | " + stat);
            // 创建已给监听器来验证
            zooKeeper.exists(path + "/ttl", (e) -> {
                System.out.println("ttl 节点变更: " + e);
            });
        } catch (KeeperException.NodeExistsException e) {
            System.out.println("节点已存在: " + e.getMessage());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

节点创建,核心在于 zooKeeper.create(path + "/yes", "保存的数据".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

  • 当节点已存在时,再创建会抛异常 KeeperException.NodeExistsException
  • 最后一个参数,来决定我们创建的节点类型
  • todo: 上面实例中在指定ttl时,没有成功,暂未找到原因,待解决

3. 节点存在判断

判断节点是否存在,比较常见了(比如我们在创建之前,可能会先判断一下是否存在)

/**
 * 判断节点是否存在
 */
private void checkPathExist(String path) {
    try {
        // 节点存在,则返回stat对象; 不存在时,返回null
        // watch: true 表示给这个节点添加监听器,当节点出现创建/删除 或者 新增数据时,触发watcher回调
        Stat stat = zooKeeper.exists(path + "/no", false);
        System.out.println("NoStat: " + stat);
    } catch (Exception e) {
        e.printStackTrace();
    }

    try {
        // 判断节点是否存在,并监听 节点的创建 + 删除 + 数据变更
        // 注意这个事件监听,只会触发一次,即单这个节点数据变更多次,只有第一次能拿到,之后的变动,需要重新再注册监听
        Stat stat = zooKeeper.exists(path + "/yes", this);
        System.out.println("YesStat: " + stat);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

注意

核心用法: zooKeeper.exists(path + "/yes", this);

  • 当节点存在时,返回Stat对象,包含一些基本信息;如果不存在,则返回null
  • 第二个参数,传入的是事件回调对象,我们的测试类NodeExmaple 实现了接口 Watcher, 所以直接传的是this
  • 注册事件监听时,需要注意这个回调只会执行一次,即触发之后就没了;后面再次修改、删除、创建节点都不会再被接收到

4. 子节点获取

获取某个节点的所有子节点,这里返回的是当前节点的一级子节点

/**
 * 获取节点的所有子节点, 只能获取一级节点
 *
 * @param path
 */
private void nodeChildren(String path) {
    try {
        // 如果获取成功,会监听 当前节点的删除,子节点的创建和删除,触发回调事件, 这个回调也只会触发一次
        List<String> children = zooKeeper.getChildren(path, this, new Stat());
        System.out.println("path:" + path + " 's children:" + children);
    } catch (KeeperException e) {
        System.out.println(e.getMessage());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

5. 数据获取与修改

节点上是可以存储数据的,在创建的时候,可以加上数据;后期可以读取,也可以修改

/**
 * 设置数据,获取数据
 *
 * @param path
 */
public void dataChange(String path) {
    try {
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData(path, false, stat);
        System.out.println("path: " + path + " data: " + new String(data) + " : " + stat);

        // 根据版本精确匹配; version = -1 就不需要进行版本匹配了
        Stat newStat = zooKeeper.setData(path, ("new data" + LocalDateTime.now()).getBytes(), stat.getVersion());
        System.out.println("newStat: " + stat.getVersion() + "/" + newStat.getVersion() + " data: " + new String(zooKeeper.getData(path, false, stat)));
    } catch (Exception e) {
        e.printStackTrace();
    }
}

在设置数据时,可以指定版本,当version > 0时,表示根据版本精确匹配;如果为-1时,则只要节点路径对上就成

6. 事件监听

监听主要是针对节点而言,前面在判断节点是否存在、修改数据时都可以设置监听器,但是他们是一次性的,如果我们希望长久有效,则可以使用下面的addWatch

public void watchEvent(String path) {
    try {
        // 注意这个节点存在
        // 添加监听, 与 exist判断节点是否存在时添加的监听器 不同的在于,触发之后,依然有效还会被触发, 只有手动调用remove才会取消
        // 感知: 节点创建,删除,数据变更 ; 创建子节点,删除子节点
        // 无法感知: 子节点的子节点创建/删除, 子节点的数据变更
        zooKeeper.addWatch(path + "/yes", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("事件触发 on " + path + " event:" + event);
            }
        }, AddWatchMode.PERSISTENT);
    } catch (Exception e) {
        e.printStackTrace();
    }

    try {
        // 注意这个节点不存在
        // 添加监听, 与 exist 不同的在于,触发之后,依然有效还会被触发, 只有手动调用remove才会取消
        // 与前面的区别在于,它的子节点的变动也会被监听到
        zooKeeper.addWatch(path + "/no", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("事件触发 on " + path + " event:" + event);
            }
        }, AddWatchMode.PERSISTENT_RECURSIVE);
    } catch (Exception e) {
        e.printStackTrace();
    }

    // 移除所有的监听
    //zooKeeper.removeAllWatches(path, WatcherType.Any, true);
}

上面给出了两种case,

  • AddWatchMode.PERSISTENT: 表示只关心当前节点的删除、数据变更,创建,一级子节点的创建、删除;无法感知子节点的子节点创建、删除,无法感知子节点的数据变更
  • AddWatchMode.PERSISTENT_RECURSIVE: 相当于递归监听,改节点及其子节点的所有变更都监听

7. 节点删除

最后再介绍一个基本功能,节点删除,只有子节点都不存在时,才能删除当前节点(和linux的rmdir类似)

/**
 * 删除节点
 */
public void deleteNode(String path) {
    try {
        // 根据版本限定删除, -1 表示不需要管版本,path匹配就可以执行;否则需要版本匹配,不然就会抛异常
        zooKeeper.delete(path, -1);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

8. 小结

本文主要介绍的是java侧对zookeeper的基本操作姿势,可以算是zk的入门,了解下节点的增删改,事件监听;

当然一般更加推荐的是使用Curator来操作zk,相比较于apache的jar包,使用姿势更加顺滑,后面也会做对比介绍

II. 其他

0. 项目

2 - 2.从0到1实现一个分布式锁

分布式锁,在实际的业务使用场景中算是比较常用的了,而分布式锁的实现,常见的除了redis之外,就是zk的实现了,前面一篇博文介绍了zk的基本概念与使用姿势,那么如果让我们来记住zk的特性来设计一个分布式锁,可以怎么做呢?

I. 方案设计

1. 创建节点方式实现

zk有四种节点,一个最容易想到的策略就是创建节点,谁创建成功了,就表示谁持有了这个锁

这个思路与redis的setnx有点相似,因为zk的节点创建,也只会有一个会话会创建成功,其他的则会抛已存在的异常

借助临时节点,会话丢掉之后节点删除,这样可以避免持有锁的实例异常而没有主动释放导致所有实例都无法持有锁的问题

如果采用这种方案,如果我想实现阻塞获取锁的逻辑,那么其中一个方案就需要写一个while(true)来不断重试

while(true) {
    if (tryLock(xxx)) return true;
    else Thread.sleep(1000);
}

另外一个策略则是借助事件监听,当节点存在时,注册一个节点删除的触发器,这样就不需要我自己重试判断了;充分借助zk的特性来实现异步回调

public void lock() {
  if (tryLock(path,  new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            synchronized (path){
                path.notify();
            }
        }
    })) {
      return true;
  }

  synchronized (path) {
      path.wait();
  }
}

那么上面这个实现有什么问题呢?

每次节点的变更,那么所有的都会监听到变动,好处是非公平锁的支持;缺点就是剩下这些唤醒的实例中也只会有一个抢占到锁,无意义的唤醒浪费性能

2. 临时顺序节点方式

接下来这种方案更加常见,晚上大部分的教程也是这种case,主要思路就是创建临时顺序节点

只有序号最小的节点,才表示抢占锁成功;如果不是最小的节点,那么就监听它前面一个节点的删除事件,前面节点删除了,一种可能是他放弃抢锁,一种是他释放自己持有的锁,不论哪种情况,对我而言,我都需要捞一下所有的节点,要么拿锁成功;要么换一个前置节点

II.分布式锁实现

接下来我们来一步步看下,基于临时顺序节点,可以怎么实现分布式锁

对于zk,我们依然采用apache的提供的包 zookeeper来操作;后续提供Curator的分布式锁实例

1. 依赖

核心依赖

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

版本说明:

  • zk版本: 3.6.2
  • SpringBoot: 2.2.1.RELEASE

2. 简单的分布式锁

第一步,都是实例创建

public class ZkLock implements Watcher {

    private ZooKeeper zooKeeper;
    // 创建一个持久的节点,作为分布式锁的根目录
    private String root;

    public ZkLock(String root) throws IOException {
        try {
            this.root = root;
            zooKeeper = new ZooKeeper("127.0.0.1:2181", 500_000, this);
            Stat stat = zooKeeper.exists(root, false);
            if (stat == null) {
                // 不存在则创建
                createNode(root, true);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    // 简单的封装节点创建,这里只考虑持久 + 临时顺序
    private String createNode(String path, boolean persistent) throws Exception {
        return zooKeeper.create(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, persistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL_SEQUENTIAL);
    }
}

在我们的这个设计中,我们需要持有当前节点和监听前一个节点的变更,所以我们在ZkLock实例中,添加两个成员

/**
 * 当前节点
 */
private String current;

/**
 * 前一个节点
 */
private String pre;

接下来就是尝试获取锁的逻辑

  • current不存在,在表示没有创建过,就创建一个临时顺序节点,并赋值current
  • current存在,则表示之前已经创建过了,目前处于等待锁释放过程
  • 接下来根据当前节点顺序是否最小,来表明是否持有锁成功
  • 当顺序不是最小时,找前面那个节点,并赋值 pre;
  • 监听pre的变化
/**
 * 尝试获取锁,创建顺序临时节点,若数据最小,则表示抢占锁成功;否则失败
 *
 * @return
 */
public boolean tryLock() {
    try {
        String path = root + "/";
        if (current == null) {
            // 创建临时顺序节点
            current = createNode(path, false);
        }
        List<String> list = zooKeeper.getChildren(root, false);
        Collections.sort(list);

        if (current.equalsIgnoreCase(path + list.get(0))) {
            // 获取锁成功
            return true;
        } else {
            // 获取锁失败,找到前一个节点
            int index = Collections.binarySearch(list, current.substring(path.length()));
            // 查询当前节点前面的那个
            pre = path + list.get(index - 1);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

请注意上面的实现,这里并没有去监听前一个节点的变更,在设计tryLock,因为是立马返回成功or失败,所以使用这个接口的,不需要注册监听

我们的监听逻辑,放在 lock() 同步阻塞里面

  • 尝试抢占锁,成功则直接返回
  • 拿锁失败,则监听前一个节点的删除事件
public boolean lock() {
    if (tryLock()) {
        return true;
    }

    try {
        // 监听前一个节点的删除事件
        Stat state = zooKeeper.exists(pre, true);
        if (state != null) {
            synchronized (pre) {
                // 阻塞等待前面的节点释放
                pre.wait();
                // 这里不直接返回true,因为前面的一个节点删除,可能并不是因为它持有锁并释放锁,如果是因为这个会话中断导致临时节点删除,这个时候需要做的是换一下监听的 preNode
                return lock();
            }
        } else {
          // 不存在,则再次尝试拿锁
          return lock();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

注意:

  • 当节点不存在时,或者事件触发回调之后,重新调用lock(),表明我胡汉三又来竞争锁了?

为啥不是直接返回 true? 而是需要重新竞争呢?

  • 因为前面节点的删除,有可能是因为前面节点的会话中断导致的;但是锁还在另外的实例手中,这个时候我应该做的是重新排队

最后别忘了释放锁

public void unlock() {
    try {
        zooKeeper.delete(current, -1);
        current = null;
        zooKeeper.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

到此,我们的分布式锁就完成了,接下来我们复盘下实现过程

  • 所有知识点来自前一篇的zk基础使用(创建节点,删除节点,获取所有自己点,监听事件)
  • 抢锁过程 =》 创建序号最小的节点
  • 若节点不是最小的,那么就监听前面的节点删除事件

这个实现,支持了锁的重入(why? 因为锁未释放时,我们保存了current,当前节点存在时则直接判断是不是最小的;而不是重新创建)

3. 测试

最后写一个测试case,来看下

@SpringBootApplication
public class Application {

    private void tryLock(long time) {
        ZkLock zkLock = null;
        try {
            zkLock = new ZkLock("/lock");
            System.out.println("尝试获取锁: " + Thread.currentThread() + " at: " + LocalDateTime.now());
            boolean ans = zkLock.lock();
            System.out.println("执行业务逻辑:" + Thread.currentThread() + " at:" + LocalDateTime.now());
            Thread.sleep(time);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (zkLock != null) {
                zkLock.unlock();
            }
        }
    }

    public Application() throws IOException, InterruptedException {
        new Thread(() -> tryLock(10_000)).start();

        Thread.sleep(1000);
        // 获取锁到执行锁会有10s的间隔,因为上面的线程抢占到锁,并持有了10s
        new Thread(() -> tryLock(1_000)).start();
        System.out.println("---------over------------");

        Scanner scanner = new Scanner(System.in);
        String ans = scanner.next();
        System.out.println("---> over --->" + ans);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

输出结果如下

II. 其他

0. 项目