[ref]同步711

This commit is contained in:
shi_jq 2025-03-13 10:46:26 +08:00
parent b8c1101693
commit 50296a077f
28 changed files with 3981 additions and 0 deletions

View File

@ -0,0 +1,56 @@
#include "CCreateInfluxDBUserDlg.h"
#include "ui_CCreateInfluxDBUserDlg.h"
CCreateInfluxDBUserDlg::CCreateInfluxDBUserDlg(QWidget *parent) :
CustomUiDialog(parent),
ui(new Ui::CCreateInfluxDBUserDlg)
{
ui->setupUi(this);
connect(ui->confirmBtn , &QPushButton::clicked , this , &CCreateInfluxDBUserDlg::accept);
setAutoLayout(true);
}
CCreateInfluxDBUserDlg::~CCreateInfluxDBUserDlg()
{
delete ui;
}
QString CCreateInfluxDBUserDlg::getUserInfo()
{
return ui->lineEdit_user->text();
}
QString CCreateInfluxDBUserDlg::getPassWordInfo()
{
return ui->lineEdit_password->text();
}
QString CCreateInfluxDBUserDlg::getIP()
{
return ui->lineEdit_ip->text();
}
QString CCreateInfluxDBUserDlg::getAdminUser()
{
return ui->lineEdit_admin->text();
}
void CCreateInfluxDBUserDlg::setIP(QString ipAddress)
{
ui->lineEdit_ip->setText(ipAddress);
}
void CCreateInfluxDBUserDlg::setAdminUser(QString admin)
{
ui->lineEdit_admin->setText(admin);
}
void CCreateInfluxDBUserDlg::setUserInfo(QString userInfo)
{
ui->lineEdit_user->setText(userInfo);
}
void CCreateInfluxDBUserDlg::setPassWord(QString passWord)
{
ui->lineEdit_password->setText(passWord);
}

View File

@ -0,0 +1,39 @@
#ifndef CCREATEINFLUXDBUSERDLG_H
#define CCREATEINFLUXDBUSERDLG_H
#include <QDialog>
#include "pub_widget/CustomDialog.h"
namespace Ui {
class CCreateInfluxDBUserDlg;
}
class CCreateInfluxDBUserDlg : public CustomUiDialog
{
Q_OBJECT
public:
explicit CCreateInfluxDBUserDlg(QWidget *parent = 0);
~CCreateInfluxDBUserDlg();
QString getUserInfo();
QString getPassWordInfo();
QString getIP();
QString getAdminUser();
void setIP(QString ipAddress);
void setAdminUser(QString admin);
void setUserInfo(QString userInfo);
void setPassWord(QString passWord);
private:
Ui::CCreateInfluxDBUserDlg *ui;
};
#endif // CCREATEINFLUXDBUSERDLG_H

View File

@ -0,0 +1,109 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>CCreateInfluxDBUserDlg</class>
<widget class="QDialog" name="CCreateInfluxDBUserDlg">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>378</width>
<height>161</height>
</rect>
</property>
<property name="windowTitle">
<string>Dialog</string>
</property>
<layout class="QGridLayout" name="gridLayout_2">
<property name="leftMargin">
<number>6</number>
</property>
<property name="topMargin">
<number>6</number>
</property>
<property name="rightMargin">
<number>6</number>
</property>
<property name="bottomMargin">
<number>6</number>
</property>
<property name="spacing">
<number>9</number>
</property>
<item row="2" column="0" colspan="2">
<layout class="QGridLayout" name="gridLayout">
<item row="3" column="1">
<widget class="QLineEdit" name="lineEdit_admin">
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="5" column="0">
<widget class="QLabel" name="label_user">
<property name="text">
<string>新建用户:</string>
</property>
</widget>
</item>
<item row="3" column="0">
<widget class="QLabel" name="label_4">
<property name="text">
<string>管理员用户:</string>
</property>
</widget>
</item>
<item row="5" column="1">
<widget class="QLineEdit" name="lineEdit_user">
<property name="clearButtonEnabled">
<bool>true</bool>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QLineEdit" name="lineEdit_ip"/>
</item>
<item row="2" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>IP地址</string>
</property>
</widget>
</item>
<item row="4" column="0">
<widget class="QLabel" name="label_password">
<property name="text">
<string>管理员密码:</string>
</property>
</widget>
</item>
<item row="4" column="1">
<widget class="QLineEdit" name="lineEdit_password">
<property name="echoMode">
<enum>QLineEdit::Password</enum>
</property>
<property name="cursorMoveStyle">
<enum>Qt::LogicalMoveStyle</enum>
</property>
</widget>
</item>
<item row="6" column="2">
<widget class="QPushButton" name="confirmBtn">
<property name="text">
<string>确认</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>请输入相关用户信息:</string>
</property>
</widget>
</item>
</layout>
</widget>
<resources/>
<connections/>
</ui>

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View File

@ -0,0 +1,35 @@

/*********************************************************************************
* @file ETLCommon.cpp
* @brief
* @author caodingfa
* @version 1.0
* @date
**********************************************************************************/
#ifdef WIN32
#include <stdio.h>
#else
#include <stdio.h>
#include <stdlib.h>
#endif
#include "ETLCommon.h"
namespace iot_dbms
{
//< 是否需要退出程序
bool g_bNeedExit = false;
///////////////////////////////////////////////////////////////////////////////////////
void printHelp()
{
printf("Usage:\n");
printf(" -r \t Register as system service. \n");
printf(" -u \t Unregister system service. \n");
printf(" -s \t Run as system service. \n");
}
} //< namespace iot_net

View File

@ -0,0 +1,26 @@

/*********************************************************************************
* @file ETLCommon.h
* @brief
* @author caodingfa
* @version 1.0
* @date
**********************************************************************************/
#pragma once
#include <string>
#include <set>
#include "pub_utility_api/TimeUtil.h"
namespace iot_dbms
{
const int CN_SCAN_PERIOD = 3 * SEC_PER_MIN * MSEC_PER_SEC;
//< 是否需要退出程序
extern bool g_bNeedExit;
//< 输出命令行帮助
void printHelp();
} //< namespace iot_dbms

View File

@ -0,0 +1,357 @@

/*********************************************************************************
* @file ETLServer.cpp
* @brief
* @author caodingfa
* @version 1.0
* @date
**********************************************************************************/
#include "boost/property_tree/xml_parser.hpp"
#include "boost/typeof/typeof.hpp"
#include "boost/format.hpp"
#include "boost/make_shared.hpp"
#include "rapidjson/pointer.h"
#include "rapidjson/error/en.h"
#include "pub_logger_api/logger.h"
#include "pub_utility_api/FileUtil.h"
#include "ETLCommon.h"
#include "ETLServer.h"
#include "pub_sysinfo_api/SysInfoApi.h"
using namespace std;
using namespace iot_public;
namespace iot_dbms
{
//< 上一次 周期性更新 的时间开机后ms数
const std::string CN_CFG_NAME = "tsdb_etl.xml";
const std::string CN_TABLE_INCREMENT = "increment";
const int CN_GROUP_TIME = 15;
///////////////////////////////////////////////////////////////////////////////////////////
CETLServer::CETLServer():m_ptrTsdbConn(NULL)
{
m_nGroupTime = CN_GROUP_TIME;
m_nInvalidStatus = 4|8; //对应测点的状态码4:MENU_STATE_AI_INVALID 8:MENU_STATE_AI_GK_OFF
}
CETLServer::~CETLServer()
{
//release();
}
bool CETLServer::initialize()
{
// 初始化日志系统
iot_public::StartLogSystem( "default", "tsdb_etl" );
if(!loadConfig())
{
return false;
}
if(!initTSDB())
{
return false;
}
return true;
}
void CETLServer::release()
{
m_ptrTsdbConn.reset();
// 停止日志系统
iot_public::StopLogSystem();
}
void CETLServer::process()
{
if(!m_ptrTsdbConn->pingServer())
{
LOGERROR("ping TSDB失败本次不执行");
return;
}
for(auto iter = m_mapPntType2TagName.begin();iter != m_mapPntType2TagName.end();iter++)
{
TagName2TimeMAP mapTagName2Time;
getLastValue(iter->first,iter->second,mapTagName2Time);
extractValueToIncrement(iter->first,mapTagName2Time);
}
}
bool CETLServer::loadConfig()
{
string strCfgPath = CFileUtil::getPathOfCfgFile(CN_CFG_NAME,CN_DIR_PLATFORM);
if(strCfgPath.empty())
{
LOGERROR("加载配置文件%s失败",CN_CFG_NAME.c_str());
return false;
}
boost::property_tree::ptree pt;
namespace xml = boost::property_tree::xml_parser;
try
{
xml::read_xml(strCfgPath, pt, xml::no_comments);
BOOST_AUTO(pGroupTime, pt.get_child("root.energy.group_time"));
m_nGroupTime = pGroupTime.get<int>("<xmlattr>.period");
//存在无效位标记就是用配置文件
BOOST_AUTO(pInvalidStatus,pt.get_child_optional("root.invalid_status"));
if(pInvalidStatus)
{
m_nInvalidStatus = pInvalidStatus->get<int>("<xmlattr>.value");
}
LOGINFO("测点无效状态值=[%d]",m_nInvalidStatus);
BOOST_AUTO(module, pt.get_child("root.energy"));
for (BOOST_AUTO(pSum, module.begin()); pSum != module.end(); ++pSum)
{
if (pSum->first != "sum_pnt")
{
continue;
}
for(BOOST_AUTO(pPnt,pSum->second.begin()); pPnt != pSum->second.end();++pPnt )
{
if (pPnt->first == "pnt")
{
string strTableName = pPnt->second.get<string>("<xmlattr>.table_name");
string strTagName = pPnt->second.get<string>("<xmlattr>.tag_name");
if(strTableName.empty() || strTagName.empty())
{
LOGWARN("测点[%s.%s]格式不正确,忽略",strTableName.c_str(),strTagName.c_str());
}
else
{
m_mapPntType2TagName[strTableName].push_back(strTagName);
}
}
}
}
}
catch (std::exception &ex)
{
LOGERROR("解析配置文件[%s]失败.Msg=[%s]", strCfgPath.c_str(), ex.what());
return false;
}
catch (...)
{
LOGERROR("加载配置文件发生未知异常");
return false;
}
return true;
}
bool CETLServer::initTSDB()
{
if(!initTsdbApi())
{
LOGERROR("初始化tsdb api失败");
return false;
}
if(!initTsdbConn())
{
LOGERROR("初始化TsdbConn失败");
return false;
}
return true;
}
bool CETLServer::initTsdbConn()
{
iot_public::CSysInfoInterfacePtr ptrSysInfo;
if(!iot_public::createSysInfoInstance(ptrSysInfo))
{
LOGERROR( "createSysInfoInstance 返回失败" );
return false;
}
iot_public::SDatabaseInfo stFirstConnectInfo; //数据库连接信息
std::vector<iot_public::SDatabaseInfo> vecLocalDbInfo;
std::vector<iot_public::SDatabaseInfo> vecRemoteDbInfo;
int nRet = ptrSysInfo->getLocalDBInfo( stFirstConnectInfo, vecLocalDbInfo, vecRemoteDbInfo );
if ( iotFailed == nRet )
{
LOGERROR( "getLocalDBInfo 返回失败" );
return false;
}
//todo:暂时先只连接本机
STsdbConnParam connParam;
connParam.strUserName = stFirstConnectInfo.strServiceName;
connParam.strUserPassword = stFirstConnectInfo.strUserPassword;
connParam.strDbName = stFirstConnectInfo.strServiceName;
m_ptrTsdbConn = boost::make_shared<CTsdbConn>(connParam);
if(m_ptrTsdbConn == NULL)
{
LOGERROR("创建TsdbConn失败");
return false;
}
return true;
}
void CETLServer::getLastValue(const string &strTable, const TagNameSeq &vecTagName, TagName2TimeMAP &mapTagName2Time)
{
string strTagNameList = joinTagName(vecTagName);
if(strTagNameList.empty())
{
LOGWARN("表[%s]测点为空,忽略",strTagNameList.c_str());
return;
}
//主键为tag_name值为time因为当前测点没有历史数据时返回结果中没有这一项所以先初始化一下
for(size_t i = 0; i < vecTagName.size();i++)
{
mapTagName2Time[vecTagName[i]] = 0L;
}
//SELECT last(value) FROM "increment" WHERE tag_name='tag1' or tag_name='tag2' group by tag_name
string strSQL = boost::str(boost::format("SELECT last(value) FROM \"%1%\" WHERE %2% group by tag_name")
%CN_TABLE_INCREMENT %strTagNameList);
LOGDEBUG("获取表[%s]最新记录.sql=[%s]",strTable.c_str(),strSQL.c_str());
string strQueryResult;
if(!m_ptrTsdbConn->doQuery(strSQL.c_str(),&strQueryResult,0))
{
LOGERROR("执行查询失败.sql=[%s]",strSQL.c_str());
return;
}
LOGDEBUG("查询结果:%s",strQueryResult.c_str());
rapidjson::Document docRoot;
docRoot.Parse(strQueryResult.c_str());
if (docRoot.HasParseError())
{
LOGERROR("Parse JSON error(offset %u): %s , return false",
static_cast<unsigned>(docRoot.GetErrorOffset()),
GetParseError_En(docRoot.GetParseError()));
return;
}
if (!docRoot.HasMember("results"))
{
LOGERROR("parseJson(): Can't get results , return false");
return;
}
const rapidjson::Value &valResultsArray = docRoot["results"];
if (!(valResultsArray.IsArray()))
{
LOGERROR("parseJson(): results is not an array , return false !");
return;
}
if(valResultsArray.Size() != 1) //只有一条sql而且没有分块truncate所以结果只能有1个
{
LOGERROR("parseJosn(): results size is error. real_size=%d",static_cast<int>(valResultsArray.Size()));
return;
}
const rapidjson::Value &valResult = valResultsArray[0];
if (!(valResult.HasMember("series") && valResult["series"].IsArray() && valResult["series"].Size() > 0))
{
//< 不为错误,可能是没有查询到结果
LOGDEBUG("parseJosn(): series is empty or error");
return;
}
for(rapidjson::SizeType nSeriesIdx = 0; nSeriesIdx < valResult["series"].Size(); nSeriesIdx++)
{
const rapidjson::Value &valSerie = valResult["series"][nSeriesIdx];
if (!(valSerie.HasMember("tags") && valSerie["tags"].HasMember("tag_name")) )
{
LOGERROR("parseJson(): invalid tags, ignore !");
return;
}
if (!(valSerie.HasMember("values") && valSerie["values"].IsArray() &&
valSerie["values"].Size() > 0 && valSerie["values"][0].Size() > 0))
{
LOGERROR("parseJson(): invalid values, ignore !");
return;
}
string valTagName = valSerie["tags"]["tag_name"].GetString();
int64 lRetTime = valSerie["values"][0][0].GetInt64();
mapTagName2Time[valTagName] = lRetTime;
}
}
void CETLServer::extractValueToIncrement(const std::string &strTableName,const TagName2TimeMAP &mapTagName2Time)
{
string strStatusQuery = boost::str(boost::format(" status | %1% = %1%") % ~m_nInvalidStatus);
//将最后时间相同的测点聚合到一个sql中
std::map<int64,TagNameSeq> mapTime2TagName;
for(auto iterTag = mapTagName2Time.begin(); iterTag != mapTagName2Time.end(); iterTag++)
{
mapTime2TagName[iterTag->second].push_back(iterTag->first);
}
//select difference(first(value)) as value into "increment" from "test" WHERE (tag_name='tag1' or tag_name='tag2') and time>=1700611200000000000 and time<=1700697600000000000 group by tag_name ,time(15m) fill(linear)
string strAllSQL;
for(auto iter = mapTime2TagName.begin(); iter != mapTime2TagName.end(); iter++)
{
string strTagNameList = joinTagName(iter->second);
if(strTagNameList.empty())
{
continue;
}
string strQueryTime = boost::str(boost::format("time>=%1%ms and time<=now()") %iter->first);
string strOneSQL = boost::str(boost::format("SELECT DIFFERENCE(first(value)) as value into \"%1%\" FROM \"%2%\" WHERE (%3%) and (%4%) and %5% GROUP BY tag_name,time(%6%m);")
%CN_TABLE_INCREMENT %strTableName %strTagNameList %strStatusQuery %strQueryTime %m_nGroupTime );
strAllSQL += strOneSQL;
}
if(strAllSQL.empty())
{
LOGDEBUG("抽取%s数据当前没有可以执行的SQL",strTableName.c_str());
return;
}
LOGDEBUG("执行SQL=[%s]",strAllSQL.c_str());
string strQueryResult;
if(!m_ptrTsdbConn->doQuery(strAllSQL.c_str(),&strQueryResult,0))
{
LOGERROR("执行查询失败.sql=[%s]",strAllSQL.c_str());
return;
}
LOGDEBUG("查询结果:%s",strQueryResult.c_str());
}
string CETLServer::joinTagName(const TagNameSeq &vecTagName)
{
if(vecTagName.empty())
{
return "";
}
string strTagNameList = "tag_name='" + vecTagName[0] + "'";
for(size_t i = 1; i < vecTagName.size();i++)
{
strTagNameList += " or tag_name='" + vecTagName[i] + "'";
}
return strTagNameList;
}
} //< namespace iot_dbms

