[add]添加opc-ua的客户端采集逻辑

This commit is contained in:
liang-ys 2026-01-13 17:48:19 +08:00
parent c5da95eb37
commit 045bfec418
6 changed files with 2178 additions and 0 deletions

View File

@ -0,0 +1,265 @@
#include "IEC62541.h"
#include "pub_utility_api/I18N.h"
#include "pub_utility_api/CharUtil.h"
#include "pub_utility_api/CommonConfigParse.h"
using namespace iot_public;
CIEC62541 g_iec62541;
bool g_IEC62541IsMainFes = false;
bool g_IEC62541ChanelRun = true;
int EX_SetBaseAddr(void *address)
{
g_iec62541.SetBaseAddr(address);
return iotSuccess;
}
int EX_SetProperty(int FesStatus)
{
g_iec62541.SetProperty(FesStatus);
return iotSuccess;
}
int EX_OpenChan(int MainChanNo,int ChanNo,int OpenFlag)
{
g_iec62541.OpenChan(MainChanNo,ChanNo,OpenFlag);
return iotSuccess;
}
int EX_CloseChan(int MainChanNo,int ChanNo,int CloseFlag)
{
g_iec62541.CloseChan(MainChanNo,ChanNo,CloseFlag);
return iotSuccess;
}
int EX_ChanTimer(int ChanNo)
{
g_iec62541.ChanTimer(ChanNo);
return iotSuccess;
}
int EX_ExitSystem(int flag)
{
boost::ignore_unused_variable_warning(flag);
g_IEC62541ChanelRun = false;//使所有的线程退出。
return iotSuccess;
}
CIEC62541::CIEC62541():m_ProtocolId(-1),m_ptrCFesBase(NULL)
{
}
CIEC62541::~CIEC62541()
{
m_vecDataThreadPtr.clear();
}
int CIEC62541::SetBaseAddr(void *address)
{
if (m_ptrCFesBase == NULL)
{
m_ptrCFesBase = (CFesBase *)address;
}
//规约映射表初始化
if(m_ptrCFesBase != NULL)
{
m_ptrCFesBase->ProtocolRtuInitByParam1((char*)"iec62541_client");
ReadConfigParam(); //加载配置文件中的RTU配置参数
}
return iotSuccess;
}
int CIEC62541::SetProperty(int IsMainFes)
{
g_IEC62541IsMainFes = (IsMainFes == 1);
LOGDEBUG("CIEC61850::SetProperty g_IEC62541IsMainFes:%d",IsMainFes);
return iotSuccess;
}
/**
* @brief CIEC62541::OpenChan
* OpenFlag线线
* @param MainChanNo
* @param ChanNo
* @param OpenFlag 1线 2线 3线线
* @return iotSuccess iotFailed
*/
int CIEC62541::OpenChan(int MainChanNo, int ChanNo, int OpenFlag)
{
CFesChanPtr ptrMainFesChan = GetChanDataByChanNo(MainChanNo);
CFesChanPtr ptrFesChan = GetChanDataByChanNo(ChanNo);
if(ptrMainFesChan == NULL || ptrFesChan == NULL)
{
return iotFailed;
}
// if((OpenFlag==CN_FesChanThread_Flag)||(OpenFlag==CN_FesChanAndDataThread_Flag))
// {
// }
if((OpenFlag==CN_FesDataThread_Flag)||(OpenFlag==CN_FesChanAndDataThread_Flag))
{
if (ptrMainFesChan->m_DataThreadRun == CN_FesStopFlag)
{
//找到就用配置文件中配置,找不到就用构造函数默认参数
fes_iec62541_client::SIEC62541AppConfParam stRtuConfParam;
auto iterConfig = m_mapConfMap.find(ptrMainFesChan->m_Param.ChanNo);
if(iterConfig != m_mapConfMap.end())
{
stRtuConfParam = iterConfig->second;
}
//open chan thread
fes_iec62541_client::CIEC62541DataProcThreadPtr ptrThread =
boost::make_shared<fes_iec62541_client::CIEC62541DataProcThread>(m_ptrCFesBase,ptrMainFesChan,stRtuConfParam);
if (ptrThread == NULL)
{
LOGERROR("CIEC62541 EX_OpenChan() ChanNo:%d create CIEC62541DataProcThread error!",ptrFesChan->m_Param.ChanNo);
return iotFailed;
}
m_vecDataThreadPtr.push_back(ptrThread);
ptrThread->resume(); //start Data THREAD
}
}
// 因为使用的sdk的连接所以直接将通信状态设置为run
ptrMainFesChan->SetComThreadRunFlag(CN_FesRunFlag);
return iotSuccess;
}
/**
* @brief CIEC62541::CloseChan
* OpenFlag线线
* @param MainChanNo
* @param ChanNo
* @param OpenFlag 1线 2线 3线线
* @return iotSuccess iotFailed
*/
int CIEC62541::CloseChan(int MainChanNo, int ChanNo, int CloseFlag)
{
CFesChanPtr ptrMainFesChan = GetChanDataByChanNo(MainChanNo);
CFesChanPtr ptrFesChan = GetChanDataByChanNo(ChanNo);
if(ptrMainFesChan == NULL || ptrFesChan == NULL)
{
return iotFailed;
}
//虽然本协议使用sdk不是自己管理连接但是需要执行SetComThreadRunFlag(CN_FesStopFlag),否则冗余状态变化时,无法重新打开通道
if ((CloseFlag == CN_FesChanThread_Flag) || (CloseFlag == CN_FesChanAndDataThread_Flag))
{
ptrFesChan->SetComThreadRunFlag(CN_FesStopFlag);
LOGINFO("ChanNo=%d ptrFesChan->SetComThreadRunFlag(CN_FesStopFlag)", ptrFesChan->m_Param.ChanNo);
}
if((CloseFlag==CN_FesDataThread_Flag)||(CloseFlag==CN_FesChanAndDataThread_Flag))
{
if (ptrMainFesChan->m_DataThreadRun == CN_FesRunFlag)
{
//close data thread
ClearDataProcThreadByChanNo(MainChanNo);
}
}
return iotSuccess;
}
/**
* @brief CIEC62541::ChanTimer
*
* @param MainChanNo
* @return
*/
int CIEC62541::ChanTimer(int MainChanNo)
{
boost::ignore_unused_variable_warning(MainChanNo);
//把命令缓冲时间间隔到的命放到发送命令缓冲区
return iotSuccess;
}
void CIEC62541::ClearDataProcThreadByChanNo(int nChanNo)
{
for(auto it = m_vecDataThreadPtr.begin(); it != m_vecDataThreadPtr.end();it++)
{
const fes_iec62541_client::CIEC62541DataProcThreadPtr &ptrThread = *it;
if(ptrThread->getChannelNo() == nChanNo)
{
m_vecDataThreadPtr.erase(it);
LOGINFO("CIEC62541::ClearDataProcThreadByChanNo %d ok",nChanNo);
break;
}
}
}
int CIEC62541::ReadConfigParam()
{
if (m_ptrCFesBase == NULL)
return iotFailed;
m_ProtocolId = m_ptrCFesBase->GetProtocolID((char*)"iec62541_client");
if (m_ProtocolId == -1)
{
LOGERROR("ReadConfigParam() ProtoclID=iec62541_client error");
return iotFailed;
}
LOGINFO("iec62541_client ProtoclID=%d",m_ProtocolId);
CCommonConfigParse config;
fes_iec62541_client::SIEC62541AppConfParam defaultRtuParam;
CFesChanPtr ptrChan = NULL; //CHAN数据区
CFesRtuPtr ptrRTU = NULL;
if (config.load("../../data/fes/", "iec62541_client.xml") == iotFailed)
{
LOGWARN("iec62541 load iec62541_client.xml error");
return iotSuccess;
}
LOGINFO("iec62541 load iec62541_client.xml ok");
for (size_t nChanIdx = 0; nChanIdx < m_ptrCFesBase->m_vectCFesChanPtr.size(); nChanIdx++)
{
ptrChan = m_ptrCFesBase->m_vectCFesChanPtr[nChanIdx];
if(ptrChan->m_Param.Used != 1 || m_ProtocolId != ptrChan->m_Param.ProtocolId)
{
continue;
}
//found RTU
for (size_t nRtuIdx = 0; nRtuIdx < m_ptrCFesBase->m_vectCFesRtuPtr.size(); nRtuIdx++)
{
ptrRTU = m_ptrCFesBase->m_vectCFesRtuPtr[nRtuIdx];
if (!ptrRTU->m_Param.Used || (ptrRTU->m_Param.ChanNo != ptrChan->m_Param.ChanNo))
{
continue;
}
fes_iec62541_client::SIEC62541AppConfParam param = defaultRtuParam;
string strRtuName = "RTU" + IntToString(ptrRTU->m_Param.RtuNo);
parseRtuConfig(config,strRtuName,param);
m_mapConfMap[ptrRTU->m_Param.ChanNo] = param;
}
}
return iotSuccess;
}
int CIEC62541::parseRtuConfig(CCommonConfigParse &configParse,const std::string &strRtuName,fes_iec62541_client::SIEC62541AppConfParam &stParam)
{
LOGDEBUG("CIEC62541:解析RTU参数,RTU=%s",strRtuName.c_str());
//找到就用配置文件,找不到就用构造函数中的值
return iotSuccess;
}

