分析pfstore运转思路:store挂了pfc如何得知
任务描述
store上的pfs挂了,pfc是怎么知道挂了的,然后就改了数据库;研究这个机制,就是store变化了,conduct怎么知道,然后修改t_store
分析流程:
flowchart TD
A[ZooKeeper 存储节点路径] -->|新增/删除/修改事件| B(Conduct 监听模块);
B -->|事件类型判断| C{是新增Store?};
C -->|是| D[读取ZK中Store的配置信息<br/>(ID、IP、初始状态等)];
D --> E[执行SQL:INSERT INTO t_store (...)];
C -->|否| F{是状态/配置修改?};
F -->|是| G[读取ZK中Store的最新信息<br/>(存活状态、配置变更等)];
G --> H[执行SQL:UPDATE t_store SET ... WHERE store_id=...];
F -->|否| I{是Store删除?};
I -->|是| J[执行SQL:DELETE FROM t_store WHERE store_id=...<br/>或标记为“已删除”状态];
K[定时任务模块] --> L[主动拉取ZK中所有Store的全量信息];
L --> M[与t_store表数据对比];
M -->|存在差异| N[执行SQL:UPDATE/INSERT/DELETE 同步数据];
M -->|无差异| O[结束本次同步];
B --> P[触发告警/日志记录<br/>(如Store离线告警)];
E --> P;
H --> P;
J --> P;
- 某一节点store挂死
conductor通过zk的事件监听和定时兜底同步两种方式感知store的变化
| 感知方式 | 触发场景 | 实时性 | 可靠性 |
| ZK 事件监听 | Store 新增、状态变更、配置修改等 | 毫秒级 | 依赖 ZK 集群可用性 |
| 定时兜底同步 | 周期性拉取 ZK 中 Store 全量信息 | 分钟级(可配置) | 不依赖 ZK 实时性,兜底保障 |
解释代码
public static void registerAsConductor(String managmentIp, String zkIp) throws Exception
{
try
{
mngtIp = managmentIp;
zk = new ZooKeeper(zkIp, 50000, new Watcher() {
@Override
public void process(WatchedEvent event)
{
if (event.getState() == KeeperState.SyncConnected)
{
if (event.getType() == EventType.NodeChildrenChanged)
{
synchronized (locker)
{
locker.notify();
}
}
}
}
});
if (zk.exists("/pureflash", false) == null)
{
zk.create("/pureflash", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if (zk.exists(zkBaseDir, false) == null)
{
zk.create(zkBaseDir, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
zkHelper = new ZkHelper(zk);
zkHelper.createZkNodeIfNotExist(zkBaseDir + "/conductors",null);
myZkNodePath = zk.create(ClusterManager.zkBaseDir + "/conductors/conductor", managmentIp.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("Register on ZK as node:{}", myZkNodePath);
}
catch (IOException | KeeperException | InterruptedException e)
{
throw e;
}
}
-
通过
new ZooKeeper(zkIp, 50000, new Watcher()注册全局Watcher,当ZK连接状态或节点变化时,触发process方法。 -
连接状态事件:如
KeeperState.SyncConnected(ZK客户端与服务端建立连接)、KeeperState.Disconnected(连接断开)等。 -
节点变化事件:如
EventType.NodeChildrenChanged(子节点新增 / 删除)、EventType.NodeDeleted(节点被删除)、EventType.NodeDataChanged(节点数据修改)等。 -
事件回调:当触发条件满足时,
ZK会调用Watcher的process方法,将事件类型、路径等信息传递给业务逻辑(如代码中通过locker.notify()唤醒等待线程)。
public static void watchStores()
{
try {
List<String> nodes = zk.getChildren(zkBaseDir + "/stores", null);
for(String n : nodes)
{
zkHelper.watchNode(zkBaseDir + "/stores/" + n + "/alive", new AliveWatchCbk(Integer.parseInt(n)));
}
} catch (KeeperException | InterruptedException e) {
logger.error("Failed access zk",e);
}
zkHelper.watchNewChild(zkBaseDir + "/stores", new ZkHelper.NewChildCallback() {
@Override
void onNewChild(String childPath) {
logger.info("new store found on zk: {}", childPath);
int id = Integer.parseInt(childPath.substring(childPath.lastIndexOf('/')+1));
zkHelper.watchNode(childPath + "/alive", new AliveWatchCbk(id) );
}
});
}
这段 watchStores 方法是 Conduct 感知 Store 状态变化的核心实现,明确了 ZK 监听的注册逻辑,结合之前的问题,能完整解释 “Store 掉线时 Conduct 如何感知并修改 t_store 表”,具体拆解如下:
-
对已存在的
Store:注册alive节点监听 先通过zk.getChildren(zkBaseDir + "/stores", null)获取所有已创建的Store节点(如 1、2、3,对应store_id)。 对每个Store的alive节点(路径:zkBaseDir + "/stores/" + store_id + "/alive"),注册AliveWatchCbk监听器(自定义的回调类,传入store_id便于后续定位)。 -
对新增的
Store:监听stores子节点变化 通过zkHelper.watchNewChild监听zkBaseDir + "/stores"路径,当有新的Store节点(如新增store_id=4)被创建时,触发onNewChild回调。 回调中解析出新Store的store_id,并立即为其alive节点注册AliveWatchCbk监听器,确保新增Store也能被实时监控。
static class AliveWatchCbk extends ZkHelper.NodeChangeCallback {
int id;
public AliveWatchCbk(int id)
{
this.id=id;
}
@Override
void onNodeCreate(String childPath) {
logger.info("{} created", childPath);
updateStoreFromZk(id);
}
@Override
void onNodeDelete(String childPath) {
logger.error("{} removed", childPath);
S5Database.getInstance().sql("update t_store set status=? where id=?", Status.OFFLINE, id).execute();
}
}
结合store掉线的完整流程: 1.Store 掉线(如 PFS 挂掉)→ 该 Store 与 ZK 的会话断开 → ZK 自动删除其 alive 临时节点。
- ZK 向 Conduct 推送 NodeDeleted 事件 → 触发 AliveWatchCbk 回调。
- AliveWatchCbk 执行 SQL,将 t_store 表中对应 store_id 的状态改为 OFFLINE。
- 回调中重新注册 alive 节点监听,等待 Store 后续上线。
这个主要是通过alive临时节点感知故障,具体流程就是当pfs挂了,因为有watchStores,会自动监视所有已存在注册的节点并且回调,该store与zk的会话(session)会断开,然后AliveWatchCbk回调函数触发onNodeDelete方法,执行sql,将t_store表中对应store_id的状态改为OFFLINE,修改了数据库。 |
任务(25.11.10)
1、在这个pureflash代码里面搜搜,找找看看,猜猜 2、“registerAsConductor”搜搜这个zk的watch机制
PFS 节点通过临时节点向 ZK “心跳注册”,ZK 通过监听临时节点的删除事件,反向感知 PFS 节点的存活状态。一旦临时节点被删除,即判定对应的 PFS 节点已挂。
心跳注册的核心是 “临时节点”:PFS 节点(如stores/2)启动时,会通过 ZK 客户端创建一个临时节点(如/pureflash/cluster1/stores/2/alive)。临时节点的生命周期与 PFS 节点和 ZK 的会话(Session)绑定 —— 只要 PFS 节点存活且网络正常,就会通过定期发送 “心跳包”(ZK 会话的默认机制,如每几秒发送一次 ping)维持会话,临时节点就会一直存在。这本质上是 PFS 向 ZK “注册存活状态” 的过程,即 “心跳注册”。
Watcher 的作用是 “感知心跳失效”
整理
- 在ZooKeeper的C客户端中,zookeeper_init 函数会自动建立心跳机制来维持与服务器的连接。不需要显式调用心跳函数。
- 临时节点,临时节点在会话结束时会自动删除。这常用于表示服务的在线状态。
机制:
1、zk的client与server本身的heartbeat机制
2、zk的临时节点表示服务的在线状态
3、zk的watch机制
zkHelper.watchNode(zkBaseDir + "/stores/"+store_id+"/trays/"+t+"/online", new ZkHelper.NodeChangeCallback());
意外掉线了可以kill -9pfs进程后再重新启动,store状态即可从offline恢复至OK。
pfc日志 - pfs启动
nohup: ignoring input
[2025/11/11 11:06:28.945] [main] INFO com.netbric.s5.conductor.Main - use config file: /etc/pureflash/pfc.conf
[2025/11/11 11:06:28.985] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.5.6-c11b7e26bc554b8523dc929761dd28808913f091, built on 10/08/2019 20:18 GMT
[2025/11/11 11:06:28.985] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:host.name=flyslice-Standard-PC-i440FX-PIIX-1996
[2025/11/11 11:06:28.985] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.version=11.0.28
[2025/11/11 11:06:28.986] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Ubuntu
[2025/11/11 11:06:28.986] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64
[2025/11/11 11:06:28.986] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.class.path=/home/flyslice/yangxiao/cocalele/jconductor/out/production/jconductor:/home/flyslice/yangxiao/cocalele/jconductor/lib/HikariCP-3.4.1-sources.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/commons-cli-1.3.1.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/log4j-1.2.17.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/jetty-util-9.2.16.v20160414.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/norm-0.8.5.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/gson-2.2.2.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/mysql-connector-java-8.0.18.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/argparse4j-0.8.1-sources.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/jetty-io-9.2.16.v20160414.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/slf4j-simple-1.7.25.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/commons-logging-1.2.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/jna-platform-4.5.2.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/fluent-hc-4.5.14.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/commons-lang3-3.9.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/httpmime-4.5.14.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/argparse4j-0.8.1.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/httpcore-4.4.16.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/HikariCP-3.4.1.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/commons-codec-1.11.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/httpclient-4.5.14.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/httpclient-osgi-4.5.14.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/slf4j-api-1.7.25.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/jetty-server-9.2.16.v20160414.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/ini4j-0.5.4.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/jetty-client-9.2.16.v20160414.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/httpclient-cache-4.5.14.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/jetty-servlets-9.2.16.v20160414.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/zookeeper-3.5.6.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/java-ascii-table.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/javax.persistence.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/servlet-api-3.1.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/commons-exec-1.3.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/jetty-servlet-9.2.16.v20160414.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/zookeeper-jute-3.5.6.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/httpclient-win-4.5.14.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/jetty-http-9.2.16.v20160414.jar:/home/flyslice/yangxiao/cocalele/jconductor/lib/jna-4.5.2.jar
[2025/11/11 11:06:28.987] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=/home/flyslice/yangxiao/cocalele/PureFlash/build_deb/bin:/home/flyslice/yangxiao/cocalele/PureFlash/build_deb/bin:/home/flyslice/yangxiao/cocalele/PureFlash/build_deb/bin:/home/flyslice/yangxiao/cocalele/PureFlash/build_deb/bin:/home/flyslice/yangxiao/cocalele/PureFlash/build_deb/bin:/home/flyslice/yangxiao/cocalele/PureFlash/build_deb/bin::/usr/java/packages/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
[2025/11/11 11:06:28.987] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/tmp
[2025/11/11 11:06:28.988] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=<NA>
[2025/11/11 11:06:28.988] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Linux
[2025/11/11 11:06:28.988] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=amd64
[2025/11/11 11:06:28.988] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=5.15.0-139-generic
[2025/11/11 11:06:28.989] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=root
[2025/11/11 11:06:28.989] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=/root
[2025/11/11 11:06:28.989] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/home/flyslice/yangxiao/cocalele/jconductor
[2025/11/11 11:06:28.989] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.memory.free=1006MB
[2025/11/11 11:06:28.989] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.memory.max=16076MB
[2025/11/11 11:06:28.990] [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.memory.total=1016MB
[2025/11/11 11:06:28.993] [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.61.3:2181,192.168.61.195:2181,192.168.61.34:2181 sessionTimeout=50000 watcher=com.netbric.s5.cluster.ClusterManager$1@12bc6874
[2025/11/11 11:06:28.997] [main] INFO org.apache.zookeeper.common.X509Util - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
[2025/11/11 11:06:29.005] [main] INFO org.apache.zookeeper.ClientCnxnSocket - jute.maxbuffer value is 4194304 Bytes
[2025/11/11 11:06:29.012] [main] INFO org.apache.zookeeper.ClientCnxn - zookeeper.request.timeout value is 0. feature enabled=
[2025/11/11 11:06:29.391] [main-SendThread(192.168.61.195:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.61.195/192.168.61.195:2181. Will not attempt to authenticate using SASL (unknown error)
[2025/11/11 11:06:29.402] [main-SendThread(192.168.61.195:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established, initiating session, client: /192.168.61.3:46872, server: 192.168.61.195/192.168.61.195:2181
[2025/11/11 11:06:29.413] [main-SendThread(192.168.61.195:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.61.195/192.168.61.195:2181, sessionid = 0x20019a43d840004, negotiated timeout = 40000
[2025/11/11 11:06:29.437] [main] INFO com.netbric.s5.cluster.ClusterManager - Register on ZK as node:/pureflash/cluster1/conductors/conductor0000000013
[2025/11/11 11:06:29.486] [main] INFO com.netbric.s5.orm.S5Database - meta db:jdbc:mysql://127.0.0.1:3306/s5?useSSL=false&serverTimezone=UTC
[2025/11/11 11:06:29.490] [main] INFO com.zaxxer.hikari.HikariDataSource - HikariPool-1 - Starting...
[2025/11/11 11:06:29.701] [main] INFO com.zaxxer.hikari.HikariDataSource - HikariPool-1 - Start completed.
[2025/11/11 11:06:29.859] [main] INFO com.netbric.s5.cluster.ClusterManager - watchNewChild on: /pureflash/cluster1/stores/1/trays
[2025/11/11 11:06:29.867] [main] INFO com.netbric.s5.cluster.ClusterManager - a021d97d-1f3f-428d-b7ba-348b88fea37d OK
[2025/11/11 11:06:29.888] [main] INFO com.netbric.s5.cluster.ClusterManager - upsert port:192.168.61.3, purpose:0
[2025/11/11 11:06:29.892] [main] INFO com.netbric.s5.cluster.ClusterManager - upsert port:192.168.61.3, purpose:1
[2025/11/11 11:06:29.896] [main] INFO com.netbric.s5.cluster.ClusterManager - watchNewChild on: /pureflash/cluster1/stores/2/trays
[2025/11/11 11:06:29.908] [main] INFO com.netbric.s5.cluster.ClusterManager - 89693e7f-d02b-4aea-9e1e-654af3169b5a OK
[2025/11/11 11:06:29.916] [main] INFO com.netbric.s5.cluster.ClusterManager - upsert port:192.168.61.195, purpose:0
[2025/11/11 11:06:29.919] [main] INFO com.netbric.s5.cluster.ClusterManager - upsert port:192.168.61.195, purpose:1
[2025/11/11 11:06:29.924] [main] INFO com.netbric.s5.cluster.ClusterManager - watchNewChild on: /pureflash/cluster1/stores/3/trays
[2025/11/11 11:06:29.930] [main] INFO com.netbric.s5.cluster.ClusterManager - c7edb3c9-bc12-44bf-b4a5-18cae99a8644 OK
[2025/11/11 11:06:29.938] [main] INFO com.netbric.s5.cluster.ClusterManager - upsert port:192.168.61.34, purpose:0
[2025/11/11 11:06:29.941] [main] INFO com.netbric.s5.cluster.ClusterManager - upsert port:192.168.61.34, purpose:1
[2025/11/11 11:06:29.979] [main] INFO com.netbric.s5.conductor.Main - HTTP started on port 49180
zk日志:
# 重启前
[zk: 192.168.61.3:2181(CONNECTED) 22] ls /pureflash/cluster1/stores/1/trays/a021d97d-1f3f-428d-b7ba-348b88fea37d
[capacity, devname, object_size, state]
[zk: 192.168.61.3:2181(CONNECTED) 24] ls /pureflash/cluster1/stores/3/trays/c7edb3c9-bc12-44bf-b4a5-18cae99a8644
[capacity, devname, object_size, state]
# 重启后
[zk: 192.168.61.3:2181(CONNECTED) 26] ls /pureflash/cluster1/stores/2/trays/89693e7f-d02b-4aea-9e1e-654af3169b5a
[capacity, devname, object_size, online, state]
[zk: 192.168.61.3:2181(CONNECTED) 32] ls /pureflash/cluster1/stores/1/trays/a021d97d-1f3f-428d-b7ba-348b88fea37d
[capacity, devname, object_size, online, state]
[zk: 192.168.61.3:2181(CONNECTED) 33] ls /pureflash/cluster1/stores/3/trays/c7edb3c9-bc12-44bf-b4a5-18cae99a8644
[capacity, devname, object_size, online, state]
25.11.11虚拟机实验1
删除数据库内容,重新导入,但内容清空,此情况需要杀死pfs重新启动。
pfc报错如下
root@flyslice-Standard-PC-i440FX-PIIX-1996:/home/flyslice/yangxiao/cocalele/jconductor# ./pfcli list_disk
java.lang.IllegalArgumentException: Please provide valid data : [[Ljava.lang.String;@3bb9a3ff
at com.bethecoder.ascii_table.impl.SimpleASCIITableImpl.getTable(SimpleASCIITableImpl.java:155)
at com.bethecoder.ascii_table.impl.SimpleASCIITableImpl.getTable(SimpleASCIITableImpl.java:74)
at com.bethecoder.ascii_table.impl.SimpleASCIITableImpl.printTable(SimpleASCIITableImpl.java:51)
at com.bethecoder.ascii_table.impl.SimpleASCIITableImpl.printTable(SimpleASCIITableImpl.java:41)
at com.bethecoder.ascii_table.ASCIITable.printTable(ASCIITable.java:71)
at com.netbric.s5.cli.CliMain.cmd_list_disk(CliMain.java:381)
at com.netbric.s5.cli.CliMain$4.run(CliMain.java:138)
at com.netbric.s5.cli.CliMain.main(CliMain.java:225)
[main] ERROR com.netbric.s5.cli.CliMain - Failed: Please provide valid data : [[Ljava.lang.String;@3bb9a3ff
发现日志打印如下
[2025/11/11 15:26:07.745] [pool-1-thread-12] INFO com.netbric.s5.conductor.handler.S5RestfulHandler - API called: op=/s5c/?op=list_disk
[2025/11/11 15:26:07.748] [pool-1-thread-12] INFO com.netbric.s5.conductor.handler.S5RestfulHandler - {
"trays": [],
"op": "list_disk_reply",
"ret_code": 0
}
原因就是数据库内容为空,无法获取,Java报错。