[ref]同步711

This commit is contained in:
shi_jq 2025-03-13 11:10:01 +08:00
parent 5ab8b987da
commit d508f04563
7 changed files with 1510 additions and 0 deletions

View File

@ -0,0 +1,292 @@
#include "DNP3Master.h"
#include "pub_utility_api/I18N.h"
#include "pub_utility_api/CharUtil.h"
using namespace iot_public;
DNP3Master g_dnp3master;
bool g_dnp3masterIsMainFes = false;
bool g_dnp3masterChanelRun = true;
int EX_SetBaseAddr(void *address)
{
g_dnp3master.SetBaseAddr(address);
return iotSuccess;
}
int EX_SetProperty(int FesStatus)
{
g_dnp3master.SetProperty(FesStatus);
return iotSuccess;
}
int EX_OpenChan(int MainChanNo,int ChanNo,int OpenFlag)
{
g_dnp3master.OpenChan(MainChanNo,ChanNo,OpenFlag);
return iotSuccess;
}
int EX_CloseChan(int MainChanNo,int ChanNo,int CloseFlag)
{
g_dnp3master.CloseChan(MainChanNo,ChanNo,CloseFlag);
return iotSuccess;
}
int EX_ChanTimer(int ChanNo)
{
g_dnp3master.ChanTimer(ChanNo);
return iotSuccess;
}
int EX_ExitSystem(int flag)
{
LOGDEBUG("dnp3master EX_ExitSystem() start");
boost::ignore_unused_variable_warning(flag);
g_dnp3masterChanelRun = false;//使所有的线程退出。
return iotSuccess;
}
DNP3Master::DNP3Master():m_ProtocolId(-1),m_ptrCFesBase(NULL)
{
}
DNP3Master::~DNP3Master()
{
m_vecDataThreadPtr.clear();
}
int DNP3Master::SetBaseAddr(void *address)
{
if (m_ptrCFesBase == NULL)
{
m_ptrCFesBase = (CFesBase *)address;
}
//规约映射表初始化
if(m_ptrCFesBase != NULL)
{
m_ptrCFesBase->ProtocolRtuInitByParam1((char*)"dnp3_master");
ReadConfigParam(); //加载配置文件中的RTU配置参数
}
return iotSuccess;
}
int DNP3Master::SetProperty(int IsMainFes)
{
g_dnp3masterIsMainFes = (IsMainFes == 1);
LOGDEBUG("dnp3master::SetProperty g_dnp3masterIsMainFes:%d",IsMainFes);
return iotSuccess;
}
/**
* @brief DNP3Master::OpenChan OpenFlag线线
* @param MainChanNo
* @param ChanNo
* @param OpenFlag 1线 2线 3线线
* @return
*/
int DNP3Master::OpenChan(int MainChanNo, int ChanNo, int OpenFlag)
{
CFesChanPtr ptrMainFesChan = GetChanDataByChanNo(MainChanNo);
CFesChanPtr ptrFesChan = GetChanDataByChanNo(ChanNo);
if(ptrMainFesChan == NULL || ptrFesChan == NULL)
{
return iotFailed;
}
if((OpenFlag==CN_FesDataThread_Flag)||(OpenFlag==CN_FesChanAndDataThread_Flag))
{
if (ptrMainFesChan->m_DataThreadRun == CN_FesStopFlag)
{
SDNP3MASTERAppConfParam stRtuConfParam;
auto iterConfig = m_mapConfMap.find(ptrMainFesChan->m_Param.ChanNo);
if(iterConfig != m_mapConfMap.end())
{
stRtuConfParam = iterConfig->second;
}
//open chan thread
DNP3MasterDataProcThreadPtr ptrThread =
boost::make_shared<DNP3MasterDataProcThread>(m_ptrCFesBase,ptrMainFesChan,stRtuConfParam);
if (ptrThread == NULL)
{
LOGERROR("dnp3_master EX_OpenChan() ChanNo:%d create DNP3MasterDataProcThread error!",ptrFesChan->m_Param.ChanNo);
return iotFailed;
}
if(iotSuccess != ptrThread->init())
{
LOGERROR("dnp3_master EX_OpenChan() ChanNo:%d init DNP3MasterDataProcThread error!",ptrFesChan->m_Param.ChanNo);
return iotFailed;
}
m_vecDataThreadPtr.push_back(ptrThread);
ptrThread->resume(); //start Data THREAD
}
}
//使用的是sdk连接
ptrMainFesChan->SetComThreadRunFlag(CN_FesRunFlag);
return iotSuccess;
}
/**
* @brief DNP3Master::CloseChan OpenFlag线线
* @param MainChanNo
* @param ChanNo
* @param CloseFlag 1线 2线 3线线
* @return iotSuccess iotFailed
*/
int DNP3Master::CloseChan(int MainChanNo, int ChanNo, int CloseFlag)
{
CFesChanPtr ptrMainFesChan = GetChanDataByChanNo(MainChanNo);
CFesChanPtr ptrFesChan = GetChanDataByChanNo(ChanNo);
if(ptrMainFesChan == NULL || ptrFesChan == NULL)
{
return iotFailed;
}
//虽然本协议使用sdk不是自己管理连接但是需要执行SetComThreadRunFlag(CN_FesStopFlag),否则冗余状态变化时,无法重新打开通道
if ((CloseFlag == CN_FesChanThread_Flag) || (CloseFlag == CN_FesChanAndDataThread_Flag))
{
ptrFesChan->SetComThreadRunFlag(CN_FesStopFlag);
LOGINFO("ChanNo=%d ptrFesChan->SetComThreadRunFlag(CN_FesStopFlag)", ptrFesChan->m_Param.ChanNo);
}
if((CloseFlag==CN_FesDataThread_Flag)||(CloseFlag==CN_FesChanAndDataThread_Flag))
{
if (ptrMainFesChan->m_DataThreadRun == CN_FesRunFlag)
{
//close data thread
ClearDataProcThreadByChanNo(MainChanNo);
}
}
return iotSuccess;
}
/**
* @brief DNP3Master::ChanTimer
* @param MainChanNo
* @return
*/
int DNP3Master::ChanTimer(int MainChanNo)
{
boost::ignore_unused_variable_warning(MainChanNo);
return iotSuccess;
}
/**
* @brief DNP3Master::ReadConfigParam
* @return
*/
int DNP3Master::ReadConfigParam()
{
if (m_ptrCFesBase == NULL)
return iotFailed;
m_ProtocolId = m_ptrCFesBase->GetProtocolID((char*)"dnp3_master");
if (m_ProtocolId == -1)
{
LOGERROR("ReadConfigParam() ProtoclID=dnp3_master error");
return iotFailed;
}
LOGINFO("dnp3_master ProtoclID=%d",m_ProtocolId);
CCommonConfigParse config;
SDNP3MASTERAppConfParam defaultRtuParam;
CFesChanPtr ptrChan = NULL; //CHAN数据区
CFesRtuPtr ptrRTU = NULL;
if (config.load("../../data/fes/", "dnp3_master.xml") == iotFailed)
{
LOGWARN("dnp3Master load dnp3_master.xml error");
return iotSuccess;
}
parseRtuConfig(config,"RTU?",defaultRtuParam);
LOGDEBUG("dnp3_master nchanidx=%d",m_ptrCFesBase->m_vectCFesChanPtr.size());
for (size_t nChanIdx = 0; nChanIdx < m_ptrCFesBase->m_vectCFesChanPtr.size(); nChanIdx++)
{
ptrChan = m_ptrCFesBase->m_vectCFesChanPtr[nChanIdx];
if(ptrChan->m_Param.Used != 1 || m_ProtocolId != ptrChan->m_Param.ProtocolId)
{
continue;
}
//found RTU
for (size_t nRtuIdx = 0; nRtuIdx < m_ptrCFesBase->m_vectCFesRtuPtr.size(); nRtuIdx++)
{
//LOGDEBUG("dnp3_master nRtuIdx=%d",nRtuIdx);
ptrRTU = m_ptrCFesBase->m_vectCFesRtuPtr[nRtuIdx];
//LOGDEBUG("dnp3_master Used=%d ptrRTU->m_Param.ChanNo=%d ptrChan->m_Param.ChanNo=%d",ptrRTU->m_Param.Used , ptrRTU->m_Param.ChanNo , ptrChan->m_Param.ChanNo);
if (!ptrRTU->m_Param.Used || (ptrRTU->m_Param.ChanNo != ptrChan->m_Param.ChanNo))
{
continue;
}
SDNP3MASTERAppConfParam param = defaultRtuParam;
string strRtuName = "RTU" + IntToString(ptrRTU->m_Param.RtuNo);
parseRtuConfig(config,strRtuName,param);
m_mapConfMap[ptrRTU->m_Param.ChanNo] = param;
}
}
return iotSuccess;
}
int DNP3Master::parseRtuConfig(CCommonConfigParse &configParse,const std::string &strRtuName,SDNP3MASTERAppConfParam &stParam)
{
LOGDEBUG("dnp_master:解析RTU参数,RTU=%s",strRtuName.c_str());
if(iotSuccess != configParse.getIntValue(strRtuName,"responseTimeout",stParam.responseTimeout))
{
//采用默认配置
return iotFailed;
}
configParse.getIntValue(strRtuName,"callAllClassTime",stParam.callAllClassTime);
configParse.getIntValue(strRtuName,"callClass0Time",stParam.callClass0Time);
configParse.getIntValue(strRtuName,"callClass1Time",stParam.callClass1Time);
configParse.getIntValue(strRtuName,"callClass2Time",stParam.callClass2Time);
configParse.getBoolValue(strRtuName,"disableUnsolOnStartup",stParam.disableUnsolOnStartup);
configParse.getBoolValue(strRtuName,"performTimeSync",stParam.performTimeSync);
configParse.getBoolValue(strRtuName,"integrityOnEventOverflowIIN",stParam.integrityOnEventOverflowIIN);
configParse.getIntValue(strRtuName,"controlQualifierMode",stParam.controlQualifierMode);
configParse.getIntValue(strRtuName,"maxTxFragSize",stParam.maxTxFragSize);
configParse.getIntValue(strRtuName,"maxRxFragSize",stParam.maxRxFragSize);
configParse.getIntValue(strRtuName,"minRetryDelay",stParam.minRetryDelay);
configParse.getIntValue(strRtuName,"maxRetryDelay",stParam.maxRetryDelay);
configParse.getIntValue(strRtuName,"taskRetryPeriod",stParam.taskRetryPeriod);
configParse.getIntValue(strRtuName,"maxTaskRetryPeriod",stParam.maxTaskRetryPeriod);
configParse.getIntValue(strRtuName,"taskStartTimeout",stParam.taskStartTimeout);
LOGDEBUG("dnp_master:解析RTU参数,RTU=%s end",strRtuName.c_str());
}
void DNP3Master::ClearDataProcThreadByChanNo(int nChanNo)
{
for(auto it = m_vecDataThreadPtr.begin(); it != m_vecDataThreadPtr.end();it++)
{
const DNP3MasterDataProcThreadPtr &ptrThread = *it;
if( ptrThread->getChannelNo() == nChanNo)
{
m_vecDataThreadPtr.erase(it);
LOGINFO("dnp3_master::ClearDataProcThreadByChanNo %d ok",nChanNo);
break;
}
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include <vector>
#include "Export.h"
#include "FesDef.h"
#include "FesBase.h"
#include "ProtocolBase.h"
#include "DNP3MasterDataProcThread.h"
#include "pub_utility_api/CommonConfigParse.h"
#include "DNP3MasterCommon.h"
extern "C" PROTOCOLBASE_API int EX_SetBaseAddr(void *Address);
extern "C" PROTOCOLBASE_API int EX_SetProperty(int FesStatus);
extern "C" PROTOCOLBASE_API int EX_OpenChan(int MainChanNo,int ChanNo,int OpenFlag);
extern "C" PROTOCOLBASE_API int EX_CloseChan(int MainChanNo,int ChanNo,int CloseFlag);
extern "C" PROTOCOLBASE_API int EX_ChanTimer(int MainChanNo);
extern "C" PROTOCOLBASE_API int EX_ExitSystem(int flag);
class PROTOCOLBASE_API DNP3Master : public CProtocolBase
{
public:
DNP3Master();
virtual ~DNP3Master();
int SetBaseAddr(void *address);
int SetProperty(int IsMainFes);
int OpenChan(int MainChanNo,int ChanNo,int OpenFlag);
int CloseChan(int MainChanNo,int ChanNo,int CloseFlag);
int ChanTimer(int MainChanNo);
private:
int ReadConfigParam();
int parseRtuConfig(CCommonConfigParse &configParse, const std::string &strRtuName, SDNP3MASTERAppConfParam &stParam);
void ClearDataProcThreadByChanNo(int nChanNo);
private:
int m_ProtocolId; //本协议的ID
CFesBase* m_ptrCFesBase; //CProtocolBase类中定义
DNP3MasterDataProcThreadPtrSeq m_vecDataThreadPtr; //存放所有本协议的线程处理指针
boost::unordered_map<int,SDNP3MASTERAppConfParam> m_mapConfMap; //存储RTU对应的配置参数
};

View File

@ -0,0 +1,609 @@
#include "DNP3MasterDataProcThread.h"
#include <opendnp3/master/DefaultMasterApplication.h>
#include <opendnp3/master/PrintingCommandResultCallback.h>
#include <opendnp3/master/PrintingSOEHandler.h>
extern bool g_dnp3masterIsMainFes;
extern bool g_dnp3masterChanelRun;
DNP3MasterDataProcThread::DNP3MasterDataProcThread(CFesBase *ptrCFesBase, const CFesChanPtr &ptrChan, const SDNP3MASTERAppConfParam &stConfParam)
:CTimerThreadBase("DNP3MasterDataProcThread",CN_RunPeriodMsec,0,true),
m_ptrCFesBase(ptrCFesBase),
m_ptrFesChan(ptrChan),
m_ptrCurChan(NULL),
m_ptrCurRtu(NULL),
m_pDnpManager(NULL),
m_channelListener(std::make_shared<DNP3ChannelListener>()),
m_stRtuParam(stConfParam),
m_curIP(0),
m_pChannelRetry(NULL)
{
m_SignalPointList.clear();
m_DoublePointList.clear();
}
DNP3MasterDataProcThread::~DNP3MasterDataProcThread()
{
quit();//在调用quit()前系统会调用beforeQuit();
//虽然本协议使用sdk不是自己管理连接但是需要执行SetComThreadRunFlag(CN_FesStopFlag),否则冗余状态变化时,无法重新打开通道
m_ptrCurChan->SetLinkStatus(CN_FesChanDisconnect);
m_ptrFesChan->SetDataThreadRunFlag(CN_FesStopFlag);
if(m_ptrCurRtu != NULL)
{
m_ptrCFesBase->WriteRtuSatus(m_ptrCurRtu, CN_FesRtuComDown);
}
//waitadd
//需要处理dnp3连接的问题
if (m_pDnpManager)
{
m_pDnpManager->Shutdown();
delete m_pDnpManager;
}
m_pDnpManager = NULL;
if(m_pChannelRetry)
{
delete m_pChannelRetry;
}
m_pChannelRetry = NULL;
}
int DNP3MasterDataProcThread::beforeExecute()
{
return 0;
}
void DNP3MasterDataProcThread::execute()
{
if (!m_channel || !m_master)
{
LOGINFO("DNP3 execute m_channel not exist! try init..");
CreateConnection();
return;
}
if(!g_dnp3masterChanelRun) //收到线程退出标志不再往下执行
return;
if (m_channelListener->GetState() != ChannelState::OPEN)
{
m_ptrCurChan->SetLinkStatus(CN_FesChanConnect);
m_ptrCurRtu->WriteRtuPointsComStatus(CN_FesRtuComDown);
m_ptrCFesBase->WriteRtuSatus(m_ptrCurRtu, CN_FesRtuComDown);
LOGINFO("DNP3 channel not connected Wait AutoReconnect...");
return;
}else if( m_channelListener->GetState() == ChannelState::OPEN )
{
m_ptrCurChan->SetLinkStatus(CN_FesChanConnect);
m_ptrCurRtu->WriteRtuPointsComStatus(CN_FesRtuNormal);
m_ptrCFesBase->WriteRtuSatus(m_ptrCurRtu, CN_FesRtuNormal);
}
if (m_master)
{
//auto commandProcessor = m_master->GetCommandProcessor();
//m_master->ScanAllObjects(GroupVariationID(60, 1), PrintingSOEHandler::Create(), TaskConfig::Default());
// ControlRelayOutputBlock crob(ControlCode::LATCH_ON);
// commandProcessor->DirectOperate(crob, 0, TaskConfig::Default());
}
handleCommand();
}
void DNP3MasterDataProcThread::beforeQuit()
{
}
int DNP3MasterDataProcThread::getChannelNo()
{
return m_ptrFesChan->m_Param.ChanNo;
}
int DNP3MasterDataProcThread::getRtuNo()
{
return m_ptrCurRtu->m_Param.RtuNo;
}
int DNP3MasterDataProcThread::init()
{
LOGDEBUG("DNP3MasterDataProcThread init start!");
m_ptrCurChan = m_ptrFesChan;
m_ptrCurRtu = GetRtuDataByChanData(m_ptrFesChan);
if ((m_ptrFesChan == NULL) || (m_ptrCurRtu == NULL))
{
LOGERROR("DNP3MasterDataProcThread m_ptrCurRtu或m_ptrFesChan为空");
return iotFailed;
}
//waitadd
//这里是否要像61850一样处理双IP和连接时间 后续处理
m_curIP = 0;
//设置通讯状态
m_ptrFesChan->SetDataThreadRunFlag(CN_FesRunFlag);
m_ptrCFesBase->WriteRtuSatus(m_ptrCurRtu, CN_FesRtuComDown);
if(iotSuccess != initDNP3ConnPara())
{
LOGWARN("DNP3MasterDataProcThread m_pDnpManager init error! ChanNo=%d" , getChannelNo());
return iotFailed;
}
//形成单双点的映射
InitDigitalMapping();
return iotSuccess;
}
int DNP3MasterDataProcThread::initDNP3ConnPara()
{
//配置dnp管理器
if( m_pDnpManager != NULL)
{
LOGWARN("DNP3MasterDataProcThread m_pDnpManager already init! ChanNo=%d" , getChannelNo());
return iotSuccess;
}
m_pDnpManager = new DNP3Manager(CN_THREAD_SIZE , DNP3LoggHandler::Create());
if( NULL == m_pDnpManager)
{
LOGERROR("DNP3MasterDataProcThread m_pDnpManager new faliure!");
return iotFailed;
}
//配置dnp3的连接相关参数
m_pChannelRetry = new ChannelRetry(TimeDuration::Seconds(m_stRtuParam.minRetryDelay), TimeDuration::Seconds(m_stRtuParam.maxRetryDelay));
m_stackConfig.master.responseTimeout = TimeDuration::Seconds(m_stRtuParam.responseTimeout);
m_stackConfig.master.disableUnsolOnStartup = m_stRtuParam.disableUnsolOnStartup;
m_stackConfig.master.timeSyncMode = m_stRtuParam.performTimeSync?TimeSyncMode::NonLAN:TimeSyncMode::None;
m_stackConfig.master.maxTxFragSize = m_stRtuParam.maxTxFragSize;
m_stackConfig.master.maxRxFragSize = m_stRtuParam.maxRxFragSize;
m_stackConfig.master.integrityOnEventOverflowIIN = m_stRtuParam.integrityOnEventOverflowIIN;
m_stackConfig.master.taskRetryPeriod = TimeDuration::Seconds(m_stRtuParam.taskRetryPeriod);
m_stackConfig.master.maxTaskRetryPeriod = TimeDuration::Minutes(m_stRtuParam.maxTaskRetryPeriod);
m_stackConfig.master.taskStartTimeout = TimeDuration::Seconds(m_stRtuParam.taskStartTimeout);
m_stackConfig.master.startupIntegrityClassMask = ClassField::AllClasses();
m_stackConfig.master.controlQualifierMode = m_stRtuParam.controlQualifierMode == 1?IndexQualifierMode::allow_one_byte:IndexQualifierMode::always_two_bytes;
m_stackConfig.link.LocalAddr = m_ptrCurRtu->m_Param.ResParam1 <= 0?1: m_ptrCurRtu->m_Param.ResParam1; //主站地址若没配置默认1
m_stackConfig.link.RemoteAddr = m_ptrCurRtu->m_Param.RtuAddr;
return iotSuccess;
}
int DNP3MasterDataProcThread::CreateConnection()
{
if (!m_pDnpManager || !m_ptrFesChan)
{
LOGERROR("CreateConnection m_pDnpManager or m_ptrFesChan is NULL!");
return iotFailed;
}
if (m_channel)
{
m_channel->Shutdown();
m_channel.reset();
}
if (m_master)
{
m_master.reset();
}
LOGINFO("DNP3 CreateConnection CreateConnection!");
//日志等级
const auto logLevels = levels::NORMAL | levels::ALL_APP_COMMS;
std::string channelName = m_ptrFesChan->m_Param.ChanName;
std::string materName = m_ptrCurRtu->m_Param.RtuName;
IPEndpoint IpAndPort(m_ptrFesChan->m_Param.NetRoute[m_curIP].NetDesc ,m_ptrFesChan->m_Param.NetRoute[m_curIP].PortNo);
char ServerIp[CN_FesMaxNetDescSize]; //通道IP
memset(ServerIp, 0, CN_FesMaxNetDescSize);
m_channel = m_pDnpManager->AddTCPClient(
channelName.c_str(),
logLevels,
*m_pChannelRetry,
{IpAndPort},
ServerIp,
m_channelListener
);
m_master = m_channel->AddMaster(
materName.c_str(),
MasterSOEHandler::Create(this),
std::make_shared<DefaultMasterApplication>(),
m_stackConfig
);
// // 为 AddClassScan 创建 SOEHandler
// auto integritySOEHandler = std::make_shared<ScanSOEHandler>("完整性扫描");
// auto exceptionSOEHandler = std::make_shared<ScanSOEHandler>("异常扫描");
// 添加周期性扫描
m_master->AddClassScan(
ClassField::AllClasses(), // 每分钟完整性扫描
TimeDuration::Minutes(1),
MasterSOEHandler::Create(this)
);
// m_master->AddClassScan(
// ClassField(ClassField::CLASS_1), // 每5秒异常扫描
// TimeDuration::Seconds(5),
// PrintingSOEHandler::Create()
// );
m_master->Enable();
LOGINFO("DNP3 CreateConnection Start Connect!");
}
void DNP3MasterDataProcThread::handleCommand()
{
}
void DNP3MasterDataProcThread::procDiDataValue(const HeaderInfo &info, const opendnp3::ICollection<opendnp3::Indexed<Binary> > &values)
{
LOGDEBUG("DNP3MasterDataProcThread procDiDataValue Now! HeaderInfo group: %s , qualifier: %s , tsquality: %s , isEventVariation :%d , flagsValid:%d , headerIndex :%d!",
GroupVariationSpec::to_human_string(info.gv), QualifierCodeSpec::to_human_string(info.qualifier),
TimestampQualitySpec::to_human_string(info.tsquality), info.isEventVariation, info.flagsValid, info.headerIndex);
SFesRtuDiValue DiValue[256];
SFesChgDi ChgDi[256];
int changPointNum = 0, ChgCount = 0;
//处理分为单点以及双点处理
auto process = [this, &info, &DiValue, &ChgDi, &ChgCount , &changPointNum](const opendnp3::Indexed<opendnp3::Binary>& item)
{
int pointIndex = -1 , diVal;
try{
pointIndex = m_SignalPointList.at(item.index);
}catch(const std::out_of_range& e)
{
LOGERROR("signalPoint out of range!index:" , item.index);
return;
}
auto* pDi = GetFesDiByPointNo(m_ptrCurRtu, pointIndex);
if (pDi == NULL)
return;
if (changPointNum >= 256 || ChgCount >= 256)
{
LOGDEBUG("Warning: changPointNum or ChgCount exceeds array size!");
return;
}
//获取值、时间戳
byte status = CN_FesValueUpdate;
diVal = item.value.value;
uint64_t lTime = getUTCTimeMsec();
if (info.flagsValid)
{
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::BinaryQuality::RESTART))
{
status = CN_FesValueNotUpdate;
}
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::BinaryQuality::COMM_LOST))
{
status |= CN_FesValueComDown;
}
if ( !(item.value.flags.value & static_cast<uint8_t>(opendnp3::BinaryQuality::ONLINE)))
{
status |= CN_FesValueInvaild;
}
};
if (info.tsquality == TimestampQuality::SYNCHRONIZED)
{
lTime = item.value.time.value;
}
if( diVal != pDi->Value && (g_dnp3masterIsMainFes == true))
{
memcpy(ChgDi[ChgCount].TableName,pDi->TableName,CN_FesMaxTableNameSize);
memcpy(ChgDi[ChgCount].ColumnName,pDi->ColumnName,CN_FesMaxColumnNameSize);
memcpy(ChgDi[ChgCount].TagName,pDi->TagName,CN_FesMaxTagSize);
ChgDi[ChgCount].Value = diVal;
ChgDi[ChgCount].Status = status;
ChgDi[ChgCount].time = lTime;
ChgDi[ChgCount].RtuNo = m_ptrCurRtu->m_Param.RtuNo;
ChgDi[ChgCount].PointNo = pDi->PointNo;
LOGTRACE("变化遥信 RtuNo:%d PointNo:%d %s value=%d status=%d ms=%" PRId64 ,
getRtuNo(), pDi->PointNo, ChgDi[ChgCount].TableName, ChgDi[ChgCount].Value,ChgDi[ChgCount].Status, ChgDi[ChgCount].time);
ChgCount++;
}
//waitadd
//在这里是否需要添加SOE的内容还是说在处理未请求事件的收做
//更新点值
DiValue[changPointNum].PointNo = pDi->PointNo;
DiValue[changPointNum].Value = diVal;
DiValue[changPointNum].Status = status;
DiValue[changPointNum].time = lTime;
changPointNum++;
};
values.ForeachItem(process);
if( ChgCount > 0 )
{
m_ptrCFesBase->WriteChgDiValue(m_ptrCurRtu,ChgCount,&ChgDi[0]);
}
if( changPointNum > 0 )
{
m_ptrCurRtu->WriteRtuDiValue(changPointNum, &DiValue[0]);
}
LOGDEBUG("procDiDataValue end! changPointNum:%d, ChgCount:%d", changPointNum, ChgCount);
}
void DNP3MasterDataProcThread::procDoubleDiDataValue(const HeaderInfo &info, const opendnp3::ICollection<opendnp3::Indexed<DoubleBitBinary> > &values)
{
LOGDEBUG("DNP3MasterDataProcThread procDoubleDiDataValue Now! HeaderInfo group: %s , qualifier: %s , tsquality: %s , isEventVariation :%d , flagsValid:%d , headerIndex :%d!",
GroupVariationSpec::to_human_string(info.gv), QualifierCodeSpec::to_human_string(info.qualifier),
TimestampQualitySpec::to_human_string(info.tsquality), info.isEventVariation, info.flagsValid, info.headerIndex);
SFesRtuDiValue DiValue[256];
SFesChgDi ChgDi[256];
int changPointNum = 0, ChgCount = 0;
InitDigitalMapping();
//处理分为单点以及双点处理
auto process = [this, &info, &DiValue, &ChgDi, &ChgCount ,&changPointNum](const opendnp3::Indexed<opendnp3::DoubleBitBinary>& item)
{
int pointIndex = -1 , diVal;
try{
pointIndex = m_DoublePointList.at(item.index);
}catch(const std::out_of_range& e)
{
LOGERROR("doublePoint out of range!index:" , item.index);
return;
}
auto* pDi = GetFesDiByPointNo(m_ptrCurRtu, pointIndex);
if (pDi == NULL)
return;
if (changPointNum >= 256 || ChgCount >= 256)
{
LOGDEBUG("Warning: changPointNum or ChgCount exceeds array size!");
return;
}
//获取值、时间戳
byte status = CN_FesValueUpdate;
diVal = DoubleBitSpec::to_type( item.value.value );
uint64_t lTime = getUTCTimeMsec();
if (info.flagsValid)
{
if ( !(item.value.flags.value & static_cast<uint8_t>(opendnp3::DoubleBitBinaryQuality::ONLINE)))
{
status |= CN_FesValueInvaild;
}
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::DoubleBitBinaryQuality::RESTART))
{
status = CN_FesValueNotUpdate;
}
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::DoubleBitBinaryQuality::COMM_LOST))
{
status |= CN_FesValueComDown;
}
};
if (info.tsquality == TimestampQuality::SYNCHRONIZED)
{
lTime = item.value.time.value;
}
if( diVal != pDi->Value && (g_dnp3masterIsMainFes == true))
{
memcpy(ChgDi[ChgCount].TableName,pDi->TableName,CN_FesMaxTableNameSize);
memcpy(ChgDi[ChgCount].ColumnName,pDi->ColumnName,CN_FesMaxColumnNameSize);
memcpy(ChgDi[ChgCount].TagName,pDi->TagName,CN_FesMaxTagSize);
ChgDi[ChgCount].Value = diVal;
ChgDi[ChgCount].Status = status;
ChgDi[ChgCount].time = lTime;
ChgDi[ChgCount].RtuNo = m_ptrCurRtu->m_Param.RtuNo;
ChgDi[ChgCount].PointNo = pDi->PointNo;
LOGTRACE("变化遥信 RtuNo:%d PointNo:%d %s value=%d status=%d ms=%" PRId64 ,
getRtuNo(), pDi->PointNo, ChgDi[ChgCount].TableName, ChgDi[ChgCount].Value,ChgDi[ChgCount].Status, ChgDi[ChgCount].time);
ChgCount++;
}
//waitadd
//在这里是否需要添加SOE的内容还是说在处理未请求事件的收做
//更新点值
DiValue[changPointNum].PointNo = pDi->PointNo;
DiValue[changPointNum].Value = diVal;
DiValue[changPointNum].Status = status;
DiValue[changPointNum].time = lTime;
changPointNum++;
};
values.ForeachItem(process);
if( ChgCount > 0 )
{
m_ptrCFesBase->WriteChgDiValue(m_ptrCurRtu,ChgCount,&ChgDi[0]);
}
if( changPointNum > 0 )
{
m_ptrCurRtu->WriteRtuDiValue(changPointNum, &DiValue[0]);
}
LOGDEBUG("procDiDataValue end! changPointNum:%d, ChgCount:%d", changPointNum, ChgCount);
}
void DNP3MasterDataProcThread::procAiDataValue(const HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::Analog>>& values)
{
LOGDEBUG("DNP3MasterDataProcThread procAiDataValue Now! HeaderInfo group: %s , qualifier: %s , tsquality: %s , isEventVariation :%d , flagsValid:%d , headerIndex :%d!",
GroupVariationSpec::to_human_string(info.gv), QualifierCodeSpec::to_human_string(info.qualifier),
TimestampQualitySpec::to_human_string(info.tsquality), info.isEventVariation, info.flagsValid, info.headerIndex);
SFesRtuAiValue AiValue[256];
SFesChgAi ChgAi[256];
int changPointNum = 0, ChgCount = 0;
auto process = [this, &info, &AiValue, &changPointNum](const opendnp3::Indexed<opendnp3::Analog>& item) {
int pointIndex = item.index;
auto* pAi = GetFesAiByPointNo(m_ptrCurRtu, pointIndex);
if (pAi == NULL)
return;
if (changPointNum >= 256)
{
LOGDEBUG("Warning: changPointNum exceeds array size!");
return;
}
AiValue[changPointNum].Status = CN_FesValueUpdate;
float fval = static_cast<float>(item.value.value);
uint64_t lTime = getUTCTimeMsec();
if (info.flagsValid)
{
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::AnalogQuality::RESTART))
{
AiValue[changPointNum].Status = CN_FesValueNotUpdate;
}
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::AnalogQuality::COMM_LOST))
{
AiValue[changPointNum].Status |= CN_FesValueComDown;
}
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::AnalogQuality::OVERRANGE))
{
AiValue[changPointNum].Status |= CN_FesValueExceed;
}
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::AnalogQuality::REFERENCE_ERR))
{
fval = 0.0f;
AiValue[changPointNum].Status |= CN_FesValueInvaild;
}
}
if (info.tsquality == TimestampQuality::SYNCHRONIZED)
{
lTime = item.value.time.value;
}
AiValue[changPointNum].PointNo = pAi->PointNo;
AiValue[changPointNum].Value = fval;
AiValue[changPointNum].time = lTime;
changPointNum++;
};
values.ForeachItem(process);
m_ptrCurRtu->WriteRtuAiValueAndRetChg(changPointNum, &AiValue[0], &ChgCount, &ChgAi[0]);
if (ChgCount > 0)
{
m_ptrCFesBase->WriteChgAiValue(m_ptrCurRtu, ChgCount, &ChgAi[0]);
}
LOGDEBUG("procAiDataValue end! changPointNum:%d, ChgCount:%d", changPointNum, ChgCount);
}
void DNP3MasterDataProcThread::procAccDataValue(const HeaderInfo &info, const opendnp3::ICollection<opendnp3::Indexed<Counter> > &values)
{
LOGDEBUG("DNP3MasterDataProcThread procAccDataValue Now! HeaderInfo group: %s , qualifier: %s , tsquality: %s , isEventVariation :%d , flagsValid:%d , headerIndex :%d!",
GroupVariationSpec::to_human_string(info.gv), QualifierCodeSpec::to_human_string(info.qualifier),
TimestampQualitySpec::to_human_string(info.tsquality), info.isEventVariation, info.flagsValid, info.headerIndex);
SFesRtuAccValue AccValue[256];
SFesChgAcc ChgAcc[256];
int changPointNum = 0, ChgCount = 0;
auto process = [this, &info, &AccValue, &changPointNum](const opendnp3::Indexed<opendnp3::Counter>& item) {
int pointIndex = item.index;
auto* pAi = GetFesAccByPointNo(m_ptrCurRtu, pointIndex);
if (pAi == NULL)
return;
if (changPointNum >= 256)
{
LOGDEBUG("Warning: changPointNum exceeds array size!");
return;
}
AccValue[changPointNum].Status = CN_FesValueUpdate;
uint32_t iValue32 = item.value.value;
uint64_t lTime = getUTCTimeMsec();
if (info.flagsValid)
{
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::CounterQuality::RESTART))
{
AccValue[changPointNum].Status = CN_FesValueNotUpdate;
}
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::CounterQuality::COMM_LOST))
{
AccValue[changPointNum].Status |= CN_FesValueComDown;
}
if (item.value.flags.value & static_cast<uint8_t>(opendnp3::CounterQuality::DISCONTINUITY ))
{
AccValue[changPointNum].Status |= CN_FesValueInvaild;
}
}
if (info.tsquality == TimestampQuality::SYNCHRONIZED)
{
lTime = item.value.time.value;
}
AccValue[changPointNum].PointNo = pAi->PointNo;
AccValue[changPointNum].Value = static_cast<double>(iValue32);
AccValue[changPointNum].time = lTime;
changPointNum++;
};
values.ForeachItem(process);
m_ptrCurRtu->WriteRtuAccValueAndRetChg(changPointNum, &AccValue[0], &ChgCount, &ChgAcc[0]);
if (ChgCount > 0)
{
m_ptrCFesBase->WriteChgAccValue(m_ptrCurRtu, ChgCount, &ChgAcc[0]);
}
LOGDEBUG("procAiDataValue end! changPointNum:%d, ChgCount:%d", changPointNum, ChgCount);
}
void DNP3MasterDataProcThread::InitDigitalMapping()
{
//区分单双点遥信的映射
if( NULL == m_ptrCurRtu )
{
LOGERROR("InitDigitalMapping m_ptrCurRtu is NULL!");
return;
}
SFesDi *pDi = NULL;
m_SignalPointList.clear();
m_DoublePointList.clear();
for (int j = 0; j < m_ptrCurRtu->m_MaxDiPoints; j++)
{
pDi = (m_ptrCurRtu->m_pDi)+j;
if( pDi->Param1 == 1) //双点
{
m_DoublePointList.push_back(pDi->PointNo);
}else
{
m_SignalPointList.push_back(pDi->PointNo);
}
}
}