View File

@ -0,0 +1,59 @@
#pragma once
#include "FesDef.h"
#include "FesBase.h"
#include "ProtocolBase.h"
#include "IEC62541DataProcThread.h"
#include "pub_utility_api/CommonConfigParse.h"
extern "C" PROTOCOLBASE_API int EX_SetBaseAddr(void *Address);
extern "C" PROTOCOLBASE_API int EX_SetProperty(int FesStatus);
extern "C" PROTOCOLBASE_API int EX_OpenChan(int MainChanNo,int ChanNo,int OpenFlag);
extern "C" PROTOCOLBASE_API int EX_CloseChan(int MainChanNo,int ChanNo,int CloseFlag);
extern "C" PROTOCOLBASE_API int EX_ChanTimer(int MainChanNo);
extern "C" PROTOCOLBASE_API int EX_ExitSystem(int flag);
class PROTOCOLBASE_API CIEC62541 : public CProtocolBase
{
public:
CIEC62541();
virtual ~CIEC62541();
int SetBaseAddr(void *address);
int SetProperty(int IsMainFes);
int OpenChan(int MainChanNo,int ChanNo,int OpenFlag);
int CloseChan(int MainChanNo,int ChanNo,int CloseFlag);
int ChanTimer(int MainChanNo);
private:
/**
* @brief
*
* @param nChanNo
*/
void ClearDataProcThreadByChanNo(int nChanNo);
/**
* @brief data/fes/iec61850_clientV3
*
* @return iotSuccessiotFailed
*/
int ReadConfigParam();
/**
* @brief RTU对应的参数使
*
* @param configParse
* @param strRtuName RtuNameRTU+
* @param stParam
* @return iotSuccessiotFailed
*/
int parseRtuConfig(CCommonConfigParse &configParse,const std::string &strRtuName, fes_iec62541_client::SIEC62541AppConfParam &stParam);
private:
int m_ProtocolId; //本协议的ID
CFesBase* m_ptrCFesBase; //CProtocolBase类中定义
fes_iec62541_client::CIEC62541DataProcThreadPtrSeq m_vecDataThreadPtr; //存放所有本协议的线程处理指针
boost::unordered_map<int, fes_iec62541_client::SIEC62541AppConfParam> m_mapConfMap; //存储RTU对应的配置参数
};

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,122 @@
#pragma once
#include "FesDef.h"
#include "FesBase.h"
#include "ProtocolBase.h"
#include "IEC62541common.h"
#include "open62541/client.h"
#include "open62541/client_highlevel.h"
#include "open62541/client_config_default.h"
#include "open62541/config.h"
#include "boost/container/map.hpp"
namespace fes_iec62541_client {
class CIEC62541DataProcThread : public CTimerThreadBase,CProtocolBase
{
public:
CIEC62541DataProcThread(CFesBase *ptrCFesBase,const CFesChanPtr &ptrChan,const SIEC62541AppConfParam &stConfParam);
virtual ~CIEC62541DataProcThread();
/*
@brief execute函数前的处理
*/
virtual int beforeExecute();
/*
@brief
*/
virtual void execute();
virtual void beforeQuit();
public:
/**
* @brief
*
* @return iotSuccess
*/
int init();
/**
* @brief
*
* @return
*/
int getChannelNo();
int getRtuNo();
private:
CFesBase* m_ptrCFesBase;
CFesChanPtr m_ptrFesChan; //主通道数据区。如果存在备通道,每次发送接收数据时需要得到当前使用的通道数据
CFesChanPtr m_ptrCurChan; //当前使用通道数据区。如果存在备通,每次发送接收数据时需要得到当前使用的通道数据
CFesRtuPtr m_ptrCurRtu; //当前使用RTU数据区,本协议每个通道对应一个RTU数据所以不需要轮询处理。
SIEC62541AppData m_AppData; //协议内部数据结构
SIEC62541AppConfParam m_stRtuParam; //Rtu配置参数
UA_Client *m_ptrUAClient;
bool m_bReady;
bool m_bConnected;
size_t m_cntReadNodeId;
boost::shared_ptr<UA_ReadValueId[]> m_ptrReadNodeIds;
boost::container::map<int64,int64> m_indexMapDI;//<采集表索引m_readNodeIds索引>
boost::container::map<int64,int64> m_indexMapAI;//<采集表索引m_readNodeIds索引>
boost::container::map<int64,int64> m_indexMapMI;//<采集表索引m_readNodeIds索引>
boost::container::map<int64,int64> m_indexMapACC;//<采集表索引m_readNodeIds索引>
private:
int initClient();
std::string getStatusMessage(UA_StatusCode status);
//先处理命令
int handleCommand();
//批量请求数据
int batchRequestData();
void initReadNodeIds();
int getNodeIdByString(const string& tagStr,std::string& namespaceIndex,std::string& nodeId);
void initNumericNodeId(int64 indexReadNodeId,int64 pointNo,string &strNsNode,std::string& nsStr,std::string& nodeIdStr,boost::container::map<std::string,bool>& mapNumNodeIdInit,
boost::container::map<std::string,int64>& mapNumNodeIdIndex,boost::container::map<int64,int64>& indexMap);
void initStringNodeId(int64 indexReadNodeId,int64 pointNo,string &strNsNode,std::string& nsStr,std::string& nodeIdStr,boost::container::map<std::string,bool>& mapStringNodeIdInit,
boost::container::map<std::string,int64>& mapStringNodeIdIndex,boost::container::map<int64,int64>& indexMap);
void initGUIDNodeId(int64 indexReadNodeId,int64 pointNo,string &strNsNode,std::string& nsStr,std::string& nodeIdStr,boost::container::map<std::string,bool>& mapStringNodeIdInit,
boost::container::map<std::string,int64>& mapStringNodeIdIndex,boost::container::map<int64,int64>& indexMap);
void procDiData(UA_ReadResponse& response);
void procAiData(UA_ReadResponse& response);
void procMiData(UA_ReadResponse& response);
void procAccData(UA_ReadResponse& response);
int getValue(const UA_DataValue* value,const std::vector<int64>& indexVec,DataType dataType,double& result);
int calculateIndex(const UA_DataValue* value,const std::vector<int64>& indexVec,int64& resultOffset);
bool getDataIndex(const string& tagStr,std::vector<int64>& indexVec);
void AoCmdProcess(CFesRtuPtr ptrRtu);
void DoCmdProcess(CFesRtuPtr ptrRtu);
void MoCmdProcess(CFesRtuPtr ptrRtu);
bool hasDataIndex(const string &tagStr, string &resultIndex);
int buildNodeId(int nodeIdType, const string &tagStr,UA_NodeId& nodeId);
int buildValue(int dataType, double curValue, UA_Variant &value);
void BatchRequest(UA_WriteValue *ptrWriteValue, size_t writeSize, std::string strFlag);
};
typedef boost::shared_ptr<CIEC62541DataProcThread> CIEC62541DataProcThreadPtr;
typedef std::vector<CIEC62541DataProcThreadPtr> CIEC62541DataProcThreadPtrSeq;
}

