Vert.x 学习笔记4 EventBus
EventBus(类似服务中内置了一个messageQueue)
- 每一个 Vert.x 实例都有一个单独的 Event Bus 实例。您可以通过 Vertx 实例的 eventBus方法来获得对应的 EventBus 实例。
- 应用中的不同组成部分可以通过 Event Bus 相互通信,您无需关心它们由哪一种语言实现, 也无需关心它们是否在同一个 Vert.x 实例中。
- 您甚至可以通过桥接的方式让浏览器中运行的多个JavaScript客户端在同一个 Event Bus 上相互通信。
todo 这句话目前还不太理解,让js 通过sdk 和服务端的vertx 交互吗?
- Event Bus构建了一个跨越多个服务器节点和多个浏览器的分布式点对点消息系统。
- Event Bus支持发布/订阅、点对点、请求-响应的消息传递方式。
- Event Bus的API很简单。基本上只涉及注册处理器、 注销处理器以及发送和发布(publish)消息。
基本概念
寻址
- 消息的发送目标被称作 地址(address) 。
- Vert.x的地址格式并不花哨。Vert.x中的地址就是一个简单的字符串,任何字符串都合法。 不过还是建议使用某种规范来进行地址的命名。 例如 使用点号(.)来划分命名空间。
- 一些合法的地址形如:europe.news.feed1、acme.games.pacman、sausages 以及 X 。
处理器(handler)
- 消息需由处理器(Handler)来接收。您需要将处理器注册在某个地址上。
- 同一个地址可以注册许多不同的处理器。
- 一个处理器也可以注册在多个不同的地址上。
发布/订阅消息
- Event Bus支持 发布(publish)消息 功能。
- 消息将被发布到一个地址上。 发布意味着信息会被传递给所有注册在该地址上的处理器。
- 即我们熟悉的 发布/订阅 消息传递模式。
点对点消息传递 与 请求-响应消息传递
- Event Bus也支持 点对点 消息模式。
- 消息将被发送到一个地址上,Vert.x仅会把消息发给注册在该地址上的处理器中的其中一个。
- 若这个地址上注册有不止一个处理器,那么Vert.x将使用 不严格的轮询算法 选择其中一个。
- 点对点消息传递模式下,可在消息发送的时候指定一个应答处理器(可选)。
获取Event Bus
EventBus eb = vertx.eventBus();
注册处理器
EventBus eb = vertx.eventBus();
eb.consumer("news.uk.sport", message -> {
System.out.println("I have received a message: " + message.body());
});
// 复杂的写法
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.consumer("news.uk.sport");
consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
});
// 若您希望在完成注册后收到通知,您可以在 MessageConsumer 对象上注册 一个 completionhandler
consumer.completionHandler(res -> {
if (res.succeeded()) {
System.out.println("The handler registration has reached all nodes");
} else {
System.out.println("Registration failed!");
}
});
注销处理器
consumer.unregister(res -> {
if (res.succeeded()) {
System.out.println("The handler un-registration has reached all nodes");
} else {
System.out.println("Un-registration failed!");
}
});
发布消息
eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");
发送消息
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");
设置消息头
DeliveryOptions options = new DeliveryOptions();
options.addHeader("some-header", "some-value");
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball", options);
应答消息/发送回复
MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
message.reply("how interesting!");
});
带超时的发送
- 当发送带有应答处理器的消息时,可以在
DeliveryOptions中指定一个超时时间。如果在这个时间之内没有收到应答,则会以“失败的结果”为参数调用应答处理器。默认超时是 30 秒。
发送失败
- 没有可用的处理器来接收消息
- 接收者调用了 fail方法显式声明失败
- 发生这些情况时,应答处理器将会以这些异常失败结果为参数进行调用。
转载自 vertx 中文翻译文档