View File

@ -0,0 +1,266 @@
#ifndef DNP3MASTERDATAPROCTHREAD_H
#define DNP3MASTERDATAPROCTHREAD_H
#include "FesDef.h"
#include "FesBase.h"
#include "ProtocolBase.h"
#include "DNP3MasterCommon.h"
#include <opendnp3/DNP3Manager.h>
#include <sstream>
#include <string>
using namespace opendnp3;
using namespace std;
class DNP3ChannelListener;
class DNP3MasterDataProcThread : public CTimerThreadBase,CProtocolBase
{
public:
DNP3MasterDataProcThread(CFesBase *ptrCFesBase,const CFesChanPtr &ptrChan,const SDNP3MASTERAppConfParam &stConfParam);
virtual ~DNP3MasterDataProcThread();
/*
@brief execute函数前的处理
*/
virtual int beforeExecute();
/*
@brief
*/
virtual void execute();
virtual void beforeQuit();
int getChannelNo(); //获取当前通道号
int getRtuNo(); //获取当前RTU号
int init();
int initDNP3ConnPara(); //配置dnp3的连接参数
int CreateConnection(); //建立连接
void handleCommand(); //命令处理
void procDiDataValue(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::Binary>>& values);
void procDoubleDiDataValue(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::DoubleBitBinary>>& values);
void procAiDataValue(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::Analog>>& values);
void procMiDataValue();
void procAccDataValue(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::Counter>>& values);
void InitDigitalMapping();
private:
CFesBase* m_ptrCFesBase;
CFesChanPtr m_ptrFesChan; //主通道数据区。如果存在备通道,每次发送接收数据时需要得到当前使用的通道数据
CFesChanPtr m_ptrCurChan; //当前使用通道数据区。如果存在备通,每次发送接收数据时需要得到当前使用的通道数据
CFesRtuPtr m_ptrCurRtu; //当前使用RTU数据区,本协议每个通道对应一个RTU数据所以不需要轮询处理。
DNP3Manager* m_pDnpManager; //dnp3库根管理器 管理主从站连接
std::shared_ptr<DNP3ChannelListener> m_channelListener; // 通道状态监听器
SDNP3MASTERAppConfParam m_stRtuParam; //Rtu配置参数
std::shared_ptr<IChannel> m_channel; //dnp3通道
std::shared_ptr<IMaster> m_master; //dnp3主站指针
MasterStackConfig m_stackConfig; //dnp3主站配置参数
ChannelRetry* m_pChannelRetry; //重连时间配置
int m_curIP; //0=主通道 1=备通道
std::vector<int> m_SignalPointList; //单点映射
std::vector<int> m_DoublePointList; //双点映射
};
typedef boost::shared_ptr<DNP3MasterDataProcThread> DNP3MasterDataProcThreadPtr;
typedef std::vector<DNP3MasterDataProcThreadPtr> DNP3MasterDataProcThreadPtrSeq;
//dnp3日志类实现
class DNP3LoggHandler final : public ILogHandler, private Uncopyable
{
public:
void log(ModuleId module, const char* id, LogLevel level, const char* location, const char* message) override {
// 输出日志信息到控制台
std::stringstream sstream;
sstream << "Module: " << module.value
<< ", ID: " << id
<< ", Location: " << location
<< ", Message: " << message;
std::string levelStr = levelToString(level);
// 检查 level 中的每个标志并调用对应的日志函数
if (level.value & opendnp3::flags::ERR.value) {
LOGERROR("DNP3LoggHandler logtype:%s logContext:%s",
"ERROR", sstream.str().c_str());
}
if (level.value & opendnp3::flags::WARN.value) {
LOGWARN("DNP3LoggHandler logtype:%s logContext:%s",
"WARNING", sstream.str().c_str());
}
if (level.value & opendnp3::flags::INFO.value) {
LOGINFO("DNP3LoggHandler logtype:%s logContext:%s",
"INFO", sstream.str().c_str());
}
if (level.value & opendnp3::flags::EVENT.value) {
LOGINFO("DNP3LoggHandler logtype:%s logContext:%s",
"EVENT", sstream.str().c_str()); // EVENT 可以映射到 INFO
}
// 调试级别:处理通信相关的日志(可选)
if (level.value & (opendnp3::flags::APP_HEX_RX.value | opendnp3::flags::APP_HEX_TX.value |
opendnp3::flags::LINK_RX.value | opendnp3::flags::LINK_TX.value)) {
LOGDEBUG("DNP3LoggHandler logtype:%s logContext:%s",
levelStr.c_str(), sstream.str().c_str());
}
}
static std::shared_ptr<opendnp3::ILogHandler> Create()
{
return std::make_shared<DNP3LoggHandler>();
}
private:
// 将 LogLevel 转换为字符串(可选,用于调试或显示)
std::string levelToString(opendnp3::LogLevel &level) {
std::string result;
if (level.value & opendnp3::flags::ERR.value) result += "ERROR ";
if (level.value & opendnp3::flags::WARN.value) result += "WARNING ";
if (level.value & opendnp3::flags::INFO.value) result += "INFO ";
if (level.value & opendnp3::flags::EVENT.value) result += "EVENT ";
if (level.value & opendnp3::flags::APP_HEX_RX.value) result += "APP_HEX_RX ";
if (level.value & opendnp3::flags::APP_HEX_TX.value) result += "APP_HEX_TX ";
if (level.value & opendnp3::flags::LINK_RX.value) result += "LINK_RX ";
return result.empty() ? "UNKNOWN" : result;
}
};
//dnp3通道监视器
class DNP3ChannelListener final : public opendnp3::IChannelListener
{
public:
virtual void OnStateChange(ChannelState state) override
{
std::stringstream sstr;
sstr<< "DNP3ChannelListener channel state change: " << ChannelStateSpec::to_human_string(state);
LOGDEBUG("%s" , sstr.str().c_str());
m_state = state;
}
static std::shared_ptr<opendnp3::IChannelListener> Create()
{
return std::make_shared<DNP3ChannelListener>();
}
DNP3ChannelListener() :m_state(opendnp3::ChannelState::CLOSED) {}
opendnp3::ChannelState GetState() const { return m_state; }
private:
opendnp3::ChannelState m_state;
};
//MasterSOEHandler 非特定扫描事件处理
class MasterSOEHandler final : public ISOEHandler
{
public:
MasterSOEHandler(DNP3MasterDataProcThread *pProc):m_pDataProcThread(pProc){}
static std::shared_ptr<ISOEHandler> Create(DNP3MasterDataProcThread *pProc)
{
return std::make_shared<MasterSOEHandler>(pProc);
}
// 处理二进制输入(数字量)
void Process(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::Binary>>& values) override
{
if(m_pDataProcThread)
{
m_pDataProcThread->procDiDataValue(info , values);
}
}
// 处理二进制输入(双点数字量)
void Process(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::DoubleBitBinary>>& values) override
{
if(m_pDataProcThread)
{
m_pDataProcThread->procDoubleDiDataValue(info , values);
}
}
// 处理模拟量输入(模拟量)
void Process(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::Analog>>& values) override
{
if(m_pDataProcThread)
{
m_pDataProcThread->procAiDataValue(info , values);
}else
{
LOGERROR("MasterSOEHandler Process Analog m_pDataProcThread is NULL");
}
}
// 处理计数器(累积量)
void Process(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::Counter>>& values) override
{
if(m_pDataProcThread)
{
m_pDataProcThread->procAccDataValue(info , values);
}else
{
LOGERROR("MasterSOEHandler Process Counter m_pDataProcThread is NULL");
}
}
// 处理二进制输出状态(控制响应中的数字量)
void Process(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::BinaryOutputStatus>>& values) override
{
// values.Foreach([this, &info](const opendnp3::Indexed<opendnp3::BinaryOutputStatus>& item) {
// std::string source = "[控制响应]";
// std::cout << source << " BinaryOutputStatus - Index: " << item.index
// << ", Value: " << (item.value.value ? "true" : "false") << std::endl;
// //updateSystemPoint("数字量", item.index, item.value.value ? 1.0 : 0.0, info.isEvent);
// });
}
// 处理模拟量输出状态(控制响应中的模拟量)
void Process(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::AnalogOutputStatus>>& values) override
{
// values.Foreach([this, &info](const opendnp3::Indexed<opendnp3::AnalogOutputStatus>& item) {
// std::string source = "[控制响应]";
// std::cout << source << " AnalogOutputStatus - Index: " << item.index
// << ", Value: " << item.value.value << std::endl;
// //updateSystemPoint("模拟量", item.index, item.value.value, info.isEvent);
// });
}
// 处理混合量(假设用 OctetString 表示)
void Process(const opendnp3::HeaderInfo& info, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::OctetString>>& values) override
{
// values.Foreach([this, &info](const opendnp3::Indexed<opendnp3::OctetString>& item) {
// std::string source = info.isEvent ? "[未请求事件]" : "[控制响应或手动扫描]";
// std::cout << source << " OctetString - Index: " << item.index
// << ", Value: " << item.value.ToString() << std::endl;
// // 混合量可能需要解析为特定格式,这里简化为字符串长度
// //updateSystemPoint("混合量", item.index, static_cast<double>(item.value.Size()), info.isEvent);
// });
}
// 其他必要方法(空实现)
void Process(const opendnp3::HeaderInfo&, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::FrozenCounter>>& values) override {}
void Process(const opendnp3::HeaderInfo&, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::TimeAndInterval>>& values) override {}
void Process(const opendnp3::HeaderInfo&, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::BinaryCommandEvent>>& values) override {}
void Process(const opendnp3::HeaderInfo&, const opendnp3::ICollection<opendnp3::Indexed<opendnp3::AnalogCommandEvent>>& values) override {}
void Process(const HeaderInfo& info, const ICollection<DNPTime>& values) override {}
void BeginFragment(const ResponseInfo& info) override {}
void EndFragment(const ResponseInfo& info) override {}
private:
DNP3MasterDataProcThread* m_pDataProcThread;
};
#endif // DNP3MASTERDATAPROCTHREAD_H

