初识 zenoh
前言
在分布式系统和机器人通信领域,数据如何高效、可靠地在节点之间流动,一直是核心问题。传统的客户端-服务器模型在某些场景下显得笨重,而纯粹的点对点模型又难以扩展。
在接触了 ROS 2 和 Cyclone DDS 之后,我开始思考一个问题:是否存在一种更加灵活、统一的数据通信模型,既能适应局域网内的高性能通信,又能扩展到跨网络的分布式环境?
在 ROS 中,两个节点在通信前需要先通过 ROS MASTER 进行注册和发现,但这种做法容易遇到以下这几个比较典型的问题:
- 单点故障:
ROS MASTER是整个系统的核心,一旦它出现故障,整个通信系统就会瘫痪。 - 操作复杂:在分布式环境中,配置和维护
ROS MASTER需要额外的工作,尤其是在动态变化的网络环境中。 - 跨网络通信麻烦:
ROS MASTER的设计主要针对局域网环境,对于跨网络通信可能面临各种问题,不限于防火墙、NAT 等网络配置问题。
ROS2 的出现,去除了 ROS MASTER 的依赖,引入了基于 DDS 的分布式通信模型,虽然解决了上述问题,但在某些场景下,DDS 的复杂性和性能开销又显得过于沉重。
首先,基于 DDS 的通信使用起来会更加方便,因为节点间通信可以自动互相发现,自动匹配,但是方便必然会带来性能上的损失,节点间通信需要依靠广播来发现对方,这就会带来额外的网络开销,尤其是在节点数量较多的情况下,广播会带来大量的网络流量,影响通信效率。每有一个新的节点加入,其余节点都需要进行一次发现,理论上复杂度为 O(N),当节点数量较多时,性能损失会非常明显,以至于出现 discovery storm 的现象。且在跨网络通信时,广播的效率会更低,同一个 domain 内的节点依旧通过广播来发现对方,这就会带来更高的网络开销。
因此,不论是 ROS 中的 ROS MASTER 模型,还是 ROS2 中的 DDS 模型,都存在一些性能和可用性上的挑战。对于需要高性能、低延迟通信的场景,或者需要跨网络通信的分布式系统,这些模型可能并不是最佳选择。而他们存在缺陷最重要的问题就在于他们的通信方式过于单一,无法适应不同场景下的多样化需求。
ROS2 官方通信架构基于 DDS,但通过 RMW 抽象层允许替换具体中间件实现。而 Zenoh 虽说不是专门为 ROS2 而生,但它为解决 DDS 在大规模和跨网络场景中的局限提供了一种替代思路,且现在逐渐变得越来越流行,尤其在需要高性能、低延迟通信的场景中。因其配置上的灵活性,我们可以针对不同场景使用不同的配置策略。
在将 zenoh 运用到 ROS2 之前,需要先对 zenoh 的底层使用有一定了解
zenoh start
了解 zenoh 的基本使用,肯定是先从 python 入手
# 安装 zenoh-python
pip install eclipse-zenoh
Session
The Session is the main component of Zenoh. It holds the zenoh runtime object, which maintains the state of the connection of the node to the Zenoh network.
The session allows declaring other zenoh entities like Publisher, Subscriber, Querier, Queryable, and obtaining Liveliness instances, and keeps them functioning. Closing the session will undeclare all objects declared by it.
这是官方对 Session 的解释,通俗来说,Session 是 zenoh 中的核心组件,所有的通信实体(如发布者、订阅者、查询者等)都需要通过 Session 来创建和管理。Session 负责维护节点与 zenoh 网络的连接状态,确保通信实体能够正常工作。
当我们通过 Session 创建不论是 Publisher、Subscriber 还是 Querier、Queryable,它们都是基于Session的一个个通信实体,这些通信实体的生命周期和 Session 是绑定在一起的,也就是说,当我们关闭 Session 时,这些通信实体也会被撤销,无法再使用了。
Session是声明容器,我们可以将其理解为:
session.declare_xxx(...)
Session 的两种 API 风格
我们可以显示声明一个 publisher,或者直接在 Session 上调用 put 方法来发布数据:
# 显式声明 publisher
pub = session.declare_publisher("demo/key")
pub.put("hello")
# 直接在 Session 上调用 put 方法
session.put("demo/key", "hello")
针对于直接在 Session 上调用 put 方法来发布数据的方式,官方文档中是这样解释的:
This is a shortcut for declaring a Publisher and calling put on it.
也就是说,直接在 Session 上调用 put 方法来发布数据的方式,本质上是一个快捷方式,它会在内部自动声明一个 Publisher,并调用这个 Publisher 的 put 方法来发布数据。这种方式适合于一些简单的场景(一般用来调试),可以减少代码量,但如果需要更复杂的功能,比如设置 QoS、使用不同的通信模式等,还是需要显式声明一个 Publisher 来进行配置。
没有 Session 就没有一切,zenoh 中一切通信都是基于 Session 来进行的,所以说,每一个节点都需要创建一个 Session 来进行通信的说法是错误的,并不是节点创建了 Session,而是 Session 创建了节点。在下面的示例中,我们不难发现,任何的通信测试都需要先创建一个 Session,而后通过这个 Session 来创建通信组件。
Selector
A selector is the combination of a KeyExpr, which defines the set of keys that are relevant to an operation, and a set of Parameters with a few intended uses.
也就是说,Selector 是 zenoh 中用于定义通信操作相关的键集合和参数的一种机制。它由一个 KeyExpr 和一组参数组成,KeyExpr 定义了与通信操作相关的键集合,而参数则可以用于指定一些特定的通信行为或属性。
KeyExpr 用于匹配通信中的键,比如在发布-订阅模式中,订阅者可以使用 KeyExpr 来指定它感兴趣的键集合,而发布者则可以使用 KeyExpr 来指定它发布数据的键集合。参数则可以用来在 Query 模式中指定参数。
ZBytes
数据传输中,序列化和反序列化是尤为重要的,ZBytes 是 zenoh 中用于表示二进制数据的一种类型,它提供了一种高效的方式来处理和传输二进制数据。在 zenoh 中,所有的数据传输都是基于 ZBytes 来进行的,无论是发布-订阅模式还是查询模式,数据都会被序列化成 ZBytes 进行传输,而接收方则需要将 ZBytes 反序列化成原始数据格式来使用。
我们可以利用 z_serialize 和 z_deserialize 方法来进行数据的序列化和反序列化:
# Numeric: UInt8, UInt16, Uint32, UInt64, UInt128, Int8, Int16, Int32, Int64,
# Int128, int (handled as int32), Float32, Float64, float (handled as Float64)
input = UInt32(1234)
payload = z_serialize(input)
output = z_deserialize(UInt32, payload)
assert input == output
# list
input = [0.0, 1.5, 42.0] # all items must have the same type
payload = z_serialize(input)
output = z_deserialize(list[float], payload)
assert input == output
# dict
input = {0: "abc", 1: "def"}
payload = z_serialize(input)
output = z_deserialize(dict[int, str], payload)
assert input == output
# tuple
input = (0.42, "string")
payload = z_serialize(input)
output = z_deserialize(tuple[float, str], payload)
assert input == output
这是官方 z_bytes.py 中的示例(z_bytes),我们可以看到,zenoh 中的 ZBytes 支持多种数据类型的序列化和反序列化,包括数值类型、列表、字典和元组等,这些数据类型在 zenoh 中都可以被高效地序列化成 ZBytes 进行传输,并且在接收方可以方便地反序列化回原始数据格式来使用。
peer 模式下的基础通信
下面是一个最简单的 zenoh 发布者和订阅者示例:
#!/usr/bin/env python3
import zenoh, random, time
# This example simulates a sensor that periodically publishes random values.
random.seed() # Initialize the random number generator with the current system time
def read_temp():
return random.randint(0, 100)
#
if __name__ == "__main__":
with zenoh.open(zenoh.Config()) as session:
key = 'myhome/kitchen/temp'
pub = session.declare_publisher(key)
while True:
temp = read_temp()
buf = f"{temp}"
print(f"Putting Data ('{key}: '{buf})...")
pub.put(buf)
time.sleep(1)
#!/usr/bin/env python3
import zenoh, time
def listener(sample):
print(f"Received {sample.kind} ('{sample.key_expr}': '{sample.payload.to_string()}')")
if __name__ == "__main__":
with zenoh.open(zenoh.Config()) as session:
sub = session.declare_subscriber("myhome/kitchen/temp", listener)
time.sleep(60)
pub 负责生成随机温度数据并发布到 myhome/kitchen/temp 这个 key 上,而 sub 则订阅了这个 key,并在收到数据时打印出来。
这里的 pub 和 sub 使用的配置文件都是默认的 zenoh.Config(),这意味着它们会使用默认的通信方式来发现对方并进行通信,通常使用的是 peer 模式,这种模式下节点通过广播来发现对方。这种通信方式更像是 ROS2 中的 DDS 模式,节点之间通过广播来发现对方,适用于局域网环境。也就是说,想要完全模拟 ROS2 中的 DDS 模式,我们只需要使用默认的 zenoh.Config() 配置即可。
基于插件的数据持久化存储
zenoh 还支持基于插件的数据持久化存储,这对于需要长期保存数据的应用非常有用。通过配置 zenoh,我们可以将数据存储在本地文件系统、数据库或者云存储中。
首先,官方提供了最基础的内存存储,我们只需要配置 router的配置文件,开启 store 插件即可:
{
plugins: {
rest: { // activate and configure the REST plugin
http_port: 8000 // with HTTP server listening on port 8000
},
storage_manager: { // activate and configure the storage_manager plugin
storages: {
myhome: { // configure a "myhome" storage
key_expr: "myhome/**", // which subscribes and replies to query on myhome/**
volume: { // and using the "memory" volume (always present by default)
id: "memory"
}
}
}
}
}
}
利用 -c 选项指定 zenohd 启动的配置文件:
╭─root@ubuntu-s-2vcpu-2gb-sfo2-01 ~/project/zenoh ‹main*›
╰─➤ ./target/debug/zenohd -c /zenoh/project/study/myhome.json5
这样一来,pub 发布的数据只要在 myhome/ 路径下,就会被 zenohd 的 store 插件自动存储起来,之后我们可以通过查询接口来获取这些 key 的最后一次发布的数据:
# z_get.py
import zenoh
if __name__ == "__main__":
with zenoh.open(zenoh.Config()) as session:
replies = session.get('myhome/kitchen/temp')
for reply in replies:
try:
print("Received ('{}': '{}')"
.format(reply.ok.key_expr, reply.ok.payload.to_string()))
except:
print("Received (ERROR: '{}')"
.format(reply.err.payload.to_string()))
我们让 pub 发布一些数据后,而后关闭 pub,再运行 z_get.py,就会发现我们依然能够获取到 myhome/kitchen/temp 这个 key 的最后一次发布的数据:
(.venv) ╭─root@ubuntu-s-2vcpu-2gb-sfo2-01 /zenoh/project/study
╰─➤ ./z_sensor.py
Putting Data ('myhome/kitchen/temp: '89)...
^CTraceback (most recent call last):
File "/zenoh/project/study/./z_sensor.py", line 21, in <module>
time.sleep(1)
KeyboardInterrupt
(.venv) ╭─root@ubuntu-s-2vcpu-2gb-sfo2-01 /zenoh/project/study
╰─➤ ./z_get.py
Received ('myhome/kitchen/temp': '89')
client 模式下的基础通信
相比 peer 模式,client 模式下的通信方式更像是 ROS 中的 ROS MASTER 模式,节点需要先连接到一个中心服务器(即 zenohd)进行注册和发现,然后才能进行通信。这种模式适用于需要跨网络通信或者需要更高性能的场景,因为它避免了广播带来的网络开销。
在我们让 pub 和 sub 使用 client 模式之前,必须先启动一个 zenohd 服务器,而后我们在之前代码基础上稍微修改一下配置文件即可,我们在 pub 和 sub 的代码前面加上;
conf = zenoh.Config()
conf.insert_json5("mode", '"client"')
而后使用这个配置,就可以正常通信,但是需要注意的是,client 之前通信是强依赖于 zenohd 的,如果 zenohd 出现故障或者网络问题导致 pub 和 sub 无法连接到 zenohd,那么通信就会中断,这也是 client 模式的一个潜在风险点。因此 client 模式并不适合运用到 ROS2 通信的节点当中,这也是为什么默认配置文件使用的是 peer 模式的原因。
基于 router 的 peer 模式通信
在 ROS 中,如果我们没有 ROS MASTER,节点之间是无法通信的,而 zenoh 中的 peer 模式如果我们把多播给关闭,那么他们就必须依靠 router 或者手动配置监听 endpoint 来发现对方,为了测试 zenoh 中的 peer 底层通信的运作机制,我们来做一些测试。
首先,我们需要修改配置文件使得节点间通信基于 router,我们在 pub 和 sub 的代码前面加上:
# 设置日志打印,便于后面调试
zenoh.init_log_from_env_or("debug")
conf = zenoh.Config()
# 设置模式
conf.insert_json5("mode", '"peer"')
# 关闭 multicast
conf.insert_json5("scouting/multicast/enabled", "false")
# 连接到 zenohd
conf.insert_json5("connect/endpoints", '["tcp/127.0.0.1:7447"]')
先不启动 zenohd ,我们直接在两边启动 pub 和 sub,我们会发现他们无法通信,但他们并不会因此而报错,相反,它们会每隔 4s 尝试重新连接到 zenohd(重连策略也是可以通过配置文件调整的):
2026-02-22T10:45:18.217351Z DEBUG net-0 ThreadId(03) zenoh::net::runtime::orchestrator: Unable to connect to configured peer tcp/127.0.0.1:7447! Can not create a new TCP link bound to tcp/127.0.0.1:7447: [127.0.0.1:7447: Connection refused (os error 111) at /root/.cargo/git/checkouts/zenoh-9c599d5ef3e0480e/790faad/io/zenoh-link-commons/src/tcp.rs:96.] at /root/.cargo/git/checkouts/zenoh-9c599d5ef3e0480e/790faad/io/zenoh-links/zenoh-link-tcp/src/unicast.rs:280.. Retry in 4s.
2026-02-22T10:45:22.218744Z DEBUG net-0 ThreadId(03) zenoh::net::runtime::orchestrator: Try to connect: tcp/127.0.0.1:7447: global timeout: 18446744073709551615.999999999s, retry: ConnectionRetryConf { exit_on_failure: false, period_init_ms: 1000, period_max_ms: 4000, period_increase_factor: 2.0 }
2026-02-22T10:45:22.219047Z DEBUG net-0 ThreadId(03) zenoh::net::runtime::orchestrator: Unable to connect to configured peer tcp/127.0.0.1:7447! Can not create a new TCP link bound to tcp/127.0.0.1:7447: [127.0.0.1:7447: Connection refused (os error 111) at /root/.cargo/git/checkouts/zenoh-9c599d5ef3e0480e/790faad/io/zenoh-link-commons/src/tcp.rs:96.] at /root/.cargo/git/checkouts/zenoh-9c599d5ef3e0480e/790faad/io/zenoh-links/zenoh-link-tcp/src/unicast.rs:280.. Retry in 4s.
2026-02-22T10:45:26.219434Z DEBUG net-0 ThreadId(03) zenoh::net::runtime::orchestrator: Try to connect: tcp/127.0.0.1:7447: global timeout: 18446744073709551615.999999999s, retry: ConnectionRetryConf { exit_on_failure: false, period_init_ms: 1000, period_max_ms: 4000, period_increase_factor: 2.0 }
2026-02-22T10:45:26.219664Z DEBUG net-0 ThreadId(03) zenoh::net::runtime::orchestrator: Unable to connect to configured peer tcp/127.0.0.1:7447! Can not create a new TCP link bound to tcp/127.0.0.1:7447: [127.0.0.1:7447: Connection refused (os error 111) at /root/.cargo/git/checkouts/zenoh-9c599d5ef3e0480e/790faad/io/zenoh-link-commons/src/tcp.rs:96.] at /root/.cargo/git/checkouts/zenoh-9c599d5ef3e0480e/790faad/io/zenoh-links/zenoh-link-tcp/src/unicast.rs:280.. Retry in 4s.
两边节点都在尝试连接到 zenohd,而后我们启动 zenohd,我们会发现两边节点都成功连接到了 zenohd,并且开始通信:
下面是 sub 端的日志:
...
2026-02-22T10:46:06.541916Z DEBUG net-0 ThreadId(03) zenoh_transport::unicast::establishment::open: New transport link opened from 5046faa1314124cf71499466e61834b6 to eea2adfe2983e58bbaf16ded47a5441a: TransportLinkUnicast { link: Link { src: tcp/127.0.0.1:45330, dst: tcp/127.0.0.1:7447, mtu: 65480, is_reliable: true, is_streamed: true }, config: TransportLinkUnicastConfig { direction: Outbound, batch: BatchConfig { mtu: 49152, is_streamed: true, is_compression: false }, priorities: None, reliability: None } }.
2026-02-22T10:46:06.541973Z DEBUG net-0 ThreadId(03) zenoh::net::runtime::orchestrator: Successfully connected to configured peer tcp/127.0.0.1:7447
2026-02-22T10:46:06.542030Z DEBUG net-0 ThreadId(03) zenoh::net::runtime::orchestrator: Try to connect to peer 3af4ff56e88870bf66fd923e07017679 via any of [tcp/138.197.209.236:38299, tcp/[fe80::43b:dbff:fe0c:3308]:38299, tcp/[fe80::d895:7dff:feb7:4753]:38299, tcp/10.46.0.5:38299, tcp/10.120.0.2:38299]
2026-02-22T10:46:06.544279Z DEBUG net-0 ThreadId(03) zenoh_shm::posix_shm::segment: Opened SHM segment, id: 4179599025
2026-02-22T10:46:06.544839Z DEBUG acc-0 ThreadId(04) zenoh_link_tcp::unicast: Accepted TCP connection on [::ffff:138.197.209.236]:36221: [::ffff:138.197.209.236]:54192
2026-02-22T10:46:06.545011Z DEBUG acc-0 ThreadId(04) zenoh_shm::posix_shm::segment: Opened SHM segment, id: 4179599025
2026-02-22T10:46:06.545079Z DEBUG net-0 ThreadId(03) zenoh_transport::unicast::manager: Will use Universal transport!
2026-02-22T10:46:06.545250Z DEBUG net-0 ThreadId(03) zenoh::net::routing::router: New Face{2, 3af4ff56e88870bf66fd923e07017679}
2026-02-22T10:46:06.545409Z DEBUG net-0 ThreadId(03) zenoh::net::protocol::gossip: [Gossip] Add node (link) 3af4ff56e88870bf66fd923e07017679
2026-02-22T10:46:06.546056Z DEBUG net-0 ThreadId(03) zenoh_transport::unicast::manager: New transport opened between 5046faa1314124cf71499466e61834b6 and 3af4ff56e88870bf66fd923e07017679 - whatami: peer, sn resolution: U32, initial sn: 51080727, qos: true, shm: Some(TransportShmConfig { partner_protocols: [0] }), multilink: false, lowlatency: false
2026-02-22T10:46:06.546318Z DEBUG net-0 ThreadId(03) zenoh_transport::unicast::establishment::open: New transport link opened from 5046faa1314124cf71499466e61834b6 to 3af4ff56e88870bf66fd923e07017679: TransportLinkUnicast { link: Link { src: tcp/138.197.209.236:52558, dst: tcp/138.197.209.236:38299, mtu: 65480, is_reliable: true, is_streamed: true }, config: TransportLinkUnicastConfig { direction: Outbound, batch: BatchConfig { mtu: 49152, is_streamed: true, is_compression: false }, priorities: None, reliability: None } }.
2026-02-22T10:46:06.547289Z DEBUG net-0 ThreadId(03) zenoh::net::runtime::orchestrator: Successfully connected to newly scouted peer: Transport Unicast { zid: 3af4ff56e88870bf66fd923e07017679, whatami: Peer, is_qos: true, is_shm: true, links: [Link { src: tcp/138.197.209.236:52558, dst: tcp/138.197.209.236:38299, group: None, mtu: 65480, is_streamed: true, interfaces: ["eth0"], auth_identifier: Tcp, priorities: None, reliability: None }, Link { src: tcp/[::ffff:138.197.209.236]:36221, dst: tcp/[::ffff:138.197.209.236]:54192, group: None, mtu: 49152, is_streamed: true, interfaces: ["eth0"], auth_identifier: Tcp, priorities: None, reliability: None }], sn_resolution: U32 }
2026-02-22T10:46:06.547481Z DEBUG acc-0 ThreadId(04) zenoh_transport::unicast::establishment::accept: New transport link accepted from 3af4ff56e88870bf66fd923e07017679 to 5046faa1314124cf71499466e61834b6: TransportLinkUnicast { link: Link { src: tcp/[::ffff:138.197.209.236]:36221, dst: tcp/[::ffff:138.197.209.236]:54192, mtu: 49152, is_reliable: true, is_streamed: true }, config: TransportLinkUnicastConfig { direction: Inbound, batch: BatchConfig { mtu: 49152, is_streamed: true, is_compression: false }, priorities: None, reliability: None } }
Received SampleKind.PUT ('demo/example/hello': 'Hello, World!')
所以说,zenoh 中的 peer 模式并不完全依赖于广播来发现对方,他们也可以通过连接到 router 来发现对方,这种通信方式更像是 ROS 中的 ROS MASTER 模式,节点需要先连接到一个中心服务器进行注册和发现,然后才能进行通信,但是与 ROS MASTER 模式不同的是,zenoh 中的 peer 模式有重连机制,即使中心服务器出现故障或者网络问题导致节点无法连接到中心服务器,节点也会每隔一段时间尝试重新连接,从而提高通信的可靠性。
query 模式下的通信
zenoh 除了支持 pub-sub、storage-get 的通信模式之外,还支持一种基于 query 的通信模式,这种调用模式更像是 rpc,即客户端发送一个请求,服务器处理请求并返回结果。这种模式适用于需要请求-响应通信的场景,比如服务调用或者数据查询。
query 通信和 pub/sub 最大的区别在于,query 发送请求给后需要等待获得结果,而 pub/sub则是纯粹的发送和接收消息。
下面是一个简单的使用示例:
z_querier.py
#!/usr/bin/env python3
import zenoh
import itertools
import time
import random
def on_matching_status_update(status: zenoh.MatchingStatus):
if status.matching:
print("Querier has matching queryables.")
else:
print("Querier has NO MORE matching queryables")
if __name__ == "__main__":
zenoh.init_log_from_env_or("error")
with zenoh.open(zenoh.Config()) as session:
key = f"myhome/kitchen/**"
querier = session.declare_querier(key, target=zenoh.QueryTarget.ALL)
querier.declare_matching_listener(on_matching_status_update)
for idx in itertools.count():
x = random.randint(0, 100)
y = random.randint(0, 100)
params = {"x": str(x), "y": str(y)}
print(f"{x} + {y} = ?")
query_selector = zenoh.Selector(key, parameters=params)
payload = f"Hello {idx}"
buf = f"{idx:04d}, {payload if payload else ''}"
replies = querier.get(parameters=query_selector.parameters, payload=buf)
for reply in replies:
try:
print("Received ('{}': '{}')"
.format(reply.ok.key_expr, reply.ok.payload.to_string()))
except:
print("Received (ERROR: '{}')"
.format(reply.err.payload.to_string()))
time.sleep(1)
z_queryable.py
#! /usr/bin/env python3
import zenoh, time
if __name__ == "__main__":
zenoh.init_log_from_env_or("error")
with zenoh.open(zenoh.Config()) as session:
key = "myhome/kitchen/temp"
queryable = session.declare_queryable("myhome/kitchen/temp")
while True:
with queryable.recv() as query:
if query.payload is not None:
print(
f">> [Queryable ] Received Query '{query.selector}'"
f" with payload: '{query.payload.to_string()}'"
)
else:
print(f">> [Queryable ] Received Query '{query.selector}'")
x = int(query.selector.parameters.get("x", "0"))
y = int(query.selector.parameters.get("y", "0"))
payload = f"{x} + {y} = {x + y}"
query.reply(key, payload)
这个示例使用 querier 实现了一个远程加法服务,querier 发送一个包含两个参数 x 和 y 的查询请求,而 queryable 则接收这个请求,解析出参数 x 和 y,计算它们的和,并将结果作为响应返回给 querier。
我们可以通过 target 选项来决定 querier 会将请求递送给哪些 queryable,如果我们设置 target=zenoh.QueryTarget.ALL,那么 querier 会将请求递送给所有匹配的 queryable,具体可参考:
除了声明 querier 以外,我们还通过 Session 的 declare_matching_listener 方法来注册了一个匹配状态监听器,这个监听器会在 querier 的匹配状态发生变化时被调用,当 querier 有匹配的 queryable 时,监听器会打印出 “Querier has matching queryables.”,当 querier 没有匹配的 queryable 时,监听器会打印出 “Querier has NO MORE matching queryables”。
listener 的回调函数会在 querier 的匹配状态发生变化时被调用,这个状态包括 querier 是否有匹配的 queryable,以及匹配的 queryable 的数量等信息。通过这个监听器,我们可以实时了解 querier 的匹配状态,从而做出相应的处理,比如在没有匹配的 queryable 时进行重试或者报警等操作。
因此,我们可以看到预期中的结果:
# z_queryable 启动
Querier has matching queryables.
95 + 95 = ?
Received ('myhome/kitchen/temp': '95 + 95 = 190')
41 + 26 = ?
Received ('myhome/kitchen/temp': '41 + 26 = 67')
76 + 55 = ?
Received ('myhome/kitchen/temp': '76 + 55 = 131')
56 + 48 = ?
Received ('myhome/kitchen/temp': '56 + 48 = 104')
71 + 6 = ?
Received ('myhome/kitchen/temp': '71 + 6 = 77')
86 + 28 = ?
Received ('myhome/kitchen/temp': '86 + 28 = 114')
59 + 50 = ?
Received ('myhome/kitchen/temp': '59 + 50 = 109')
67 + 3 = ?
Received ('myhome/kitchen/temp': '67 + 3 = 70')
65 + 81 = ?
Received ('myhome/kitchen/temp': '65 + 81 = 146')
75 + 67 = ?
Received ('myhome/kitchen/temp': '75 + 67 = 142')
60 + 23 = ?
Received ('myhome/kitchen/temp': '60 + 23 = 83')
100 + 9 = ?
Received ('myhome/kitchen/temp': '100 + 9 = 109')
47 + 25 = ?
Received ('myhome/kitchen/temp': '47 + 25 = 72')
# 关闭 z_queryable
Querier has NO MORE matching queryables