侧边栏壁纸
博主头像
DJ's Blog博主等级

行动起来,活在当下

  • 累计撰写 133 篇文章
  • 累计创建 51 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

【Zookeeper】简介

Administrator
2022-04-04 / 0 评论 / 0 点赞 / 54 阅读 / 7916 字

【Zookeeper】简介

概念

  1. Zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务。
  2. Zookeeper本身就是一个分布式程序。
  3. Zookeeper是为别的分布式程序服务的。

功能

  1. 主从协调
  2. 服务器节点动态上下线
  3. 统一配置管理
  4. 分布式共享锁
  5. 统一名称服务(dubbo)
  6. 管理(存储,读取)用户程序提交的数据,一般都是状态数据,不是业务数据。
  7. 为用户程序提供数据节点监听服务。

特性

  1. 一个Leader,多个Follower组成的集群。
  2. 全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的。
  3. 分布式读写,更新请求转发,由Leader实施。
  4. 更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行。
  5. 数据更新原子性,一次数据更新要么成功,要么失败。
  6. 实时性,在一定时间范围内,client能读到最新数据。
  7. 集群中只要有半数以上节点存活,Zookeeper就能正常服务。

应用场景

服务器状态动态感知


如上图所示,有一个客户端分布式的采集数据程序,部署在多台服务器的集群中,现在要求集群中任意一台服务器down机了,其他服务器能够顶上,而且采集的数据不会重复。
这时候就可以使用zookeeper来记录集群中的服务器信息和采集数据状态信息,并且提供节点监听功能。这样当集群中的一台服务器down机了,其他的服务器都能感知到,然后通过zookeeper来选举一台服务器来顶替down机的服务器,顶替的服务器也能读取到之前down机的服务器采集数据的状态,然后继续采集,这样采集的数据也不会重复了。

服务器主从选举


如上图所示,有一个服务端分布式的服务程序,部署在多台服务器的集群中,现在要求客户端每次只需要连接其中的一台服务器中的服务程序。
这时候就可以使用zookeeper来记录集群中的服务器的状态信息和IP地址等信息,并且通过选举机制选举出一台服务器为这个集群的leader,当客户端访问的时候就可以通过zookeeper知道此时集群中的哪台服务器是leader,并且通过具体的IP地址来正确的访问到这台服务器了。
当集群中的某一台服务器down机了,通过zookeeper集群中的其他服务器也能感知到,如果down机的是leader,则会通过选举机制重新选举出leader来,这个过程对于客户端来讲是完全透明的。

配置管理


如上图所示,有一个服务端的分布式程序,部署在多台服务器的集群中,每个程序的配置文件都一样,而且配置文件的修改比较频繁,现在要求每次修改配置文件,集群中的每台服务器中的程序都同步修改。
这时候就可以使用zookeeper来保管这个配置文件的具体内容,并且对配置文件提供监听功能,一旦发现配置文件修改了,就会通知到集群中的各个服务器,集群中的各个服务器可以将这个配置文件的内容保存到内存中,当有更新的时候更新内存中的数据。这样就能确保集群中的每台服务器都能同步更新。DRM(分布式资源管理)就是这样的。

应用实例

分布式系统服务器动态上下线程序

需求

分布式系统的服务器会有动态变化,客户端要能实时洞察到这些变化。这些变化包括服务器的动态上下线,新增,删除等。

图解

  1. 服务端启动时去zookeeper集群中注册节点信息:

1)最好是EPHEMERAL_SEQUENTIAL形式的节点,这样当服务器动态上下线的时候,zookeeper就可以自动增删节点,而且还能保证每次创建的节点都不一样。
2)节点信息如下所示:
/servers/server0000000001 "centos1..."
/servers/server0000000002 "centos2..."
/servers/server0000000003 "centos3..."

  1. 客户端启动时调用getChildren方法,从zookeeper集群中获取当前/servers路径下在线服务器列表信息,并且注册监听/servers路径下节点的变化。
  2. 当服务器端有动态上下线的情况出现,就会更新zookeeper集群中的/servers路径下的节点信息。并且事件通知给客户端,调用客户端的process方法,这时候客户端需要重新获取服务器列表,并且继续注册监听。

代码

  • 服务端程序
/**
 * 分布式程序服务端
 * @author dj4817
 * @version $Id: DistributedServer.java, v 0.1 2017/12/15 21:46 dj4817 Exp $$
 */
public class DistributedServer {

    private static final String connectString  = "10.101.44.200:2181,10.101.44.201:2181,10.101.44.202:2181,10.101.44.203:2181,10.101.44.204:2181";
    private static final int    sessionTimeout = 2000;
    private static final String parentNode     = "/servers";

    private ZooKeeper           zooKeeper;

    /**
     * 创建zookeeper连接
     * @throws Exception
     */
    public void getConnect() throws Exception {

        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
                System.out.println(event.getType() + "---" + event.getPath());
                try {
                    zooKeeper.getChildren("/", true);
                } catch (Exception e) {
                }
            }
        });
    }

    /**
     * 向zookeeper中注册服务器信息
     * @param hostName
     * @throws Exception
     */
    public void registerServer(String hostName) throws Exception {

        if (zooKeeper.exists(parentNode, false) == null) {
            zooKeeper.create(parentNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        String create = zooKeeper.create(parentNode + "/server", hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostName + " is online.." + create);
    }

    /**
     * 执行业务
     * @throws InterruptedException
     */
    public void handleBussiness() throws InterruptedException {
        System.out.println("server start working.....");
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 主程序
     * @param args
     */
    public static void main(String[] args) throws Exception {

        // 1.创建zookeeper连接
        DistributedServer server = new DistributedServer();
        server.getConnect();
        // 2.向zookeeper中注册服务器信息
        server.registerServer("centos0");
        // 3.执行业务
        server.handleBussiness();
    }
}
  • 客户端程序
/**
 * 分布式程序客户端
 * @author dj4817
 * @version $Id: DistributedClient.java, v 0.1 2017/12/15 21:46 dj4817 Exp $$
 */
public class DistributedClient {

    private static final String connectString  = "10.101.44.200:2181,10.101.44.201:2181,10.101.44.202:2181,10.101.44.203:2181,10.101.44.204:2181";
    private static final int    sessionTimeout = 2000;
    private static final String parentNode     = "/servers";

    private ZooKeeper           zooKeeper;

    /**
     * 创建zookeeper连接
     * @throws Exception
     */
    public void getConnect() throws Exception {

        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
                System.out.println(event.getType() + "---" + event.getPath());
                if (Event.EventType.None != event.getType()) {
                    try {
                        // 获取服务器信息
                        getServersInfo();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

    /**
     * 获取服务器信息
	 * @throws Exception
     */
    public void getServersInfo() throws Exception {

        List<String> childrens = zooKeeper.getChildren(parentNode, true);
        System.out.println("服务器信息:");
        for (String children : childrens) {
            byte[] data = zooKeeper.getData(parentNode + "/" + children, false, null);
            System.out.println("主机名:" + new String(data));
        }
    }

    /**
     * 执行业务
     * @throws InterruptedException
     */
    public void handleBussiness() throws InterruptedException {
        
        System.out.println("client start working.....");
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 主程序
     * @param args
     */
    public static void main(String[] args) throws Exception {

        // 1.创建zookeeper连接
        DistributedClient client = new DistributedClient();
        client.getConnect();
        // 2.获取服务器信息
        client.getServersInfo();
        // 3.处理业务
        client.handleBussiness();
    }
}
0

评论区