View File

@ -0,0 +1,51 @@

/*********************************************************************************
* @file ETLServer.h
* @brief
* @author caodingfa
* @version 1.0
* @date
**********************************************************************************/
#pragma once
#include <string>
#include <vector>
#include <map>
#include "DataType.h"
#include "ETLCommon.h"
#include "tsdb_api/TsdbApi.h"
#include "rapidjson/document.h"
namespace iot_dbms
{
typedef std::vector<std::string> TagNameSeq; //测点标签格式station.dev.pnt
typedef std::map<std::string,int64> TagName2TimeMAP; //测点标签->时间戳单位ms
typedef std::map<std::string,TagNameSeq> PntType2TagNameMAP; //测点类型->测点序列
class CETLServer
{
public:
CETLServer();
~CETLServer();
bool initialize();
void release();
void process();
private:
bool loadConfig();
bool initTSDB();
bool initTsdbConn();
void getLastValue(const std::string &strTable,const TagNameSeq &vecTagName,TagName2TimeMAP &mapTagName2Time);
void extractValueToIncrement(const std::string &strTableName, const TagName2TimeMAP &mapTagName2Time);
std::string joinTagName(const TagNameSeq &vecTagName);
private:
int m_nGroupTime;
int m_nInvalidStatus;
CTsdbConnPtr m_ptrTsdbConn;
PntType2TagNameMAP m_mapPntType2TagName;
};
} //< namespace iot_net

View File

@ -0,0 +1,213 @@

/*********************************************************************************
* @file Main.cpp
* @brief
* @author caodingfa
* @version 1.0
* @date 2023/11/28
**********************************************************************************/
//< 仅在Linux系统下编译
#ifdef OS_LINUX
#include <stdlib.h>
#include <signal.h>
#include <stdio.h>
#include "boost/thread.hpp"
#include "boost/property_tree/ptree.hpp"
#include "boost/property_tree/ini_parser.hpp"
#include "pub_utility_api/FileUtil.h"
#include "pub_utility_api/TimeUtil.h"
#include "ETLCommon.h"
#include "ETLServer.h"
using namespace iot_public;
using namespace iot_dbms;
static const char *szSystemdCfgFile = "/usr/lib/systemd/system/byd_tsdb_etl.service";
boost::mutex g_objMutex_; //< 互斥锁,配合信号量使用
boost::condition_variable g_objCond_; //< 信号量
static void handleSigno(int nSigno)
{
//< 用日志有死锁的风险
printf("\nhandleSigno(): nSigno == %d \n", nSigno);
if (false == iot_dbms::g_bNeedExit)
{
iot_dbms::g_bNeedExit = true;
g_objCond_.notify_all();
}
}
//< 注册系统服务,并设置自启动
static bool regSysService()
{
//< 判断服务是否已注册,普通用户权限可执行
if (0 == std::system("systemctl is-enabled byd_tsdb_etl.service"))
{
printf("\nSystem service byd_tsdb_etl already enabled, exit.\n");
return false;
}
// {
// FILE *pFile = fopen(szSystemdCfgFile, "w");
// if (NULL == pFile)
// {
// printf("\nCan not write file, exit.\nFile: %s\n", szSystemdCfgFile);
// return false;
// }
// else
// fclose(pFile);
// }
const std::string strExec = CFileUtil::getCurModuleDir() + "tsdb_etl -s";
//< 生成或修改systemd服务配置文件
try
{
using namespace boost::property_tree;
ptree objPtree;
objPtree.put<const char *>("Unit.Description", "byd_tsdb_etl");
objPtree.put<const char *>("Unit.After", "network.target");
objPtree.put<const char *>("Service.Type", "simple");
objPtree.put<std::string>("Service.ExecStart", strExec);
objPtree.put<const char *>("Service.KillMode", "process");
objPtree.put<const char *>("Service.Restart", "on-failure");
objPtree.put<const char *>("Service.RestartSec", "42s");
objPtree.put<const char *>("Install.WantedBy", "multi-user.target");
ini_parser::write_ini(szSystemdCfgFile, objPtree);
}
catch (std::exception &e)
{
printf("\nWrite file failed, exit.\nFile: %s\nErr: %s\n", szSystemdCfgFile, e.what());
return false;
}
//< systemd重新加载配置文件
if (0 != std::system("systemctl daemon-reload"))
{
printf("\nReload config file failed, exit.\n");
return false;
}
//< 设置服务开机自启动
if (0 != std::system("systemctl enable byd_tsdb_etl.service"))
{
printf("\nEnable service failed, exit.\n");
return false;
}
printf("\nSuccessfully registered system service byd_tsdb_etl.\n");
return true;
}
//< 注销系统服务
static bool unregSysService()
{
//< 为了消除gcc编译告警
//< warning: ignoring return value of int system(const char*)
int nRc;
(void) nRc;
{
FILE *pFile = fopen(szSystemdCfgFile, "r");
if (NULL == pFile)
{
//< 重新加载一次确保systemd配置与文件一致需需管理员权限
nRc = std::system("systemctl daemon-reload");
printf("\nSystem service byd_tsdb_etl has not been registered.\n");
return false;
}
else
fclose(pFile);
}
//< 停止服务
nRc = std::system("systemctl stop byd_tsdb_etl.service");
//< 取消服务开机自启动
nRc = std::system("systemctl disable byd_tsdb_etl.service");
//< 删除服务配置文件
remove(szSystemdCfgFile);
//< systemd重新加载配置文件
nRc = std::system("systemctl daemon-reload");
printf("\nSuccessfully unregistered system service byd_tsdb_etl.\n");
return true;
}
int main(int argc, char *argv[])
{
if (2 == argc)
{
const char *szArg = argv[1];
if (0 == strcmp(szArg, "-r"))
{
return regSysService() ? EXIT_SUCCESS : EXIT_FAILURE;
}
else if (0 == strcmp(szArg, "-u"))
{
return unregSysService() ? EXIT_SUCCESS : EXIT_FAILURE;
}
else if (0 == strcmp(szArg, "-s"))
{
if (1 != getppid())
{
printf("\n1 != getppid(), exit.\n");
return EXIT_FAILURE;
}
}
else
{
iot_dbms::printHelp();
return EXIT_FAILURE;
}
}
else if (argc > 2)
{
iot_dbms::printHelp();
return EXIT_FAILURE;
}
//< 注册系统信号处理
{
signal(SIGTERM, handleSigno);
signal(SIGINT, handleSigno);
signal(SIGQUIT, handleSigno);
}
//< 初始化
iot_dbms::CETLServer etlSvr;
if (etlSvr.initialize())
{
//< 处理业务
while (!iot_dbms::g_bNeedExit)
{
etlSvr.process();
boost::mutex::scoped_lock lock(g_objMutex_);
g_objCond_.timed_wait(lock, boost::posix_time::millisec(CN_SCAN_PERIOD));
}
}
else
printf("Initialize failed, exit.\n");
//< 释放
etlSvr.release();
return iot_dbms::g_bNeedExit ? EXIT_SUCCESS : EXIT_FAILURE;
}
#endif //< OS_LINUX

View File

@ -0,0 +1,344 @@

