Skip to content

Latest commit

 

History

History
190 lines (157 loc) · 6.62 KB

08.Zookeeper分布式协调服务.md

File metadata and controls

190 lines (157 loc) · 6.62 KB

我们需要一个服务配置中心记录哪个服务器提供哪个服务,所以每个 RPC 节点想要调用一个 RPC 的时候需要去配置中心查找,本项目使用的是 zookeeper 来注册服务。

Zookeeper的数据是怎么组织的

每个节点存储 1MB 数据,节点可以有自己的子节点。

  • 启动客户端:./zkCli.sh
  • 常用客户端命令: ls、get、create、set、delete
  • ls:罗列当前节点内容
  • get:获取当前节点信息
  • create:创建节点
  • set:设置节点值
  • delete:删除节点

以下是客户端使用命令的案例:

[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1] get /zookeeper
                  # 节点数据
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0 # 节点是永久性节点还是临时性节点
dataLength = 0
numChildren = 1 # 孩子节点个数
[zk: localhost:2181(CONNECTED) 16] create /sl 20
Created /sl
[zk: localhost:2181(CONNECTED) 17] ls /
[sl, zookeeper]
[zk: localhost:2181(CONNECTED) 20] get /sl
20
cZxid = 0x4
ctime = Mon Aug 08 16:12:07 CST 2022
mZxid = 0x4
mtime = Mon Aug 08 16:12:07 CST 2022
pZxid = 0x4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
[zk: localhost:2181(CONNECTED) 22] set /sl 30
cZxid = 0x4
ctime = Mon Aug 08 16:12:07 CST 2022
mZxid = 0x6
mtime = Mon Aug 08 16:14:35 CST 2022
pZxid = 0x4
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
[zk: localhost:2181(CONNECTED) 23] get /sl
30
cZxid = 0x4
ctime = Mon Aug 08 16:12:07 CST 2022
mZxid = 0x6
mtime = Mon Aug 08 16:14:35 CST 2022
pZxid = 0x4
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
[zk: localhost:2181(CONNECTED) 35] delete /sl
Node not empty: /sl
[zk: localhost:2181(CONNECTED) 36] delete /sl/sll
[zk: localhost:2181(CONNECTED) 38] ls /sl
[]
[zk: localhost:2181(CONNECTED) 39] delete /sl
[zk: localhost:2181(CONNECTED) 41] ls / 
[zookeeper]

按照service_name method_name创建节点,储存该微服务所在的服务器IP地址和端口号。客户端访问注册中心,得知自己要访问的服务所在的地址。

img

要求注册的rpc节点定期的发送心跳消息。如果某rpc节点没了,那么心跳信息会收不到回复,然后超过一定次数之后就会关闭连接。

临时性节点,rpc节点超时未发送心跳消息,zk会自动删除临时性节点。(我们想要创建的节点类型)

永久性节点,rpc节点超时未发送心跳消息,zk不会自动删除临时性节点。

Zookeeper的watcher机制

监听机制,有点像是观察者模式。服务端向注册中心查找服务,watcher 是一个通知回调机制。我们可以添加一个监听器,比如监听节点的变化。

比如父节点的子节点有任何的变化,那么 zookeeper 会主动的告知客户端节点的变化。比如被监听父节点的子节点被删除了,那么 watcher 就会通知客户端。这其实就是一个回调函数。

Zookeeper的C接口

进入上面解压目录 src/c 下面,zookeeper 已经提供了原生的 C/C++ 和 Java API 开发接口,需要通过源码

编译生成,过程如下:

~/package/zookeeper-3.4.10/src/c$ sudo ./configure
~/package/zookeeper-3.4.10/src/c$ sudo make
~/package/zookeeper-3.4.10/src/c$ sudo make install

主要关注 zookeeper 怎么管理节点,zk-c API怎么创建节点,获取节点,删除节点以及 watcher 机制的

API 编程。

C-API的缺点

Zookeeper 原生提供了 C 和 Java 的客户端编程接口,但是使用起来相对复杂,几个弱点:

1.不会自动发送心跳消息 <==== 错误,源码上会在1/3的 Timeout 时间发送 ping 心跳消息

2.设置监听 watcher 只能是一次性的,每次触发后需要重复设置

3.znode 节点只存储简单的 byte 字节数组,如果存储对象,需要自己转换对象生成字节数组

在项目中的应用

rpc_provider 中注册到了 unordered_map 中,我们这里需要连接 ZkClient,注册到 Zookeeper 中。对应于命令,我们需要指定创建的路径和要存储的数据。

路径为:/UserServiceRpc/Login

数据为:127.0.0.1:2181(可以是别的微服务服务器地址)

对于服务端(提供rpc服务端),在 RpcProvider.cc 文件中增加如下操作

ZkClient zkCli;
zkCli.Start();
// service_name为永久性节点    method_name为临时性节点
for (auto &sp : m_serviceMap)
{
    // /UserServiceRpc
    std::string service_path = "/" + sp.first;
    // 默认创建为永久性节点
    zkCli.Create(service_path.c_str(), nullptr, 0);
    for (auto &mp : sp.second.m_mthodMap)
    {
        /** 
             * 存储当前这个rpc服务节点主机的ip和port
             * /service_name/method_name   
             * /UserServiceRpc/Login 
             */
        std::string method_path = service_path + "/" + mp.first;
        char method_path_data[128] = {0};
        snprintf(method_path_data, sizeof(method_path_data), "%s:%d", ip.c_str(), port);
        // ZOO_EPHEMERAL表示znode是一个临时性节点
        // 在 /UserServiceRpc/Login 处存储 127.0.0.1:2181
        zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
    }
}

对于客户端,使用 rpc 服务的用户,在 RpcChannel.cc 文件中增加如下操作

在 zookeeper 中查找相应的服务和方法名字,查找成功则得到储存的服务端地址。分割字符串获取 IP 地址和端口号。之后就使用该 IP 地址和端口号发动 socket 请求。

// 读取配置文件rpcserver的信息
// std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
// uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
// rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
ZkClient zkCli;
zkCli.Start();
std::string method_path = "/" + service_name + "/" + method_name;
// 127.0.0.1:8000
std::string host_data = zkCli.GetData(method_path.c_str());
if (host_data == "")
{
    controller->SetFailed(method_path + " is not exist!");
    return;
}
int idx = host_data.find(":");
if (idx == -1)
{
    controller->SetFailed(method_path + " address is invalid!");
    return;
}
std::string ip = host_data.substr(0, idx);
uint16_t port = atoi(host_data.substr(idx+1, host_data.size()-idx).c_str());