/** @file RemoteAppInfoMng.cpp @brief 其它节点的应用冗余信息管理类 @author 曹顶法 */ #include "RemoteAppInfoMng.h" #include "pub_utility_api/TimeUtil.h" #include "NodeMngMessage.pb.h" using namespace std; using namespace iot_idl; using namespace iot_public; using namespace iot_sys; iot_sys::CRemoteAppInfoMng::CRemoteAppInfoMng(const iot_public::CSysInfoInterfacePtr &ptrSysInfo, const CNodeInfoMngPtr &ptrNodeInfoMng) :m_bLastNetworkIsActive(false), m_ptrSysInfo(ptrSysInfo), m_ptrNodeInfoMng(ptrNodeInfoMng), m_ptrNetworkCheck(NULL), m_nRecvHeartbeatTimeoutMsec(4000), m_nCheckPeriodMsec(200) { } iot_sys::CRemoteAppInfoMng::~CRemoteAppInfoMng() { m_ptrNetworkCheck.reset(); m_ptrNodeInfoMng.reset(); m_ptrSysInfo.reset(); } int iot_sys::CRemoteAppInfoMng::initialize() { //1.获取本节点信息 if (iotSuccess != m_ptrSysInfo->getLocalNodeInfo(m_stLocalNodeInfo)) { LOGERROR("获取本机信息失败"); return iotFailed; } //2.获取相关的参数配置信息 m_nRecvHeartbeatTimeoutMsec = CNodeMngCfgParam::getRemoteHeartbeatTimeoutMsec(); m_nCheckPeriodMsec = CNodeMngCfgParam::getHeartbeatCheckPeriod(); m_mapRecvAppInfo.clear(); //3.初始化网络检测接口 m_ptrNetworkCheck = getNetworkCheckInstance(); if (m_ptrNetworkCheck == NULL) { LOGERROR("创建网络检测类失败"); return iotFailed; } //4.初始化要管理的应用列表 if (iotSuccess != initializeAppInfoMAP()) { return iotFailed; } //5.获取本机的应用部署信息 if (iotSuccess != m_ptrSysInfo->getLocalAppDeploy(m_vecLocalAppDeploy)) { LOGERROR("获取本机的应用部署信息失败"); return iotFailed; } return iotSuccess; } /* @brief 初始化要管理的应用列表 */ int iot_sys::CRemoteAppInfoMng::initializeAppInfoMAP() { int64 lCurTime = getMonotonicMsec(); AppKeyToRedundancyInfoMAP mapAppInfo = m_ptrNodeInfoMng->getAllAppInfo(); for (AppKeyToRedundancyInfoMAP::const_iterator pApp = mapAppInfo.begin(); pApp != mapAppInfo.end(); ++pApp) { const SAppInfoKey &stAppInfoKey = pApp->first; if (stAppInfoKey.strNodeName == m_stLocalNodeInfo.strName) //< 本机不需要再此类中管理 { continue; } SNodeInfo stNode; if (iotSuccess != m_ptrSysInfo->getNodeInfoByName(stAppInfoKey.strNodeName, stNode)) { LOGERROR("获取节点[%s]信息失败", stAppInfoKey.strNodeName.c_str()); return iotFailed; } SAppHeartbeatInfo stRecvAppInfo; stRecvAppInfo.stKey = stAppInfoKey; stRecvAppInfo.lLastTime = lCurTime; stRecvAppInfo.strIP1 = stNode.strNic1Addr; stRecvAppInfo.strIP2 = stNode.strNic2Addr; m_mapRecvAppInfo[stAppInfoKey] = stRecvAppInfo; //< 此处无需判断是否重复,NodeInfoMng中已经判断了 LOGINFO("远程节点信息管理缓冲添加记录:域[%d],应用[%d],节点[%s]", stAppInfoKey.nDomainId, stAppInfoKey.nAppId, stAppInfoKey.strNodeName.c_str()); } return iotSuccess; } /* @brief 检查接收到的应用的冗余状态信息,判断是否有超时的 */ int iot_sys::CRemoteAppInfoMng::checkRemoteBuffer() { boost::mutex::scoped_lock lock(m_objRecvBufMutex); bool bNetworkIsActive = false; SNetworkState stNetSta; if(iotSuccess == m_ptrNetworkCheck->getLocalNetStatus(stNetSta)) { bNetworkIsActive = (stNetSta.bIP1State || stNetSta.bIP2State); } int64 lCurTime = getMonotonicMsec(); for (AppKeyToHeartbeatMAP::iterator pNode = m_mapRecvAppInfo.begin(); pNode != m_mapRecvAppInfo.end(); ++pNode) { SAppHeartbeatInfo &stAppInfo = pNode->second; /* @brief 检查冗余相关状态是否发生变化 */ if(!stNetSta.bIP1State && stAppInfo.bIP1Active) { stAppInfo.bIP1Active = false; stAppInfo.bNetworkStateChanged = true; } if(!stNetSta.bIP2State && stAppInfo.bIP2Active) { stAppInfo.bIP2Active = false; stAppInfo.bNetworkStateChanged = true; } if (stAppInfo.bRedundancyStateChanged || stAppInfo.bNetworkStateChanged) { m_ptrNodeInfoMng->updateAppRedundancyInfo(stAppInfo); stAppInfo.bRedundancyStateChanged = false; stAppInfo.bNetworkStateChanged = false; } /* @brief 判断是否超时 */ if(bNetworkIsActive && !m_bLastNetworkIsActive) { //< 如果网络恢复(上一次网络不正常,这次正常),则重新计时,否则可能产生以下问题: //< 网络一恢复(能ping通网关),消息总线尚未连接,尚未收到其他机器心跳,认为域中无主,但实际上可能有主 //< 下面判断已超时,通过heartbeatTimeout()设置nTimeoutCount大于xml配置中的redundancy_stable_threshold //< 本机可能会马上升主,当消息总线连接,心跳交互完成,又进行双主决策。 //< 双主决策中按节点顺序,判断靠前的为主,可能抢走了原来的主,可能造成部分数据丢失(比如告警丢失) //< todo 双主决策可以完善,用持有时间作为优先级,避免不必要的抢主 stAppInfo.lLastTime = lCurTime; } else { int64 lTimeInterval = lCurTime - stAppInfo.lLastTime; if (lTimeInterval > m_nRecvHeartbeatTimeoutMsec) { /* @brief 为避免超时时一直打印日志,仅在上次状态不为超时状态时才打印 */ if (stAppInfo.bActive || stAppInfo.bMaster || stAppInfo.bHasMaster) { LOGERROR("远程节点心跳超时.域[%d],应用[%d],节点[%s],当前时间[%" PRId64 "],最后更新时间[%" PRId64 "]", stAppInfo.stKey.nDomainId, stAppInfo.stKey.nAppId, stAppInfo.stKey.strNodeName.c_str(), lCurTime, stAppInfo.lLastTime); } /* @brief 心跳超时,将本应用的冗余状态修改为不可用状态 */ stAppInfo.bActive = false; stAppInfo.bMaster = false; stAppInfo.bSlave = false; stAppInfo.bHasMaster = false; stAppInfo.bIP1Active = false; stAppInfo.bIP2Active = false; int nTimeoutCount = 0; if (bNetworkIsActive) { //TODO:超时次数的计算需要再考虑一下,SLocalNodeInfo中增加一个变量,还是用间隔时间除一下,取商 nTimeoutCount = static_cast(lTimeInterval / m_nCheckPeriodMsec); } else { /* @brief 持续次数设置为0,避免网络恢复时,对系统造成冲击 */ stAppInfo.lLastTime = lCurTime; nTimeoutCount = 0; } m_ptrNodeInfoMng->heartbeatTimeout(stAppInfo, nTimeoutCount); } } } m_bLastNetworkIsActive = bNetworkIsActive; return iotSuccess; } /* @brief 设置冗余切换失败标识 */ int iot_sys::CRemoteAppInfoMng::setSwitchFailedFlag(const SAppInfoKey &stKey) { if (stKey.nAppId == CN_AppId_All) { for (size_t i = 0; i < m_vecLocalAppDeploy.size(); ++i) { SAppInfoKey stKeyTemp = stKey; stKeyTemp.nAppId = m_vecLocalAppDeploy[i].nAppId; if (iotSuccess != setSwitchFailedFlagByKey(stKeyTemp)) { return iotFailed; } } } else { return setSwitchFailedFlagByKey(stKey); } return iotSuccess; } //TODO:可以考虑直接更新nodemngbuf,而不是通过本类的线程扫描然后更新 /* @brief 更新接收到的应用冗余心跳信息 */ int iot_sys::CRemoteAppInfoMng::updateRecvHeartbeatInfo(const SAppHeartbeatInfo &stHeartbeat) { boost::mutex::scoped_lock lock(m_objRecvBufMutex); AppKeyToHeartbeatMAP::iterator pIter = m_mapRecvAppInfo.find(stHeartbeat.stKey); if (pIter != m_mapRecvAppInfo.end()) { // LOGDEBUG("接收到心跳信息.域[%d],应用[%d],节点[%s],可用[%d],为主[%d],为备[%d],有主[%d],切换失败[%d]", // stHeartbeat.stKey.nDomainId, stHeartbeat.stKey.nAppId, stHeartbeat.stKey.strNodeName.c_str(), // stHeartbeat.bActive, stHeartbeat.bMaster, stHeartbeat.bSlave, stHeartbeat.bHasMaster, stHeartbeat.bSwitchNG); SAppHeartbeatInfo &stAppInfo = pIter->second; /* @brief 更新相关标识 */ if (stAppInfo.bActive != stHeartbeat.bActive || stAppInfo.bMaster != stHeartbeat.bMaster || stAppInfo.bSlave != stHeartbeat.bSlave || stAppInfo.bHasMaster != stHeartbeat.bHasMaster || stAppInfo.bSwitchNG != stHeartbeat.bSwitchNG) { LOGINFO("节点状态发生变化.域[%d],应用[%d],节点[%s],可用[%d->%d],为主[%d->%d],为备[%d->%d],有主[%d->%d],切换失败[%d->%d]", stAppInfo.stKey.nDomainId, stAppInfo.stKey.nAppId, stAppInfo.stKey.strNodeName.c_str(), stAppInfo.bActive, stHeartbeat.bActive, stAppInfo.bMaster, stHeartbeat.bMaster, stAppInfo.bSlave, stHeartbeat.bSlave, stAppInfo.bHasMaster, stHeartbeat.bHasMaster, stAppInfo.bSwitchNG, stHeartbeat.bSwitchNG); stAppInfo.bActive = stHeartbeat.bActive; stAppInfo.bMaster = stHeartbeat.bMaster; stAppInfo.bSlave = stHeartbeat.bSlave; stAppInfo.bHasMaster = stHeartbeat.bHasMaster; stAppInfo.bSwitchNG = stHeartbeat.bSwitchNG; stAppInfo.bRedundancyStateChanged = true; stAppInfo.nRedundancyStatePersistCount = 1; } else { stAppInfo.nRedundancyStatePersistCount = (std::min)(stAppInfo.nRedundancyStatePersistCount + 1, CN_MaxPersistCount); /** @brief 更新节点信息管理缓冲区中的持续次数 加入此判断可以减少对m_ptrNodeInfoMng的更新次数,因为使用nRedundancyStatusPersistCount时都会先判断bActive 为了逻辑统一,不进行过滤 */ m_ptrNodeInfoMng->increasePersistCount(stAppInfo.stKey); // if (!stAppInfo.bActive) // { // m_ptrNodeInfoMng->increasePersistCount(stAppInfo.stKey); // } } /* @brief 更新访问信息字段 */ if (stAppInfo.bIP1Active != stHeartbeat.bIP1Active || stAppInfo.bIP2Active != stHeartbeat.bIP2Active) { bool bIP1Active = stHeartbeat.bIP1Active; bool bIP2Active = stHeartbeat.bIP2Active; SNetworkState stNetSta; if(iotSuccess == m_ptrNetworkCheck->getLocalNetStatus(stNetSta)) { //为了解决如下情况:本机A、B网都正常,接收者只有A网正常,但是在接收者的内存中记录了AB网都正常 //调整为:本机某网不正常时,不管对端是否正常,以本机为准 //为了降低正常情况下此处访问网络状态的次数,没有每次都获取网络状态,而通过与checkRemoteBuffer配合完成 if(!stNetSta.bIP1State && bIP1Active) { bIP1Active = false; } if(!stNetSta.bIP2State && bIP2Active) { bIP2Active = false; } } if(stAppInfo.bIP1Active != bIP1Active || stAppInfo.bIP2Active == bIP2Active) { stAppInfo.bIP1Active = stHeartbeat.bIP1Active; stAppInfo.bIP2Active = stHeartbeat.bIP2Active; stAppInfo.bNetworkStateChanged = true; } } /* @brief 更新心跳时间戳 */ stAppInfo.lLastTime = getMonotonicMsec(); if (stAppInfo.bRedundancyStateChanged || stAppInfo.bNetworkStateChanged) { m_ptrNodeInfoMng->updateAppRedundancyInfo(stAppInfo); stAppInfo.bRedundancyStateChanged = false; stAppInfo.bNetworkStateChanged = false; } } else { LOGERROR("记录不存在.域[%d],应用[%d],节点[%s]", stHeartbeat.stKey.nDomainId, stHeartbeat.stKey.nAppId, stHeartbeat.stKey.strNodeName.c_str()); return iotFailed; } return iotSuccess; } /* @brief 设置指定应用的冗余切换失败标识 */ int iot_sys::CRemoteAppInfoMng::setSwitchFailedFlagByKey(const SAppInfoKey &stKey) { boost::mutex::scoped_lock lock(m_objRecvBufMutex); AppKeyToHeartbeatMAP::iterator pIter = m_mapRecvAppInfo.find(stKey); if (pIter != m_mapRecvAppInfo.end()) { SAppHeartbeatInfo &stAppInfo = pIter->second; stAppInfo.bSwitchNG = true; LOGINFO("设置应用切换失败标识.域[%d],应用[%d],节点[%s]", stKey.nDomainId, stKey.nAppId, stKey.strNodeName.c_str()); return iotSuccess; } else { LOGERROR("记录不存在.域[%d],应用[%d],节点[%s]", stKey.nDomainId, stKey.nAppId, stKey.strNodeName.c_str()); return iotFailed; } }