/*********************************************************************************
* @file Main.cpp
* @brief
* @author caodingfa
* @version 1.0
* @date 2023/11/28
**********************************************************************************/
//< 仅在Win系统下编译
#ifdef OS_WINDOWS
#include <stdlib.h>
#include <signal.h>
#include <stdio.h>
#include <thread>
#include <windows.h>
#include "boost/thread.hpp"
#include "pub_utility_api/FileUtil.h"
#include "ETLCommon.h"
#include "ETLServer.h"
#define CN_SERVICE_NAME "byd_tsdb_etl"
static const char *szServiceName = CN_SERVICE_NAME;
static SERVICE_STATUS_HANDLE g_hServiceStatus = NULL;
static LPSERVICE_STATUS g_pStatus = NULL;
using namespace iot_public;
using namespace iot_dbms;
boost::mutex g_objMutex_; //< 互斥锁,配合信号量使用
boost::condition_variable g_objCond_; //< 信号量
static void handleSigno(int nSigno)
{
//< 用日志有死锁的风险
printf("\nhandleSigno(): nSigno == %d \n", nSigno);
if (false == iot_dbms::g_bNeedExit)
{
iot_dbms::g_bNeedExit = true;
g_objCond_.notify_all();
}
}
//< 注册系统服务,并设置自启动
static bool regSysService()
{
bool bRet = false;
//< 打开服务控制管理器
SC_HANDLE hSCM = ::OpenSCManagerA(NULL, NULL, SC_MANAGER_ALL_ACCESS);
if (NULL == hSCM)
{
printf("OpenSCManager() failed. Need administrator rights.\n");
}
else
{
//< 判断服务是否已存在
SC_HANDLE hService = ::OpenServiceA(hSCM, szServiceName, SERVICE_QUERY_CONFIG);
if (NULL == hService)
{
const std::string strExec = CFileUtil::getCurModuleDir() + "tsdb_etl.exe -s";
//< 创建服务
//< 使用SERVICE_USER_XXX_PROCESS服务类型用户登录时启动不影响共享内存等的使用
//< 若使用SERVICE_WIN32_XXX_PROCESS等服务类型程序以system用户启动共享内存被隔离其他用户访问不到
hService = ::CreateServiceA(
hSCM, szServiceName, szServiceName,
SERVICE_ALL_ACCESS, SERVICE_WIN32_OWN_PROCESS,
SERVICE_AUTO_START, SERVICE_ERROR_NORMAL,
strExec.c_str(), NULL, NULL, NULL, NULL, NULL);
if (hService == NULL)
{
printf("CreateService() failed. Need administrator rights.\n");
}
else
{
SERVICE_DESCRIPTIONA stSrvDesc;
char szDesc[] = "The TSDB ETL";
stSrvDesc.lpDescription = szDesc;
if (::ChangeServiceConfig2A(hService, SERVICE_CONFIG_DESCRIPTION, &stSrvDesc))
{
bRet = true;
}
::CloseServiceHandle(hService);
}
}
else
{
printf("System service byd_tsdb_etl already exists.\n");
::CloseServiceHandle(hService);
}
::CloseServiceHandle(hSCM);
}
if(bRet)
printf("\nSuccessfully registered system service byd_tsdb_etl.\n");
return bRet;
}
//< 注销系统服务
static bool unregSysService()
{
bool bRet = false;
//< 打开服务控制管理器
SC_HANDLE hSCM = ::OpenSCManagerA(NULL, NULL, SC_MANAGER_ALL_ACCESS);
if (hSCM == NULL)
{
printf("OpenSCManager() failed. Need administrator rights.\n");
}
else
{
SC_HANDLE hService = ::OpenServiceA(hSCM, szServiceName, SERVICE_QUERY_STATUS | SERVICE_STOP | DELETE);
if (NULL == hService)
{
//< 服务不存在
printf("System service byd_tsdb_etl not exists.\n");
}
else
{
SERVICE_STATUS objStatus;
if(TRUE == QueryServiceStatus(hService, &objStatus))
{
if(SERVICE_STOPPED != objStatus.dwCurrentState
&& TRUE == ControlService(hService, SERVICE_CONTROL_STOP, &objStatus))
{
int nWaitLoop = 0;
SERVICE_STATUS objStatusTemp;
while(TRUE == QueryServiceStatus(hService, &objStatusTemp))
{
if (SERVICE_STOPPED == objStatusTemp.dwCurrentState
|| nWaitLoop >= 30)
{
objStatus = objStatusTemp;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
++nWaitLoop;
}
}
if (SERVICE_STOPPED == objStatus.dwCurrentState)
{
//< 删除服务
if (::DeleteService(hService))
{
bRet = true;
}
else
{
printf("DeleteService() failed.\n");
}
}
else
{
printf("Stop service timeout.\n");
}
}
else
{
printf("QueryServiceStatus() failed.\n");
}
::CloseServiceHandle(hService);
}
::CloseServiceHandle(hSCM);
}
if(bRet)
printf("\nSuccessfully unregistered system service byd_tsdb_etl.\n");
return bRet;
}
static void WINAPI serviceStrl(DWORD dwOpcode)
{
switch (dwOpcode)
{
case SERVICE_CONTROL_STOP:
if (false == iot_dbms::g_bNeedExit)
{
iot_dbms::g_bNeedExit = true;
g_objCond_.notify_all();
}
g_pStatus->dwCurrentState = SERVICE_STOP_PENDING;
SetServiceStatus(g_hServiceStatus, g_pStatus);
break;
case SERVICE_CONTROL_PAUSE:
break;
case SERVICE_CONTROL_CONTINUE:
break;
case SERVICE_CONTROL_INTERROGATE:
break;
case SERVICE_CONTROL_SHUTDOWN:
break;
default:
printf("serviceStrl(): Invalid operation code !");
}
}
static void WINAPI serviceMain()
{
g_pStatus->dwCurrentState = SERVICE_START_PENDING;
g_pStatus->dwControlsAccepted = SERVICE_ACCEPT_STOP;
//< 注册服务控制
g_hServiceStatus = RegisterServiceCtrlHandlerA(szServiceName, serviceStrl);
if (g_hServiceStatus == NULL)
{
printf("serviceMain(): RegisterServiceCtrlHandler() return NULL !");
return;
}
SetServiceStatus(g_hServiceStatus, g_pStatus);
g_pStatus->dwWin32ExitCode = S_OK;
g_pStatus->dwCheckPoint = 0;
g_pStatus->dwWaitHint = 0;
g_pStatus->dwCurrentState = SERVICE_RUNNING;
SetServiceStatus(g_hServiceStatus, g_pStatus);
{
//< 初始化
iot_dbms::CETLServer etlSvr;
if (etlSvr.initialize())
{
//< 处理业务
while (!iot_dbms::g_bNeedExit)
{
etlSvr.process();
boost::mutex::scoped_lock lock(g_objMutex_);
g_objCond_.timed_wait(lock, boost::posix_time::millisec(CN_SCAN_PERIOD));
}
}
else
printf("Initialize failed, exit.\n");
//< 释放
etlSvr.release();
}
g_pStatus->dwCurrentState = SERVICE_STOPPED;
SetServiceStatus(g_hServiceStatus, g_pStatus);
}
int main(int argc, char *argv[])
{
if (2 == argc)
{
const char *szArg = argv[1];
if (0 == strcmp(szArg, "-r"))
{
return regSysService() ? EXIT_SUCCESS : EXIT_FAILURE;
}
else if (0 == strcmp(szArg, "-u"))
{
return unregSysService() ? EXIT_SUCCESS : EXIT_FAILURE;
}
else if (0 == strcmp(szArg, "-s"))
{
//g_hServiceStatus = NULL;
g_pStatus = new SERVICE_STATUS;
g_pStatus->dwServiceType = SERVICE_WIN32_OWN_PROCESS;
g_pStatus->dwCurrentState = SERVICE_STOPPED;
g_pStatus->dwControlsAccepted = SERVICE_ACCEPT_STOP;
g_pStatus->dwWin32ExitCode = 0;
g_pStatus->dwServiceSpecificExitCode = 0;
g_pStatus->dwCheckPoint = 0;
g_pStatus->dwWaitHint = 0;
char szSrvName[] = CN_SERVICE_NAME;
SERVICE_TABLE_ENTRYA st[] =
{
{ szSrvName, (LPSERVICE_MAIN_FUNCTIONA)serviceMain },
{ NULL, NULL }
};
if(!::StartServiceCtrlDispatcherA(st))
{
printf("StartServiceCtrlDispatcher() failed.ErrorCode=%d\n",GetLastError());
printf("May not be started by Service Manager.\n");
}
delete g_pStatus;
g_pStatus = NULL;
}
else
{
iot_dbms::printHelp();
return EXIT_FAILURE;
}
}
else if (argc > 2)
{
iot_dbms::printHelp();
return EXIT_FAILURE;
}
else
{
//< 注册系统信号处理
{
signal(SIGTERM, handleSigno);
signal(SIGINT, handleSigno);
signal(SIGBREAK, handleSigno);
}
//< 初始化
iot_dbms::CETLServer etlSvr;
if (etlSvr.initialize())
{
//< 处理业务
while (!iot_dbms::g_bNeedExit)
{
etlSvr.process();
boost::mutex::scoped_lock lock(g_objMutex_);
g_objCond_.timed_wait(lock, boost::posix_time::millisec(CN_SCAN_PERIOD));
}
}
else
{
printf("Initialize failed, exit.\n");
}
//< 释放
etlSvr.release();
}
return iot_dbms::g_bNeedExit ? EXIT_SUCCESS : EXIT_FAILURE;
}
#endif //< OS_WINDOWS

View File

@ -0,0 +1,40 @@
QT -= gui core
CONFIG -= qt
CONFIG += console
CONFIG -= app_bundle
TEMPLATE = app
TARGET = tsdb_etl
#DEFINES += _NO_LOGGING
#DEFINES += PRINT_TIME_DEBUG
HEADERS += ETLCommon.h \
ETLServer.h
SOURCES += MainLinux.cpp \
MainWindows.cpp \
ETLCommon.cpp \
ETLServer.cpp
win32{
LIBS += -ladvapi32
}
else{
LIBS += -lpthread
}
LIBS += -llog4cplus -lpub_logger_api -lpub_utility_api -lboost_thread -lboost_system
LIBS += -lpub_sysinfo_api -ltsdb_api
#-------------------------------------------------------------------
COMMON_PRI=$$PWD/../../common.pri
exists($$COMMON_PRI) {
include($$COMMON_PRI)
}else {
error("FATAL error: can not find common.pri")
}

View File

@ -0,0 +1,447 @@

/******************************************************************************//**
* @file CFrontThread.h
* @brief 线
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
//#include "boost/typeof/typeof.hpp"
#include "boost/lexical_cast.hpp"
//< 屏蔽Protobuff编译报警
#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#endif
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable: 4100)
#endif
#include "google/protobuf/text_format.h"
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif
#ifdef _MSC_VER
#pragma warning(pop)
#endif
#include "pub_logger_api/logger.h"
#include "common/Common.h"
#include "MessageChannel.h"
#include "CNodeMng.h"
#include "CFrontThread.h"
namespace iot_dbms
{
CFrontThread::CFrontThread() :
CTimerThreadBase("CFrontThread", 0),
m_bIsRunning(false), m_nLoopCnt(0), m_nReleaseThreshold(0),
m_pMbComm(NULL), m_pMbMsgRcv(NULL)
{
}
CFrontThread::~CFrontThread()
{
suspendThread();
quit();
delete m_pMbComm;
delete m_pMbMsgRcv;
}
bool CFrontThread::resumeThread()
{
if (isThreadRunning())
{
return true;
}
if (NULL == m_pMbComm)
{
m_pMbComm = new iot_net::CMbCommunicator();
}
//< 订阅
if (!m_pMbComm->addSub(CNodeMng::getInstance().getAppId(), CH_TSS_APP_TO_SRV))
{
LOGERROR("添加订阅失败!");
delete m_pMbComm;
m_pMbComm = NULL;
return false;
}
if (NULL == m_pMbMsgRcv)
{
m_pMbMsgRcv = new iot_net::CMbMessage();
}
//< 注意先赋值后resume()
m_bIsRunning = true;
resume();
return true;
}
bool CFrontThread::suspendThread()
{
m_bIsRunning = false;
suspend();
//< 销毁通讯器
delete m_pMbComm;
m_pMbComm = NULL;
//< 销毁消息
delete m_pMbMsgRcv;
m_pMbMsgRcv = NULL;
//< 释放内存
{
std::string str1, str2;
m_strTags.swap(str1);
m_strFields.swap(str2);
iot_idl::STssInsert objNew;
m_objTssInsert.Swap(&objNew);
}
return true;
}
bool CFrontThread::isThreadRunning() const
{
return m_bIsRunning && isRunning();
}
void CFrontThread::execute()
{
for (int i = 0; m_bIsRunning && (i < 100); ++i)
{
//< 接收消息
if (!m_pMbComm->recvMsg(*m_pMbMsgRcv, 200))
break;
handleOneMbMsg();
}
if (m_nLoopCnt > 10)
{
m_nLoopCnt = 0;
if (m_nReleaseThreshold > 0)
m_nReleaseThreshold = static_cast<int>(m_nReleaseThreshold * 0.9);
else if (m_nReleaseThreshold < 0)
m_nReleaseThreshold = 0;
//< ProtoBuf 的 Clear() 不会清空内存,用此方法清空
if (m_objTssInsert.mutable_point()->Capacity() > m_nReleaseThreshold * 3)
{
std::string str1, str2;
m_strTags.swap(str1);
m_strFields.swap(str2);
iot_idl::STssInsert objNew;
m_objTssInsert.Swap(&objNew);
if (m_objTssInsert.mutable_point()->Capacity() > 0)
{
LOGERROR("与预期不一致Capacity() == %d",
m_objTssInsert.mutable_point()->Capacity());
}
}
}
else
++m_nLoopCnt;
}
void CFrontThread::handleOneMbMsg_bak()
{
/*
LOGDEBUG("MsgType = %d , Para1 = %d , Para2 = %d , DataSize = %llu",
m_pMbMsgRcv->getMsgType(),
m_pMbMsgRcv->getPara1(),
m_pMbMsgRcv->getPara2(),
(unsigned long long)m_pMbMsgRcv->getDataSize());
*/
//< 检查是否本域消息
if (m_pMbMsgRcv->getPara2() != CNodeMng::getInstance().getLocalDomainID())
{
LOGINFO("收到非本域消息,忽略!");
return;
}
//< 不管消息是否正确,先回复已收到,原因:
//< 1、防止应用端因一条错误消息而死循环
//< 2、提高速度
{
iot_net::CMbMessage objMbMSgRep;
objMbMSgRep.setMsgType(iot_idl::MT_TSS_SRV2APP_ADD_ACK);
objMbMSgRep.setSubject(CN_AppId_PUBLIC, CH_TSS_SRV_TO_APP);
objMbMSgRep.setData(std::string("TSS"));
objMbMSgRep.setPara1(m_pMbMsgRcv->getPara1());
objMbMSgRep.setPara2(CNodeMng::getInstance().getLocalDomainID());
//< 不管是否发送失败,应用端收不到确认会重发,对于重复数据时序库自动覆盖
m_pMbComm->replyMsg(objMbMSgRep, *m_pMbMsgRcv);
}
//< 当前只有一种消息
if (m_pMbMsgRcv->getMsgType() != iot_idl::MT_TSS_APP2SRV_ADD)
{
LOGINFO("收到非预期消息类型!");
return;
}
//< 反序列化
bool bRc = m_objTssInsert.ParseFromArray(m_pMbMsgRcv->getDataPtr(), (int) m_pMbMsgRcv->getDataSize());
if (!bRc)
{
LOGWARN("STssInsert 反序列化失败,忽略消息!");
return;
}
//< 为了调试,打印所有消息,生产环境不要打
//{
// std::string strPrint;
// google::protobuf::TextFormat::PrintToString(objTssInsert, &strPrint);
// LOGDEBUG("收到消息,内容:\n%s", strPrint.c_str());
//}
//< 当前不允许没有tag
if (m_objTssInsert.tag_name_size() <= 0)
{
LOGERROR("STssInsert未设置tag_name忽略消息");
return;
}
//< 不允许没有field
if (m_objTssInsert.field_name_size() <= 0)
{
LOGERROR("STssInsert未设置field_name忽略消息");
return;
}
//< 没有数据插入,无效
if (m_objTssInsert.point_size() <= 0)
{
LOGERROR("STssInsert未插入任何point无意义数据忽略消息");
return;
}
const int nPointCnt = m_objTssInsert.point_size();
if (nPointCnt > m_nReleaseThreshold)
m_nReleaseThreshold = nPointCnt;
//< 生成 InfluxDB 插入字串
//< 批量插入的语句中有错误语句InfluxDB会跳过错误语句不影响正确语句
StdStringPtr ptrStrInsert(new std::string);
for (int nPointIndex = 0; nPointIndex < nPointCnt; nPointIndex++)
{
const iot_idl::STsdbPoint &objPoint = m_objTssInsert.point(nPointIndex);
if (objPoint.tag_val_size() != m_objTssInsert.tag_name_size())
{
LOGERROR("STsdbPoint中tag_val数量不正确忽略此点");
continue;
}
if (objPoint.field_val_size() != m_objTssInsert.field_name_size())
{
LOGERROR("STsdbPoint中field_val数量不正确忽略此点");
continue;
}
//< 生成 Tag 字串
m_strTags.clear();
for (int nTagIndex = 0; nTagIndex < objPoint.tag_val_size(); nTagIndex++)
{
if (0 != nTagIndex)
{
m_strTags += ",";
}
m_strTags += m_objTssInsert.tag_name(nTagIndex);
m_strTags += "=";
//< dp为了查实时库经常将tag_name进行resize()操作,尾部会有大量的\0
//< std::string直接相加会带上这些\0,即按size()长度相加
//< 而influxdb解析语句时遇到\0就截断了
//< 为了防止这种问题使用c_str()的方式
const std::string &strTagVal = objPoint.tag_val(nTagIndex);
m_strTags += strTagVal;
//< 遇到此情况输出日志
if (strlen(strTagVal.c_str()) != strTagVal.size())
{
std::string strPrint;
google::protobuf::TextFormat::PrintToString(m_objTssInsert, &strPrint);
LOGWARN("收到的tag_val字串中包含\\0字符仅使用\\0前的内容以防字符串截断。请检查消息源程序消息内容如下\n%s\n",
strPrint.c_str());
}
}
//< 生成 Field 字串
m_strFields.clear();
for (int nFieldIndex = 0; nFieldIndex < objPoint.field_val_size(); nFieldIndex++)
{
if (0 != nFieldIndex)
{
m_strFields += ",";
}
m_strFields += m_objTssInsert.field_name(nFieldIndex);
m_strFields += "=";
m_strFields += toInfluxString(objPoint.field_val(nFieldIndex));
}
//< 预先分配空间避免string频繁扩充扩充时还需要复制内存
if (0 == nPointIndex)
{
const size_t nSizeOfOne = m_objTssInsert.meas_name().size() +
m_strTags.size() + m_strFields.size() + 30;
ptrStrInsert->reserve(nSizeOfOne * nPointCnt);
}
//< 添加到 ptrStrInsert
//*ptrStrInsert += boost::str(boost::format("%1%,%2% %3% %4% \n")
// % objTssInsert.meas_name()
// % strTags
// % strFields
// % objPoint.time_stamp());
//< 经测试lexical_cast比format快几十倍
*ptrStrInsert += m_objTssInsert.meas_name() + ",";
*ptrStrInsert += m_strTags + " ";
*ptrStrInsert += m_strFields + " ";
*ptrStrInsert += boost::lexical_cast<std::string>(objPoint.time_stamp()) + " \n";
}
//< 判断save_action
bool bToLocal = false;
bool bToRemote = false;
switch (m_objTssInsert.save_action())
{
case iot_idl::SA_TSS_DO_NOTHING:
break;
case iot_idl::SA_TSS_LOCAL_ONLY:
bToLocal = true;
break;
case iot_idl::SA_TSS_REMOTE_ONLY:
bToRemote = true;
break;
case iot_idl::SA_TSS_LOCAL_REMOTE:
bToLocal = true;
bToRemote = true;
break;
default: LOGERROR("Unknow save_action value !");
break;
}
//< 添加给NodeMngpushSaveStr()中有判是否为空字串
CNodeMng::getInstance().pushSaveStr(ptrStrInsert, bToLocal, bToRemote);
}
void CFrontThread::handleOneMbMsg()
{
/*
LOGDEBUG("MsgType = %d , Para1 = %d , Para2 = %d , DataSize = %llu",
m_pMbMsgRcv->getMsgType(),
m_pMbMsgRcv->getPara1(),
m_pMbMsgRcv->getPara2(),
(unsigned long long)m_pMbMsgRcv->getDataSize());
*/
//< 检查是否本域消息
if (m_pMbMsgRcv->getPara2() != CNodeMng::getInstance().getLocalDomainID())
{
LOGINFO("收到非本域消息,忽略!");
return;
}
//< 不管消息是否正确,先回复已收到,原因:
//< 1、防止应用端因一条错误消息而死循环
//< 2、提高速度
{
iot_net::CMbMessage objMbMSgRep;
objMbMSgRep.setMsgType(iot_idl::MT_TSS_SRV2APP_ADD_ACK);
objMbMSgRep.setSubject(CN_AppId_PUBLIC, CH_TSS_SRV_TO_APP);
objMbMSgRep.setData(std::string("TSS"));
objMbMSgRep.setPara1(m_pMbMsgRcv->getPara1());
objMbMSgRep.setPara2(CNodeMng::getInstance().getLocalDomainID());
//< 不管是否发送失败,应用端收不到确认会重发,对于重复数据时序库自动覆盖
m_pMbComm->replyMsg(objMbMSgRep, *m_pMbMsgRcv);
}
//< 当前只有一种消息
if (m_pMbMsgRcv->getMsgType() != iot_idl::MT_TSS_APP2SRV_ADD)
{
LOGINFO("收到非预期消息类型!");
return;
}
//< 反序列化
bool bRc = m_objTssInsert.ParseFromArray(m_pMbMsgRcv->getDataPtr(), (int) m_pMbMsgRcv->getDataSize());
if (!bRc)
{
LOGWARN("STssInsert 反序列化失败,忽略消息!");
return;
}
//< 为了调试,打印所有消息,生产环境不要打
//{
// std::string strPrint;
// google::protobuf::TextFormat::PrintToString(objTssInsert, &strPrint);
// LOGDEBUG("收到消息,内容:\n%s", strPrint.c_str());
//}
//< 当前不允许没有tag
if (m_objTssInsert.tag_name_size() <= 0)
{
LOGERROR("STssInsert未设置tag_name忽略消息");
return;
}
//< 不允许没有field
if (m_objTssInsert.field_name_size() <= 0)
{
LOGERROR("STssInsert未设置field_name忽略消息");
return;
}
//< 没有数据插入,无效
if (m_objTssInsert.point_size() <= 0)
{
LOGERROR("STssInsert未插入任何point无意义数据忽略消息");
return;
}
const int nPointCnt = m_objTssInsert.point_size();
if (nPointCnt > m_nReleaseThreshold)
m_nReleaseThreshold = nPointCnt;
//< 添加给NodeMngpushSaveStr()中有判是否为空字串
CNodeMng::getInstance().pushSaveMsg(m_objTssInsert);
}
} //< namespace iot_dbms

