2025-03-12 14:17:01 +08:00

278 lines
8.0 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
@file AnaWorkThread.cpp
@brief 模拟量处理工作线程
@author 周正龙
*/
#include "boost/program_options.hpp"
#include "AnaWorkThread.h"
#include "pub_logger_api/logger.h"
#include "pub_utility_api/TimeUtil.h"
using namespace iot_net;
using namespace iot_public;
using namespace iot_service;
using namespace iot_idl;
using namespace iot_dbms;
using namespace std;
CAnaWorkThread::CAnaWorkThread(iot_public::SRunAppInfo stRunAppInfo,CPacketQueuePtr ptrPacketQueue):
CTimerThreadBase("CAnaWorkThread",15)
{
m_nMsgSeqNum = 0;
m_lLastSendReqAllDataTime = 0;
m_lLastProAlmTime = 0;
m_lLastTime = 0;
m_lLastAllDataTime = 0;
m_ptrAnaProcess = nullptr;
m_ptrDataProcApi = nullptr;
m_ptrDataPublish = nullptr;
m_stRunAppInfo = stRunAppInfo ;
m_lLastAllDataTime = 0 ;
m_ptrPacketQueue = ptrPacketQueue ;
m_lCurTime = getMonotonicMsec();
}
CAnaWorkThread::~CAnaWorkThread()
{
}
bool CAnaWorkThread::initialize()
{
//数据处理接口库实例
//=============================================================================================
m_ptrDataProcApi = getDataProcInstance(m_stRunAppInfo.strAppName);
if (m_ptrDataProcApi == NULL)
{
LOGERROR("CAnaWorkThread::init(), getDataProcInstance error");
return false;
}
if(m_ptrDataProcApi->initialize() <0)
{
LOGERROR("CAnaWorkThread::init(), data_process_api initialize() error");
return false;
}
//数据发布实例
//=============================================================================================
m_ptrDataPublish = boost::make_shared<CSrvDataPublish>(m_stRunAppInfo);
if (m_ptrDataPublish == NULL)
{
LOGERROR("CAnaWorkThread ::init(), make_shared<CSrvDataPublish> fail!\n");
return false;
}
if(m_ptrDataPublish->initialize() == false)
{
LOGERROR("CAnaWorkThread::init(), CSrvDataPublish initialize() error");
return false;
}
//ai处理 实例
//=============================================================================================
m_ptrAnaProcess = boost::make_shared<CAnaProcess>(m_stRunAppInfo,m_ptrDataProcApi,\
m_ptrDataPublish);
if (m_ptrAnaProcess == NULL)
{
LOGERROR("CAnaWorkThread::init(), make_shared<CAnaProcess> fail!\n");
return false;
}
if(m_ptrAnaProcess->initialize() == false)
{
LOGERROR("CAnaWorkThread::init(), CAnaProcess initialize() error");
return false;
}
return true;
}
/**
@brief 主模块业务处理执行体
@param 无
@return 无
@retval
*/
void CAnaWorkThread::execute()
{
m_lCurTime = getMonotonicMsec();
//1.消息处理
//===================================================================
processFesMessage();
requestAllDataFromFes() ;
processOptMessage();
//3.ANA 报警延迟处理
//===================================================================
processDelayTimeAlarm();
//发送变位产生的data/测点事件+SOE事件+越限事件
//=============================================================================================================
if(m_ptrDataPublish)
{
m_ptrDataPublish->sendChange();
m_ptrDataPublish->sendAlarm();
}
return;
}
/**
@brief 报警延迟处理
@param mapDelayLimitType
@return 无
@retval
*/
int CAnaWorkThread::processDelayTimeAlarm()
{
if((m_lCurTime - m_lLastProAlmTime) >= 200)
{
m_lLastProAlmTime = m_lCurTime;
int nRetCode = m_ptrAnaProcess->delayTimeProcess(m_stRunAppInfo.nDomainId);
if (nRetCode == -1)
{
LOGERROR("CAnaWorkThread::execute, process analog delayTimeProcessAlgo error ");
}
LOGDEBUG("CAnaWorkThread::execute, end process of delayTimeProcess");
}
return 1;
}
int CAnaWorkThread::processOptMessage()
{
//处理标签操作
//=============================================================================================================
std::deque<iot_idl::SOptSetDataPkg> deqOptManSetPkg;
if(m_ptrPacketQueue->getOptManSetPkg(POINT_TYPE_ANA,deqOptManSetPkg))
{
std::deque<iot_idl::SOptSetDataPkg>::iterator it = deqOptManSetPkg.begin();
for( ;it!=deqOptManSetPkg.end();++it)
{
iot_idl::SOptSetDataPkg &objOptManSetPkg = *it ;
m_ptrAnaProcess->processOperate(objOptManSetPkg,OPERATE_TYPE );
LOGINFO("processOptMessage:处理标签操作 num=%d .",objOptManSetPkg.seq_set_data_info_size());
}
}
//处理计算点
//=============================================================================================================
std::deque<iot_idl::SOptSetDataPkg> deqOptCalSetPkg;
if(m_ptrPacketQueue->getOptCalSetPkg(POINT_TYPE_ANA,deqOptCalSetPkg))
{
std::deque<iot_idl::SOptSetDataPkg>::iterator it = deqOptCalSetPkg.begin();
for( ;it!=deqOptCalSetPkg.end();++it)
{
iot_idl::SOptSetDataPkg &objOptCalSetPkg = *it ;
m_ptrAnaProcess->processOperate(objOptCalSetPkg,OPT_TYPE_CAL_SET);
//LOGINFO("processOptMessage:处理计算点 num=%d .",objOptCalSetPkg.seq_set_data_info_size());
}
}
return true ;
}
/**
@brief 前置实时数据处理(变化数据/全数据/报警/SOE/各种事件)
@param 无
@return 无
@retval
*/
int CAnaWorkThread::processFesMessage()
{
int64 t1,t2;
//处理变化模拟量
//=============================================================================================================
std::deque<iot_idl::SFesChangeAiPkg> deqFesChgAiPkg;
if(m_ptrPacketQueue->getFesChangeAnaPkg(deqFesChgAiPkg)) //获得变化数据包队列
{
std::deque<iot_idl::SFesChangeAiPkg>::iterator it = deqFesChgAiPkg.begin();
for( ;it!=deqFesChgAiPkg.end();++it)
{
t1 = getMonotonicMsec() ;
iot_idl::SFesChangeAiPkg &objFesChangeAiPkg = *it ;
m_ptrAnaProcess->procAnaChange(objFesChangeAiPkg);
t2 = getMonotonicMsec() ;
LOGDEBUG("processFesMessage:处理变化模拟量 num=%d用时=%lld(ms)",objFesChangeAiPkg.staidata_size(),(long long)(t2-t1));
}
}
//处理全数据
//=============================================================================================================
iot_idl::SFesUpdateAiPkg objFesUpdAiPkg;
if(m_ptrPacketQueue->getFesUpdateAnaPkg(objFesUpdAiPkg)) //获得全数据
{
t1 = getMonotonicMsec() ;
m_ptrAnaProcess->procAnaUpdate(objFesUpdAiPkg);
t2 = getMonotonicMsec() ;
LOGDEBUG("processFesMessage:处理模拟量全数据 num=%d用时=%lld(ms)",objFesUpdAiPkg.staidata_size(),(long long)(t2-t1));
}
return true;
}
/**
@brief 挂起线程后的处理
@param 无
@return 无
@retval
*/
void CAnaWorkThread::afterSuspend()
{
m_ptrDataPublish->suspend();
LOGINFO("CAnaWorkThread::Thread afterSuspend");
}
/**
@brief 恢复线程后的处理
@param 无
@return 无
@retval
*/
void CAnaWorkThread::afterResume()
{
m_ptrDataPublish->resume();
LOGINFO("CAnaWorkThread::Thread afterResume");
}
void CAnaWorkThread::release()
{
if(m_ptrAnaProcess!= NULL)
{
m_ptrAnaProcess.reset();
m_ptrAnaProcess = NULL;
LOGINFO("CAnaWorkThread::release(), 释放模拟量处理类成功!");
}
if(m_ptrDataPublish!= NULL)
{
m_ptrDataPublish->release();
m_ptrDataPublish.reset();
m_ptrDataPublish =NULL;
LOGINFO("CAnaWorkThread::release(), 释放数据发布处理类成功!");
}
if(m_ptrDataProcApi!= NULL)
{
m_ptrDataProcApi.reset();
m_ptrDataProcApi = NULL;
LOGINFO("CAnaWorkThread::release(), 释放数据处理动态库成功!");
}
return ;
}
int iot_service::CAnaWorkThread::requestAllDataFromFes()
{
/*
int64 lCurTimeMs = getMonotonicMsec();
if (lCurTimeMs - m_lLastSendReqAllDataTime > CN_MinReqFesAllDataIntervalMs)
{
//TODO:发送请求
//m_ptrDataPublish->SendFesDataRequest();
}*/
return kbdSuccess;
}