View File

@ -0,0 +1,44 @@
# ARM板上资源有限,不会与云平台混用,不编译
message("Compile only in x86 environment")
# requires(contains(QMAKE_HOST.arch, x86_64))
requires(!contains(QMAKE_HOST.arch, aarch64):!linux-aarch64*)
QT -= core gui sql
CONFIG -= qt
TARGET = dnp3_master
TEMPLATE = lib
SOURCES += \
DNP3Master.cpp \
DNP3MasterDataProcThread.cpp
HEADERS += \
DNP3Master.h \
DNP3MasterDataProcThread.h \
DNP3MasterCommon.h
INCLUDEPATH += ../../include/
INCLUDEPATH += ../../include/libdnp3
LIBS += -lboost_system -lboost_thread -lboost_locale -lboost_chrono
LIBS += -lpub_utility_api -lpub_logger_api -llog4cplus -lprotocolbase -lrdb_api
LIBS += -lopendnp3
DEFINES += PROTOCOLBASE_API_EXPORTS
include($$PWD/../../../idl_files/idl_files.pri)
#-------------------------------------------------------------------
COMMON_PRI=$$PWD/../../../common.pri
exists($$COMMON_PRI) {
include($$COMMON_PRI)
}else {
error("FATAL error: can not find common.pri")
}