View File

@ -0,0 +1,76 @@

/******************************************************************************//**
* @file CFrontThread.h
* @brief 线
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#pragma once
//< 屏蔽Protobuff编译报警
#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#endif
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable: 4100)
#endif
#include "TsdbSaveMessage.pb.h"
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif
#ifdef _MSC_VER
#pragma warning(pop)
#endif
#include "pub_utility_api/TimerThreadBase.h"
#include "net_msg_bus_api/MsgBusApi.h"
namespace iot_dbms
{
class CFrontThread final : private iot_public::CTimerThreadBase
{
public:
CFrontThread();
~CFrontThread() override;
//< 线程正在运行时调用安全返回true
bool resumeThread();
bool suspendThread();
bool isThreadRunning() const;
private:
void execute()override;
void handleOneMbMsg_bak();
void handleOneMbMsg();
private:
volatile bool m_bIsRunning;
//< 用于动态调节 m_objTssInsert 的释放
int m_nLoopCnt;
int m_nReleaseThreshold;
iot_net::CMbCommunicator *m_pMbComm; //< 消息总线通讯器
iot_net::CMbMessage *m_pMbMsgRcv; //< 接收的消息总线消息
//< handleOneMbMsg() 函数中使用,为了性能作为成员函数,避免频繁构造
std::string m_strTags;
std::string m_strFields;
iot_idl::STssInsert m_objTssInsert;
};
} //< namespace iot_dbms

View File

@ -0,0 +1,383 @@

/******************************************************************************//**
* @file CNodeMng.cpp
* @brief
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#include "boost/typeof/typeof.hpp"
#include "boost/lexical_cast.hpp"
#include "pub_logger_api/logger.h"
#include "pub_sysinfo_api/SysInfoApi.h"
#include "google/protobuf/text_format.h"
#include "CNodeMng.h"
namespace iot_dbms
{
static CNodeMng *g_pNodeMng = NULL;
static CNodeMng::GC gc;
CNodeMng::GC::GC()
{
}
CNodeMng::GC::~GC()
{
if (NULL != g_pNodeMng)
{
delete g_pNodeMng;
g_pNodeMng = NULL;
}
}
//<================================================
CNodeMng::CNodeMng()
{
m_nLocalDomainID = -1;
}
CNodeMng::~CNodeMng()
{
release();
}
CNodeMng& CNodeMng::getInstance()
{
if (NULL == g_pNodeMng)
{
g_pNodeMng = new CNodeMng();
}
return *g_pNodeMng;
}
bool CNodeMng::haveInstance()
{
return NULL != g_pNodeMng;
}
bool CNodeMng::init(const iot_public::SRunAppInfo &stRunAppInfo)
{
m_stRunAppInfo = stRunAppInfo;
iot_public::CSysInfoInterfacePtr ptrSysInfo;
if (iot_public::createSysInfoInstance(ptrSysInfo) == false)
{
LOGERROR("createSysInfoInstance() return false !");
return false;
}
//< 获取本机域ID和主机名
{
iot_public::SNodeInfo stLocalNodeInfo;
if (iotSuccess != ptrSysInfo->getLocalNodeInfo(stLocalNodeInfo))
{
LOGERROR("getLocalNodeInfo() failed !");
return false;
}
m_nLocalDomainID = stLocalNodeInfo.nDomainId;
m_strLocalNodeName = stLocalNodeInfo.strName;
}
//< 本地域数据库连接信息
iot_public::SDatabaseInfo stFirstDbInfo;
std::vector<iot_public::SDatabaseInfo> vecLocalDbInfo;
std::vector<iot_public::SDatabaseInfo> vecRemoteDbInfo;
int nRc = ptrSysInfo->getDBInfoByDomainId(m_nLocalDomainID, stFirstDbInfo,
vecLocalDbInfo, vecRemoteDbInfo);
if (iotSuccess != nRc)
{
LOGERROR("getDbInfoByNodeName failed , return false !");
return false;
}
//< 初始化 m_vecLocalThreads
for (size_t nDbInfoIndex = 0; nDbInfoIndex < vecLocalDbInfo.size(); nDbInfoIndex++)
{
iot_public::SNodeInfo stDbNodeInfo;
nRc = ptrSysInfo->getNodeInfoByName(vecLocalDbInfo[nDbInfoIndex].strNodeName, stDbNodeInfo);
if (iotSuccess != nRc)
{
LOGERROR("getNodeInfoByName failed, continue !");
continue;
}
std::string strIpA, strIpB;
for (int i = 0; i < stDbNodeInfo.nNicNum; i++)
{
switch (i + 1)
{
case 1:
strIpA = stDbNodeInfo.strNic1Addr;
break;
case 2:
strIpB = stDbNodeInfo.strNic2Addr;
break;
default:
break;
}
}
//< todo 端口号、数据库名、用户名、密码 暂时写死
/* 为以后搜索方便,保留此注释
* EMS_DEFAULT_DATABASE使
*
*/
CNodeThreadPtr ptrNodeThread(new CNodeThread(stDbNodeInfo.strName,
strIpA, strIpB,
8086, stFirstDbInfo.strServiceName,
stFirstDbInfo.strServiceName, stFirstDbInfo.strUserPassword));
m_vecLocalThreads.push_back(ptrNodeThread);
ptrNodeThread->resumeThread();
}
//< 初始化 m_vecRemoteThreads
for (size_t nDbInfoIndex = 0; nDbInfoIndex < vecRemoteDbInfo.size(); nDbInfoIndex++)
{
iot_public::SNodeInfo stDbNodeInfo;
nRc = ptrSysInfo->getNodeInfoByName(vecRemoteDbInfo[nDbInfoIndex].strNodeName, stDbNodeInfo);
if (iotSuccess != nRc)
{
LOGERROR("getNodeInfoByName failed, continue !");
continue;
}
std::string strIpA, strIpB;
for (int i = 0; i < stDbNodeInfo.nNicNum; i++)
{
switch (i + 1)
{
case 1:
strIpA = stDbNodeInfo.strNic1Addr;
break;
case 2:
strIpB = stDbNodeInfo.strNic2Addr;
break;
default:
break;
}
}
//< todo 端口号、数据库名、用户名、密码 暂时写死
/* 为以后搜索方便,保留此注释
* EMS_DEFAULT_DATABASE使
*
*/
CNodeThreadPtr ptrNodeThread(new CNodeThread(stDbNodeInfo.strName,
strIpA, strIpB,
8086, stFirstDbInfo.strServiceName,
stFirstDbInfo.strServiceName, stFirstDbInfo.strUserPassword));
m_vecRemoteThreads.push_back(ptrNodeThread);
ptrNodeThread->resumeThread();
}
return true;
}
void CNodeMng::release()
{
//< CTimerThreadBase 析构时会退出线程
m_vecLocalThreads.clear();
m_vecRemoteThreads.clear();
}
//< 获取本域ID
int CNodeMng::getLocalDomainID() const
{
return m_nLocalDomainID;
}
//< 获取本机主机名
const std::string& CNodeMng::getLocalNodeName() const
{
return m_strLocalNodeName;
}
std::string CNodeMng::getAppName() const
{
return m_stRunAppInfo.strAppName;
}
int CNodeMng::getAppId() const
{
return m_stRunAppInfo.nAppId;
}
void CNodeMng::pushSaveMsg(const iot_idl::STssInsert &objTssInsert)
{
//< 为了调试,打印所有消息,生产环境不要打
//{
// std::string strPrint;
// google::protobuf::TextFormat::PrintToString(objTssInsert, &strPrint);
// LOGDEBUG("收到消息,内容:\n%s", strPrint.c_str());
//}
std::string strTags;
std::string strFields;
const int nPointCnt = objTssInsert.point_size();
//< 生成 InfluxDB 插入字串
//< 批量插入的语句中有错误语句InfluxDB会跳过错误语句不影响正确语句
StdStringPtr ptrStrInsert(new std::string);
for (int nPointIndex = 0; nPointIndex < nPointCnt; nPointIndex++)
{
const iot_idl::STsdbPoint &objPoint = objTssInsert.point(nPointIndex);
if (objPoint.tag_val_size() != objTssInsert.tag_name_size())
{
LOGERROR("STsdbPoint中tag_val数量不正确忽略此点");
continue;
}
if (objPoint.field_val_size() != objTssInsert.field_name_size())
{
LOGERROR("STsdbPoint中field_val数量不正确忽略此点");
continue;
}
//< 生成 Tag 字串
strTags.clear();
for (int nTagIndex = 0; nTagIndex < objPoint.tag_val_size(); nTagIndex++)
{
if (0 != nTagIndex)
{
strTags += ",";
}
strTags += objTssInsert.tag_name(nTagIndex);
strTags += "=";
//< dp为了查实时库经常将tag_name进行resize()操作,尾部会有大量的\0
//< std::string直接相加会带上这些\0,即按size()长度相加
//< 而influxdb解析语句时遇到\0就截断了
//< 为了防止这种问题使用c_str()的方式
const std::string &strTagVal = objPoint.tag_val(nTagIndex);
strTags += strTagVal;
//< 遇到此情况输出日志
if (strlen(strTagVal.c_str()) != strTagVal.size())
{
std::string strPrint;
google::protobuf::TextFormat::PrintToString(objTssInsert, &strPrint);
LOGWARN("收到的tag_val字串中包含\\0字符仅使用\\0前的内容以防字符串截断。请检查消息源程序消息内容如下\n%s\n",
strPrint.c_str());
}
}
//< 生成 Field 字串
strFields.clear();
for (int nFieldIndex = 0; nFieldIndex < objPoint.field_val_size(); nFieldIndex++)
{
if (0 != nFieldIndex)
{
strFields += ",";
}
strFields += objTssInsert.field_name(nFieldIndex);
strFields += "=";
strFields += toInfluxString(objPoint.field_val(nFieldIndex));
}
//< 预先分配空间避免string频繁扩充扩充时还需要复制内存
if (0 == nPointIndex)
{
const size_t nSizeOfOne = objTssInsert.meas_name().size() +
strTags.size() + strFields.size() + 30;
ptrStrInsert->reserve(nSizeOfOne * nPointCnt);
}
//< 添加到 ptrStrInsert
//*ptrStrInsert += boost::str(boost::format("%1%,%2% %3% %4% \n")
// % objTssInsert.meas_name()
// % strTags
// % strFields
// % objPoint.time_stamp());
//< 经测试lexical_cast比format快几十倍
*ptrStrInsert += objTssInsert.meas_name() + ",";
*ptrStrInsert += strTags + " ";
*ptrStrInsert += strFields + " ";
*ptrStrInsert += boost::lexical_cast<std::string>(objPoint.time_stamp()) + " \n";
}
//< 判断save_action
bool bToLocal = false;
bool bToRemote = false;
switch (objTssInsert.save_action())
{
case iot_idl::SA_TSS_DO_NOTHING:
break;
case iot_idl::SA_TSS_LOCAL_ONLY:
bToLocal = true;
break;
case iot_idl::SA_TSS_REMOTE_ONLY:
bToRemote = true;
break;
case iot_idl::SA_TSS_LOCAL_REMOTE:
bToLocal = true;
bToRemote = true;
break;
default: LOGERROR("Unknow save_action value !");
break;
}
//< 添加给NodeMngpushSaveStr()中有判是否为空字串
pushSaveStr(ptrStrInsert, bToLocal, bToRemote);
}
void CNodeMng::pushSaveStr(StdStringPtr ptrStrSave, bool bToLocal, bool bToRemote)
{
if (!ptrStrSave)
{
LOGERROR("pushSaveStr(): ptrStrSave == NULL, return !");
return;
}
if (ptrStrSave->empty())
{
LOGINFO("pushSaveStr(): ptrStrSave->empty(), return !");
return;
}
if (bToLocal)
{
for (size_t i = 0; i < m_vecLocalThreads.size(); i++)
{
m_vecLocalThreads[i]->pushSaveStr(ptrStrSave);
}
}
if (bToRemote)
{
for (size_t i = 0; i < m_vecRemoteThreads.size(); i++)
{
m_vecRemoteThreads[i]->pushSaveStr(ptrStrSave);
}
}
}
} //< namespace iot_dbms

