ThingsBoard源码解析-数据订阅与规则链数据处理
前言
结合本篇对规则链的执行过程进行探讨
根据之前对MQTT
源码的学习,我们由消息的处理入手
//org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {switch (msg.fixedHeader().messageType()) {case PUBLISH:processPublish(ctx, (MqttPublishMessage) msg);break;case SUBSCRIBE:processSubscribe(ctx, (MqttSubscribeMessage) msg);break;case UNSUBSCRIBE:processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);break;case PINGREQ:if (checkConnected(ctx, msg)) {ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));transportService.reportActivity(deviceSessionCtx.getSessionInfo());}break;case DISCONNECT:ctx.close();break;case PUBACK:int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);if (rpcRequest != null) {transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);}break;default:break;}}
接着看对发布消息的处理
//org.thingsboard.server.transport.mqtt.MqttTransportHandlerprivate void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {if (!checkConnected(ctx, mqttMsg)) {return;}String topicName = mqttMsg.variableHeader().topicName();int msgId = mqttMsg.variableHeader().packetId();log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {//消息来源为网关主题if (gatewaySessionHandler != null) {handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg);transportService.reportActivity(deviceSessionCtx.getSessionInfo());}} else {//处理设备的消息,重点processDevicePublish(ctx, mqttMsg, topicName, msgId);}
}
继续
//org.thingsboard.server.transport.mqtt.MqttTransportHandlerprivate void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {try {Matcher fwMatcher;MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));} else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));attrReqTopicType = TopicType.V1;} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));toServerRpcSubTopicType = TopicType.V1;} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));} else if ((fwMatcher = FW_REQUEST_PATTERN.matcher(topicName)).find()) {getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.FIRMWARE);} else if ((fwMatcher = SW_REQUEST_PATTERN.matcher(topicName)).find()) {getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE);} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) {TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) {TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) {TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) {TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) {TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) {TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) {TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC)) {TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getProtoMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC)) {TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC)) {TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getJsonMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));toServerRpcSubTopicType = TopicType.V2_JSON;} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC)) {TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getProtoMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));toServerRpcSubTopicType = TopicType.V2_PROTO;} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC)) {TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));toServerRpcSubTopicType = TopicType.V2;} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX)) {TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getJsonMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX);transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));attrReqTopicType = TopicType.V2_JSON;} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX)) {TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getProtoMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX);transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));attrReqTopicType = TopicType.V2_PROTO;} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX)) {TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));attrReqTopicType = TopicType.V2;} else {transportService.reportActivity(deviceSessionCtx.getSessionInfo());ack(ctx, msgId);}} catch (AdaptorException e) {log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);ctx.close();}
}
这里根据消息来源主题的不同,进行对应的处理
顺着属性消息处理继续往下看
//org.thingsboard.server.common.transport.service.DefaultTransportService@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) {//更新活动时间reportActivityInternal(sessionInfo);TenantId tenantId = getTenantId(sessionInfo);DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));//获取键值对JsonObject json = JsonUtils.getJsonObject(msg.getKvList());//构造元数据TbMsgMetaData metaData = new TbMsgMetaData();metaData.putValue("deviceName", sessionInfo.getDeviceName());metaData.putValue("deviceType", sessionInfo.getDeviceType());metaData.putValue("notifyDevice", "false");CustomerId customerId = getCustomerId(sessionInfo);//发送至规则引擎sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));}
}
继续
//org.thingsboard.server.common.transport.service.DefaultTransportServiceprivate void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) {//创建设备配置标识DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));//从缓存中获取设备配置DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);RuleChainId ruleChainId;String queueName;if (deviceProfile == null) {//无设备配置,使用默认的规则链和队列名log.warn("[{}] Device profile is null!", deviceProfileId);ruleChainId = null;queueName = ServiceQueue.MAIN;} else {//获取规则链标识ruleChainId = deviceProfile.getDefaultRuleChainId();//获取队列名String defaultQueueName = deviceProfile.getDefaultQueueName();queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;}//创建消息TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null);//发送至规则引擎sendToRuleEngine(tenantId, tbMsg, callback);
}
继续
//org.thingsboard.server.common.transport.service.DefaultTransportServiceprivate void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {//获取分区信息TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator());if (log.isTraceEnabled()) {log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);}//创建消息ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg)).setTenantIdMSB(tenantId.getId().getMostSignificantBits()).setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();//统计数增加ruleEngineProducerStats.incrementTotal();//统计相关回调StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats);//发送至规则消息引擎队列ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
}
接下来,我们需要去消费端查看后续的处理
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
先找到TbQueueProducer的位置
接着从TbQueueConsumer入手
package org.thingsboard.server.queue;import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;import java.util.List;
import java.util.Set;public interface TbQueueConsumer<T extends TbQueueMsg> {String getTopic();void subscribe();void subscribe(Set<TopicPartitionInfo> partitions);void unsubscribe();List<T> poll(long durationInMillis);void commit();boolean isStopped();}
重点关注拉取方法List<T> poll(long durationInMillis);
,看看在哪些地方被调用
发现目标DefaultTbRuleEngineConsumerService
//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerServicevoid consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {updateCurrentThreadName(threadSuffix);while (!stopped && !consumer.isStopped()) {try {//拉取消息List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);if (msgs.isEmpty()) {continue;}//获取提交策略final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration);//获取确认策略final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration);//初始化提交策略submitStrategy.init(msgs);while (!stopped) {//创建处理上下文TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs());//提交,重点为 submitMessage 方法submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> submitMessage(configuration, stats, ctx, id, msg)));//超时等待final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);//创建结果TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);if (timeout) {//超时处理printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout");}if (!ctx.getFailedMap().isEmpty()) {//失败处理printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed");}//打印统计信息ctx.printProfilerStats();//根据结果获取决策TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);if (statsEnabled) {//记录是否提交完成stats.log(result, decision.isCommit());}//清理上下文ctx.cleanup();//判断决策if (decision.isCommit()) {//已提交//停止提交策略submitStrategy.stop();//退出循环break;} else {//未提交完毕//将决策的重试消息集合更新至提交策略,继续提交submitStrategy.update(decision.getReprocessMap());}}//消费端提交确认consumer.commit();} catch (Exception e) {if (!stopped) {log.warn("Failed to process messages from queue.", e);try {Thread.sleep(pollDuration);} catch (InterruptedException e2) {log.trace("Failed to wait until the server has capacity to handle new requests", e2);}}}}log.info("TB Rule Engine Consumer stopped.");
}
查看关键方法
//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerServicevoid submitMessage(TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, TbMsgPackProcessingContext ctx, UUID id, TbProtoQueueMsg<ToRuleEngineMsg> msg) {log.trace("[{}] Creating callback for topic {} message: {}", id, configuration.getName(), msg.getValue());//获取原始消息ToRuleEngineMsg toRuleEngineMsg = msg.getValue();//获取租户标识TenantId tenantId = TenantId.fromUUID(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));//创建回调TbMsgCallback callback = prometheusStatsEnabled ? new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : new TbMsgPackCallback(id, tenantId, ctx);try {if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {//转发至规则引擎 ActorforwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);} else {//消息为空直接回调成功方法callback.onSuccess();}} catch (Exception e) {//回调失败方法callback.onFailure(new RuleEngineException(e.getMessage()));}
}
继续
//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerServiceprivate void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {//构建消息TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);QueueToRuleEngineMsg msg;//获取关联类型列表ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();Set<String> relationTypes = null;if (relationTypesList != null) {if (relationTypesList.size() == 1) {relationTypes = Collections.singleton(relationTypesList.get(0));} else {relationTypes = new HashSet<>(relationTypesList);}}//创建消息msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());//使用 Actor 系统上下文发送消息actorContext.tell(msg);
}
先看一下QueueToRuleEngineMsg
package org.thingsboard.server.common.msg.queue;import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;import java.util.Set;/*** Created by ashvayka on 15.03.18.*/
@ToString
@EqualsAndHashCode(callSuper = true)
public final class QueueToRuleEngineMsg extends TbRuleEngineActorMsg {@Getterprivate final TenantId tenantId;@Getterprivate final Set<String> relationTypes;@Getterprivate final String failureMessage;public QueueToRuleEngineMsg(TenantId tenantId, TbMsg tbMsg, Set<String> relationTypes, String failureMessage) {super(tbMsg);this.tenantId = tenantId;this.relationTypes = relationTypes;this.failureMessage = failureMessage;}@Overridepublic MsgType getMsgType() {return MsgType.QUEUE_TO_RULE_ENGINE_MSG;}@Overridepublic void onTbActorStopped(TbActorStopReason reason) {String message;if (msg.getRuleChainId() != null) {message = reason == TbActorStopReason.STOPPED ?String.format("Rule chain [%s] stopped", msg.getRuleChainId().getId()) :String.format("Failed to initialize rule chain [%s]!", msg.getRuleChainId().getId());} else {message = reason == TbActorStopReason.STOPPED ? "Rule chain stopped" : "Failed to initialize rule chain!";}msg.getCallback().onFailure(new RuleEngineException(message));}public boolean isTellNext() {return relationTypes != null && !relationTypes.isEmpty();}}
得知消息类型为MsgType.QUEUE_TO_RULE_ENGINE_MSG,后面会用到
接着我们看Actor
系统对消息的处理
//org.thingsboard.server.actors.ActorSystemContextpublic void tell(TbActorMsg tbActorMsg) {appActor.tell(tbActorMsg);
}
appActor 为应用Actor
,是整个Actor
系统的根Actor
,感兴趣可以自行阅读
根据之前的学习,我们了解到Actor
的处理方法为boolean process(TbActorMsg msg)
//org.thingsboard.server.actors.service.ContextAwareActor@Override
public boolean process(TbActorMsg msg) {if (log.isDebugEnabled()) {log.debug("Processing msg: {}", msg);}//处理消息if (!doProcess(msg)) {log.warn("Unprocessed message: {}!", msg);}return false;
}
可见,正真执行的方法为protected abstract boolean doProcess(TbActorMsg msg)
//org.thingsboard.server.actors.app.AppActor@Override
protected boolean doProcess(TbActorMsg msg) {if (!ruleChainsInitialized) {//规则链未初始化//初始化租户 ActorinitTenantActors();ruleChainsInitialized = true;if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {log.warn("Rule Chains initialized by unexpected message: {}", msg);}}//判断消息类型switch (msg.getMsgType()) {case APP_INIT_MSG:break;case PARTITION_CHANGE_MSG:ctx.broadcastToChildren(msg);break;case COMPONENT_LIFE_CYCLE_MSG:onComponentLifecycleMsg((ComponentLifecycleMsg) msg);break;case QUEUE_TO_RULE_ENGINE_MSG:onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);break;case TRANSPORT_TO_DEVICE_ACTOR_MSG:onToDeviceActorMsg((TenantAwareMsg) msg, false);break;case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:onToDeviceActorMsg((TenantAwareMsg) msg, true);break;case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:onToTenantActorMsg((EdgeEventUpdateMsg) msg);break;case SESSION_TIMEOUT_MSG:ctx.broadcastToChildrenByType(msg, EntityType.TENANT);break;default:return false;}return true;
}
回忆之前消息类型为MsgType.QUEUE_TO_RULE_ENGINE_MSG
//org.thingsboard.server.actors.app.AppActorprivate void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {//消息来自系统,视为异常msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));} else {if (!deletedTenants.contains(msg.getTenantId())) {//租户未被删除//获取或创建租户 Actor 并发送消息getOrCreateTenantActor(msg.getTenantId()).tell(msg);} else {//租户已删除,直接回调成功方法msg.getMsg().getCallback().onSuccess();}}
}
deletedTenants 用于记录删除的用户标识
//org.thingsboard.server.actors.app.AppActorprivate TbActorRef getOrCreateTenantActor(TenantId tenantId) {return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), () -> DefaultActorService.TENANT_DISPATCHER_NAME, () -> new TenantActor.ActorCreator(systemContext, tenantId));
}
直接查看TenantActor的doProcess
方法
//org.thingsboard.server.actors.tenant.TenantActor@Override
protected boolean doProcess(TbActorMsg msg) {if (cantFindTenant) {//找不到租户log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;//直接回调成功方法queueMsg.getMsg().getCallback().onSuccess();} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;//直接回调成功方法transportMsg.getCallback().onSuccess();}return true;}switch (msg.getMsgType()) {case PARTITION_CHANGE_MSG:PartitionChangeMsg partitionChangeMsg = (PartitionChangeMsg) msg;ServiceType serviceType = partitionChangeMsg.getServiceQueueKey().getServiceType();if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {//To Rule Chain Actorsbroadcast(msg);} else if (ServiceType.TB_CORE.equals(serviceType)) {List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) {@Overrideprotected boolean testEntityId(EntityId entityId) {return super.testEntityId(entityId) && !isMyPartition(entityId);}});deviceActorIds.forEach(id -> ctx.stop(id));}break;case COMPONENT_LIFE_CYCLE_MSG:onComponentLifecycleMsg((ComponentLifecycleMsg) msg);break;case QUEUE_TO_RULE_ENGINE_MSG:onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);break;case TRANSPORT_TO_DEVICE_ACTOR_MSG:onToDeviceActorMsg((DeviceAwareMsg) msg, false);break;case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:onToDeviceActorMsg((DeviceAwareMsg) msg, true);break;case SESSION_TIMEOUT_MSG:ctx.broadcastToChildrenByType(msg, EntityType.DEVICE);break;case RULE_CHAIN_INPUT_MSG:case RULE_CHAIN_OUTPUT_MSG:case RULE_CHAIN_TO_RULE_CHAIN_MSG:onRuleChainMsg((RuleChainAwareMsg) msg);break;case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:onToEdgeSessionMsg((EdgeEventUpdateMsg) msg);break;default:return false;}return true;
}
查看MsgType.QUEUE_TO_RULE_ENGINE_MSG类型的处理方法
//org.thingsboard.server.actors.tenant.TenantActorprivate void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {//检查当前服务是否为规则引擎服务if (!isRuleEngine) {log.warn("RECEIVED INVALID MESSAGE: {}", msg);return;}TbMsg tbMsg = msg.getMsg();//检查规则引擎是否启用状态if (getApiUsageState().isReExecEnabled()) {//判断规则链标识是否为空if (tbMsg.getRuleChainId() == null) {//获取根链 Actor 并判断是否为空if (getRootChainActor() != null) {//向根链 Actor 发送消息getRootChainActor().tell(msg);} else {//无根链 Actor ,回调失败方法tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));log.info("[{}] No Root Chain: {}", tenantId, msg);}} else {try {//向指定规则链发送消息ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);} catch (TbActorNotRegisteredException ex) {log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());//TODO: 3.1 Log it to dead letters queue;tbMsg.getCallback().onSuccess();}}} else {log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);tbMsg.getCallback().onSuccess();}
}
跳过中间的步骤,直接看规则链Actor
的doProcess
方法即可
//org.thingsboard.server.actors.ruleChain.RuleChainActor@Override
protected boolean doProcess(TbActorMsg msg) {switch (msg.getMsgType()) {case COMPONENT_LIFE_CYCLE_MSG:onComponentLifecycleMsg((ComponentLifecycleMsg) msg);break;case QUEUE_TO_RULE_ENGINE_MSG:processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);break;case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);break;case RULE_CHAIN_TO_RULE_CHAIN_MSG:processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);break;case RULE_CHAIN_INPUT_MSG:processor.onRuleChainInputMsg((RuleChainInputMsg) msg);break;case RULE_CHAIN_OUTPUT_MSG:processor.onRuleChainOutputMsg((RuleChainOutputMsg) msg);break;case PARTITION_CHANGE_MSG:processor.onPartitionChangeMsg((PartitionChangeMsg) msg);break;case STATS_PERSIST_TICK_MSG:onStatsPersistTick(id);break;default:return false;}return true;
}
查看MsgType.QUEUE_TO_RULE_ENGINE_MSG类型的处理方法
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorvoid onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {TbMsg msg = envelope.getMsg();//验证消息if (!checkMsgValid(msg)) {return;}log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);//判断是否包含关联类型if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {onTellNext(msg, true);} else {onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());}
}
此处的关联类型是什么?
结合规则链的结构联想一下,规则链本质是从上个节点传递到下个节点,节点传递之间有什么是可有可无的?
显然关联类型就是节点流转的条件
由于首节点是没有条件的,因此在构造消息时没有设置关联类型
注:每条规则链仅有一个首节点,且除首节点外的其他节点至少存在一个关联类型(TbRelationTypes.FAILURE),这部分感兴趣自行研究
先看没有关联类型的处理
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) {try {//检查组件(节点)状态是否正常checkComponentStateActive(msg);//获取节点标识RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null;RuleNodeCtx targetCtx;if (targetId == null) {//未指定目标节点//将当前规则链的首节点作为目标targetCtx = firstNode;//拷贝消息,entityId即当前规则链的标识msg = msg.copyWithRuleChainId(entityId);} else {//已指定目标节点,获取节点上下文targetCtx = nodeActors.get(targetId);}//判断上下文是否存在if (targetCtx != null) {log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);//推送至节点pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE);} else {log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);msg.getCallback().onSuccess();}} catch (RuleNodeException rne) {msg.getCallback().onFailure(rne);} catch (Exception e) {msg.getCallback().onFailure(new RuleEngineException(e.getMessage()));}
}
查看下一个方法前,我们先了解一下节点上下文的结构
package org.thingsboard.server.actors.ruleChain;import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;/*** Created by ashvayka on 19.03.18.*/
@Data
@AllArgsConstructor
final class RuleNodeCtx {private final TenantId tenantId;private final TbActorRef chainActor;private final TbActorRef selfActor;private RuleNode self;
}
很简单的结构,记录了租户标识,规则链Actor
,自身节点Actor
和自身节点
继续
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {if (nodeCtx != null) {//创建消息并告知自身节点nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));} else {log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty"));}
}
与QueueToRuleEngineMsg类似,查看可知RuleChainToRuleNodeMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_MSG,后面同样以此为处理条件
终于到节点的Actor
了
//org.thingsboard.server.actors.ruleChain.RuleNodeActor@Override
protected boolean doProcess(TbActorMsg msg) {switch (msg.getMsgType()) {case COMPONENT_LIFE_CYCLE_MSG:case RULE_NODE_UPDATED_MSG:onComponentLifecycleMsg((ComponentLifecycleMsg) msg);break;case RULE_CHAIN_TO_RULE_MSG:onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);break;case RULE_TO_SELF_MSG:onRuleNodeToSelfMsg((RuleNodeToSelfMsg) msg);break;case STATS_PERSIST_TICK_MSG:onStatsPersistTick(id);break;case PARTITION_CHANGE_MSG:onClusterEventMsg((PartitionChangeMsg) msg);break;default:return false;}return true;
}
查看MsgType.RULE_CHAIN_TO_RULE_MSG处理方法
//org.thingsboard.server.actors.ruleChain.RuleNodeActorprivate void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg envelope) {TbMsg msg = envelope.getMsg();//验证消息if (!msg.isValid()) {if (log.isTraceEnabled()) {log.trace("Skip processing of message: {} because it is no longer valid!", msg);}return;}if (log.isDebugEnabled()) {log.debug("[{}][{}][{}] Going to process rule engine msg: {}", ruleChainId, id, processor.getComponentName(), msg);}try {//处理消息processor.onRuleChainToRuleNodeMsg(envelope);//增加处理计数increaseMessagesProcessedCount();} catch (Exception e) {logAndPersist("onRuleMsg", e);}
}
继续
//org.thingsboard.server.actors.ruleChain.RuleNodeActorMessageProcessorvoid onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {//回调处理开始通知msg.getMsg().getCallback().onProcessingStart(info);//检查组件状态可用checkComponentStateActive(msg.getMsg());TbMsg tbMsg = msg.getMsg();//获取规则节点计数int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();//获取消息的最大规则节点执行次数int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();//判断执行次数是否超限if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {//上报规则引擎执行计数apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT);if (ruleNode.isDebugMode()) {systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());}try {//执行节点处理方法tbNode.onMsg(msg.getCtx(), msg.getMsg());} catch (Exception e) {msg.getCtx().tellFailure(msg.getMsg(), e);}} else {tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));}
}
这里仅调用了tbNode的onMsg
方法,那么节点的流转呢?
猜想所有节点继承自一个公共的父抽象类,该类中实现了节点的流转
那么看一下继承关系
这些节点并没由继承公共的父类,流转方法没有抽离出来?
随便找一个节点看看,这里我看的是TbMsgTypeFilterNode
//org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode@Override
public void onMsg(TbContext ctx, TbMsg msg) {ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False");
}
验证了猜想,接着看下去
//org.thingsboard.server.actors.ruleChain.DefaultTbContext@Override
public void tellSuccess(TbMsg msg) {tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null);
}@Override
public void tellNext(TbMsg msg, String relationType) {tellNext(msg, Collections.singleton(relationType), null);
}@Override
public void tellNext(TbMsg msg, Set<String> relationTypes) {tellNext(msg, relationTypes, null);
}
这里贴出了tellSuccess
方法和另一个tellNext
方法,它们有被其他节点使用
//org.thingsboard.server.actors.ruleChain.DefaultTbContextprivate void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {if (nodeCtx.getSelf().isDebugMode()) {relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));}//回调处理结束通知msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());//像规则链 Actor 发送消息nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
}
和前面一样,查看RuleNodeToRuleChainTellNextMsg的消息类型为MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG
接下来又回到了规则链Actor
的doProcess
方法,找到对应的处理方法
//org.thingsboard.server.actors.ruleChain.RuleChainActorcase RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);break;
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorvoid onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {var msg = envelope.getMsg();if (checkMsgValid(msg)) {onTellNext(msg, envelope.getOriginator(), envelope.getRelationTypes(), envelope.getFailureMessage());}
}
这里调用的onTellNext
方法即前面onQueueToRuleEngineMsg
方法中根据关联类型通知的方法
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {try {checkComponentStateActive(msg);EntityId entityId = msg.getOriginator();//获取主题分区信息TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);//根据来源节点标识获取关联(后续节点的指向)列表List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId);if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymorelog.warn("[{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message.", tenantId, entityId, msg.getId());ruleNodeRelations = Collections.emptyList();}//根据指定的关联类型筛选关联List<RuleNodeRelation> relationsByTypes = ruleNodeRelations.stream().filter(r -> contains(relationTypes, r.getType())).collect(Collectors.toList());//获取关联个数int relationsCount = relationsByTypes.size();if (relationsCount == 0) {//没有后续节点了log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());//判断当前关联是否包含失败(上个节点是否执行失败)if (relationTypes.contains(TbRelationTypes.FAILURE)) {//获取规则节点上下文RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);if (ruleNodeCtx != null) {//回调消息的失败方法msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));} else {log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());//回调消息的失败方法msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));}} else {//回调消息的成功方法msg.getCallback().onSuccess();}} else if (relationsCount == 1) {//后续仅一个节点//此处循环仅执行一次for (RuleNodeRelation relation : relationsByTypes) {log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());//推送给目标pushToTarget(tpi, msg, relation.getOut(), relation.getType());}} else {//后续多个节点MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relationsByTypes);//遍历关联列表for (RuleNodeRelation relation : relationsByTypes) {EntityId target = relation.getOut();//推送至队列putToQueue(tpi, msg, callbackWrapper, target);}}} catch (RuleNodeException rne) {msg.getCallback().onFailure(rne);} catch (Exception e) {log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e);msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));}
}
我们先看后续多个节点的推送
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void putToQueue(TopicPartitionInfo tpi, TbMsg msg, TbQueueCallback callbackWrapper, EntityId target) {switch (target.getEntityType()) {case RULE_NODE:putToQueue(tpi, msg.copyWithRuleNodeId(entityId, new RuleNodeId(target.getId()), UUID.randomUUID()), callbackWrapper);break;case RULE_CHAIN:putToQueue(tpi, msg.copyWithRuleChainId(new RuleChainId(target.getId()), UUID.randomUUID()), callbackWrapper);break;}
}
根据实体类型拷贝消息,发送至队列
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void putToQueue(TopicPartitionInfo tpi, TbMsg newMsg, TbQueueCallback callbackWrapper) {//构建消息ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder().setTenantIdMSB(tenantId.getId().getMostSignificantBits()).setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).setTbMsg(TbMsg.toByteString(newMsg)).build();//推送至规则引擎clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper);
}
继续
//org.thingsboard.server.service.queue.DefaultTbClusterService@Override
public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {log.trace("PUSHING msg: {} to:{}", msg, tpi);//获取规则引擎消息生产端,发送消息producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);toRuleEngineMsgs.incrementAndGet();
}
消息发送至队列,消费端接收到消息后,根据关联类型再次调用onTellNext
方法处理,此时的消息仅指向单个后续节点
最后查看单个后续节点的处理
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) {//判断是否为当前服务负责的分区if (tpi.isMyPartition()) {//判断实体类型switch (target.getEntityType()) {case RULE_NODE://推送至节点pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);break;case RULE_CHAIN://通知父 Actorparent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));break;}} else {//放入队列,交给负责的服务putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target);}
}
注:集群部署下,每个服务负责若干分区pushMsgToNode
方法和putToQueue
方法前面都已经看过了,重点看跳转规则链
和前面一样,查看RuleChainToRuleChainMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG
接着查看租户Actor
中对应的处理方法
//org.thingsboard.server.actors.tenant.TenantActorprivate void onRuleChainMsg(RuleChainAwareMsg msg) {if (getApiUsageState().isReExecEnabled()) {getOrCreateActor(msg.getRuleChainId()).tell(msg);}
}
再查看规则链Actor
中对应的处理方法
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorvoid onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {var msg = envelope.getMsg();if (!checkMsgValid(msg)) {return;}try {checkComponentStateActive(envelope.getMsg());if (firstNode != null) {pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());} else {envelope.getMsg().getCallback().onSuccess();}} catch (RuleNodeException e) {log.debug("Rule Chain is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId);}
}
pushMsgToNode
方法前面已经看过了,至此,我们已阅读完整个规则链的执行逻辑
总结
最后,画一下规则链的执行逻辑
相关文章:

ThingsBoard源码解析-数据订阅与规则链数据处理
前言 结合本篇对规则链的执行过程进行探讨 根据之前对MQTT源码的学习,我们由消息的处理入手 //org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {switch (msg.fixedHeade…...

探究Transformer模型中不同的池化技术
❤️觉得内容不错的话,欢迎点赞收藏加关注😊😊😊,后续会继续输入更多优质内容❤️👉有问题欢迎大家加关注私戳或者评论(包括但不限于NLP算法相关,linux学习相关,读研读博…...

Android 9.0 设置讯飞语音引擎为默认tts语音播报引擎
1.前言 在9.0的系统rom定制化开发中,在产品开发中,一些内置的app需要用到tts语音播报功能,所以需要用到讯飞语音引擎作为默认的系统tts语音引擎功能,所以就需要 了解系统关于tts语音引擎默认的设置方法,然后在设置讯飞语音引擎为默认的tts语音引擎来实现tts语音播报功能的…...

直流无刷电机驱动的PWM频率
以下来源:Understanding the effect of PWM when controlling a brushless dc motorhttps://www.controleng.com/articles/understanding-the-effect-of-pwm-when-controlling-a-brushless-dc-motor/ Brushless dc motors have an electrical time constant τ of a…...

机房动环监控4大价值,轻松解决学校解决问题
不管是政府机构、学校、企业还是医院均有配备机房。机房一般配备服务器、计算机、存储设备、机柜组、UPS、精密空调等关键设备。 传统的机房在事故发生时,无法及时发现并处理,影响范围大,造成严重的损失。因此,一套智慧机房动环监…...

用于平抑可再生能源功率波动的储能电站建模及评价(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

Burpsuite详细教程
Burpsuite是一种功能强大的Web应用程序安全测试工具。它提供了许多有用的功能和工具,可以帮助用户分析和评估Web应用程序的安全性。在本教程中,我们将介绍如何安装、配置和使用Burpsuite,并提供一些常用的命令。 第一步:安装Burp…...

目标检测:FP(误检)和FN(漏检)统计
1. 介绍 目标检测,检测结果分为三类:TP(正确检测),FP(误检),FN(漏检), 尤其是针对复杂场景或者小目标检测场景中,会存在一些FP(误检),FN(漏检)。 如何对检测的效果进行可视化,以帮助我们改进模型,提高模型recall值。 步骤 (1): 数据需要准备为yolo格式(2) 训练数据获得…...
【MySQL专题】04、性能优化之读写分离(MyCat)
1、MyCat概述 从定义和分类来看,它是一个开源的分布式数据库系统,是一个实现了MySQL协议的Server,前端用户可以把它看做是一个数据库代理,用MySQL客户端工具和命令行访问,而其后端可以用MySQL原生(Native&…...

信息系统项目管理师第四版知识摘编:第5章 信息系统工程
第5章 信息系统工程信息系统工程是用系统工程的原理、方法来指导信息系统建设与管理的一门工程技术学科,它是信息科学、管理科学、系统科学、计算机科学与通信技术相结合的综合性、交叉性、具有独特风格的应用学科。5.1软件工程软件工程是指应用计算机科学、数学及管…...

【2023春招】西山居游戏研发岗笔试AK
120min,一共三道算法、两道填空、10道不定项选择 算法题部分 T1-二叉树后序遍历 题面 一个节点数据为整数的二叉搜索树,它的遍历结果可以在内存中用一个整数数组来表示。比如,以下二叉树,它每个节点的左子节点都比自己小,右子节点都比自己大,对它进行后序遍历,结果可以…...

什么是分布式,分布式和集群的区别又是什么?
1. 什么是分布式 ? 分布式系统一定是由多个节点组成的系统。 其中,节点指的是计算机服务器,而且这些节点一般不是孤立的,而是互通的。 这些连通的节点上部署了我们的节点,并且相互的操作会有协同。 分布式系统对于用户而言&a…...

Cellchat和Cellphonedb细胞互作一些问题的解决(error和可视化)
今日的内容主要解决两个问题,一个是cellchat的代码报错问题,因为已经有很多人提出这个问题了。第二个是Cellphonedb结果的可视化,这里提供一种免费的很实用的快捷可视化方法。其实这些问题只要自己思考都是能明白的。 Cellchat和cellphonedb细…...

大文件分片上传的实现【前后台完整版】
在一般的产品开发过程中,大家多少会遇到上传视频功能的需求,往往我们采用的都是对视频大小进行限制等方法,来防止上传请求超时,导致上传失败。这时候可能将视频分片上传可以对你的项目有一个小小的体验优化。 本片文章前端是vue&…...

Java序列化面试总结
Java序列化与反序列化是什么? Java序列化是指把Java对象转换为字节流的过程,而Java反序列化是指把字节流恢复为Java对象的过程。 序列化: 序列化是把对象转换成有序字节流,以便在网络上传输或者保存在本地文件中。核心作用是对象…...

fs的常用方法
以下是fs模块的一些常用方法: 1. 读取文件内容 使用fs.readFile()方法读取文件内容。该方法接收两个参数:文件路径和回调函数。回调函数的参数包括错误信息和文件内容。 javascript const fs require(fs); fs.readFile(/path/to/file, (err, data)…...

【华为OD机试 2023最新 】字符串重新排列、字符串重新排序(C++ 100%)
文章目录 题目描述输入描述输出描述用例题目解析C++题目描述 给定一个字符串s,s包括以空格分隔的若干个单词,请对s进行如下处理后输出: 1、单词内部调整:对每个单词字母重新按字典序排序 2、单词间顺序调整: 1)统计每个单词出现的次数,并按次数降序排列 2)次数相同,按…...

Matlab自动消除论文插图白边的7种方法
通过Matlab所绘制的插图,如不进行一定的调整,其四周往往存在一定范围的白边。 白边的存在会影响数据展示效果,有时也会给论文的排版造成一定麻烦。 要想消除白边,一种简单的方法是,在导出插图后,用其它软…...

Python每日一练(20230330)
目录 1. 存在重复元素 🌟 2. 矩阵置零 🌟🌟 3. 回文对 🌟🌟🌟 🌟 每日一练刷题专栏 🌟 Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一练 专栏 1…...

面试官:Tomcat 在 SpringBoot 中是如何启动的(二)
文章目录 总结彩蛋我们再看看Tomcat类的源码: //部分源码,其余部分省略。 public class Tomcat {//设置连接器public void setConnector(Connector connector) {Service service = getService(...

软件测试岗位中,如何顺利拿下50K+?送你一份涨薪秘籍
随着科技发展以及5G时代的到来,IT行业早已发生翻天覆地的变化。已不是当初你认为只要有好点子就能立马起盘做项目的时代了。在IT行业高速发展的时期中“软件测试行业”仍然是热门行业之一。软件行业的高速发展必然带来更多的岗位,正如IT行业发展需要有开…...

java webflux函数式实现数据结构
我前面写的文章 java webflux注解方式写一个可供web端访问的数据接口 带大家写了个注解方式实现的webflux 首先 使用函数式时 您需要自己初始化服务器 使用函数式需要两个接口 分别是 RouterFunction 和 HandlerFuncion RouterFunction主要的作用就是分别一下任务 例如 添加 直…...

百度文心一言可以完胜ChatGPT的4点可能性
文心一言,百度全新一代知识增强大语言模型,文心大模型家族的新成员,能够与人对话互动,回答问题,协助创作,高效便捷地帮助人们获取信息、知识和灵感。但说实话,很多人拿他与ChatGPT相对比&#x…...

大型分布式架构设计
大型网站的特点 大型网站架构技术 大型网站架构技术-架构模式 大型网站架构技术-高性能架构 以用户为中心,提供快速的访问体验。主要体现在:响应快、并发能力高和性能稳定。 大型网站架构技术-高可用架构 大型网站在任何时候都应该可以正常访问&#…...

基于springboot实现校园在线拍卖电商系统【源码】
基于springboot实现校园在线拍卖系统演示开发语言:Java 框架:springboot JDK版本:JDK1.8 服务器:tomcat7 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/myeclipse/idea Maven包&…...

SaaS智慧校园源码,电子班牌管理系统 人脸考勤、综合评价系统、请假管理、校务管理
Java智慧校园系统源码 智慧学校源码 小程序电子班牌,前后端分离架构 智慧校园全套源码包含:电子班牌管理系统、成绩管理系统、考勤人脸刷卡管理系统、综合素养评价系统、请假管理系统、电子班牌发布系统、校务管理系统、小程序移动端、教师后台管理系统…...

MONGODB mongodb 一般人不知道的数据类型与使用
开头还是介绍一下群,如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题,有需求都可以加群群内有各大数据库行业大咖,CTO,可以解决你的问题。加群请联系 liuaustin3 ,在新加的朋友会分到2群(共…...

蚁群算法优化
%%%%%%%%%%%%蚁群算法解决 TSP 问题%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%初始化%%%%%%%%%%%%%%%%%%% clear all; %清除所有变量 close all; %清图 clc; %清屏 m 50; %蚂蚁个数 Alpha 1; %信息素重要程度参数 Beta 5; %启发式因子重要程度参数 Rho 0.1; %信息素蒸发系数 G 20…...

山东首版次申报的材料
首版次软件基本知识: 1、 首版次高端软件的定义: 首版次高端软件是指省内企事业单位通过自主开发或者合作开发其功能或性能有重大突破 在该领域具有技术领先优势或者打破市场垄断拥有自主知识产权 尚未取得重大市场业绩的 同产品名称、同一版本号的软件产品。 2、主管部门:山…...

个人时间管理网站—首页的前端实现【源码】
🌟所属专栏:献给榕榕 🐔作者简介:rchjr——五带信管菜只因一枚 😮前言:该专栏系为女友准备的,里面会不定时发一些讨好她的技术作品,感兴趣的小伙伴可以关注一下~👉文章简…...