View File

@ -0,0 +1,73 @@
#ifndef DNP3MASTERCOMMON_H
#define DNP3MASTERCOMMON_H
#include "FesDef.h"
#include "FesBase.h"
#include <boost/unordered_map.hpp>
#define CN_RunPeriodMsec (10) //dnp3线程运行周期
const int CN_RESPONSETIMEOUT = 2; //以秒为单位
const int CN_CALLALLCALSETIME = 1; //以分钟为单位
const int CN_CALLOTHERCLASS = 10; //以秒为单位
const int CN_MAX_APDU_SIZE = 2048;
const int CN_CONTROL_QUALIFIER_SIZE = 2;
const int CN_THREAD_SIZE = 1; //DNP3管理器线程数
const int CN_MIN_RETRY_DELAY = 3;
const int CN_MAX_RETRY_DELAY = 40;
const int CN_TASK_RETR_PERIOD = 5;
const int CN_MAX_TASK_RETR_PERIOD = 1;
const int CN_TASK_START_TIMEOUT = 10;
//主站配置信息
typedef struct _SDNP3MASTERAppConfParam
{
int responseTimeout; //应用层响应超时时间
int callAllClassTime; //定期召唤所有类的数据时间
int callClass0Time; //定期召唤类0的数据时间
int callClass1Time; //定期召唤类1的数据时间
int callClass2Time; //定期召唤类2的数据时间
int callClass3Time; //定期召唤类3的数据时间
bool disableUnsolOnStartup; // 启动时禁用非请求响应
bool performTimeSync; // 是否自动时间同步
int maxTxFragSize; // 最大APDU传输单元
int maxRxFragSize; //最大APDU接受单位
bool integrityOnEventOverflowIIN; //定义当检测到事件缓冲区溢出IIN时是否执行完整性扫描
int controlQualifierMode; //主站请求时限定词的字节大小(可配置 1、2)
int taskRetryPeriod; //任务执行失败后,再次尝试执行该任务之前需要等待的时间间隔
int maxTaskRetryPeriod; //任务失败后重试的最大时间间隔
int taskStartTimeout; //判定任务失败之前所等待的时间
int minRetryDelay; //通道最小重连时间
int maxRetryDelay; //通道最大重连时间
_SDNP3MASTERAppConfParam()
{
responseTimeout = CN_RESPONSETIMEOUT;
callAllClassTime = CN_CALLALLCALSETIME;
callClass0Time = CN_CALLOTHERCLASS;
callClass1Time = CN_CALLOTHERCLASS;
callClass2Time = CN_CALLOTHERCLASS;
callClass3Time = CN_CALLOTHERCLASS;
disableUnsolOnStartup = true;
performTimeSync = false;
maxTxFragSize = CN_MAX_APDU_SIZE;
maxRxFragSize = CN_MAX_APDU_SIZE;
controlQualifierMode = CN_CONTROL_QUALIFIER_SIZE;
integrityOnEventOverflowIIN = true;
taskRetryPeriod = CN_TASK_RETR_PERIOD;
maxTaskRetryPeriod = CN_MAX_TASK_RETR_PERIOD;
taskStartTimeout = CN_TASK_START_TIMEOUT;
minRetryDelay = CN_MIN_RETRY_DELAY;
maxRetryDelay = CN_MAX_RETRY_DELAY;
}
}SDNP3MASTERAppConfParam , *P_SDNP3MASTERAppConfParam;
#endif // DNP3MASTERCOMMON_H