View File

@ -0,0 +1,82 @@

/******************************************************************************//**
* @file CNodeMng.h
* @brief
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#pragma once
#include <vector>
#include "CNodeThread.h"
#include "TsdbSaveCommon.h"
#include "TsdbSaveMessage.pb.h"
#include "pub_sysinfo_api/SysInfoApi.h"
namespace iot_dbms
{
class CNodeThread;
class CNodeMng final
{
public:
//< 单例释放类
class GC final
{
public:
GC();
~GC();
};
public:
//< 单例
static CNodeMng& getInstance();
//< 判断单例是否已实例化
static bool haveInstance();
~CNodeMng();
//< 初始化
bool init(const iot_public::SRunAppInfo &stRunAppInfo);
//< 释放资源
void release();
//< 获取本域ID
int getLocalDomainID() const;
//< 获取本机主机名
const std::string& getLocalNodeName() const;
std::string getAppName() const;
int getAppId() const;
//< 添加存库消息
void pushSaveMsg(const iot_idl::STssInsert &objTssInsert);
void pushSaveStr(StdStringPtr ptrStrSave, bool bToLocal, bool bToRemote);
private:
CNodeMng();
private:
//< 本域ID
int m_nLocalDomainID;
iot_public::SRunAppInfo m_stRunAppInfo;
//< 本机主机名
std::string m_strLocalNodeName;
//< 本地域时序库服务器节点线程
std::vector<CNodeThreadPtr> m_vecLocalThreads;
//< 远程域时序库服务器节点线程
std::vector<CNodeThreadPtr> m_vecRemoteThreads;
};
} //< namespace iot_dbms

View File

@ -0,0 +1,433 @@

/******************************************************************************//**
* @file CNodeThread.h
* @brief 线
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#include "boost/filesystem/operations.hpp"
#include "pub_logger_api/logger.h"
#include "pub_utility_api/TimeUtil.h"
#include "pub_utility_api/FileUtil.h"
#include "tsdb_api/CTsdbConn.h"
#include "CNodeThread.h"
#include "CNodeMng.h"
namespace iot_dbms
{
//< 1s运行一次死锁超时 1天
CNodeThread::CNodeThread(const std::string &strNodeName,
const std::string &strIpA, const std::string &strIpB,
int nPort, const std::string &strDbName,
const std::string &strUser, const std::string &strPasswd) :
CTimerThreadBase("CNodeThread", 1000, 1000 * 60 * 60 * 24)
{
m_bDbCreated = false;
m_nLastCheckConnTime = 0;
m_pConnInUse = NULL;
//< m_pConnIpA
{
if (strIpA.empty())
{
m_pConnIpA = NULL;
}
else
{
m_pConnIpA = new CTsdbConn(strIpA.c_str(), nPort, strDbName.c_str(),
strUser.c_str(), strPasswd.c_str());
}
}
//< m_pConnIpB
{
if (strIpB.empty())
{
m_pConnIpB = NULL;
}
else
{
m_pConnIpB = new CTsdbConn(strIpB.c_str(), nPort, strDbName.c_str(),
strUser.c_str(), strPasswd.c_str());
}
}
m_strNodeName = strNodeName;
m_strDbName = strDbName;
}
CNodeThread::~CNodeThread()
{
suspendThread();
quit();
m_pConnInUse = NULL;
if (m_pConnIpA)
{
delete m_pConnIpA;
m_pConnIpA = NULL;
}
if (m_pConnIpB)
{
delete m_pConnIpB;
m_pConnIpB = NULL;
}
m_objReadStream.close();
m_objWriteStream.close();
}
bool CNodeThread::resumeThread()
{
resume();
return true;
}
bool CNodeThread::suspendThread()
{
suspend();
return true;
}
bool CNodeThread::isThreadRunning() const
{
return isRunning();
}
void CNodeThread::pushSaveStr(StdStringPtr ptrStrSave)
{
boost::mutex::scoped_lock locker(m_mutexQue);
m_queStrSave.push(ptrStrSave);
}
void CNodeThread::afterResume()
{
updateConnInUse(true);
}
void CNodeThread::execute()
{
//< 非强制性更新连接状态
updateConnInUse(false);
//< 自动创建数据库,无需工程人员手动创建
//< 已测试:若数据库已存在,再创建同名数据库,创建无效,不影响原有数据
{
if (!m_bDbCreated && NULL != m_pConnInUse)
{
//< 已测试若数据库已存在再创建同名数据库HTTP返回200createDatabase()函数返回true
if (m_pConnInUse->createDatabase(m_strDbName.c_str()))
{
m_bDbCreated = true;
}
else
{
LOGERROR("createDatabase %s failed !", m_strDbName.c_str());
}
}
}
//< 从文件处理
dealFromFile();
//< 从队列处理
dealFromQueue();
}
void CNodeThread::dealFromFile()
{
//< 一次处理完全部缓存
//< 提交成功,则清空,不成功下个循环继续
std::string strToDeal;
//< 注意防止死循环
while (m_pConnInUse)
{
//< strToDeal为空则读取文件
//< 不为空,则是上次循环未提交成功,切换网络后继续执行到此处
if (strToDeal.empty())
{
//< 16MiB
while (strToDeal.size() < 0x00FFFFFF)
{
std::string strTemp;
if (readFromFile(strTemp))
{
strToDeal += strTemp;
}
else
{
break;
}
}
}
if (strToDeal.empty())
{
delFile();
break;
}
//< 超时时间1分钟
const bool bRc = m_pConnInUse->doInsert(strToDeal.c_str(), 60 * 1000);
if (!bRc)
{
//< 插入不成功,强制更新连接状态
//< 返回true表示当前m_pConnInUse不可用发生了变化
if (updateConnInUse(true))
{
//< 继续大循环
continue;
}
else
{
//< 连接状态正常,可能是提交语句有问题,跳过
//< 从文件读取的内容可能很长,不输出日志
//LOGERROR("提交TSDB失败但连接状态正常放弃提交该语句语句如下\n%s", strToDeal.c_str());
}
}
//< 成功,或者语句有问题,清空
strToDeal.clear();
}
}
void CNodeThread::dealFromQueue()
{
//< 从 m_queStrSave 获取全部
std::string strToDeal;
{
std::queue<StdStringPtr> queToDeal;
{
//< 加锁
boost::mutex::scoped_lock locker(m_mutexQue);
if (m_queStrSave.empty())
{
return;
}
std::swap(m_queStrSave, queToDeal);
}
while (!(queToDeal.empty()))
{
//< 批量一次性处理,效率更高
strToDeal += *(queToDeal.front());
//< 前端已经加了\n
//strToDeal += "\n";
queToDeal.pop();
}
}
if (m_pConnInUse)
{
//< 超时时间1分钟
bool bRc = m_pConnInUse->doInsert(strToDeal.c_str(), 60 * 1000);
if (!bRc)
{
//< 插入不成功,强制更新连接状态
//< 返回true表示当前m_pConnInUse不可用发生了变化
if (updateConnInUse(true))
{
//< 写文件
writeToFile(strToDeal);
}
else
{
//< 连接状态正常,可能是提交语句有问题,跳过
LOGERROR("提交TSDB失败但连接状态正常提交语句如下\n%s", strToDeal.c_str());
}
}
}
else
{
//< 写文件
writeToFile(strToDeal);
}
}
bool CNodeThread::updateConnInUse(bool bForce)
{
bool bRet = false;
if (NULL == m_pConnInUse)
{
if (m_pConnIpA
&& m_pConnIpA->pingServer(1000))
{
m_pConnInUse = m_pConnIpA;
bRet = true;
}
else if (m_pConnIpB
&& m_pConnIpB->pingServer(1000))
{
m_pConnInUse = m_pConnIpB;
bRet = true;
}
//< 更新时间
m_nLastCheckConnTime = iot_public::getMonotonicMsec();
}
else
{
//< 强制更新,或者已到周期性检查时间(防止频繁检查)
if (bForce
|| m_nLastCheckConnTime + 5000 < iot_public::getMonotonicMsec())
{
//< 检查当前 m_pConnInUse 是否可用
if (!m_pConnInUse->pingServer(1000))
{
m_pConnInUse = NULL;
bRet = true;
if (m_pConnIpA
&& m_pConnIpA->pingServer(1000))
{
m_pConnInUse = m_pConnIpA;
}
else if (m_pConnIpB
&& m_pConnIpB->pingServer(1000))
{
m_pConnInUse = m_pConnIpB;
}
}
//< 更新时间
m_nLastCheckConnTime = iot_public::getMonotonicMsec();
}
}
return bRet;
}
bool CNodeThread::readFromFile(std::string &strOutput)
{
if (m_objWriteStream.is_open())
{
m_objWriteStream.close();
}
if (!m_objReadStream.is_open())
{
//std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, m_strNodeName);
std::string strFullPath = getCacheFilePath();
if (strFullPath.empty())
return false;
if (boost::filesystem::exists(strFullPath)
&& boost::filesystem::is_regular_file(strFullPath))
{
m_objReadStream.open(strFullPath, std::ios::in);
}
else
return false;
}
if (!std::getline(m_objReadStream, strOutput))
return false;
strOutput.append("\n");
return true;
}
void CNodeThread::writeToFile(const std::string &strInput)
{
if (m_objReadStream.is_open())
{
m_objReadStream.close();
}
if (!m_objWriteStream.is_open())
{
//std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, "");
std::string strFullPath = getCachePath();
if (!boost::filesystem::is_directory(strFullPath))
{
if (!boost::filesystem::create_directories(strFullPath))
{
LOGERROR("create directories %s failed.", strFullPath.c_str());
}
}
//strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, m_strNodeName);
strFullPath = getCacheFilePath();
if (strFullPath.empty())
return;
//< append模式在文件末尾添加没有文件则自动创建
m_objWriteStream.open(strFullPath, std::ios::app);
}
m_objWriteStream << strInput;
}
void CNodeThread::delFile()
{
m_objReadStream.close();
m_objWriteStream.close();
//std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, m_strNodeName);
std::string strFullPath = getCacheFilePath();
if (strFullPath.empty())
return;
if (boost::filesystem::exists(strFullPath))
{
if (!boost::filesystem::remove_all(strFullPath))
{
LOGERROR("删除文件失败.");
}
}
}
std::string CNodeThread::getCachePath()
{
std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, "");
boost::filesystem::path objFullPath = strFullPath;
objFullPath /= CNodeMng::getInstance().getAppName();
return objFullPath.string();
}
std::string CNodeThread::getCacheFilePath()
{
std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, "");
boost::filesystem::path objFullPath = strFullPath;
objFullPath /= CNodeMng::getInstance().getAppName();
strFullPath = iot_public::CFileUtil::getAbsolutePath(objFullPath.string(), m_strNodeName);
return strFullPath;
}
} //< namespace iot_dbms

View File

@ -0,0 +1,97 @@

/******************************************************************************//**
* @file CNodeThread.h
* @brief 线
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#pragma once
#include <queue>
#include "boost/shared_ptr.hpp"
#include "boost/thread/mutex.hpp"
#include "boost/filesystem/fstream.hpp"
#include "pub_utility_api/TimerThreadBase.h"
#include "TsdbSaveCommon.h"
namespace iot_dbms
{
class CTsdbConn;
class CNodeThread final : private iot_public::CTimerThreadBase
{
public:
CNodeThread(const std::string &strNodeName,
const std::string &strIpA, const std::string &strIpB,
int nPort, const std::string &strDbName,
const std::string &strUser, const std::string &strPasswd);
~CNodeThread() override;
bool resumeThread();
bool suspendThread();
bool isThreadRunning() const;
void pushSaveStr(StdStringPtr ptrStrSave);
private:
void afterResume()override;
void execute()override;
//< 从缓存文件处理
void dealFromFile();
//< 从内存队列处理
void dealFromQueue();
//< 更新 m_pConnInUse如果当前m_pConnInUse可用则不变化
//< 返回值m_pConnInUse 是否发生了变化
bool updateConnInUse(bool bForce);
bool readFromFile(std::string &strOutput);
void writeToFile(const std::string &strInput);
void delFile();
//< 获取时序数据缓存目录
std::string getCachePath();
//< 获取时序数据缓存文件路径
std::string getCacheFilePath();
private:
//< 是否已执行创建数据库命令
bool m_bDbCreated;
//< 上一次检查连接的时间,防止频繁检查
boost::int64_t m_nLastCheckConnTime;
CTsdbConn *m_pConnInUse; //< 正在使用的连接
CTsdbConn *m_pConnIpA; //< A网连接
CTsdbConn *m_pConnIpB; //< B网连接
std::string m_strNodeName; //< 目标节点主机名
std::string m_strDbName; //< 数据库名
//< 指针数据,各线程共享,不应修改
std::queue<StdStringPtr> m_queStrSave; //< 插入数据队列
boost::mutex m_mutexQue; //< m_queStrSave的锁
boost::filesystem::ifstream m_objReadStream;
boost::filesystem::ofstream m_objWriteStream;
};
typedef boost::shared_ptr<CNodeThread> CNodeThreadPtr;
} //< namespace iot_dbms

View File

@ -0,0 +1,82 @@

/******************************************************************************//**
* @file CTsdbSaveRedunSw.cpp
* @brief
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#include "CTsdbSaveSrv.h"
#include "CNodeMng.h"
#include "CTsdbSaveRedunSw.h"
#include "SampleThread.h"
namespace iot_dbms
{
CTsdbSaveRedunSw::CTsdbSaveRedunSw( CTsdbSaveSrv *pParent, const iot_public::SRunAppInfo &stRunAppInfo)
: m_pParent( pParent ),
m_stRunAppInfo(stRunAppInfo),
m_ptrSampleThread(NULL)
{
assert(m_pParent);
}
CTsdbSaveRedunSw::~CTsdbSaveRedunSw()
{
//< m_objFrontThread析构时会 quit()
m_ptrSampleThread.reset();
}
int CTsdbSaveRedunSw::redundantSwitch(bool bMaster, bool bSlave)
{
//< 当前逻辑 bSlave 无需使用
if (bMaster)
{
m_objFrontThread.resumeThread();
}
else
{
m_objFrontThread.suspendThread();
}
if(m_ptrSampleThread != NULL)
{
m_ptrSampleThread->redundantSwitch(bMaster);
}
m_pParent->updateProcInfo(true, m_objFrontThread.isThreadRunning(), bSlave);
return iotSuccess;
}
int CTsdbSaveRedunSw::initialize()
{
//PUBLIC应用不加载定时存盘功能
if(m_stRunAppInfo.nAppId == CN_AppId_PUBLIC)
{
return iotSuccess;
}
//初始化历史采样接口
//================================================================================================
m_ptrSampleThread = boost::make_shared<CSampleThread>(m_stRunAppInfo);
if (m_ptrSampleThread == NULL)
{
LOGERROR("CTsdbSaveRedunSw::initialize(), create SampleInstance fail!");
return iotFailed;
}
if(!m_ptrSampleThread->initialize())
{
LOGERROR("CTsdbSaveRedunSw::initialize(), init SampleInstance fail!");
return iotFailed;
}
return iotSuccess;
}
} //< namespace iot_dbms

View File

@ -0,0 +1,45 @@

/******************************************************************************//**
* @file CTsdbSaveRedunSw.h
* @brief
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#pragma once
#include "sys_node_mng_api/NodeMngInterface.h"
#include "CFrontThread.h"
#include "SampleThread.h"
namespace iot_dbms
{
class CTsdbSaveSrv;
class CTsdbSaveRedunSw final : public ::iot_sys::CRedundantSwitchInterface
{
public:
explicit CTsdbSaveRedunSw( CTsdbSaveSrv *pParent,const iot_public::SRunAppInfo &stRunAppInfo );
~CTsdbSaveRedunSw();
public:
//< 见父类CRedundantSwitchInterface说明
int redundantSwitch( bool bMaster, bool bSlave ) override;
int initialize();
private:
CTsdbSaveSrv *const m_pParent;
iot_public::SRunAppInfo m_stRunAppInfo;
CFrontThread m_objFrontThread;
CSampleThreadPtr m_ptrSampleThread;
};
typedef boost::shared_ptr<CTsdbSaveRedunSw> CTsdbSaveRedunSwPtr;
} //< namespace iot_dbms

View File

@ -0,0 +1,372 @@

/******************************************************************************//**
* @file CTsdbSaveSrv.cpp
* @brief
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#include "boost/program_options.hpp"
#include "boost/algorithm/string/predicate.hpp"
#include "pub_logger_api/logger.h"
#include "pub_utility_api/SingleProcInstance.h"
#include "net_msg_bus_api/MsgBusApi.h"
#include "tsdb_api/TsdbApi.h"
#include "CNodeMng.h"
#include "CTsdbSaveSrv.h"
#define TSDB_LOCAL_SAVE_SRV_PROC_NAME "tsdb_local_save"
using namespace std;
using namespace iot_public;
namespace iot_dbms
{
CTsdbSaveSrv::CTsdbSaveSrv()
{
m_enRunModel = RM_NORMAL;
}
CTsdbSaveSrv::~CTsdbSaveSrv()
{
stop();
iot_public::StopLogSystem();
}
//< @param int & nStatus 错误码
bool CTsdbSaveSrv::start(int argc, char *argv[], int& /*nStatus*/)
{
assert(!m_ptrRedunSw); //< NULL
string strAppName;
string strStartArgs;
//< 参数解析
if (!parseCommandLine(argc, argv, strAppName,strStartArgs))
{
std::cerr << "参数解析失败" << std::endl;
return false;
}
//< 启动日志
iot_public::StartLogSystem(strAppName.c_str(), TSDB_LOCAL_SAVE_SRV_PROC_NAME);
if(!getSysAppInfo(strAppName))
{
LOGERROR("获取应用信息失败");
return false;
}
//< 判断是否已启动
if (isAlreadyRunning(strStartArgs))
{
LOGFATAL(TSDB_LOCAL_SAVE_SRV_PROC_NAME" 已启动,不可重复启动,本实例退出!");
return false;
}
//< 消息总线
if (!iot_net::initMsgBus(TSDB_LOCAL_SAVE_SRV_PROC_NAME, "", true))
{
LOGFATAL("初始化消息总线失败,程序启动失败!");
return false;
}
//< 时序库接口库
if (!initTsdbApi())
{
LOGFATAL("初始化时序库接口库失败,程序启动失败!");
return false;
}
//< 初始化 CNodeMng 加载配置等
if (!CNodeMng::getInstance().init(m_stRunAppInfo))
{
LOGFATAL("CNodeMng 初始化失败,程序启动失败!");
return false;
}
//< CFrontThread 由 m_ptrRedunSw 管理
//< 初始化 m_ptrRedunSw
m_ptrRedunSw.reset(new CTsdbSaveRedunSw(this,m_stRunAppInfo));
if(m_ptrRedunSw == NULL || m_ptrRedunSw->initialize() != iotSuccess)
{
LOGFATAL("CNodeMng 初始化失败,程序启动失败!");
return false;
}
switch (m_enRunModel)
{
case RM_NORMAL:
{
//< 进程管理
{
iot_sys::SProcessInfoKey objProcInfo;
objProcInfo.nAppId = m_stRunAppInfo.nAppId;
objProcInfo.nDomainId = CNodeMng::getInstance().getLocalDomainID();
objProcInfo.strNodeName = CNodeMng::getInstance().getLocalNodeName();
objProcInfo.strProcName = TSDB_LOCAL_SAVE_SRV_PROC_NAME;
objProcInfo.strProcParam = strStartArgs;
m_ptrProcMng = iot_sys::getProcMngInstance(objProcInfo);
if (!m_ptrProcMng)
{
LOGFATAL("getProcMngInstance return NULL");
return false;
}
m_ptrProcMng->setCallback(this);
}
//< 冗余管理
{
m_ptrRedundantMng = iot_sys::getRedundantMngInstance(CNodeMng::getInstance().getLocalDomainID(),
m_stRunAppInfo.nAppId,
CNodeMng::getInstance().getLocalNodeName());
if (!m_ptrRedundantMng)
{
LOGERROR("getRedundantMngInstance return NULL");
return false;
}
m_ptrRedundantMng->setCallback(m_ptrRedunSw);
}
//< 更新进程管理状态
updateProcInfo(true, false, false);
}
break;
case RM_NO_PROC_MNG_MASTER:
{
if (iotSuccess != m_ptrRedunSw->redundantSwitch(true, false))
{
LOGFATAL("以主机模式启动失败!");
return false;
}
}
break;
case RM_NO_PROC_MNG_SLAVE:
{
if (iotSuccess != m_ptrRedunSw->redundantSwitch(false, true))
{
LOGFATAL("以备机模式启动失败!");
return false;
}
}
break;
default:
{
LOGFATAL("非预期的启动模式,程序启动失败!");
return false;
}
break;
}
LOGINFO(TSDB_LOCAL_SAVE_SRV_PROC_NAME" is now running ...");
return true;
}
bool CTsdbSaveSrv::stop()
{
LOGINFO(TSDB_LOCAL_SAVE_SRV_PROC_NAME" is now exiting ...");
//< 取消冗余切换,防止正在退出时发生冗余切换
if (m_ptrRedundantMng)
{
//LOGDEBUG("Release m_ptrRedundantMng ...");
m_ptrRedundantMng->unsetCallback();
m_ptrRedundantMng.reset();
//LOGDEBUG("Release m_ptrRedundantMng complete !");
}
//< 释放 m_ptrRedunSw
if (m_ptrRedunSw)
{
//LOGDEBUG("Release m_ptrRedunSw ...");
m_ptrRedunSw.reset();
//LOGDEBUG("Release m_ptrRedunSw complete !");
}
//< 取消进程管理回调
//if (m_ptrProcMng)
//{
// m_ptrProcMng->unsetCallback();
//}
//< 清理业务线程
if (CNodeMng::haveInstance())
{
//LOGDEBUG("Release CNodeMng ...");
//< CFrontThread 由 m_ptrRedunSw 管理
CNodeMng::getInstance().release();
//LOGDEBUG("Release CNodeMng complete !");
}
//< 更新进程管理状态
if (m_ptrProcMng)
{
//LOGDEBUG("Release m_ptrProcMng ...");
updateProcInfo(false, false, false);
m_ptrProcMng.reset();
//LOGDEBUG("Release m_ptrProcMng complete !");
}
//< 释放时序库接口库
releaseTsdbApi();
//< 停止消息总线
iot_net::releaseMsgBus();
//< 停止日志系统
//< 移到析构函数中防止日志库停止后又写日志从而使log4cplus提示找不到logger
//iot_public::StopLogSystem();
return true;
}
int CTsdbSaveSrv::toQuit()
{
shutdown();
return iotSuccess;
}
int CTsdbSaveSrv::updateProcInfo(bool bActive, bool bMaster, bool bSlave)
{
if (m_ptrProcMng)
{
return m_ptrProcMng->updateProcessInfo(bActive, bMaster, bSlave);
}
return iotFailed;
}
bool CTsdbSaveSrv::isAlreadyRunning(const std::string &strStartArgs )
{
std::string strUniqueName = TSDB_LOCAL_SAVE_SRV_PROC_NAME;
strUniqueName += strStartArgs;
return iot_public::CSingleProcInstance::hasInstanceRunning( strUniqueName );
}
bool CTsdbSaveSrv::parseCommandLine(int argc, char *argv[],std::string &strAppName, std::string &strStartArgs)
{
//< 拼接启动参数,用于向进程管理注册
for ( int i = 1; i < argc; ++i )
{
if ( i != 1 )
{
strStartArgs += " ";
}
strStartArgs += argv[i];
}
namespace po = boost::program_options;
po::options_description desc("usage");
po::variables_map vm;
try
{
desc.add_options()
("app"",a", po::value<std::string>(), "\t""The APP name, can only ran as PUBLIC")
("no_proc_mng_master"",m", "\t""Run as master without ProcMng and RedundantMng")
("no_proc_mng_slave"",s", "\t""Run as slave without ProcMng and RedundantMng")
("help"",h", "\t""Print this info");
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
if (vm.count("help"))
{
std::cout << desc << std::endl;
return false;
}
if (vm.count("no_proc_mng_master") && vm.count("no_proc_mng_slave"))
{
std::cout << "no_proc_mng_master and no_proc_mng_slave can not use at the same time !" << std::endl;
return false;
}
if (0 == vm.count("app"))
{
std::cout << "Must set app !" << std::endl;
return false;
}
else
{
strAppName = vm["app"].as<std::string>();
}
if (vm.count("no_proc_mng_master"))
{
m_enRunModel = RM_NO_PROC_MNG_MASTER;
}
else if (vm.count("no_proc_mng_slave"))
{
m_enRunModel = RM_NO_PROC_MNG_SLAVE;
}
else
{
m_enRunModel = RM_NORMAL;
}
}
catch (std::exception &ex)
{
std::cerr << ex.what() << std::endl;
std::cout << desc << std::endl;
return false;
}
catch (...)
{
std::cerr << "未知错误" << std::endl;
std::cout << desc << std::endl;
return false;
}
return true;
}
bool CTsdbSaveSrv::getSysAppInfo(const std::string &strAppName)
{
CSysInfoInterfacePtr sysInfoPtr;
if(createSysInfoInstance(sysInfoPtr) == false)
{
LOGERROR("AppName=%s ,createSysInfoInstance fail!\n", strAppName.c_str());
return false;
}
if(sysInfoPtr == NULL)
{
LOGERROR("AppName=%s ,Get System Info fail!\n", strAppName.c_str());
return false;
}
if(iotSuccess != sysInfoPtr->getLocalRunAppInfoByName(strAppName,m_stRunAppInfo))
{
LOGERROR("getLocalRunAppInfoByName fail");
return false;
}
return true;
}
} //< namespace iot_dbms

