[华为北向网管NCE开发教程(6)消息订阅
1.作用
之前介绍的都是我们向网管NCE发起请求获取数据,消息订阅则反过来,是网管NCE系统给我们推送信息。其原理和MQ,JMS这些差不多,这里不过多累述。
2.场景
所支持订阅的场景有如下,以告警通知为例,当我们订阅告警通知以后,如果NCE网管有告警通知产生以后,就会给订阅的人发送一个通知(也就是实时告警推送)。那么我们就可以接收到如下的通知。
2024-06-06 00:09:30c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160140.0Z, X.733::EventType=securityAlarm, emsTime=20240605160142.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784751, isClearable=true, affectedTPList=21}
2024-06-06 00:09:36c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160147.0Z, X.733::EventType=securityAlarm, emsTime=20240605160149.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784752, isClearable=true, affectedTPList=21}
2024-06-06 00:09:43c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160155.0Z, X.733::EventType=securityAlarm, emsTime=20240605160156.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784753, isClearable=true, affectedTPList=21}
2024-06-06 00:09:50c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160202.0Z, X.733::EventType=securityAlarm, emsTime=20240605160203.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784755, isClearable=true, affectedTPList=21}
2024-06-06 00:10:01c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160213.0Z, X.733::EventType=securityAlarm, emsTime=20240605160214.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784756, isClearable=true, affectedTPList=21}
同理,如果我们订阅了文件传输状态通知,当存在文件传输完成的时候会收到如下通知,通知信息中包含了,文件传输完成后,文件的存储地址。
2024-06-06 10:15:26c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786334, fileName=pm/sdh/0605-0606/3145740.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
2024-06-06 10:15:39c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786335, fileName=pm/sdh/0605-0606/3145734.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
2024-06-06 10:15:42c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786336, fileName=pm/sdh/0605-0606/3145739.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
通知类型 | 说明 |
---|---|
NT_ALARM | 告警通知 |
NT_ALARM_UPDATED | 告警更新通知 |
NT_TCA | 性能越限告警通知 |
NT_OBJECT_CREATION | 对象创建通知 |
NT_OBJECT_DELETION | 对象删除通知 |
NT_ATTRIBUTE_VALUE_CHANGE | 属性改变通知 |
NT_STATE_CHANGE | 状态改变通知 |
NT_ROUTE_CHANGE | 路由改变通知 |
NT_PROTECTION_SWITCH | 保护倒换通知 |
NT_FILE_TRANSFER_STATUS | 文件传输状态通知 |
NT_EPROTECTION_SWITCH | 设备保护倒换通知事件 |
NT_ASON_RESOURCE_CHANGE | 智能资源改变通知 |
NT_PRBSTEST_STATUS | 伪随机码测试状态通知 |
NT_WDMPROTECTION_SWITCH | 波分保护倒换通知 |
NT_ATMPROTECTION_SWITCH ATM | 保护倒换通知 |
NT_RPRPROTECTION_SWITCH RPR | 保护组倒换通知事件格式 |
NT_IPPROTECTION_SWITCH Tunnel | 保护组倒换通知事件格式 |
3.如何开订阅(SpringBoot为例)
3.1登录NCE
3.1.1CorbaLoginReq
配置文件的登录参数如下
huawei: nce: login: corba:host: 127.0.0.1port: 12001userName: 111111passWord: 111111
配置文件参数注入Spring Bean
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;import lombok.Data;@Data
@SpringBootConfiguration
@ConfigurationProperties(prefix = "huawei.nce.login.corba")
public class CorbaLoginReq {private String host;private String port;private String userName;private String passWord;
}
3.1.2CorbaLoginRes
登录返回参数
import org.omg.DynamicAny.DynAnyFactory;import lombok.Data;
import mtnm.tmforum.org.emsSession.EmsSession_I;@Data
public class CorbaLoginRes {private org.omg.CORBA.ORB orb;private org.omg.PortableServer.POA rootPOA ;private EmsSession_I emsSession;private DynAnyFactory dynAnyFactory;
}
3.1.3TANmsSession_IImpl
import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.session.Session_I;
/*** NmsSession_IPOA for EMS(NCE) invoking. * @author**/
public class TANmsSession_IImpl extends NmsSession_IPOA {public void eventLossCleared(String endTime) {log("TANmsSession_IImpl.eventLossCleared(String endTime) is invoked by EMS(NCE).");log("endTime:"+endTime);}public void eventLossOccurred(String startTime, String notificationId) {log("TANmsSession_IImpl.eventLossOccurred(String startTime, String notificationId) is invoked by EMS.");log("startTime:"+startTime+", notificationId:"+notificationId);}public Session_I associatedSession() {log("TANmsSession_IImpl.associatedSession() is invoked by EMS(NCE).");return null;}public void endSession() {log("TANmsSession_IImpl.endSession() is invoked by EMS(NCE).");}public void ping() {log("TANmsSession_IImpl.ping() is invoked by EMS(NCE).");}private static void log(String str){System.out.println(str);}
}
3.1.4BaseCorbaService
public interface BaseCorbaService {/*** @description:登录华为nce-corba* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年3月1日 下午4:19:59*/CorbaLoginRes login();/*** @description:清空登录* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年6月7日 下午3:24:02*/void clearLogin();
}
import java.util.Arrays;
import java.util.List;import org.omg.CosNaming.NameComponent;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynAnyFactoryHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import com.collect.sdh.module.corba.entity.CorbaLoginReq;
import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.entity.TANmsSession_IImpl;
import com.collect.sdh.module.corba.service.BaseCorbaService;import mtnm.tmforum.org.common.Common_IHolder;
import mtnm.tmforum.org.emsMgr.EMSMgr_I;
import mtnm.tmforum.org.emsMgr.EMSMgr_IHelper;
import mtnm.tmforum.org.emsSession.EmsSession_I;
import mtnm.tmforum.org.emsSession.EmsSession_IHolder;
import mtnm.tmforum.org.emsSession.EmsSession_IPackage.managerNames_THolder;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_I;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_IHelper;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_I;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_IHelper;
import mtnm.tmforum.org.equipment.EquipmentOrHolderIterator_IHolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolderList_THolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolder_T;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfoList_THolder;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfo_T;
import mtnm.tmforum.org.equipment.PhysicalLocationInfoList_THolder;
import mtnm.tmforum.org.equipment.PhysicalLocationInfo_T;
import mtnm.tmforum.org.globaldefs.NameAndStringValue_T;
import mtnm.tmforum.org.globaldefs.NamingAttributesIterator_IHolder;
import mtnm.tmforum.org.globaldefs.NamingAttributesList_THolder;
import mtnm.tmforum.org.globaldefs.ProcessingFailureException;
import mtnm.tmforum.org.managedElement.ManagedElementIterator_IHolder;
import mtnm.tmforum.org.managedElement.ManagedElementList_THolder;
import mtnm.tmforum.org.managedElement.ManagedElement_T;
import mtnm.tmforum.org.managedElement.ManagedElement_THolder;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_I;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_I;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetwork_T;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkIterator_IHolder;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkList_THolder;
import mtnm.tmforum.org.nmsSession.NmsSession_I;
import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.subnetworkConnection.CCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnectList_THolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnect_T;
import mtnm.tmforum.org.subnetworkConnection.Route_THolder;
import mtnm.tmforum.org.subnetworkConnection.SNCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnectionList_THolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_T;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointIterator_IHolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointList_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPoint_T;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkIterator_IHolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkList_THolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLink_T;@Service
public class BaseCorbaServiceImpl implements BaseCorbaService {@Autowiredprivate CorbaLoginReq loginReq;private CorbaLoginRes login;/*** @description:清空登录* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年6月7日 下午3:24:02*/@Overridepublic void clearLogin() {login = null;}/*** @description:登录华为nce-corba* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年3月1日 下午4:19:59*/@Overridepublic CorbaLoginRes login() {if(login != null) {/*本应该检测登录是否可用,如果可用,则返回登录信息,不可用则重新登录,(不知道是否可以使用emsSession.ping()来判断)但是没找到华为有这个接口,因此如果出现不可抗力因素导致登录无效,例如网络中断则通过com.collect.sdh.module.test.TestCorbaController.cleanLogin()清空登录*/ return login;}try {login = new CorbaLoginRes();String[] argv = new String[2];argv[0] = "-ORBInitRef";argv[1] = "NameService=corbaloc::" + loginReq.getHost() + ":" + loginReq.getPort() + "/NameService";org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(argv, null);org.omg.PortableServer.POA rootPOA = org.omg.PortableServer.POAHelper.narrow(orb.resolve_initial_references("RootPOA"));rootPOA.the_POAManager().activate();DynAnyFactory dynAnyFactory = DynAnyFactoryHelper.narrow(orb.resolve_initial_references("DynAnyFactory"));org.omg.CosNaming.NamingContextExt nc = org.omg.CosNaming.NamingContextExtHelper.narrow(orb.resolve_initial_references("NameService"));org.omg.CosNaming.NameComponent[] name;name = new NameComponent[5];name[0] = new NameComponent("TMF_MTNM", "Class");name[1] = new NameComponent("HUAWEI", "Vendor");name[2] = new NameComponent("Huawei/NCE", "EmsInstance");name[3] = new NameComponent("2.0", "Version");name[4] = new NameComponent("Huawei/NCE", "EmsSessionFactory_I");EmsSessionFactory_I emsSessionFactory = EmsSessionFactory_IHelper.narrow(nc.resolve(name));NmsSession_IPOA pNmsSessionServant = new TANmsSession_IImpl();NmsSession_I nmsSession = pNmsSessionServant._this(orb);EmsSession_IHolder emsSessionInterfaceHolder = new EmsSession_IHolder();emsSessionFactory.getEmsSession(loginReq.getUserName(), loginReq.getPassWord(), nmsSession, emsSessionInterfaceHolder);EmsSession_I emsSession = emsSessionInterfaceHolder.value;login.setDynAnyFactory(dynAnyFactory);login.setOrb(orb);login.setRootPOA(rootPOA);login.setEmsSession(emsSession);return login;} catch (Exception e) {e.printStackTrace();return null;}}
}
3.2定制通知
3.2.1ConsumerNotice
需要实现接口:org.omg.CosNotifyComm.StructuredPushConsumerPOA
import java.util.HashMap;
import java.util.Map;import org.omg.CosEventComm.Disconnected;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.StructuredEvent;
import org.omg.CosNotifyComm.InvalidEventType;
import org.omg.CosNotifyComm.StructuredPushConsumerPOA;
import org.springframework.util.ObjectUtils;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.AnyUtil;import lombok.extern.log4j.Log4j2;/*** @description:消费通知* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午10:57:26*/
@Log4j2
public class ConsumerNotice extends StructuredPushConsumerPOA{private CorbaLoginRes loginRes;public ConsumerNotice(CorbaLoginRes loginRes) {super();this.loginRes = loginRes;}private static Map<String, String> noticeTypes = new HashMap<>();static {noticeTypes.put("NT_ALARM", "告警通知");noticeTypes.put("NT_ALARM_UPDATED", "告警更新通知");noticeTypes.put("NT_TCA", "性能越限告警通知");noticeTypes.put("NT_OBJECT_CREATION", "对象创建通知");noticeTypes.put("NT_OBJECT_DELETION", "对象删除通知");noticeTypes.put("NT_ATTRIBUTE_VALUE_CHANGE", "属性改变通知");noticeTypes.put("NT_STATE_CHANGE", "状态改变通知");noticeTypes.put("NT_ROUTE_CHANGE", "路由改变通知");noticeTypes.put("NT_PROTECTION_SWITCH", "保护倒换通知");noticeTypes.put("NT_FILE_TRANSFER_STATUS", "文件传输状态通知");noticeTypes.put("NT_EPROTECTION_SWITCH", "设备保护倒换通知事件");noticeTypes.put("NT_ASON_RESOURCE_CHANGE", "智能资源改变通知");noticeTypes.put("NT_PRBSTEST_STATUS", "伪随机码测试状态通知");noticeTypes.put("NT_WDMPROTECTION_SWITCH", "波分保护倒换通知");noticeTypes.put("NT_ATMPROTECTION_SWITCH", "ATM保护倒换通知");noticeTypes.put("NT_RPRPROTECTION_SWITCH", "RPR保护组倒换通知事件格式");noticeTypes.put("NT_IPPROTECTION_SWITCH", "Tunnel保护组倒换通知事件格式");}@Overridepublic void disconnect_structured_push_consumer() {log.info("Consumer disconnect_structured_push_consumer");}@Overridepublic void push_structured_event(StructuredEvent event) throws Disconnected {String eventType = event.header.fixed_header.event_type.type_name;Map<String, Object> eventData = new HashMap<>(event.filterable_data.length);for (int i = 0; i < event.filterable_data.length; i++) {if (!ObjectUtils.isEmpty(event.filterable_data[i])) {eventData.put(event.filterable_data[i].name, AnyUtil.parseAny( event.filterable_data[i].value, loginRes.getDynAnyFactory()));}}log.info("收到事件通知:{}<{}>,通知参数:{}",eventType, noticeTypes.get(eventType), eventData);}@Overridepublic void offer_change(EventType[] arg0, EventType[] arg1) throws InvalidEventType {}}
3.2.2AnyUtil
用于解析返回的信息。
import org.omg.CORBA.Any;
import org.omg.CORBA.TCKind;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynArray;
import org.omg.DynamicAny.DynEnum;
import org.omg.DynamicAny.DynSequence;
import org.omg.DynamicAny.DynStruct;
import org.omg.DynamicAny.DynUnion;/*** @description:org.omg.DynamicAny格式化工具* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午11:33:17*/
public class AnyUtil {/*** @description:格式化数据* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午11:34:17*/public static String parseAny(Any any, DynAnyFactory factory){if( null==any ){return null;}StringBuilder result = new StringBuilder();try {switch (any.type().kind().value()) {case TCKind._tk_char:result.append(any.extract_char());break;case TCKind._tk_null:break;case TCKind._tk_boolean:result.append(any.extract_boolean());break;case TCKind._tk_short:result.append(any.extract_short());break;case TCKind._tk_long:result.append(any.extract_long());break;case TCKind._tk_double:result.append(any.extract_double());break;case TCKind._tk_float:result.append(any.extract_float());break;case TCKind._tk_octet:result.append(any.extract_octet());break;case TCKind._tk_ulong:result.append(any.extract_ulong());break;case TCKind._tk_string:result.append(any.extract_string());break;case TCKind._tk_enum:{DynEnum dynEnum = (DynEnum) factory.create_dyn_any(any);result.append(dynEnum.get_as_string());break;}case TCKind._tk_any:{any=factory.create_dyn_any(any).get_any();result.append(any);break;}case TCKind._tk_objref:{result.append(any.extract_Object());break;}case TCKind._tk_struct:case TCKind._tk_except:{DynStruct dynstruct = (DynStruct) factory.create_dyn_any(any);org.omg.DynamicAny.NameValuePair[] members = dynstruct.get_members();result.append("{");for (int i = 0; i < members.length; i++) {if(i>0){result.append(" ");}result.append(members[i].id).append(" ").append(parseAny(members[i].value, factory));}result.append("}");break;}case TCKind._tk_union:DynUnion dynunion = (DynUnion) factory.create_dyn_any(any);result.append(dynunion.member_name()).append(" ");result.append(parseAny(dynunion.member().to_any(), factory));break;case TCKind._tk_sequence:DynSequence dynseq = (DynSequence) factory.create_dyn_any(any);Any[] contents = dynseq.get_elements();result.append("{");for (int i = 0; i < contents.length; i++){result.append(parseAny(contents[i], factory));}result.append("}");break;case TCKind._tk_array:DynArray dynarray = (DynArray) factory.create_dyn_any(any);Any[] arrayContents = dynarray.get_elements();result.append("{");for (int i = 0; i < arrayContents.length; i++){result.append(parseAny(arrayContents[i], factory)).append("");}result.append("}");break;default:result.append(any.type().kind().value());}} catch (Exception ex) {ex.printStackTrace();}return new String(result.toString().getBytes(StandardCharsets.ISO_8859_1));}
}
3.3订阅通知
SubscribeNotice 实现 Runnable,即订阅的时候,另起一个线程来订阅。该线程负责订阅。
3.3.1SubscribeNotice
import org.omg.CORBA.IntHolder;
import org.omg.CORBA.Object;
import org.omg.CosNotifyChannelAdmin.ClientType;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.EventChannelHolder;
import org.omg.CosNotifyChannelAdmin.ProxySupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierHelper;
import org.omg.CosNotifyComm.StructuredPushConsumerHelper;
import org.springframework.util.ObjectUtils;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.JsonUtils;import lombok.extern.log4j.Log4j2;/*** @description:订阅消费通知* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午9:33:18*/
@Log4j2
public class SubscribeNotice implements Runnable{/*** 登录corba成功后的参数*/private CorbaLoginRes loginRes;/*** 记录订阅通知的通道ID的存储文件地址*/private String poxyIdPath;public SubscribeNotice(CorbaLoginRes loginRes, String poxyIdPath) {super();this.loginRes = loginRes;this.poxyIdPath = poxyIdPath;}@Overridepublic void run() {try {//获取通道IntHolder poxyId = new IntHolder();poxyId.value = getPoxyId(poxyIdPath);EventChannelHolder eventChannel = new EventChannelHolder();loginRes.getEmsSession().getEventChannel(eventChannel);//ConsumerNotice extends StructuredPushConsumerPOA 为消费者ConsumerNotice consumerNotice = new ConsumerNotice(loginRes);ConsumerAdmin defaultConsumerAdmin = eventChannel.value.default_consumer_admin();//连接通道,如果发现通道已经打开,则先关闭之前的通道(已经打开的通道即使不可以,北向接口并未释放该接口的资源,但是会限制连接通道(数量 < 3))try {if (poxyId.value > 0){log.info("释放旧的消费通道:{}", poxyId.value);ProxySupplier oldSupplier = defaultConsumerAdmin.get_proxy_supplier(poxyId.value);assert (oldSupplier != null);StructuredProxyPushSupplier myOldPoxy = StructuredProxyPushSupplierHelper.narrow(oldSupplier);myOldPoxy.disconnect_structured_push_supplier();}}catch (Exception e) {e.printStackTrace();}ProxySupplier tmpSupplier = defaultConsumerAdmin.obtain_notification_push_supplier(ClientType.STRUCTURED_EVENT, poxyId);StructuredProxyPushSupplier proxyPushSupplier = StructuredProxyPushSupplierHelper.narrow(tmpSupplier);Object servant = loginRes.getRootPOA().servant_to_reference(consumerNotice);proxyPushSupplier.connect_structured_push_consumer(StructuredPushConsumerHelper.narrow(servant));savePoxyId(poxyIdPath, poxyId.value);log.info("保存此次的消费通道:{}", poxyId.value);loginRes.getOrb().run();} catch (Exception e) {e.printStackTrace();}}/*** @description:获取已经连接的消费通道ID* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午10:40:57*/public int getPoxyId(String path) {int poxyId = -1;//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库String str = JsonUtils.readStringFromSystemPath(path);if(!ObjectUtils.isEmpty(str)) {poxyId = Integer.parseInt(str);}return poxyId;}/*** @description:保存已经连接的消费通道ID* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午10:41:33*/public void savePoxyId(String path, int poxyId) {//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库JsonUtils.writeStringToSystemPath(path, String.valueOf(poxyId));}
}
3.3.2JsonUtils
为了保证代码完整性,如果你完全抄上面的代码,这里提供了代码需要的两个文件操作示例
public static String readStringFromSystemPath(String path) {String data = "";try {InputStream inputStream = new FileInputStream(path);byte[] bdata = FileCopyUtils.copyToByteArray(inputStream);data = new String(bdata, StandardCharsets.UTF_8);} catch (FileNotFoundException e) {log.info("文件不存在,文件地址:{}", path);} catch (Exception e) {log.info("读取文件失败,文件地址:{},失败原因:{}", path,e.getMessage());} return data;}public static void writeStringToSystemPath(String filePath, String str) {Writer write = null;try {File file = new File(filePath);if(file.exists()) {file.delete();}if (!file.getParentFile().exists()) {file.getParentFile().mkdirs();}if(file.createNewFile()) {write = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8);write.write(str);write.flush();}} catch (Exception e) {e.printStackTrace();} finally {if(write !=null ) {try {write.close();} catch (IOException e) {e.printStackTrace();}}}}
3.4启动订阅
这里我们使用SpringBoot启动的时候启动订阅,即实现ApplicationRunner,然后使用线程池的单线程来启动上面我们编写的线程。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.service.BaseCorbaService;/*** @description:启动订阅corba的消费* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 下午4:18:16*/
@Component
public class SubscribeRunner implements ApplicationRunner {@Value(value = "${file-save-path}")private String poxyIdPath;@Autowiredprivate BaseCorbaService baseCorbaService;@Overridepublic void run(ApplicationArguments args) throws Exception {poxyIdPath = poxyIdPath + "poxyId";CorbaLoginRes login = baseCorbaService.login();ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(new SubscribeNotice(login, poxyIdPath));}
}
4.效果展示
相关文章:

[华为北向网管NCE开发教程(6)消息订阅
1.作用 之前介绍的都是我们向网管NCE发起请求获取数据,消息订阅则反过来,是网管NCE系统给我们推送信息。其原理和MQ,JMS这些差不多,这里不过多累述。 2.场景 所支持订阅的场景有如下,以告警通知为例,当我…...
2024.6.15 英语六级 经验与复盘
文章目录 英语六级 经验与复盘2024年上半年六级考试(2024 6.8 - 6.15)前情提要:经验:作文:(30min)听力:(25min)SectionC(精细阅读) (30min)SectionB(段落匹配) (15min)SectionA(选词填空) (5min / 舍弃)翻译(20min&…...
计算机专业的未来展望
身份角度一:一名曾经的计算机专业学生 作为一名曾经的计算机专业学生,我认为计算机相关专业仍然是一个值得考虑的选择。随着科技的飞速发展,计算机行业的需求只会越来越高,因此,无论是在就业前景还是个人发展方面&a…...
Shell变量的高级用法
在Shell编程中,变量的使用是至关重要的。初学者可能只使用最基本的变量赋值和调用,但Shell变量实际上有很多高级用法,可以极大地提升脚本的灵活性和效率。本文将介绍几种Shell变量的高级用法,帮助您更好地利用Shell脚本。 1. 参数…...

【Python/Pytorch - 网络模型】-- SVD算法
文章目录 文章目录 00 写在前面01 基于Pytorch版本的SVD算代码02 理论知识 00 写在前面 (1)矩阵的奇异值分解在最优化问题、特征值问题、最小二乘方问题、广义逆矩阵问题及统计学等方面都有重要应用; (2)应用&#…...

全光万兆时代来临:信而泰如何推动F5G-A(50PONFTTR)技术发展
技术背景 F5G-A(Fifth Generation Fixed Network-Advanced,第五代固定网络接入)是固定网络技术的一次重大升级,代表了光纤网络技术的最新发展。F5G-A旨在提供更高的带宽、更低的延迟、更可靠的连接以及更广泛的应用场景。 F5G-A六…...

港科夜闻 | 香港科大与香港科大(广州)合推红鸟跨校园学习计划,共享教学资源,促进港穗学生交流学习...
关注并星标 每周阅读港科夜闻 建立新视野 开启新思维 1、香港科大与香港科大(广州)合推“红鸟跨校园学习计划”,共享教学资源,促进港穗学生交流学习。香港科大与香港科大(广州)6月14日共同宣布推出“红鸟跨校园学习计划”,以进一步加强两校学…...

基于Wireshark实现对FTP的抓包分析
基于Wireshark实现对FTP的抓包分析 前言一、虚拟机Win10环境配置二、FileZilla客户端的安装配置下载FileZilla客户端安装FileZilla 三、FileZilla Server安装下载FileZilla Server安装 四、实现对FTP的抓包前置工作实现抓包完成抓包 前言 推荐一个网站给想要了解或者学习人工智…...

Vue54-浏览器的本地存储webStorage
一、本地存储localStorage的作用 二、本地存储的代码实现 2-1、存储数据 注意: localStorage是window上的函数,所以,可以把window.localStorage直接写成localStorage(直接调用!) 默认调了p.toString()方…...

Linux下Shell脚本基础知识
主要参考视频: 这可能是B站讲的最好的Linux Shell脚本教程,3h打通Linux-shell全套教程,从入门到精通完整版_哔哩哔哩_bilibili 主要参考文档: Shell 教程 | 菜鸟教程 (runoob.com) Bash Shell教程 (yiibai.com) 先用视频入门&…...

爬虫初学篇——看完这些还怕自己入门不了?
初次学习爬虫,知识笔记小分享 学scrapy框架可看:孤寒者博主的【Python爬虫必备—>Scrapy框架快速入门篇——上】 目录🌟 一、🍉基础知识二、🍉http协议:三、🍉解析网页(1) xpath的用…...

[数据集][目标检测]减速区域检测数据集VOC+YOLO格式1654张1类别
数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):1654 标注数量(xml文件个数):1654 标注数量(txt文件个数):1654 标注…...

OpenGL3.3_C++_Windows(8)
材质&&漫反射,光照贴图 使用struct为材质建立结构体,以便方便管理漫反射贴图是物体的颜色值(纹理)(通过 UV 坐标映射到渲染物体的表面),材质是物体的属性(物体对光的交互&a…...

GPU的工作原理
location: Beijing 1. why is GPU CPU的存储单元和计算单元的互通过慢直接促进了GPU的发展 先介绍一个概念:FLOPS(Floating Point Operations Per Second,浮点运算每秒)是一个衡量其执行浮点运算的能力,可以作为计算…...

Linux常⽤服务器构建-samba
目录 1. 介绍 2. 安装 3. 配置 3.1 创建存放共享⽂件的路径 3.2 创建samba账户 4 重启samba 5. 访问共享⽂件 5.1 mac下访问⽅式 5.2 windows下访问⽅式 1. 介绍 Samba 是在 Linux 和 UNIX 系统上实现 SMB 协议的⼀个免费软件,能够完成在 windows 、 mac 操作系统…...

【Java】已解决java.lang.UnsupportedOperationException异常
文章目录 问题背景可能出错的原因错误代码示例正确代码示例注意事项 已解决java.lang.UnsupportedOperationException异常 在Java编程中,java.lang.UnsupportedOperationException是一个运行时异常,通常表示尝试执行一个不支持的操作。这种异常经常发生…...

在ubuntu中恢复误删除的文件
1、安装 TestDisk 在 Ubuntu 上,可以使用以下命令安装 TestDisk: sudo apt-get install testdisk2、查询你删除的文件所在那个分区 #查询分区 df -h #我这里是/dev/sda2 #也可以使用下面命令查看具体哪个分区 lsblk3、查询该分区是什么系统类型 sudo …...

Sklearn中逻辑回归建模
分类模型的评估 回归模型的评估方法,主要有均方误差MSE,R方得分等指标,在分类模型中,我们主要应用的是准确率这个评估指标,除此之外,常用的二分类模型的模型评估指标还有召回率(Recallÿ…...

【ARM】MDK出现报错error: A\L3903U的解决方法
【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 解决MDK出现报错error: A\L3903U这样类型的报错 2、 问题场景 电脑或者软件因为意外情况导致崩溃,无法正常关闭,强制电脑重启之后,打开工程去编译出现下面的报错信息(…...
0018__字体的kerning是什么意思
泰山OFFICE技术讲座:字体的kerning是什么意思-CSDN博客 了解CSS属性font-kerning,font-smoothing,font-variant-CSDN博客...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...

基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...

Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

练习(含atoi的模拟实现,自定义类型等练习)
一、结构体大小的计算及位段 (结构体大小计算及位段 详解请看:自定义类型:结构体进阶-CSDN博客) 1.在32位系统环境,编译选项为4字节对齐,那么sizeof(A)和sizeof(B)是多少? #pragma pack(4)st…...
C++ 基础特性深度解析
目录 引言 一、命名空间(namespace) C 中的命名空间 与 C 语言的对比 二、缺省参数 C 中的缺省参数 与 C 语言的对比 三、引用(reference) C 中的引用 与 C 语言的对比 四、inline(内联函数…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...

使用Spring AI和MCP协议构建图片搜索服务
目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式(本地调用) SSE模式(远程调用) 4. 注册工具提…...

人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...