View File

@ -0,0 +1,181 @@

/*
* Copyright 2013-2022 Step Function I/O, LLC
*
* Licensed to Green Energy Corp (www.greenenergycorp.com) and Step Function I/O
* LLC (https://stepfunc.io) under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership. Green Energy Corp and Step Function I/O LLC license
* this file to you under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may obtain
* a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <iostream>
#include <opendnp3/ConsoleLogger.h>
#include <opendnp3/DNP3Manager.h>
#include <opendnp3/channel/PrintingChannelListener.h>
#include <opendnp3/logging/LogLevels.h>
#include <opendnp3/master/DefaultMasterApplication.h>
#include <opendnp3/master/PrintingCommandResultCallback.h>
#include <opendnp3/master/PrintingSOEHandler.h>
using namespace std;
using namespace opendnp3;
class TestSOEHandler : public ISOEHandler
{
virtual void BeginFragment(const ResponseInfo& info){
}
virtual void EndFragment(const ResponseInfo& info){}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<Binary>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<DoubleBitBinary>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<Analog>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<Counter>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<FrozenCounter>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<BinaryOutputStatus>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<AnalogOutputStatus>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<OctetString>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<TimeAndInterval>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<BinaryCommandEvent>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<Indexed<AnalogCommandEvent>>& values) {}
virtual void Process(const HeaderInfo& info, const ICollection<DNPTime>& values) {}
};
int main(int argc, char* argv[])
{
// Specify what log levels to use. NORMAL is warning and above
// You can add all the comms logging by uncommenting below
const auto logLevels = levels::NORMAL | levels::ALL_APP_COMMS;
// This is the main point of interaction with the stack
DNP3Manager manager(1, ConsoleLogger::Create());
// Connect via a TCPClient socket to a outstation
auto channel = manager.AddTCPClient("tcpclient", logLevels, ChannelRetry::Default(), {IPEndpoint("192.168.3.126", 20000)},
"0.0.0.0", PrintingChannelListener::Create());
// The master config object for a master. The default are
// useable, but understanding the options are important.
MasterStackConfig stackConfig;
// you can override application layer settings for the master here
// in this example, we've change the application layer timeout to 2 seconds
stackConfig.master.responseTimeout = TimeDuration::Seconds(2);
stackConfig.master.disableUnsolOnStartup = true;
// You can override the default link layer settings here
// in this example we've changed the default link layer addressing
stackConfig.link.LocalAddr = 1;
stackConfig.link.RemoteAddr = 10;
// Create a new master on a previously declared port, with a
// name, log level, command acceptor, and config info. This
// returns a thread-safe interface used for sending commands.
auto master = channel->AddMaster("master", // id for logging
PrintingSOEHandler::Create(), // callback for data processing
DefaultMasterApplication::Create(), // master application instance
stackConfig // stack configuration
);
auto test_soe_handler = std::make_shared<TestSOEHandler>();
// do an integrity poll (Class 3/2/1/0) once per minute
auto integrityScan = master->AddClassScan(ClassField::AllClasses(), TimeDuration::Minutes(1), test_soe_handler);
// do a Class 1 exception poll every 5 seconds
auto exceptionScan = master->AddClassScan(ClassField(ClassField::CLASS_1), TimeDuration::Seconds(5), test_soe_handler);
// Enable the master. This will start communications.
master->Enable();
bool channelCommsLoggingEnabled = true;
bool masterCommsLoggingEnabled = true;
while (true)
{
std::cout << "Enter a command" << std::endl;
std::cout << "x - exits program" << std::endl;
std::cout << "a - performs an ad-hoc range scan" << std::endl;
std::cout << "i - integrity demand scan" << std::endl;
std::cout << "e - exception demand scan" << std::endl;
std::cout << "d - disable unsolicited" << std::endl;
std::cout << "r - cold restart" << std::endl;
std::cout << "c - send crob" << std::endl;
std::cout << "t - toggle channel logging" << std::endl;
std::cout << "u - toggle master logging" << std::endl;
char cmd;
std::cin >> cmd;
switch (cmd)
{
case ('a'):
master->ScanRange(GroupVariationID(1, 2), 0, 3, test_soe_handler);
break;
case ('d'):
master->PerformFunction("disable unsol", FunctionCode::DISABLE_UNSOLICITED,
{Header::AllObjects(60, 2), Header::AllObjects(60, 3), Header::AllObjects(60, 4)});
break;
case ('r'):
{
auto print = [](const RestartOperationResult& result) {
if (result.summary == TaskCompletion::SUCCESS)
{
std::cout << "Success, Time: " << result.restartTime.ToString() << std::endl;
}
else
{
std::cout << "Failure: " << TaskCompletionSpec::to_string(result.summary) << std::endl;
}
};
master->Restart(RestartType::COLD, print);
break;
}
case ('x'):
// C++ destructor on DNP3Manager cleans everything up for you
return 0;
case ('i'):
integrityScan->Demand();
break;
case ('e'):
exceptionScan->Demand();
break;
case ('c'):
{
ControlRelayOutputBlock crob(OperationType::LATCH_ON);
master->SelectAndOperate(crob, 0, PrintingCommandResultCallback::Get());
break;
}
case ('t'):
{
channelCommsLoggingEnabled = !channelCommsLoggingEnabled;
auto levels = channelCommsLoggingEnabled ? levels::ALL_COMMS : levels::NORMAL;
channel->SetLogFilters(levels);
std::cout << "Channel logging set to: " << levels.get_value() << std::endl;
break;
}
case ('u'):
{
masterCommsLoggingEnabled = !masterCommsLoggingEnabled;
auto levels = masterCommsLoggingEnabled ? levels::ALL_COMMS : levels::NORMAL;
master->SetLogFilters(levels);
std::cout << "Master logging set to: " << levels.get_value() << std::endl;
break;
}
default:
std::cout << "Unknown action: " << cmd << std::endl;
break;
}
}
return 0;
}