View File

@ -0,0 +1,66 @@

/******************************************************************************//**
* @file CTsdbSaveSrv.h
* @brief
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#pragma once
#include "pub_utility_api/BaseService.h"
#include "sys_proc_mng_api/ProcMngInterface.h"
#include "pub_sysinfo_api/SysInfoApi.h"
#include "CTsdbSaveRedunSw.h"
namespace iot_dbms
{
class CTsdbSaveSrv final : public iot_public::CBaseService, iot_sys::CProcessQuitInterface
{
public:
CTsdbSaveSrv();
~CTsdbSaveSrv() override;
//< 见父类CBaseService说明
bool start(int argc, char *argv[], int &nStatus)override;
//< 见父类CBaseService说明
bool stop()override;
//< 见父类CProcessQuitInterface说明
int toQuit()override;
//< 设置进程状态
int updateProcInfo(bool bActive, bool bMaster, bool bSlave);
private:
bool isAlreadyRunning(const std::string &strStartArgs );
bool parseCommandLine(int argc, char *argv[],std::string &strAppName, std::string &strStartArgs);
/**
@brief
@return
*/
bool getSysAppInfo(const std::string &strAppName);
enum enRunModel
{
RM_NORMAL = 0, //< 正常模式
RM_NO_PROC_MNG_MASTER, //< 不注册进程管理、冗余管理,主机模式
RM_NO_PROC_MNG_SLAVE, //< 不注册进程管理、冗余管理,备机模式
};
private:
iot_public::SRunAppInfo m_stRunAppInfo;
enRunModel m_enRunModel;
CTsdbSaveRedunSwPtr m_ptrRedunSw;
iot_sys::CProcMngInterfacePtr m_ptrProcMng;
iot_sys::CRedundantMngInterfacePtr m_ptrRedundantMng;
};
} //< namespace iot_dbms

