/** @file AnaWorkThread.cpp @brief 模拟量处理工作线程 @author 周正龙 */ #include "boost/program_options.hpp" #include "AnaWorkThread.h" #include "pub_logger_api/logger.h" #include "pub_utility_api/TimeUtil.h" using namespace iot_net; using namespace iot_public; using namespace iot_service; using namespace iot_idl; using namespace iot_dbms; using namespace std; CAnaWorkThread::CAnaWorkThread(iot_public::SRunAppInfo stRunAppInfo,CPacketQueuePtr ptrPacketQueue): CTimerThreadBase("CAnaWorkThread",15) { m_nMsgSeqNum = 0; m_lLastSendReqAllDataTime = 0; m_lLastProAlmTime = 0; m_lLastTime = 0; m_lLastAllDataTime = 0; m_ptrAnaProcess = nullptr; m_ptrDataProcApi = nullptr; m_ptrDataPublish = nullptr; m_stRunAppInfo = stRunAppInfo ; m_lLastAllDataTime = 0 ; m_ptrPacketQueue = ptrPacketQueue ; m_lCurTime = getMonotonicMsec(); } CAnaWorkThread::~CAnaWorkThread() { } bool CAnaWorkThread::initialize() { //数据处理接口库实例 //============================================================================================= m_ptrDataProcApi = getDataProcInstance(m_stRunAppInfo.strAppName); if (m_ptrDataProcApi == NULL) { LOGERROR("CAnaWorkThread::init(), getDataProcInstance error"); return false; } if(m_ptrDataProcApi->initialize() <0) { LOGERROR("CAnaWorkThread::init(), data_process_api initialize() error"); return false; } //数据发布实例 //============================================================================================= m_ptrDataPublish = boost::make_shared(m_stRunAppInfo); if (m_ptrDataPublish == NULL) { LOGERROR("CAnaWorkThread ::init(), make_shared fail!\n"); return false; } if(m_ptrDataPublish->initialize() == false) { LOGERROR("CAnaWorkThread::init(), CSrvDataPublish initialize() error"); return false; } //ai处理 实例 //============================================================================================= m_ptrAnaProcess = boost::make_shared(m_stRunAppInfo,m_ptrDataProcApi,\ m_ptrDataPublish); if (m_ptrAnaProcess == NULL) { LOGERROR("CAnaWorkThread::init(), make_shared fail!\n"); return false; } if(m_ptrAnaProcess->initialize() == false) { LOGERROR("CAnaWorkThread::init(), CAnaProcess initialize() error"); return false; } return true; } /** @brief 主模块业务处理执行体 @param 无 @return 无 @retval */ void CAnaWorkThread::execute() { m_lCurTime = getMonotonicMsec(); //1.消息处理 //=================================================================== processFesMessage(); requestAllDataFromFes() ; processOptMessage(); //3.ANA 报警延迟处理 //=================================================================== processDelayTimeAlarm(); //发送变位产生的data/测点事件+SOE事件+越限事件 //============================================================================================================= if(m_ptrDataPublish) { m_ptrDataPublish->sendChange(); m_ptrDataPublish->sendAlarm(); } return; } /** @brief 报警延迟处理 @param mapDelayLimitType @return 无 @retval */ int CAnaWorkThread::processDelayTimeAlarm() { if((m_lCurTime - m_lLastProAlmTime) >= 200) { m_lLastProAlmTime = m_lCurTime; int nRetCode = m_ptrAnaProcess->delayTimeProcess(m_stRunAppInfo.nDomainId); if (nRetCode == -1) { LOGERROR("CAnaWorkThread::execute, process analog delayTimeProcessAlgo error "); } LOGDEBUG("CAnaWorkThread::execute, end process of delayTimeProcess"); } return 1; } int CAnaWorkThread::processOptMessage() { //处理标签操作 //============================================================================================================= std::deque deqOptManSetPkg; if(m_ptrPacketQueue->getOptManSetPkg(POINT_TYPE_ANA,deqOptManSetPkg)) { std::deque::iterator it = deqOptManSetPkg.begin(); for( ;it!=deqOptManSetPkg.end();++it) { iot_idl::SOptSetDataPkg &objOptManSetPkg = *it ; m_ptrAnaProcess->processOperate(objOptManSetPkg,OPERATE_TYPE ); LOGINFO("processOptMessage:处理标签操作 num=%d .",objOptManSetPkg.seq_set_data_info_size()); } } //处理计算点 //============================================================================================================= std::deque deqOptCalSetPkg; if(m_ptrPacketQueue->getOptCalSetPkg(POINT_TYPE_ANA,deqOptCalSetPkg)) { std::deque::iterator it = deqOptCalSetPkg.begin(); for( ;it!=deqOptCalSetPkg.end();++it) { iot_idl::SOptSetDataPkg &objOptCalSetPkg = *it ; m_ptrAnaProcess->processOperate(objOptCalSetPkg,OPT_TYPE_CAL_SET); //LOGINFO("processOptMessage:处理计算点 num=%d .",objOptCalSetPkg.seq_set_data_info_size()); } } return true ; } /** @brief 前置实时数据处理(变化数据/全数据/报警/SOE/各种事件) @param 无 @return 无 @retval */ int CAnaWorkThread::processFesMessage() { int64 t1,t2; //处理变化模拟量 //============================================================================================================= std::deque deqFesChgAiPkg; if(m_ptrPacketQueue->getFesChangeAnaPkg(deqFesChgAiPkg)) //获得变化数据包队列 { std::deque::iterator it = deqFesChgAiPkg.begin(); for( ;it!=deqFesChgAiPkg.end();++it) { t1 = getMonotonicMsec() ; iot_idl::SFesChangeAiPkg &objFesChangeAiPkg = *it ; m_ptrAnaProcess->procAnaChange(objFesChangeAiPkg); t2 = getMonotonicMsec() ; LOGDEBUG("processFesMessage:处理变化模拟量 num=%d,用时=%lld(ms)",objFesChangeAiPkg.staidata_size(),(long long)(t2-t1)); } } //处理全数据 //============================================================================================================= iot_idl::SFesUpdateAiPkg objFesUpdAiPkg; if(m_ptrPacketQueue->getFesUpdateAnaPkg(objFesUpdAiPkg)) //获得全数据 { t1 = getMonotonicMsec() ; m_ptrAnaProcess->procAnaUpdate(objFesUpdAiPkg); t2 = getMonotonicMsec() ; LOGDEBUG("processFesMessage:处理模拟量全数据 num=%d,用时=%lld(ms)",objFesUpdAiPkg.staidata_size(),(long long)(t2-t1)); } return true; } /** @brief 挂起线程后的处理 @param 无 @return 无 @retval */ void CAnaWorkThread::afterSuspend() { m_ptrDataPublish->suspend(); LOGINFO("CAnaWorkThread::Thread afterSuspend"); } /** @brief 恢复线程后的处理 @param 无 @return 无 @retval */ void CAnaWorkThread::afterResume() { m_ptrDataPublish->resume(); LOGINFO("CAnaWorkThread::Thread afterResume"); } void CAnaWorkThread::release() { if(m_ptrAnaProcess!= NULL) { m_ptrAnaProcess.reset(); m_ptrAnaProcess = NULL; LOGINFO("CAnaWorkThread::release(), 释放模拟量处理类成功!"); } if(m_ptrDataPublish!= NULL) { m_ptrDataPublish->release(); m_ptrDataPublish.reset(); m_ptrDataPublish =NULL; LOGINFO("CAnaWorkThread::release(), 释放数据发布处理类成功!"); } if(m_ptrDataProcApi!= NULL) { m_ptrDataProcApi.reset(); m_ptrDataProcApi = NULL; LOGINFO("CAnaWorkThread::release(), 释放数据处理动态库成功!"); } return ; } int iot_service::CAnaWorkThread::requestAllDataFromFes() { /* int64 lCurTimeMs = getMonotonicMsec(); if (lCurTimeMs - m_lLastSendReqAllDataTime > CN_MinReqFesAllDataIntervalMs) { //TODO:发送请求 //m_ptrDataPublish->SendFesDataRequest(); }*/ return iotSuccess; }