View File

@ -0,0 +1,52 @@
#pragma once
#include "FesDef.h"
#include "FesBase.h"
#include "open62541/client.h"
#include <boost/unordered_map.hpp>
const int CN_IEC62541_MaxProcDataNum = 500;
namespace fes_iec62541_client {
enum DataType
{
SByte=1,
Int16,
Int32,
Int64,
Byte,
UInt16,
UInt32,
UInt64,
Float,
Double,
Boolean,
//String,
//Datetime
};
class CIEC62541DataProcThread;
typedef struct _SIEC62541AppConfParam
{
_SIEC62541AppConfParam()
{
}
}SIEC62541AppConfParam,*P_SIEC62541AppConfParam;
typedef struct _SIEC62541AppData
{
_SIEC62541AppData()
{
}
}SIEC62541AppData,*P_SIEC62541AppData;
}

View File

@ -0,0 +1,41 @@
# ARM板上资源有限,不会与云平台混用,不编译
message("Compile only in x86 environment")
# requires(contains(QMAKE_HOST.arch, x86_64))
requires(!contains(QMAKE_HOST.arch, aarch64):!linux-aarch64*)
QT -= core gui
CONFIG -= qt
TARGET = iec62541_client
TEMPLATE = lib
SOURCES += \
IEC62541.cpp \
IEC62541DataProcThread.cpp
HEADERS += \
IEC62541.h \
IEC62541DataProcThread.h \
IEC62541common.h
INCLUDEPATH += ../../include/
INCLUDEPATH += ../../include/open62541
LIBS += -lboost_system -lboost_thread -lboost_locale -lboost_chrono
LIBS += -lpub_utility_api -lpub_logger_api -llog4cplus -lprotocolbase -lrdb_api
LIBS += -lopen62541 -lhal-shared
DEFINES += PROTOCOLBASE_API_EXPORTS
include($$PWD/../../../idl_files/idl_files.pri)
#-------------------------------------------------------------------
COMMON_PRI=$$PWD/../../../common.pri
exists($$COMMON_PRI) {
include($$COMMON_PRI)
}else {
error("FATAL error: can not find common.pri")
}