7. Zookeeper configuration center

Configuration Center Case

There is such a scenario in the work: the database user name and password information are placed in a configuration file, the application reads the configuration file, and the configuration file information is stored in the cache. If the user name and password of the database are changed, the cache needs to be reloaded, which is more troublesome. It can be done easily through zooKeeper, and the cache synchronization is automatically completed when the database changes.

Design ideas:

  1. Connect to the zookeeper server.
  2. Read configuration information in zookeeper, register watcher listener, read configuration information and store it in local variables.
  3. When the configuration information in zookeeper changes, the data change event is captured through the callback method of watcher.
  4. Re-obtain the configuration information in the callback, and register a new monitoring event.

Create configuration information node data before obtaining configuration information:

 create /config "dbconfig"
 create /config/url "jdbc:mysql://localhost:3306/zookeeper"
 create /config/username "root"
 create /config/password "123456"
package com.huazai.zookeeper.zkexample.config;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

/**
 * @author pyh
 * @date 2021/5/19 0:20
 */
public class ZookeeperConnection {
    public static void main(String[] args) {
        connect();
    }

    public static ZooKeeper zooKeeper;

    public static ZooKeeper connect() {
        try {
            // 由于连接zookeeper服务器是异步连接,需要CountDownLatch阻塞主线程,等待子线程连接结果后反馈给主线程
            CountDownLatch countDownLatch = new CountDownLatch(1);
            /*
                connectString:服务器的ip和端口
                sessionTimeout:客户端与服务器之间的会话超时时间,以毫秒为单位的
                watcher:监视器对象
            */
            zooKeeper = new ZooKeeper("192.168.64.132:2181", 50000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("zookeeper异步连接成功");
                    } else if (watchedEvent.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("断开连接");
                    } else if (watchedEvent.getState() == Event.KeeperState.Expired) {
                        System.out.println("会话超时");
                    } else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("认证失败");
                    } else if (watchedEvent.getState() == Event.KeeperState.Closed) {
                        System.out.println("连接关闭");
                    }
                    countDownLatch.countDown();
                    System.out.println("使用构造函数默认的watcher");
                    System.out.println("path=" + watchedEvent.getPath());
                    System.out.println("eventType=" + watchedEvent.getType());
                }
            });
            // 主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
            // 会话编号
            System.out.println("客户端sessionId:" + zooKeeper.getSessionId());
            return zooKeeper;
        } catch (Exception e) {
            System.out.println("zookeeper连接异常");
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 关闭zookeeper会话
     */
    public static void close() {
        if (zooKeeper != null) {
            try {
                zooKeeper.close();
                System.out.println("zookeeper关闭成功");
            } catch (InterruptedException e) {
                System.out.println("zookeeper关闭失败");
                e.printStackTrace();
            }
        }
    }
}
package com.huazai.zookeeper.zkexample.config;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @author pyh
 * @date 2021/6/7 22:18
 */
public class ConfigCenter implements Watcher {
    private ZooKeeper zooKeeper;

    ConfigCenter configCenter;

    private DBInfo dbInfo;

    public ConfigCenter() {
        // 1.初始化连接对象
        zooKeeper = ZookeeperConnection.connect();
    }

    /**
     * 3.当zookeeper中的配置信息发生变化时,通过watcher的回调方法捕获数据变化事件
     *
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        try {
            // 4.回调中重新获取配置信息,并注册新的监听事件
            System.out.println("监听到数据源发生改变,正在获取最新配置信息...");
            initValue();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void test() throws Exception {
        configCenter = new ConfigCenter();
        // 1.模拟程序启动的时候第一遍获取数据源
        configCenter.initValue();

        TimeUnit.SECONDS.sleep(100);
    }

    /**
     * 2.初始化数据源存入本地变量,并注册监听器
     */
    private DBInfo initValue() throws Exception {
        String url = getData("/config/url", this);
        String username = getData("/config/username", this);
        String password = getData("/config/password", this);
        dbInfo = new DBInfo(url, username, password);
        System.out.println(dbInfo);
        return dbInfo;
    }

    private String getData(final String path, Watcher watcher) throws Exception {
        String data = new String(zooKeeper.getData(path, watcher, null));
        return data;
    }
}

class DBInfo {
    private String url;
    private String username;
    private String password;

    public DBInfo() {
    }

    public DBInfo(String url, String username, String password) {
        this.url = url;
        this.username = username;
        this.password = password;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "DBInfo{" +
                "url='" + url + '\'' +
                ", username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

After successfully starting the program, execute the following commands to modify the data of the /config/password node:

set /config/password "111111"
set /config/password "888888"
set /config/password "666666"

The output of the program console is as follows:

Insert picture description here