@Extension
public class WebHookSubBroker implements ISubBroker {
private static final Logger log = LoggerFactory.getLogger(WebHookSubBroker.class);
private static final int WEBHOOK_SUB_BROKER_ID = 1001;
private final ObjectMapper mapper = new ObjectMapper();
@Override
public int id() {
log.info("id() returning: {}", WEBHOOK_SUB_BROKER_ID);
return WEBHOOK_SUB_BROKER_ID;
}
@Override
public CompletableFuture<CheckReply> check(CheckRequest request) {
log.info("check() called, tenantId={}, delivererKey={}, matchInfoCount={}",
request.getTenantId(), request.getDelivererKey(), request.getMatchInfoCount());
CheckReply.Builder replyBuilder = CheckReply.newBuilder();
for (int i = 0; i < request.getMatchInfoCount(); i++) {
replyBuilder.addCode(CheckReply.Code.OK);
}
CheckReply reply = replyBuilder.build();
log.info("check() returning: {}", reply);
return CompletableFuture.completedFuture(reply);
}
@Override
public IDeliverer open(String delivererKey) {
log.info("open() called, delivererKey={}", delivererKey);
WebhookDeliverer deliverer = new WebhookDeliverer(delivererKey, mapper);
log.info("open() returning: {}", deliverer);
return deliverer;
}
@Override
public void close() {
log.info("close() called on SubBroker");
}
private static class WebhookDeliverer implements IDeliverer {
private final String delivererKey;
private final ObjectMapper mapper;
WebhookDeliverer(String delivererKey, ObjectMapper mapper) {
this.delivererKey = delivererKey;
this.mapper = mapper;
}
@Override
public CompletableFuture<DeliveryReply> deliver(DeliveryRequest request) {
log.info("WebhookDeliverer triggered! Request package count: {}, map size: {}",
request.getPackageCount(), request.getPackageMap().size());
log.info("deliver() called, delivererKey={}, tenants={}",
delivererKey, request.getPackageMap().keySet());
String url = WebhookConfig.getDataUrl();
if (url == null || url.isEmpty()) {
log.warn("Webhook data URL is not configured. Rejecting all data deliveries.");
return rejectAll(request);
}
List<CompletableFuture<HttpResponse<String>>> futures = new ArrayList<>();
for (Map.Entry<String, DeliveryPackage> tenantEntry : request.getPackageMap().entrySet()) {
DeliveryPackage pkg = tenantEntry.getValue();
for (DeliveryPack pack : pkg.getPackList()) {
TopicMessagePack tmp = pack.getMessagePack();
String topic = tmp.getTopic();
for (TopicMessagePack.PublisherPack pubPack : tmp.getMessageList()) {
String clientId = pubPack.getPublisher().getMetadataOrDefault(
org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_CLIENT_ID_KEY, "");
String username = pubPack.getPublisher().getMetadataOrDefault(
org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_USER_ID_KEY, "");
String peerhost = pubPack.getPublisher().getMetadataOrDefault(
org.apache.bifromq.type.MQTTClientInfoConstants.MQTT_CLIENT_ADDRESS_KEY, "");
for (Message msg : pubPack.getMessageList()) {
Map<String, Object> record = new HashMap<>();
record.put("username", username);
record.put("topic", topic);
record.put("timestamp", msg.getTimestamp());
record.put("qos", msg.getPubQoS().getNumber());
record.put("publishReceivedAt", msg.getTimestamp());
record.put("pubProps", new HashMap<String, Object>());
record.put("peerhost", peerhost);
record.put("metadata", pubPack.getPublisher().getMetadataMap());
record.put("id", String.valueOf(msg.getMessageId()));
Map<String, Object> flags = new HashMap<>();
flags.put("dup", false);
flags.put("retain", msg.getIsRetain());
record.put("flags", flags);
record.put("event", "message.publish");
record.put("clientid", clientId);
try {
record.put("payload", msg.getPayload().toStringUtf8());
} catch (Exception e) {
record.put("payload",
java.util.Base64.getEncoder().encodeToString(msg.getPayload().toByteArray()));
}
try {
String payloadStr = mapper.writeValueAsString(record);
log.info("Data webhook request -> url={}, delivererKey={}, payload={}",
url, delivererKey, payloadStr);
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(url))
.header("Content-Type", "application/json")
.header("X-Deliverer-Key", delivererKey)
.POST(HttpRequest.BodyPublishers.ofString(payloadStr))
.timeout(Duration.ofSeconds(10));
SignatureUtils.sign(requestBuilder, payloadStr);
HttpRequest httpRequest = requestBuilder.build();
futures.add(WebhookHttpClient.get()
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
.whenComplete((res, ex) -> {
if (ex != null) {
log.error("Failed to push data to webhook", ex);
} else if (res.statusCode() < 200 || res.statusCode() >= 300) {
log.warn("Webhook rejected data delivery with status {}, body={}",
res.statusCode(), res.body());
} else {
log.info("Data webhook response <- status={}, body={}",
res.statusCode(), res.body());
}
}));
} catch (Exception e) {
log.error("Failed to build webhook data request for message", e);
}
}
}
}
}
if (futures.isEmpty()) {
return acceptAll(request);
}
log.info("Pushing data to webhook");
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.handle((v, t) -> {
DeliveryReply reply = createAcceptReply(request);
log.info("deliver() returning: {}", reply);
return reply;
});
}
@Override
public void close() {
log.info("close() called on Deliverer, delivererKey={}", delivererKey);
}
private CompletableFuture<DeliveryReply> acceptAll(DeliveryRequest request) {
DeliveryReply reply = createAcceptReply(request);
log.info("deliver() returning: {}", reply);
return CompletableFuture.completedFuture(reply);
}
private DeliveryReply createAcceptReply(DeliveryRequest request) {
DeliveryReply.Builder replyBuilder = DeliveryReply.newBuilder().setCode(DeliveryReply.Code.OK);
for (Map.Entry<String, DeliveryPackage> entry : request.getPackageMap().entrySet()) {
DeliveryResults.Builder resultsBuilder = DeliveryResults.newBuilder();
for (DeliveryPack pack : entry.getValue().getPackList()) {
for (int i = 0; i < pack.getMatchInfoCount(); i++) {
resultsBuilder.addResult(DeliveryResult.newBuilder()
.setMatchInfo(pack.getMatchInfo(i))
.setCode(DeliveryResult.Code.OK)
.build());
}
}
replyBuilder.putResult(entry.getKey(), resultsBuilder.build());
}
return replyBuilder.build();
}
private CompletableFuture<DeliveryReply> rejectAll(DeliveryRequest request) {
DeliveryReply reply = DeliveryReply.newBuilder()
.setCode(DeliveryReply.Code.ERROR)
.build();
log.warn("deliver() returning: {}", reply);
return CompletableFuture.completedFuture(reply);
}
}
}
private void registerWebhookRoute() {
if (!config.getDistServiceConfig().getServer().isEnable() ||
!config.getDistServiceConfig().getWorker().isEnable()) {
return;
}
java.util.concurrent.CompletableFuture.runAsync(() -> {
boolean success = false;
while (!success && !Thread.currentThread().isInterrupted()) {
try {
log.info("Attempting to register webhook route...");
MatchResult result = distClient.addRoute(
System.nanoTime(),
"default",
TopicUtil.from("#"),
"webhook-receiver",
"webhook-deliverer",
1001,
0L
).join();
if (result == MatchResult.OK) {
log.info("Webhook route registered successfully.");
success = true;
} else {
log.warn("Webhook route registration returned: {}. Retrying in 5 seconds...", result);
Thread.sleep(5000);
}
} catch (Exception e) {
log.warn("Webhook route registration failed: {}. Retrying in 5 seconds...", e.getMessage());
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
});
}
备注: 我看其他 issue 提到 bifromq-data-integration 项目可转发数据流(理论上也支持自定义 http 转发), 想请教下最佳实践方式, 以及我目前的方式是否可行. 感谢~
背景
设备上报数据, 期望能将数据通过 http 转发到其他服务
插件实现方式(单独工程)
源码修改
StandaloneStarter
问题1: 插件实现方式是否正确, 目前使用单独项目构建, 然后将 jar 包放到源码工程的 plugins 目录下
问题2: 设备上报数据转发的方式是否正确, 目前改动了源码启动类, 预期是对源码不做任何改动
备注: 我看其他 issue 提到 bifromq-data-integration 项目可转发数据流(理论上也支持自定义 http 转发), 想请教下最佳实践方式, 以及我目前的方式是否可行. 感谢~