View File

@ -0,0 +1,17 @@

/******************************************************************************//**
* @file Main.cpp
* @brief
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#include "CTsdbSaveSrv.h"
int main(int argc, char *argv[])
{
iot_dbms::CTsdbSaveSrv objApp;
return objApp.main(argc, argv);
}

View File

@ -0,0 +1,390 @@
/**
@file OptMainThread.cpp
@brief m_pMsgBuffer实现
@author
*/
#include "boost/program_options.hpp"
#include "SampleThread.h"
#include "pub_logger_api/logger.h"
#include "pub_utility_api/TimeUtil.h"
#include "service/sample_server_api/SampleDefine.h"
#include "service/common/CommonDefine.h"
#include "../service/data_process/AccStructDefine.h"
#include "../service/data_process/AnaStructDefine.h"
#include "../service/data_process/DigStructDefine.h"
#include "../service/data_process/MixStructDefine.h"
#include "CNodeMng.h"
using namespace iot_public;
using namespace iot_service;
using namespace iot_idl;
using namespace iot_dbms;
using namespace std;
#define MAX_SAVE_POINT_NUM 10000
CSampleThread::CSampleThread(iot_public::SRunAppInfo &stRunAppInfo):
CTimerThreadBase("CSampleThread",20),
m_bMaster(false),
m_stRunAppInfo(stRunAppInfo)
{
m_lLastTime = m_lCurTime = getUTCTimeMsec();
}
CSampleThread::~CSampleThread()
{
this->quit();
if(m_ptrRdbTableMng != NULL)
{
m_ptrRdbTableMng.reset();
m_ptrRdbTableMng = NULL;
}
}
/**
@brief
@param
@return
@retval
*/
void CSampleThread::execute()
{
if(!m_bMaster)
{
//备机不保存测点历史数据
return;
}
m_lCurTime = getUTCTimeMsec(); //读取当前时间 MS
//if(m_lLastTime > m_lCurTime ) m_lLastTime = m_lCurTime ; //系统时间调整处理,防止跳过时间点
//1.存配置了是否存盘的点没有配置的点不会存盘包括5分钟
/*
//=================================================================================================
if ((m_lCurTime/60000 - m_lLastTime/60000) >0 )//分钟数发生改变每1 min 判断一次固定存盘)
{
m_lLastTime = m_lCurTime ;
saveAllCfgPoint();//周期提交
LOGDEBUG("CSampleThread::execute, doEveryPoint() Time=[%lld] ",m_lCurTime);
}*/
//1.所有点存盘周期为固定5分钟
//=================================================================================================
int64 mMin = m_lCurTime/(60000) ;//转换为分钟
if ( (mMin % (SAMPLE_CYC_MIN) == 0) && (m_lCurTime/60000 - m_lLastTime/60000) >0 )//每5 min判一次
{
m_lLastTime = m_lCurTime ;
saveAllPoint() ;
LOGDEBUG("CSampleThread::execute, saveAllPoint() Time=[%" PRId64 "] ", m_lCurTime);
}
return;
}
bool CSampleThread::initialize()
{
//RDB管理实例
//=============================================================================================
m_ptrRdbTableMng = boost::make_shared<CRdbTableMng>(m_stRunAppInfo.strAppName);
if (m_ptrRdbTableMng == NULL)
{
LOGERROR("CSampleProcess ::initialize(), make_shared<CRdbTableMng> fail!\n");
return false;
}
resume();
return true;
}
void CSampleThread::redundantSwitch(bool bMaster)
{
m_bMaster = bMaster;
}
//调用INFLUEXDB API存盘数据到TSDB 中
void CSampleThread::addToTsdbSever(iot_idl::STssInsert &stTsdbData,const string&strTable)
{
if(stTsdbData.point_size() >0 )
{
LOGDEBUG("表[%s]插入记录数:%d",strTable.c_str(),stTsdbData.point_size());
stTsdbData.set_save_action(SA_TSS_LOCAL_REMOTE);
stTsdbData.set_meas_name(strTable);
stTsdbData.add_tag_name("tag_name");
stTsdbData.add_field_name("status");
stTsdbData.add_field_name("value");
//m_ptrTsdbSaveApi->addInsertMsg(stTsdbData) ;
CNodeMng::getInstance().pushSaveMsg(stTsdbData);
stTsdbData.Clear();
}
return;
}
//增加一个测点到TSDB队列
void CSampleThread::addOneTsdbData(iot_idl::STssInsert& stTsdbData,const string&strTagName,\
const int64 nStatus,const int64 nValue, const int64 nTimeStamp)
{
STsdbPoint stPoint;
stPoint.set_time_stamp(nTimeStamp);
stPoint.add_tag_val(strTagName);
SVariable *varStatus;
varStatus = stPoint.add_field_val();
varStatus->set_edatatype(CN_DATATYPE_INT64);
varStatus->set_lvalue(nStatus);
SVariable *varValue;
varValue = stPoint.add_field_val();
varValue->set_edatatype(CN_DATATYPE_INT64);
varValue->set_lvalue(nValue);
stTsdbData.add_point()->CopyFrom(stPoint);
return;
}
//增加一个测点到TSDB队列
void CSampleThread::addOneTsdbData(iot_idl::STssInsert& stTsdbData,const string&strTagName,\
const int64 nStatus,const double fValue, const int64 nTimeStamp)
{
STsdbPoint stPoint;
stPoint.set_time_stamp(nTimeStamp);
stPoint.add_tag_val(strTagName);
SVariable *varStatus;
varStatus = stPoint.add_field_val();
varStatus->set_edatatype(CN_DATATYPE_INT64);
varStatus->set_lvalue(nStatus);
SVariable *varValue;
varValue = stPoint.add_field_val();
varValue->set_edatatype(CN_DATATYPE_DOUBLE);
varValue->set_dvalue(fValue);
//LOGINFO("addOneTsdbData:strTagName=%s nStatus=%lld fValue=%f !",strTagName.c_str(),nStatus,fValue);
stTsdbData.add_point()->CopyFrom(stPoint);
return;
}
void CSampleThread::saveAllCfgPoint()
{
int nRetCode = 0;
vector<SSampleDefine> vecSampleDefine;
nRetCode = m_ptrRdbTableMng->selectAllColumnNoCondition("sample_define",vecSampleDefine);
if(nRetCode == false)
{
LOGERROR("doEveryPoint:strTableName=sample_define 获取参数错误 !");
return;
}
iot_idl::STssInsert stAnaTsdbData;
iot_idl::STssInsert stDigTsdbData;
iot_idl::STssInsert stMixTsdbData;
iot_idl::STssInsert stAccTsdbData;
stAnaTsdbData.Clear();
stDigTsdbData.Clear();
stMixTsdbData.Clear();
stAccTsdbData.Clear();
for(size_t nLoop=0;nLoop <vecSampleDefine.size();nLoop++)
{
int nPointType = vecSampleDefine[nLoop].point_type ;//得到点类型
string strTagName = vecSampleDefine[nLoop].tag_name;
int64 nStatus = vecSampleDefine[nLoop].status;
double dValue = vecSampleDefine[nLoop].dValue ;
int nPeriod = vecSampleDefine[nLoop].sample_period; //存盘周期
int64 nTimeStamp = (getUTCTimeMsec()/(60*1000)) * 60000; //得到固定的分钟
if(nPeriod<=0) nPeriod = 1;//
int64 mMin = m_lCurTime/(60*1000) ;//转换为分钟
if( (mMin % nPeriod == 0))
{
if(POINT_TYPE_ANA == nPointType)
{
addOneTsdbData(stAnaTsdbData, strTagName,nStatus,(double)dValue,nTimeStamp);
}
else if(POINT_TYPE_DIG == nPointType)
{
addOneTsdbData(stDigTsdbData, strTagName,nStatus,(int64)dValue,nTimeStamp);
}
else if(POINT_TYPE_MIX == nPointType)
{
addOneTsdbData(stMixTsdbData, strTagName,nStatus,(int64)dValue,nTimeStamp);
}
else if(POINT_TYPE_ACC == nPointType)
{
addOneTsdbData(stAccTsdbData, strTagName,nStatus,(double)dValue,nTimeStamp);
}
}
}
addToTsdbSever(stAnaTsdbData,ANA_SAMPLE_RESULT);
addToTsdbSever(stDigTsdbData,DIG_SAMPLE_RESULT);
addToTsdbSever(stMixTsdbData,MIX_SAMPLE_RESULT);
addToTsdbSever(stAccTsdbData,ACC_SAMPLE_RESULT);
return;
}
//测点5分钟周期存盘入口
void CSampleThread::saveAllPoint()
{
int64 nTimeStamp = (getUTCTimeMsec()/(60*1000)) * 60000; //得到固定的分钟
LOCALTIME localTime = convertUTCMsecToLocalTime(nTimeStamp);
saveAllAnaPoint(nTimeStamp,localTime.wMinute);
saveAllDigPoint(nTimeStamp);
saveAllMixPoint(nTimeStamp);
saveAllAccPoint(nTimeStamp);
return;
}
//模拟量周期存盘
void CSampleThread::saveAllAnaPoint(const int64 nTimeStamp,const int &minOfHour)
{
STssInsert stTsdbData;
stTsdbData.Clear();
SAnaPointAll *pAnaPoint = NULL;
int nCount = m_ptrRdbTableMng->getTableRecordCount(RT_ANA_TBL);
for(int nIndex=0;nIndex<nCount;nIndex++)
{
pAnaPoint = (SAnaPointAll*)m_ptrRdbTableMng->getRecordAllColumnByIndex(RT_ANA_TBL,nIndex);
if(NULL != pAnaPoint)
{
//caodingfa存盘周期不等于SAMPLE_CYC_MIN时按存盘周期定时存储可以是一个小时内的一个分钟数比如0、15、30、45
if((pAnaPoint->sample_period != SAMPLE_CYC_MIN) &&
(pAnaPoint->sample_period == 0 || minOfHour % pAnaPoint->sample_period != 0))
{
continue;
}
addOneTsdbData(stTsdbData, pAnaPoint->tag_name,pAnaPoint->status,pAnaPoint->value,nTimeStamp);
}
else
{
LOGDEBUG("saveAllAnaPoint:: 获取记录失败!");
continue;
}
if( ((nIndex+1)% MAX_SAVE_POINT_NUM) == 0)
{
addToTsdbSever(stTsdbData,ANA_SAMPLE_RESULT);
stTsdbData.Clear();
}
}
addToTsdbSever(stTsdbData,ANA_SAMPLE_RESULT);
return;
}
//数字量周期存盘
void CSampleThread::saveAllDigPoint(const int64 nTimeStamp)
{
STssInsert stTsdbData;
stTsdbData.Clear();
SDigPointAll *pDigPoint = NULL;
int nCount = m_ptrRdbTableMng->getTableRecordCount(RT_DIG_TBL);
for(int nIndex=0;nIndex<nCount;nIndex++)
{
pDigPoint = (SDigPointAll*)m_ptrRdbTableMng->getRecordAllColumnByIndex(RT_DIG_TBL,nIndex);
if(NULL != pDigPoint)
{
addOneTsdbData(stTsdbData, pDigPoint->tag_name,pDigPoint->status,(int64)pDigPoint->value,nTimeStamp);
}
else
{
LOGDEBUG("saveAllDigPoint:: 获取记录失败!");
continue;
}
if( ((nIndex+1)% MAX_SAVE_POINT_NUM) == 0)
{
addToTsdbSever(stTsdbData,DIG_SAMPLE_RESULT);
stTsdbData.Clear();
}
}
addToTsdbSever(stTsdbData,DIG_SAMPLE_RESULT);
return;
}
//混合量周期存盘
void CSampleThread::saveAllMixPoint(const int64 nTimeStamp)
{
STssInsert stTsdbData;
stTsdbData.Clear();
SMixPointAll *pMixPoint = NULL;
int nCount = m_ptrRdbTableMng->getTableRecordCount(RT_MIX_TBL);
for(int nIndex=0;nIndex<nCount;nIndex++)
{
pMixPoint = (SMixPointAll*)m_ptrRdbTableMng->getRecordAllColumnByIndex(RT_MIX_TBL,nIndex);
if(NULL != pMixPoint)
{
addOneTsdbData(stTsdbData, pMixPoint->tag_name,pMixPoint->status,(int64)pMixPoint->value,nTimeStamp);
}
else
{
LOGDEBUG("saveAllMixPoint:: 获取记录失败!");
continue;
}
if( ((nIndex+1)% MAX_SAVE_POINT_NUM) == 0)
{
addToTsdbSever(stTsdbData,MIX_SAMPLE_RESULT);
stTsdbData.Clear();
}
}
addToTsdbSever(stTsdbData,MIX_SAMPLE_RESULT);
return;
}
//累计量周期存盘
void CSampleThread::saveAllAccPoint(const int64 nTimeStamp)
{
STssInsert stTsdbData;
stTsdbData.Clear();
SAccPointAll *pAccPoint = NULL;
int nCount = m_ptrRdbTableMng->getTableRecordCount(RT_ACC_TBL);
for(int nIndex=0;nIndex<nCount;nIndex++)
{
pAccPoint = (SAccPointAll*)m_ptrRdbTableMng->getRecordAllColumnByIndex(RT_ACC_TBL,nIndex);
if(NULL != pAccPoint)
{
addOneTsdbData(stTsdbData, pAccPoint->tag_name,pAccPoint->status,pAccPoint->value,nTimeStamp);
}
else
{
LOGDEBUG("saveAllAccPoint:: 获取记录失败!");
continue;
}
if( ((nIndex+1)% MAX_SAVE_POINT_NUM) == 0)
{
addToTsdbSever(stTsdbData,ACC_SAMPLE_RESULT);
stTsdbData.Clear();
}
}
addToTsdbSever(stTsdbData,ACC_SAMPLE_RESULT);
return;
}

View File

@ -0,0 +1,56 @@
/**
@file MainWorkThread.h
@brief
@author
*/
#pragma once
#include "boost/thread.hpp"
#include "pub_utility_api/TimerThreadBase.h"
#include <vector>
#include <string>
//工作线程实现类
//===============================================================
#include "pub_sysinfo_api/SysInfoApi.h"
#include "sample_server_api/SampleDefine.h"
#include "rdb_api/RdbTableMng.h"
#include "TsdbSaveMessage.pb.h"
namespace iot_dbms
{
class CSampleThread : public iot_public::CTimerThreadBase
{
public:
CSampleThread(iot_public::SRunAppInfo &stRunAppInfo);
virtual ~CSampleThread();
virtual void execute()override;//业务处理函数,必须继承实现自己的业务逻辑
bool initialize(); // 初始化工作,得到定义的采样时间
void redundantSwitch( bool bMaster);
private:
//全数据存盘
void saveAllCfgPoint(); //保存所有配置的点全数据
void saveAllPoint(); //存盘全数据
void saveAllAnaPoint(const int64 uTimeStamp,const int &minOfHour);
void saveAllDigPoint(const int64 uTimeStamp);
void saveAllAccPoint(const int64 uTimeStamp);
void saveAllMixPoint(const int64 uTimeStamp);
void addOneTsdbData(iot_idl::STssInsert& stTsdbData,const std::string&strKeyIdTag,\
const int64 nStatus,const int64 nValue, const int64 uTimeStamp);
void addOneTsdbData(iot_idl::STssInsert& stTsdbData,const std::string&strKeyIdTag,\
const int64 nStatus,const double fValue, const int64 uTimeStamp);
void addToTsdbSever(iot_idl::STssInsert&stTsdbData,const std::string&strTable=""); //提交数据到 tsdb Server
private:
volatile bool m_bMaster;
int64 m_lCurTime;
int64 m_lLastTime;
iot_public::SRunAppInfo m_stRunAppInfo;
iot_dbms::CRdbTableMngPtr m_ptrRdbTableMng;
};
typedef boost::shared_ptr<CSampleThread> CSampleThreadPtr;
}

View File

@ -0,0 +1,80 @@

/******************************************************************************//**
* @file TsdbSaveCommon.cpp
* @brief
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#include "boost/filesystem.hpp"
#include "boost/lexical_cast.hpp"
//< 屏蔽Protobuff编译报警
#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#endif
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable: 4100)
#endif
#include "Public.pb.h"
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif
#ifdef _MSC_VER
#pragma warning(pop)
#endif
#include "pub_logger_api/logger.h"
#include "TsdbSaveCommon.h"
namespace iot_dbms
{
std::string toInfluxString(const iot_idl::SVariable &objInput)
{
//< 经测试lexical_cast比format快几十倍
std::string strOutput;
switch (objInput.edatatype())
{
case iot_idl::DataType::CN_DATATYPE_BOOL:
strOutput = boost::lexical_cast<std::string>(objInput.bvalue());
break;
case iot_idl::DataType::CN_DATATYPE_DOUBLE:
strOutput = boost::lexical_cast<std::string>(objInput.dvalue());
break;
case iot_idl::DataType::CN_DATATYPE_FLOAT:
strOutput = boost::lexical_cast<std::string>(objInput.fvalue());
break;
case iot_idl::DataType::CN_DATATYPE_INT32:
strOutput = boost::lexical_cast<std::string>(objInput.nvalue()) + "i";
break;
case iot_idl::DataType::CN_DATATYPE_INT64:
strOutput = boost::lexical_cast<std::string>(objInput.lvalue()) + "i";
break;
case iot_idl::DataType::CN_DATATYPE_STRING:
strOutput = objInput.strvalue();
break;
case iot_idl::DataType::CN_DATATYPE_UINT32:
strOutput = boost::lexical_cast<std::string>(objInput.uvalue()) + "i";
break;
case iot_idl::DataType::CN_DATATYPE_UINT64:
strOutput = boost::lexical_cast<std::string>(objInput.ulvalue()) + "i";
break;
default:
LOGERROR("toInfluxString(): Unknow DataType !");
break;
}
return strOutput;
}
} //< namespace iot_dbms

View File

@ -0,0 +1,36 @@

/******************************************************************************//**
* @file TsdbSaveCommon.h
* @brief
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#pragma once
#include <string>
#include "boost/cstdint.hpp"
#include "boost/shared_ptr.hpp"
typedef boost::shared_ptr<std::string> StdStringPtr;
namespace iot_idl
{
class SVariable;
}
namespace iot_dbms
{
#ifdef OS_WINDOWS
#define DATA_DIRECTORY "..\\..\\data\\cache\\tsdb_save_cache\\"
#else
#define DATA_DIRECTORY "../../data/cache/tsdb_save_cache/"
#endif
std::string toInfluxString(const iot_idl::SVariable &objInput);
} //< namespace iot_dbms

View File

@ -0,0 +1,49 @@
#本服务与tsdb_save的主要区别是本服务只处理本应用的数据包括接收变化数据和定时断面
QT -= gui core
CONFIG -= qt
CONFIG += c++11 console
CONFIG -= app_bundle
TEMPLATE = app
TARGET = tsdb_local_save
LIBS += -lboost_chrono -lboost_system -lboost_program_options -lboost_filesystem -lboost_date_time \
-lprotobuf -llog4cplus \
-lpub_logger_api -lpub_utility_api -lpub_sysinfo_api \
-lsys_proc_mng_api -lsys_node_mng_api \
-lnet_msg_bus_api -ltsdb_api \
-lrdb_api
HEADERS += $$PWD/../../idl_files/TsdbSaveMessage.pb.h \
$$PWD/../../idl_files/Public.pb.h \
CFrontThread.h \
CNodeMng.h \
CNodeThread.h \
CTsdbSaveRedunSw.h \
CTsdbSaveSrv.h \
TsdbSaveCommon.h \
SampleThread.h
SOURCES += Main.cpp \
# $$PWD/../../idl_files/TsdbSaveMessage.pb.cc \
# $$PWD/../../idl_files/Public.pb.cc \
CFrontThread.cpp \
CNodeMng.cpp \
CNodeThread.cpp \
CTsdbSaveRedunSw.cpp \
CTsdbSaveSrv.cpp \
TsdbSaveCommon.cpp \
SampleThread.cpp
include($$PWD/../../idl_files/idl_files.pri)
#-------------------------------------------------------------------
COMMON_PRI=$$PWD/../../common.pri
exists($$COMMON_PRI) {
include($$COMMON_PRI)
}else {
error("FATAL error: can not find common.pri")
}