From 50296a077f894df35712e796f3df101a908bb7a3 Mon Sep 17 00:00:00 2001 From: shi_jq Date: Thu, 13 Mar 2025 10:46:26 +0800 Subject: [PATCH] =?UTF-8?q?[ref]=E5=90=8C=E6=AD=A5711?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../db_manager/CCreateInfluxDBUserDlg.cpp | 56 +++ .../dbms/db_manager/CCreateInfluxDBUserDlg.h | 39 ++ .../dbms/db_manager/CCreateInfluxDBUserDlg.ui | 109 +++++ .../src/dbms/db_studio/icons/db_studio.ico | Bin 0 -> 15086 bytes .../src/dbms/rdb_studio/icons/rdb_studio.ico | Bin 0 -> 15086 bytes platform/src/dbms/tsdb_etl/ETLCommon.cpp | 35 ++ platform/src/dbms/tsdb_etl/ETLCommon.h | 26 + platform/src/dbms/tsdb_etl/ETLServer.cpp | 357 ++++++++++++++ platform/src/dbms/tsdb_etl/ETLServer.h | 51 ++ platform/src/dbms/tsdb_etl/MainLinux.cpp | 213 +++++++++ platform/src/dbms/tsdb_etl/MainWindows.cpp | 344 ++++++++++++++ platform/src/dbms/tsdb_etl/tsdb_etl.pro | 40 ++ .../src/dbms/tsdb_local_save/CFrontThread.cpp | 447 ++++++++++++++++++ .../src/dbms/tsdb_local_save/CFrontThread.h | 76 +++ .../src/dbms/tsdb_local_save/CNodeMng.cpp | 383 +++++++++++++++ platform/src/dbms/tsdb_local_save/CNodeMng.h | 82 ++++ .../src/dbms/tsdb_local_save/CNodeThread.cpp | 433 +++++++++++++++++ .../src/dbms/tsdb_local_save/CNodeThread.h | 97 ++++ .../dbms/tsdb_local_save/CTsdbSaveRedunSw.cpp | 82 ++++ .../dbms/tsdb_local_save/CTsdbSaveRedunSw.h | 45 ++ .../src/dbms/tsdb_local_save/CTsdbSaveSrv.cpp | 372 +++++++++++++++ .../src/dbms/tsdb_local_save/CTsdbSaveSrv.h | 66 +++ platform/src/dbms/tsdb_local_save/Main.cpp | 17 + .../src/dbms/tsdb_local_save/SampleThread.cpp | 390 +++++++++++++++ .../src/dbms/tsdb_local_save/SampleThread.h | 56 +++ .../dbms/tsdb_local_save/TsdbSaveCommon.cpp | 80 ++++ .../src/dbms/tsdb_local_save/TsdbSaveCommon.h | 36 ++ .../dbms/tsdb_local_save/tsdb_local_save.pro | 49 ++ 28 files changed, 3981 insertions(+) create mode 100644 platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.cpp create mode 100644 platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.h create mode 100644 platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.ui create mode 100644 platform/src/dbms/db_studio/icons/db_studio.ico create mode 100644 platform/src/dbms/rdb_studio/icons/rdb_studio.ico create mode 100644 platform/src/dbms/tsdb_etl/ETLCommon.cpp create mode 100644 platform/src/dbms/tsdb_etl/ETLCommon.h create mode 100644 platform/src/dbms/tsdb_etl/ETLServer.cpp create mode 100644 platform/src/dbms/tsdb_etl/ETLServer.h create mode 100644 platform/src/dbms/tsdb_etl/MainLinux.cpp create mode 100644 platform/src/dbms/tsdb_etl/MainWindows.cpp create mode 100644 platform/src/dbms/tsdb_etl/tsdb_etl.pro create mode 100644 platform/src/dbms/tsdb_local_save/CFrontThread.cpp create mode 100644 platform/src/dbms/tsdb_local_save/CFrontThread.h create mode 100644 platform/src/dbms/tsdb_local_save/CNodeMng.cpp create mode 100644 platform/src/dbms/tsdb_local_save/CNodeMng.h create mode 100644 platform/src/dbms/tsdb_local_save/CNodeThread.cpp create mode 100644 platform/src/dbms/tsdb_local_save/CNodeThread.h create mode 100644 platform/src/dbms/tsdb_local_save/CTsdbSaveRedunSw.cpp create mode 100644 platform/src/dbms/tsdb_local_save/CTsdbSaveRedunSw.h create mode 100644 platform/src/dbms/tsdb_local_save/CTsdbSaveSrv.cpp create mode 100644 platform/src/dbms/tsdb_local_save/CTsdbSaveSrv.h create mode 100644 platform/src/dbms/tsdb_local_save/Main.cpp create mode 100644 platform/src/dbms/tsdb_local_save/SampleThread.cpp create mode 100644 platform/src/dbms/tsdb_local_save/SampleThread.h create mode 100644 platform/src/dbms/tsdb_local_save/TsdbSaveCommon.cpp create mode 100644 platform/src/dbms/tsdb_local_save/TsdbSaveCommon.h create mode 100644 platform/src/dbms/tsdb_local_save/tsdb_local_save.pro diff --git a/platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.cpp b/platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.cpp new file mode 100644 index 00000000..5c9fe65b --- /dev/null +++ b/platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.cpp @@ -0,0 +1,56 @@ +#include "CCreateInfluxDBUserDlg.h" +#include "ui_CCreateInfluxDBUserDlg.h" + +CCreateInfluxDBUserDlg::CCreateInfluxDBUserDlg(QWidget *parent) : + CustomUiDialog(parent), + ui(new Ui::CCreateInfluxDBUserDlg) +{ + ui->setupUi(this); + connect(ui->confirmBtn , &QPushButton::clicked , this , &CCreateInfluxDBUserDlg::accept); + setAutoLayout(true); +} + +CCreateInfluxDBUserDlg::~CCreateInfluxDBUserDlg() +{ + delete ui; +} + +QString CCreateInfluxDBUserDlg::getUserInfo() +{ + return ui->lineEdit_user->text(); +} + +QString CCreateInfluxDBUserDlg::getPassWordInfo() +{ + return ui->lineEdit_password->text(); +} + +QString CCreateInfluxDBUserDlg::getIP() +{ + return ui->lineEdit_ip->text(); +} + +QString CCreateInfluxDBUserDlg::getAdminUser() +{ + return ui->lineEdit_admin->text(); +} + +void CCreateInfluxDBUserDlg::setIP(QString ipAddress) +{ + ui->lineEdit_ip->setText(ipAddress); +} + +void CCreateInfluxDBUserDlg::setAdminUser(QString admin) +{ + ui->lineEdit_admin->setText(admin); +} + +void CCreateInfluxDBUserDlg::setUserInfo(QString userInfo) +{ + ui->lineEdit_user->setText(userInfo); +} + +void CCreateInfluxDBUserDlg::setPassWord(QString passWord) +{ + ui->lineEdit_password->setText(passWord); +} diff --git a/platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.h b/platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.h new file mode 100644 index 00000000..65842ea3 --- /dev/null +++ b/platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.h @@ -0,0 +1,39 @@ +#ifndef CCREATEINFLUXDBUSERDLG_H +#define CCREATEINFLUXDBUSERDLG_H + +#include +#include "pub_widget/CustomDialog.h" + +namespace Ui { +class CCreateInfluxDBUserDlg; +} + +class CCreateInfluxDBUserDlg : public CustomUiDialog +{ + Q_OBJECT + +public: + explicit CCreateInfluxDBUserDlg(QWidget *parent = 0); + ~CCreateInfluxDBUserDlg(); + + QString getUserInfo(); + + QString getPassWordInfo(); + + QString getIP(); + + QString getAdminUser(); + + void setIP(QString ipAddress); + + void setAdminUser(QString admin); + + void setUserInfo(QString userInfo); + + void setPassWord(QString passWord); + +private: + Ui::CCreateInfluxDBUserDlg *ui; +}; + +#endif // CCREATEINFLUXDBUSERDLG_H diff --git a/platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.ui b/platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.ui new file mode 100644 index 00000000..d5cb7e0f --- /dev/null +++ b/platform/src/dbms/db_manager/CCreateInfluxDBUserDlg.ui @@ -0,0 +1,109 @@ + + + CCreateInfluxDBUserDlg + + + + 0 + 0 + 378 + 161 + + + + Dialog + + + + 6 + + + 6 + + + 6 + + + 6 + + + 9 + + + + + + + + + + + + + + 新建用户: + + + + + + + 管理员用户: + + + + + + + true + + + + + + + + + + IP地址: + + + + + + + 管理员密码: + + + + + + + QLineEdit::Password + + + Qt::LogicalMoveStyle + + + + + + + 确认 + + + + + + + + + 请输入相关用户信息: + + + + + + + + diff --git a/platform/src/dbms/db_studio/icons/db_studio.ico b/platform/src/dbms/db_studio/icons/db_studio.ico new file mode 100644 index 0000000000000000000000000000000000000000..9dcb1b9b102e563e023c0dbe2b50dc0ee71287fc GIT binary patch literal 15086 zcmeI2d3=@Cna9t~LdbgG_a%WKNWdyu)M;?v+Mu{kr4_Y`I7Y#Xi(Pqbxn$AYQi8qoDI;t1?$ZT9V#zev{?cCyM$U6dD z4I03|(N1;a!L{IQa3Q#gww1urpiEzH5?BnDf;nI&?+x03qrLw`?D4v#`oXxnj2=G|rBb?_sQ-9L}A#$@Sczsod=2C>iP(bC0y zAbZNZQk`%)_*pdQRYXITCKfE;@;->&+x6h>MVot&=UeKNc_`E0rJr{lSO(?*=G#4n z^kl%8tld0JdUh=2pt~*WM|9FOsQ0Ani@2AV50oLBKf}7;G#YkHN662* z(HyyXZOGT2UTghz_~IR7TJw#WC-~AWlmZ`hxE^DQQmCQroeOA3xwXca-ek(pOyu=6 zApc~at%sMY+x6S82IBbJPWchmI%|3Y>A^tj9e?Q_8x8vlHK);lW7;mUwo-*XV_R;` z>P6)y5q7bE>l=C2h#6)?CUA84)lP}!Di|oiH7|65l40>KJpvS zfe*kRDW40pb~?fPfSJI`fl%kh9~x|{sISkRL%(|UOqdTeQi^$9F@= z`z6r4Oo=o*1=^RhAKt;=?tveX&W?meQN~|jTOC0c`L*X^OF{2w=4X^_3G7Z=_BxGS zbENm~Sn~3@-Jk=^g-$W)nczbZq1^)dR)|LCfi5wGH#^;CMl?_?o7#`D$Y(wQ65y|Z zd328?mAn^%kHP)aoe&9CXm7oPviM#@Ub4Ln&O=wyyJUD8{DnL;w&AaRNHSgvZU?d% zmEQ!vqWvw*?d7bG-6%`m8%W`$F=K~8*P;9Z2q$khls%U`I`uO>N@78+S??k6B6Ww7 zDjtx&u5AlcX`LdEdnI-6!tXnz8TJ~BApT#1mBdH~$6Nf^PXB8aIvS7W;lF|AI!p}c zF#mPrrLXq@>t6bHCotDFq?*T#;5q0|0{!vV*w1C&k;i4fa(w8#guMD6&by=IVaJp0 zbcUTgQr4b3lll#$7q>T$()!z%Jif<0ADPCsg=Nd$^+38fJx(9ip>GH6uZ#YYts7fn zZKmcft+xv5p98^-fihn_ql{z^O8H`d;1;~M7O#+kFxQ7L@<@VppY&)VFp zBQVOxn#}Inm(SwgXMgda>_&EOeYSl1K42O=WN+Um{{!apG}0r{!-Po4DOroZZpUv% zgQfUQ!KSQ>U{G*|LDrltc6`-~ADbO7CrqfSI zKL%Yuz74~6H0;VB;G^8zkgXKtvh6Ep@z?%+9_0&2Uk0B6$@D6C5G(}OG49Ku zi*9Y~T~B&1kT1}hkAmA+C$*&7i(jO@3;OuwX43r6r>(}b7HFKW!Gm>cBcChofaNH* z7;R!b1l>dEzklQmxrI`GZMK*OKj%N%hqog}apY$l(2+ z{A}dd1HV-o50$ZJZL$5;w;8kHF77x2oEw8~IqfEro=%@%(f1?JL4HPeSp5bbd$v^C zlUcXy5uW_{Y9M>i{;qX=2lVGalXb5++r65)d7v3Q1Jq8w2|vXcg5Ecx%{#V!Bn$mb zG(5^j|FPuH0e1oUpZ9>|(fSmhFVaw50!T*1WZ022a(nicJ?8D@Wi!O!_!dY0hkaD% z9CsExh&_ofv=;<*x8>}Py_Ln-W4O229?4zrax{=kKVSJ?sO$Banl7L5x7IlR6<;|; zs|rkhtHrSwwjO1suh?YrUn+3sw-y!qTZ?E{XZ$OY^J-R^3H2`+Q`ai^=hd|)d+Jsu zH`T2IN>?O%>bq)8eUCB8UX!0p8h?guQum>?U^eKxr`VIiu4k79{*N^fZ7eg9hEiWF zP*F@AS%sge!S754`*P+R&pmve?rU-%_5#I3+7rel*h>{7XP#)XlK7O^CggrCv822J zWL^zR+8M0m-oKPPD@XgD&gi!Qe7LJKNh{F4xQ6mg++nfLZ;@Z|Q`axKYfkOmA=>>H z?R4&*)V|0wO%EKYaZRIM`weGncP`}`7Bu%=Tj=cQKsuI`u;;M1f3@6Dc(~In~_)Xw8kNwuLRF>wtk$p_o35S+@Tf1+p^Mr3I3Hf znX|=Hw7VX>NuB6v92(EZ=;#r~`BU<5fX(1l&RkQn&(wp-Z(|MaNJy`xe+S6O%oyDN z01M!A9`%PYzVY#3xqm}*aY6EJW8PS-IALoJx-$UMujsr?-*GQqRHV8?K_hifgS(;2 zy?|Tw(vq=S2PaY|okid=6>Lp(fKr?}2Oj;y^i?dvT)K?Gt;Zga*Qs8&xI9lbG1LcI zlIJXFY0h4VhbzhfYssRR7`mGIE?d)*iUDPJfY`&vU8_0ojRS+SQ#||d!lT9Oa{9`? z|AyQx(Aql|DlhEF`o=LRv2=rV_v+iA)xYdV_P2a}OJO0fEjpUY4af&R<@LZ*sHLsu zUUPo@o2>utLB=2(1EcBl1YoY+Cpart_IIyf8ExlhBWHAw&su^ z7wTxyS;8BO{ai$S1GpKzod}AxR%4+GkGkdRgB&k{3-BTPtzTME0Q%)jU+I1y`n(S0 z1Ad%+7p6PccFwq@tWCZj1s(3noMY&7D)T>!c(W!Qrz=)3fv47w;$xkiBu4~H?NBTq zw0EeiCC#y27rfWqxH}(|$L|@#9N2q5o!j+Ju{8e4-h*p=+l&GJ$nWh(yS3m8z*#`| z`g{uxRTLm6cOmYH;AZrO-?MkHACoF>--yiI-DmDrbXVv74-jwU_MX7br+pcmsr!#n z$e}$$KIbL!O@Q@l?Oo%!641^YLH#*EbEosZbg&Fr_~x;M|2W+vi4N&e)S)w%yG8E^=B4DdIn}ajoj;GI?rjjI{=i6s zXOgS!z3?3BG^XbwAwTt8ff$<-6#s^LomE&<=gr`*PXCOI^qYWGli<@DcbE z{%MSSl%6Peo!IH2)N8J>ZF^tVCwC^3Bfs>k`z4K8bHSc&zj02-cUD2KIMGy^|K5CK z*4?R(ja!~HowWXC6YlwuJ=e$wQ_k(SO_}zzbN`99@_&-&dCJ#;apKE8 zHTG}sQDu8t-*hC@$@N$?;2*=<*WK`e)U^Wb36e|bL55$jzU`e5<2;By zx@*uot-E&gc=p0owI)8e{?VIz3o)lpacAxmX{q4ewcHm8m&=b|QbwetgsN%XSqbZAPy*#`$ir=3YmBLH-P1W4~eQJI5(}YbuR4m6+~E`8&lF zR`ynW!|{BVz;4|_kSmTY=jdNHf7w?4U$)!q>i*&Y+Mdh%Ti~sE|E<9HghL~Nt;f3l z4rAJz@jIz+II`d86G5j8^eclZ(Yvr4ZFO(^A^zdJ9gX-XV(INY{m6f*-H-77?4y&g z!%L9;acIh}WfPfsSNd0Ubh7+^?vq}#N7IH_zBSxjrw2n z{xYzfdhsL{a9HaD+goNLa`d0E4e46@mtxZWq3hTGZOCW34VrecmcdA5JBxm@Y0ab7 z!e)SfaeoM|BQO1^PGjl?&*8IAN-QbO?`$a>lutkMe+thKwzx-PDKY@*Ci?FF z#sdAf%^iNe2kCiF<1GZX)H0PC}gx>CM9 z=YwprtYu;#{of_vLDwsaG*{$U9eZ%D35cZ}~R^Jbqr_;eu zMYrwxk^c&8pZ~7tKOXjfL*gslegcrqeVTM3xPtr);u^)#hhsAbQa&IB?;z}J8kj}9 zdEiN)v1zQLrI_sjVy3CcFCR<1IxO<5^8I0ye>t?j)!D>%_kLp5p~${TefVeSYQ9OuxC@* z;cxMu6~h@nWb@19BLVKjno0{<%i6od_YL;R$sp5YNNKbgzwltS;wa8^)<5@<|52pb z9g+LL$RQCNLL>O_f3+&bBf2|D0DQIE5DS+Tf+0}Yx=ClS!hq4xNv|bW{&_Cv?NOcl zPeAd{a6HhIPhj7$|2eycy!IJ%@4iZ$qO;ddvyq>~_9jA0=L^L_+AHy+?4kVUVSQt1 zLB}9>!Tm8$-eAn~hbHOnp$t0vLGL2yzL@5rJ@OS|yrWhv8pru}$80B?pU$9bxg+2n zV|Qr&9J~u;mx^OJkGTuse;)b6xQiUEd*=7U?5_(`_gMOV5eKh?Mw&=d0efzR?)Piy zqyK+hOTT--TCf?2uFjR5>%0R^N79J@a6DSKpcHvefVTEX#gN%NW0YU!{Zq(w7b!8R z%YSOzCa^5s=l7(41Z_Zi$n=-o@|7#NH_>+&?qG&T?(DHcpSybIBH)$7<7j+pkUHkT z-v4Fq`I2WaNOp~3HF41WjP(rSpi%rEn{_b!vS*J&`FM+K5=%$(`>%pBxBg)_>;Xan&GcRFW!JHJkI<=fG6vyitUb|p8Jjl`XF_f(H8P&%*LDPU{pHr2Jy zlXjWvt5z-`_4U@~t6bM>-*?sb+V)+W#;5vs)hy_RUbm^awOVvdjpf&6l9qp$Kb_Vk zTYX91CzD^iL)zVAidHIZH9ni7@acL_ns3wkR-5{hn`~N_Oxl$3&)0j>mFk~PRkro@ usdhaz$xTJP?=4F9_(?aJ8%+)ASJ1B`MJiLTw9Eb>%Ot;|T4noeoBl6r9b^Ll literal 0 HcmV?d00001 diff --git a/platform/src/dbms/rdb_studio/icons/rdb_studio.ico b/platform/src/dbms/rdb_studio/icons/rdb_studio.ico new file mode 100644 index 0000000000000000000000000000000000000000..9dcb1b9b102e563e023c0dbe2b50dc0ee71287fc GIT binary patch literal 15086 zcmeI2d3=@Cna9t~LdbgG_a%WKNWdyu)M;?v+Mu{kr4_Y`I7Y#Xi(Pqbxn$AYQi8qoDI;t1?$ZT9V#zev{?cCyM$U6dD z4I03|(N1;a!L{IQa3Q#gww1urpiEzH5?BnDf;nI&?+x03qrLw`?D4v#`oXxnj2=G|rBb?_sQ-9L}A#$@Sczsod=2C>iP(bC0y zAbZNZQk`%)_*pdQRYXITCKfE;@;->&+x6h>MVot&=UeKNc_`E0rJr{lSO(?*=G#4n z^kl%8tld0JdUh=2pt~*WM|9FOsQ0Ani@2AV50oLBKf}7;G#YkHN662* z(HyyXZOGT2UTghz_~IR7TJw#WC-~AWlmZ`hxE^DQQmCQroeOA3xwXca-ek(pOyu=6 zApc~at%sMY+x6S82IBbJPWchmI%|3Y>A^tj9e?Q_8x8vlHK);lW7;mUwo-*XV_R;` z>P6)y5q7bE>l=C2h#6)?CUA84)lP}!Di|oiH7|65l40>KJpvS zfe*kRDW40pb~?fPfSJI`fl%kh9~x|{sISkRL%(|UOqdTeQi^$9F@= z`z6r4Oo=o*1=^RhAKt;=?tveX&W?meQN~|jTOC0c`L*X^OF{2w=4X^_3G7Z=_BxGS zbENm~Sn~3@-Jk=^g-$W)nczbZq1^)dR)|LCfi5wGH#^;CMl?_?o7#`D$Y(wQ65y|Z zd328?mAn^%kHP)aoe&9CXm7oPviM#@Ub4Ln&O=wyyJUD8{DnL;w&AaRNHSgvZU?d% zmEQ!vqWvw*?d7bG-6%`m8%W`$F=K~8*P;9Z2q$khls%U`I`uO>N@78+S??k6B6Ww7 zDjtx&u5AlcX`LdEdnI-6!tXnz8TJ~BApT#1mBdH~$6Nf^PXB8aIvS7W;lF|AI!p}c zF#mPrrLXq@>t6bHCotDFq?*T#;5q0|0{!vV*w1C&k;i4fa(w8#guMD6&by=IVaJp0 zbcUTgQr4b3lll#$7q>T$()!z%Jif<0ADPCsg=Nd$^+38fJx(9ip>GH6uZ#YYts7fn zZKmcft+xv5p98^-fihn_ql{z^O8H`d;1;~M7O#+kFxQ7L@<@VppY&)VFp zBQVOxn#}Inm(SwgXMgda>_&EOeYSl1K42O=WN+Um{{!apG}0r{!-Po4DOroZZpUv% zgQfUQ!KSQ>U{G*|LDrltc6`-~ADbO7CrqfSI zKL%Yuz74~6H0;VB;G^8zkgXKtvh6Ep@z?%+9_0&2Uk0B6$@D6C5G(}OG49Ku zi*9Y~T~B&1kT1}hkAmA+C$*&7i(jO@3;OuwX43r6r>(}b7HFKW!Gm>cBcChofaNH* z7;R!b1l>dEzklQmxrI`GZMK*OKj%N%hqog}apY$l(2+ z{A}dd1HV-o50$ZJZL$5;w;8kHF77x2oEw8~IqfEro=%@%(f1?JL4HPeSp5bbd$v^C zlUcXy5uW_{Y9M>i{;qX=2lVGalXb5++r65)d7v3Q1Jq8w2|vXcg5Ecx%{#V!Bn$mb zG(5^j|FPuH0e1oUpZ9>|(fSmhFVaw50!T*1WZ022a(nicJ?8D@Wi!O!_!dY0hkaD% z9CsExh&_ofv=;<*x8>}Py_Ln-W4O229?4zrax{=kKVSJ?sO$Banl7L5x7IlR6<;|; zs|rkhtHrSwwjO1suh?YrUn+3sw-y!qTZ?E{XZ$OY^J-R^3H2`+Q`ai^=hd|)d+Jsu zH`T2IN>?O%>bq)8eUCB8UX!0p8h?guQum>?U^eKxr`VIiu4k79{*N^fZ7eg9hEiWF zP*F@AS%sge!S754`*P+R&pmve?rU-%_5#I3+7rel*h>{7XP#)XlK7O^CggrCv822J zWL^zR+8M0m-oKPPD@XgD&gi!Qe7LJKNh{F4xQ6mg++nfLZ;@Z|Q`axKYfkOmA=>>H z?R4&*)V|0wO%EKYaZRIM`weGncP`}`7Bu%=Tj=cQKsuI`u;;M1f3@6Dc(~In~_)Xw8kNwuLRF>wtk$p_o35S+@Tf1+p^Mr3I3Hf znX|=Hw7VX>NuB6v92(EZ=;#r~`BU<5fX(1l&RkQn&(wp-Z(|MaNJy`xe+S6O%oyDN z01M!A9`%PYzVY#3xqm}*aY6EJW8PS-IALoJx-$UMujsr?-*GQqRHV8?K_hifgS(;2 zy?|Tw(vq=S2PaY|okid=6>Lp(fKr?}2Oj;y^i?dvT)K?Gt;Zga*Qs8&xI9lbG1LcI zlIJXFY0h4VhbzhfYssRR7`mGIE?d)*iUDPJfY`&vU8_0ojRS+SQ#||d!lT9Oa{9`? z|AyQx(Aql|DlhEF`o=LRv2=rV_v+iA)xYdV_P2a}OJO0fEjpUY4af&R<@LZ*sHLsu zUUPo@o2>utLB=2(1EcBl1YoY+Cpart_IIyf8ExlhBWHAw&su^ z7wTxyS;8BO{ai$S1GpKzod}AxR%4+GkGkdRgB&k{3-BTPtzTME0Q%)jU+I1y`n(S0 z1Ad%+7p6PccFwq@tWCZj1s(3noMY&7D)T>!c(W!Qrz=)3fv47w;$xkiBu4~H?NBTq zw0EeiCC#y27rfWqxH}(|$L|@#9N2q5o!j+Ju{8e4-h*p=+l&GJ$nWh(yS3m8z*#`| z`g{uxRTLm6cOmYH;AZrO-?MkHACoF>--yiI-DmDrbXVv74-jwU_MX7br+pcmsr!#n z$e}$$KIbL!O@Q@l?Oo%!641^YLH#*EbEosZbg&Fr_~x;M|2W+vi4N&e)S)w%yG8E^=B4DdIn}ajoj;GI?rjjI{=i6s zXOgS!z3?3BG^XbwAwTt8ff$<-6#s^LomE&<=gr`*PXCOI^qYWGli<@DcbE z{%MSSl%6Peo!IH2)N8J>ZF^tVCwC^3Bfs>k`z4K8bHSc&zj02-cUD2KIMGy^|K5CK z*4?R(ja!~HowWXC6YlwuJ=e$wQ_k(SO_}zzbN`99@_&-&dCJ#;apKE8 zHTG}sQDu8t-*hC@$@N$?;2*=<*WK`e)U^Wb36e|bL55$jzU`e5<2;By zx@*uot-E&gc=p0owI)8e{?VIz3o)lpacAxmX{q4ewcHm8m&=b|QbwetgsN%XSqbZAPy*#`$ir=3YmBLH-P1W4~eQJI5(}YbuR4m6+~E`8&lF zR`ynW!|{BVz;4|_kSmTY=jdNHf7w?4U$)!q>i*&Y+Mdh%Ti~sE|E<9HghL~Nt;f3l z4rAJz@jIz+II`d86G5j8^eclZ(Yvr4ZFO(^A^zdJ9gX-XV(INY{m6f*-H-77?4y&g z!%L9;acIh}WfPfsSNd0Ubh7+^?vq}#N7IH_zBSxjrw2n z{xYzfdhsL{a9HaD+goNLa`d0E4e46@mtxZWq3hTGZOCW34VrecmcdA5JBxm@Y0ab7 z!e)SfaeoM|BQO1^PGjl?&*8IAN-QbO?`$a>lutkMe+thKwzx-PDKY@*Ci?FF z#sdAf%^iNe2kCiF<1GZX)H0PC}gx>CM9 z=YwprtYu;#{of_vLDwsaG*{$U9eZ%D35cZ}~R^Jbqr_;eu zMYrwxk^c&8pZ~7tKOXjfL*gslegcrqeVTM3xPtr);u^)#hhsAbQa&IB?;z}J8kj}9 zdEiN)v1zQLrI_sjVy3CcFCR<1IxO<5^8I0ye>t?j)!D>%_kLp5p~${TefVeSYQ9OuxC@* z;cxMu6~h@nWb@19BLVKjno0{<%i6od_YL;R$sp5YNNKbgzwltS;wa8^)<5@<|52pb z9g+LL$RQCNLL>O_f3+&bBf2|D0DQIE5DS+Tf+0}Yx=ClS!hq4xNv|bW{&_Cv?NOcl zPeAd{a6HhIPhj7$|2eycy!IJ%@4iZ$qO;ddvyq>~_9jA0=L^L_+AHy+?4kVUVSQt1 zLB}9>!Tm8$-eAn~hbHOnp$t0vLGL2yzL@5rJ@OS|yrWhv8pru}$80B?pU$9bxg+2n zV|Qr&9J~u;mx^OJkGTuse;)b6xQiUEd*=7U?5_(`_gMOV5eKh?Mw&=d0efzR?)Piy zqyK+hOTT--TCf?2uFjR5>%0R^N79J@a6DSKpcHvefVTEX#gN%NW0YU!{Zq(w7b!8R z%YSOzCa^5s=l7(41Z_Zi$n=-o@|7#NH_>+&?qG&T?(DHcpSybIBH)$7<7j+pkUHkT z-v4Fq`I2WaNOp~3HF41WjP(rSpi%rEn{_b!vS*J&`FM+K5=%$(`>%pBxBg)_>;Xan&GcRFW!JHJkI<=fG6vyitUb|p8Jjl`XF_f(H8P&%*LDPU{pHr2Jy zlXjWvt5z-`_4U@~t6bM>-*?sb+V)+W#;5vs)hy_RUbm^awOVvdjpf&6l9qp$Kb_Vk zTYX91CzD^iL)zVAidHIZH9ni7@acL_ns3wkR-5{hn`~N_Oxl$3&)0j>mFk~PRkro@ usdhaz$xTJP?=4F9_(?aJ8%+)ASJ1B`MJiLTw9Eb>%Ot;|T4noeoBl6r9b^Ll literal 0 HcmV?d00001 diff --git a/platform/src/dbms/tsdb_etl/ETLCommon.cpp b/platform/src/dbms/tsdb_etl/ETLCommon.cpp new file mode 100644 index 00000000..5195a22f --- /dev/null +++ b/platform/src/dbms/tsdb_etl/ETLCommon.cpp @@ -0,0 +1,35 @@ + +/********************************************************************************* +* @file ETLCommon.cpp +* @brief 时序数据抽取通用定义文件 +* @author caodingfa +* @version 1.0 +* @date +**********************************************************************************/ +#ifdef WIN32 +#include +#else +#include +#include + +#endif + +#include "ETLCommon.h" + +namespace iot_dbms +{ +//< 是否需要退出程序 +bool g_bNeedExit = false; + + +/////////////////////////////////////////////////////////////////////////////////////// + +void printHelp() +{ + printf("Usage:\n"); + printf(" -r \t Register as system service. \n"); + printf(" -u \t Unregister system service. \n"); + printf(" -s \t Run as system service. \n"); +} + +} //< namespace iot_net diff --git a/platform/src/dbms/tsdb_etl/ETLCommon.h b/platform/src/dbms/tsdb_etl/ETLCommon.h new file mode 100644 index 00000000..a8518685 --- /dev/null +++ b/platform/src/dbms/tsdb_etl/ETLCommon.h @@ -0,0 +1,26 @@ + +/********************************************************************************* +* @file ETLCommon.h +* @brief 时序数据抽取通用定义文件 +* @author caodingfa +* @version 1.0 +* @date +**********************************************************************************/ + +#pragma once + +#include +#include +#include "pub_utility_api/TimeUtil.h" + +namespace iot_dbms +{ +const int CN_SCAN_PERIOD = 3 * SEC_PER_MIN * MSEC_PER_SEC; + +//< 是否需要退出程序 +extern bool g_bNeedExit; + +//< 输出命令行帮助 +void printHelp(); + +} //< namespace iot_dbms diff --git a/platform/src/dbms/tsdb_etl/ETLServer.cpp b/platform/src/dbms/tsdb_etl/ETLServer.cpp new file mode 100644 index 00000000..7ce43405 --- /dev/null +++ b/platform/src/dbms/tsdb_etl/ETLServer.cpp @@ -0,0 +1,357 @@ + +/********************************************************************************* +* @file ETLServer.cpp +* @brief +* @author caodingfa +* @version 1.0 +* @date +**********************************************************************************/ + +#include "boost/property_tree/xml_parser.hpp" +#include "boost/typeof/typeof.hpp" +#include "boost/format.hpp" +#include "boost/make_shared.hpp" +#include "rapidjson/pointer.h" +#include "rapidjson/error/en.h" +#include "pub_logger_api/logger.h" +#include "pub_utility_api/FileUtil.h" +#include "ETLCommon.h" +#include "ETLServer.h" +#include "pub_sysinfo_api/SysInfoApi.h" + +using namespace std; +using namespace iot_public; + +namespace iot_dbms +{ + +//< 上一次 周期性更新 的时间,开机后ms数 +const std::string CN_CFG_NAME = "tsdb_etl.xml"; +const std::string CN_TABLE_INCREMENT = "increment"; +const int CN_GROUP_TIME = 15; + +/////////////////////////////////////////////////////////////////////////////////////////// + + +CETLServer::CETLServer():m_ptrTsdbConn(NULL) +{ + m_nGroupTime = CN_GROUP_TIME; + m_nInvalidStatus = 4|8; //对应测点的状态码,4:MENU_STATE_AI_INVALID 8:MENU_STATE_AI_GK_OFF +} + +CETLServer::~CETLServer() +{ + //release(); +} + +bool CETLServer::initialize() +{ + // 初始化日志系统 + iot_public::StartLogSystem( "default", "tsdb_etl" ); + if(!loadConfig()) + { + return false; + } + + if(!initTSDB()) + { + return false; + } + + return true; +} + +void CETLServer::release() +{ + m_ptrTsdbConn.reset(); + // 停止日志系统 + iot_public::StopLogSystem(); +} + +void CETLServer::process() +{ + if(!m_ptrTsdbConn->pingServer()) + { + LOGERROR("ping TSDB失败,本次不执行"); + return; + } + + for(auto iter = m_mapPntType2TagName.begin();iter != m_mapPntType2TagName.end();iter++) + { + TagName2TimeMAP mapTagName2Time; + getLastValue(iter->first,iter->second,mapTagName2Time); + extractValueToIncrement(iter->first,mapTagName2Time); + } +} + +bool CETLServer::loadConfig() +{ + string strCfgPath = CFileUtil::getPathOfCfgFile(CN_CFG_NAME,CN_DIR_PLATFORM); + if(strCfgPath.empty()) + { + LOGERROR("加载配置文件%s失败",CN_CFG_NAME.c_str()); + return false; + } + + boost::property_tree::ptree pt; + namespace xml = boost::property_tree::xml_parser; + try + { + xml::read_xml(strCfgPath, pt, xml::no_comments); + BOOST_AUTO(pGroupTime, pt.get_child("root.energy.group_time")); + m_nGroupTime = pGroupTime.get(".period"); + + //存在无效位标记就是用配置文件 + BOOST_AUTO(pInvalidStatus,pt.get_child_optional("root.invalid_status")); + if(pInvalidStatus) + { + m_nInvalidStatus = pInvalidStatus->get(".value"); + } + + LOGINFO("测点无效状态值=[%d]",m_nInvalidStatus); + + BOOST_AUTO(module, pt.get_child("root.energy")); + + for (BOOST_AUTO(pSum, module.begin()); pSum != module.end(); ++pSum) + { + if (pSum->first != "sum_pnt") + { + continue; + } + + for(BOOST_AUTO(pPnt,pSum->second.begin()); pPnt != pSum->second.end();++pPnt ) + { + if (pPnt->first == "pnt") + { + string strTableName = pPnt->second.get(".table_name"); + string strTagName = pPnt->second.get(".tag_name"); + if(strTableName.empty() || strTagName.empty()) + { + LOGWARN("测点[%s.%s]格式不正确,忽略",strTableName.c_str(),strTagName.c_str()); + } + else + { + m_mapPntType2TagName[strTableName].push_back(strTagName); + } + } + } + } + + } + catch (std::exception &ex) + { + LOGERROR("解析配置文件[%s]失败.Msg=[%s]", strCfgPath.c_str(), ex.what()); + return false; + } + catch (...) + { + LOGERROR("加载配置文件发生未知异常"); + return false; + } + + return true; +} + +bool CETLServer::initTSDB() +{ + if(!initTsdbApi()) + { + LOGERROR("初始化tsdb api失败"); + return false; + } + + if(!initTsdbConn()) + { + LOGERROR("初始化TsdbConn失败"); + return false; + } + + return true; +} + +bool CETLServer::initTsdbConn() +{ + iot_public::CSysInfoInterfacePtr ptrSysInfo; + if(!iot_public::createSysInfoInstance(ptrSysInfo)) + { + LOGERROR( "createSysInfoInstance 返回失败" ); + return false; + } + + iot_public::SDatabaseInfo stFirstConnectInfo; //数据库连接信息 + std::vector vecLocalDbInfo; + std::vector vecRemoteDbInfo; + int nRet = ptrSysInfo->getLocalDBInfo( stFirstConnectInfo, vecLocalDbInfo, vecRemoteDbInfo ); + if ( iotFailed == nRet ) + { + LOGERROR( "getLocalDBInfo 返回失败" ); + return false; + } + + //todo:暂时先只连接本机 + STsdbConnParam connParam; + connParam.strUserName = stFirstConnectInfo.strServiceName; + connParam.strUserPassword = stFirstConnectInfo.strUserPassword; + connParam.strDbName = stFirstConnectInfo.strServiceName; + + m_ptrTsdbConn = boost::make_shared(connParam); + if(m_ptrTsdbConn == NULL) + { + LOGERROR("创建TsdbConn失败"); + return false; + } + + return true; +} + +void CETLServer::getLastValue(const string &strTable, const TagNameSeq &vecTagName, TagName2TimeMAP &mapTagName2Time) +{ + string strTagNameList = joinTagName(vecTagName); + if(strTagNameList.empty()) + { + LOGWARN("表[%s]测点为空,忽略",strTagNameList.c_str()); + return; + } + + //主键为tag_name,值为time,因为当前测点没有历史数据时,返回结果中没有这一项,所以先初始化一下 + for(size_t i = 0; i < vecTagName.size();i++) + { + mapTagName2Time[vecTagName[i]] = 0L; + } + + //SELECT last(value) FROM "increment" WHERE tag_name='tag1' or tag_name='tag2' group by tag_name + string strSQL = boost::str(boost::format("SELECT last(value) FROM \"%1%\" WHERE %2% group by tag_name") + %CN_TABLE_INCREMENT %strTagNameList); + LOGDEBUG("获取表[%s]最新记录.sql=[%s]",strTable.c_str(),strSQL.c_str()); + + string strQueryResult; + if(!m_ptrTsdbConn->doQuery(strSQL.c_str(),&strQueryResult,0)) + { + LOGERROR("执行查询失败.sql=[%s]",strSQL.c_str()); + return; + } + + LOGDEBUG("查询结果:%s",strQueryResult.c_str()); + + rapidjson::Document docRoot; + docRoot.Parse(strQueryResult.c_str()); + if (docRoot.HasParseError()) + { + LOGERROR("Parse JSON error(offset %u): %s , return false", + static_cast(docRoot.GetErrorOffset()), + GetParseError_En(docRoot.GetParseError())); + return; + } + + if (!docRoot.HasMember("results")) + { + LOGERROR("parseJson(): Can't get results , return false"); + return; + } + + const rapidjson::Value &valResultsArray = docRoot["results"]; + if (!(valResultsArray.IsArray())) + { + LOGERROR("parseJson(): results is not an array , return false !"); + return; + } + + if(valResultsArray.Size() != 1) //只有一条sql而且没有分块truncate,所以结果只能有1个 + { + LOGERROR("parseJosn(): results size is error. real_size=%d",static_cast(valResultsArray.Size())); + return; + } + + const rapidjson::Value &valResult = valResultsArray[0]; + if (!(valResult.HasMember("series") && valResult["series"].IsArray() && valResult["series"].Size() > 0)) + { + //< 不为错误,可能是没有查询到结果 + LOGDEBUG("parseJosn(): series is empty or error"); + return; + } + + for(rapidjson::SizeType nSeriesIdx = 0; nSeriesIdx < valResult["series"].Size(); nSeriesIdx++) + { + const rapidjson::Value &valSerie = valResult["series"][nSeriesIdx]; + if (!(valSerie.HasMember("tags") && valSerie["tags"].HasMember("tag_name")) ) + { + LOGERROR("parseJson(): invalid tags, ignore !"); + return; + } + + if (!(valSerie.HasMember("values") && valSerie["values"].IsArray() && + valSerie["values"].Size() > 0 && valSerie["values"][0].Size() > 0)) + { + LOGERROR("parseJson(): invalid values, ignore !"); + return; + } + + string valTagName = valSerie["tags"]["tag_name"].GetString(); + int64 lRetTime = valSerie["values"][0][0].GetInt64(); + mapTagName2Time[valTagName] = lRetTime; + } +} + +void CETLServer::extractValueToIncrement(const std::string &strTableName,const TagName2TimeMAP &mapTagName2Time) +{ + string strStatusQuery = boost::str(boost::format(" status | %1% = %1%") % ~m_nInvalidStatus); + //将最后时间相同的测点聚合到一个sql中 + std::map mapTime2TagName; + for(auto iterTag = mapTagName2Time.begin(); iterTag != mapTagName2Time.end(); iterTag++) + { + mapTime2TagName[iterTag->second].push_back(iterTag->first); + } + //select difference(first(value)) as value into "increment" from "test" WHERE (tag_name='tag1' or tag_name='tag2') and time>=1700611200000000000 and time<=1700697600000000000 group by tag_name ,time(15m) fill(linear) + + string strAllSQL; + for(auto iter = mapTime2TagName.begin(); iter != mapTime2TagName.end(); iter++) + { + string strTagNameList = joinTagName(iter->second); + if(strTagNameList.empty()) + { + continue; + } + + string strQueryTime = boost::str(boost::format("time>=%1%ms and time<=now()") %iter->first); + string strOneSQL = boost::str(boost::format("SELECT DIFFERENCE(first(value)) as value into \"%1%\" FROM \"%2%\" WHERE (%3%) and (%4%) and %5% GROUP BY tag_name,time(%6%m);") + %CN_TABLE_INCREMENT %strTableName %strTagNameList %strStatusQuery %strQueryTime %m_nGroupTime ); + strAllSQL += strOneSQL; + } + + if(strAllSQL.empty()) + { + LOGDEBUG("抽取%s数据:当前没有可以执行的SQL",strTableName.c_str()); + return; + } + + LOGDEBUG("执行SQL=[%s]",strAllSQL.c_str()); + + string strQueryResult; + if(!m_ptrTsdbConn->doQuery(strAllSQL.c_str(),&strQueryResult,0)) + { + LOGERROR("执行查询失败.sql=[%s]",strAllSQL.c_str()); + return; + } + + LOGDEBUG("查询结果:%s",strQueryResult.c_str()); + +} + +string CETLServer::joinTagName(const TagNameSeq &vecTagName) +{ + if(vecTagName.empty()) + { + return ""; + } + + string strTagNameList = "tag_name='" + vecTagName[0] + "'"; + for(size_t i = 1; i < vecTagName.size();i++) + { + strTagNameList += " or tag_name='" + vecTagName[i] + "'"; + } + + return strTagNameList; +} + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_etl/ETLServer.h b/platform/src/dbms/tsdb_etl/ETLServer.h new file mode 100644 index 00000000..828ce1e3 --- /dev/null +++ b/platform/src/dbms/tsdb_etl/ETLServer.h @@ -0,0 +1,51 @@ + +/********************************************************************************* +* @file ETLServer.h +* @brief +* @author caodingfa +* @version 1.0 +* @date +**********************************************************************************/ + +#pragma once +#include +#include +#include +#include "DataType.h" +#include "ETLCommon.h" +#include "tsdb_api/TsdbApi.h" +#include "rapidjson/document.h" + +namespace iot_dbms +{ +typedef std::vector TagNameSeq; //测点标签格式:station.dev.pnt +typedef std::map TagName2TimeMAP; //测点标签->时间戳,单位ms +typedef std::map PntType2TagNameMAP; //测点类型->测点序列 + +class CETLServer +{ +public: + CETLServer(); + ~CETLServer(); + + bool initialize(); + void release(); + + void process(); + +private: + bool loadConfig(); + bool initTSDB(); + bool initTsdbConn(); + void getLastValue(const std::string &strTable,const TagNameSeq &vecTagName,TagName2TimeMAP &mapTagName2Time); + void extractValueToIncrement(const std::string &strTableName, const TagName2TimeMAP &mapTagName2Time); + std::string joinTagName(const TagNameSeq &vecTagName); + +private: + int m_nGroupTime; + int m_nInvalidStatus; + CTsdbConnPtr m_ptrTsdbConn; + PntType2TagNameMAP m_mapPntType2TagName; +}; + +} //< namespace iot_net diff --git a/platform/src/dbms/tsdb_etl/MainLinux.cpp b/platform/src/dbms/tsdb_etl/MainLinux.cpp new file mode 100644 index 00000000..13a88144 --- /dev/null +++ b/platform/src/dbms/tsdb_etl/MainLinux.cpp @@ -0,0 +1,213 @@ + +/********************************************************************************* +* @file Main.cpp +* @brief 时序数据用能增量抽取入口 +* @author caodingfa +* @version 1.0 +* @date 2023/11/28 +**********************************************************************************/ + +//< 仅在Linux系统下编译 +#ifdef OS_LINUX + +#include +#include +#include +#include "boost/thread.hpp" +#include "boost/property_tree/ptree.hpp" +#include "boost/property_tree/ini_parser.hpp" +#include "pub_utility_api/FileUtil.h" +#include "pub_utility_api/TimeUtil.h" + +#include "ETLCommon.h" +#include "ETLServer.h" + + +using namespace iot_public; +using namespace iot_dbms; + +static const char *szSystemdCfgFile = "/usr/lib/systemd/system/byd_tsdb_etl.service"; +boost::mutex g_objMutex_; //< 互斥锁,配合信号量使用 +boost::condition_variable g_objCond_; //< 信号量 + +static void handleSigno(int nSigno) +{ + //< 用日志有死锁的风险 + printf("\nhandleSigno(): nSigno == %d \n", nSigno); + + if (false == iot_dbms::g_bNeedExit) + { + iot_dbms::g_bNeedExit = true; + g_objCond_.notify_all(); + } + +} + + +//< 注册系统服务,并设置自启动 +static bool regSysService() +{ + //< 判断服务是否已注册,普通用户权限可执行 + if (0 == std::system("systemctl is-enabled byd_tsdb_etl.service")) + { + printf("\nSystem service byd_tsdb_etl already enabled, exit.\n"); + return false; + } + +// { +// FILE *pFile = fopen(szSystemdCfgFile, "w"); +// if (NULL == pFile) +// { +// printf("\nCan not write file, exit.\nFile: %s\n", szSystemdCfgFile); +// return false; +// } +// else +// fclose(pFile); +// } + + const std::string strExec = CFileUtil::getCurModuleDir() + "tsdb_etl -s"; + + //< 生成或修改systemd服务配置文件 + try + { + using namespace boost::property_tree; + ptree objPtree; + + objPtree.put("Unit.Description", "byd_tsdb_etl"); + objPtree.put("Unit.After", "network.target"); + + objPtree.put("Service.Type", "simple"); + objPtree.put("Service.ExecStart", strExec); + objPtree.put("Service.KillMode", "process"); + objPtree.put("Service.Restart", "on-failure"); + objPtree.put("Service.RestartSec", "42s"); + + objPtree.put("Install.WantedBy", "multi-user.target"); + + ini_parser::write_ini(szSystemdCfgFile, objPtree); + } + catch (std::exception &e) + { + printf("\nWrite file failed, exit.\nFile: %s\nErr: %s\n", szSystemdCfgFile, e.what()); + return false; + } + + //< systemd重新加载配置文件 + if (0 != std::system("systemctl daemon-reload")) + { + printf("\nReload config file failed, exit.\n"); + return false; + } + + //< 设置服务开机自启动 + if (0 != std::system("systemctl enable byd_tsdb_etl.service")) + { + printf("\nEnable service failed, exit.\n"); + return false; + } + + printf("\nSuccessfully registered system service byd_tsdb_etl.\n"); + return true; +} + +//< 注销系统服务 +static bool unregSysService() +{ + //< 为了消除gcc编译告警 + //< warning: ignoring return value of ‘int system(const char*)’ + int nRc; + (void) nRc; + + { + FILE *pFile = fopen(szSystemdCfgFile, "r"); + if (NULL == pFile) + { + //< 重新加载一次,确保systemd配置与文件一致,需需管理员权限 + nRc = std::system("systemctl daemon-reload"); + + printf("\nSystem service byd_tsdb_etl has not been registered.\n"); + return false; + } + else + fclose(pFile); + } + + //< 停止服务 + nRc = std::system("systemctl stop byd_tsdb_etl.service"); + + //< 取消服务开机自启动 + nRc = std::system("systemctl disable byd_tsdb_etl.service"); + + //< 删除服务配置文件 + remove(szSystemdCfgFile); + + //< systemd重新加载配置文件 + nRc = std::system("systemctl daemon-reload"); + + printf("\nSuccessfully unregistered system service byd_tsdb_etl.\n"); + return true; +} + +int main(int argc, char *argv[]) +{ + if (2 == argc) + { + const char *szArg = argv[1]; + if (0 == strcmp(szArg, "-r")) + { + return regSysService() ? EXIT_SUCCESS : EXIT_FAILURE; + } + else if (0 == strcmp(szArg, "-u")) + { + return unregSysService() ? EXIT_SUCCESS : EXIT_FAILURE; + } + else if (0 == strcmp(szArg, "-s")) + { + if (1 != getppid()) + { + printf("\n1 != getppid(), exit.\n"); + return EXIT_FAILURE; + } + } + else + { + iot_dbms::printHelp(); + return EXIT_FAILURE; + } + } + else if (argc > 2) + { + iot_dbms::printHelp(); + return EXIT_FAILURE; + } + + //< 注册系统信号处理 + { + signal(SIGTERM, handleSigno); + signal(SIGINT, handleSigno); + signal(SIGQUIT, handleSigno); + } + + //< 初始化 + iot_dbms::CETLServer etlSvr; + + if (etlSvr.initialize()) + { + //< 处理业务 + while (!iot_dbms::g_bNeedExit) + { + etlSvr.process(); + boost::mutex::scoped_lock lock(g_objMutex_); + g_objCond_.timed_wait(lock, boost::posix_time::millisec(CN_SCAN_PERIOD)); + } + } + else + printf("Initialize failed, exit.\n"); + + //< 释放 + etlSvr.release(); + + return iot_dbms::g_bNeedExit ? EXIT_SUCCESS : EXIT_FAILURE; +} + +#endif //< OS_LINUX diff --git a/platform/src/dbms/tsdb_etl/MainWindows.cpp b/platform/src/dbms/tsdb_etl/MainWindows.cpp new file mode 100644 index 00000000..19aad4c3 --- /dev/null +++ b/platform/src/dbms/tsdb_etl/MainWindows.cpp @@ -0,0 +1,344 @@ + +/********************************************************************************* +* @file Main.cpp +* @brief 时序数据用能增量抽取入口 +* @author caodingfa +* @version 1.0 +* @date 2023/11/28 +**********************************************************************************/ + +//< 仅在Win系统下编译 +#ifdef OS_WINDOWS + +#include +#include +#include +#include +#include +#include "boost/thread.hpp" +#include "pub_utility_api/FileUtil.h" +#include "ETLCommon.h" +#include "ETLServer.h" + +#define CN_SERVICE_NAME "byd_tsdb_etl" +static const char *szServiceName = CN_SERVICE_NAME; +static SERVICE_STATUS_HANDLE g_hServiceStatus = NULL; +static LPSERVICE_STATUS g_pStatus = NULL; + +using namespace iot_public; +using namespace iot_dbms; + +boost::mutex g_objMutex_; //< 互斥锁,配合信号量使用 +boost::condition_variable g_objCond_; //< 信号量 + +static void handleSigno(int nSigno) +{ + //< 用日志有死锁的风险 + printf("\nhandleSigno(): nSigno == %d \n", nSigno); + + if (false == iot_dbms::g_bNeedExit) + { + iot_dbms::g_bNeedExit = true; + g_objCond_.notify_all(); + } +} + +//< 注册系统服务,并设置自启动 +static bool regSysService() +{ + bool bRet = false; + + //< 打开服务控制管理器 + SC_HANDLE hSCM = ::OpenSCManagerA(NULL, NULL, SC_MANAGER_ALL_ACCESS); + if (NULL == hSCM) + { + printf("OpenSCManager() failed. Need administrator rights.\n"); + } + else + { + + //< 判断服务是否已存在 + SC_HANDLE hService = ::OpenServiceA(hSCM, szServiceName, SERVICE_QUERY_CONFIG); + if (NULL == hService) + { + const std::string strExec = CFileUtil::getCurModuleDir() + "tsdb_etl.exe -s"; + + //< 创建服务 + + //< 使用SERVICE_USER_XXX_PROCESS服务类型,用户登录时启动,不影响共享内存等的使用 + //< 若使用SERVICE_WIN32_XXX_PROCESS等服务类型,程序以system用户启动,共享内存被隔离,其他用户访问不到 + + hService = ::CreateServiceA( + hSCM, szServiceName, szServiceName, + SERVICE_ALL_ACCESS, SERVICE_WIN32_OWN_PROCESS, + SERVICE_AUTO_START, SERVICE_ERROR_NORMAL, + strExec.c_str(), NULL, NULL, NULL, NULL, NULL); + + if (hService == NULL) + { + printf("CreateService() failed. Need administrator rights.\n"); + } + else + { + SERVICE_DESCRIPTIONA stSrvDesc; + char szDesc[] = "The TSDB ETL"; + stSrvDesc.lpDescription = szDesc; + if (::ChangeServiceConfig2A(hService, SERVICE_CONFIG_DESCRIPTION, &stSrvDesc)) + { + bRet = true; + } + + ::CloseServiceHandle(hService); + } + } + else + { + printf("System service byd_tsdb_etl already exists.\n"); + ::CloseServiceHandle(hService); + } + + ::CloseServiceHandle(hSCM); + } + + if(bRet) + printf("\nSuccessfully registered system service byd_tsdb_etl.\n"); + return bRet; +} + +//< 注销系统服务 +static bool unregSysService() +{ + + bool bRet = false; + + //< 打开服务控制管理器 + SC_HANDLE hSCM = ::OpenSCManagerA(NULL, NULL, SC_MANAGER_ALL_ACCESS); + if (hSCM == NULL) + { + printf("OpenSCManager() failed. Need administrator rights.\n"); + } + else + { + SC_HANDLE hService = ::OpenServiceA(hSCM, szServiceName, SERVICE_QUERY_STATUS | SERVICE_STOP | DELETE); + + if (NULL == hService) + { + //< 服务不存在 + printf("System service byd_tsdb_etl not exists.\n"); + } + else + { + SERVICE_STATUS objStatus; + + if(TRUE == QueryServiceStatus(hService, &objStatus)) + { + if(SERVICE_STOPPED != objStatus.dwCurrentState + && TRUE == ControlService(hService, SERVICE_CONTROL_STOP, &objStatus)) + { + int nWaitLoop = 0; + SERVICE_STATUS objStatusTemp; + while(TRUE == QueryServiceStatus(hService, &objStatusTemp)) + { + if (SERVICE_STOPPED == objStatusTemp.dwCurrentState + || nWaitLoop >= 30) + { + objStatus = objStatusTemp; + break; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ++nWaitLoop; + } + } + + if (SERVICE_STOPPED == objStatus.dwCurrentState) + { + //< 删除服务 + if (::DeleteService(hService)) + { + bRet = true; + } + else + { + printf("DeleteService() failed.\n"); + } + } + else + { + printf("Stop service timeout.\n"); + } + } + else + { + printf("QueryServiceStatus() failed.\n"); + } + + ::CloseServiceHandle(hService); + } + + ::CloseServiceHandle(hSCM); + } + + if(bRet) + printf("\nSuccessfully unregistered system service byd_tsdb_etl.\n"); + return bRet; +} + +static void WINAPI serviceStrl(DWORD dwOpcode) +{ + switch (dwOpcode) + { + case SERVICE_CONTROL_STOP: + if (false == iot_dbms::g_bNeedExit) + { + iot_dbms::g_bNeedExit = true; + g_objCond_.notify_all(); + } + g_pStatus->dwCurrentState = SERVICE_STOP_PENDING; + SetServiceStatus(g_hServiceStatus, g_pStatus); + break; + case SERVICE_CONTROL_PAUSE: + break; + case SERVICE_CONTROL_CONTINUE: + break; + case SERVICE_CONTROL_INTERROGATE: + break; + case SERVICE_CONTROL_SHUTDOWN: + break; + default: + printf("serviceStrl(): Invalid operation code !"); + } +} + +static void WINAPI serviceMain() +{ + g_pStatus->dwCurrentState = SERVICE_START_PENDING; + g_pStatus->dwControlsAccepted = SERVICE_ACCEPT_STOP; + + //< 注册服务控制 + g_hServiceStatus = RegisterServiceCtrlHandlerA(szServiceName, serviceStrl); + if (g_hServiceStatus == NULL) + { + printf("serviceMain(): RegisterServiceCtrlHandler() return NULL !"); + return; + } + SetServiceStatus(g_hServiceStatus, g_pStatus); + + g_pStatus->dwWin32ExitCode = S_OK; + g_pStatus->dwCheckPoint = 0; + g_pStatus->dwWaitHint = 0; + g_pStatus->dwCurrentState = SERVICE_RUNNING; + SetServiceStatus(g_hServiceStatus, g_pStatus); + + { + //< 初始化 + iot_dbms::CETLServer etlSvr; + if (etlSvr.initialize()) + { + //< 处理业务 + while (!iot_dbms::g_bNeedExit) + { + etlSvr.process(); + boost::mutex::scoped_lock lock(g_objMutex_); + g_objCond_.timed_wait(lock, boost::posix_time::millisec(CN_SCAN_PERIOD)); + } + } + else + printf("Initialize failed, exit.\n"); + + //< 释放 + etlSvr.release(); + } + + g_pStatus->dwCurrentState = SERVICE_STOPPED; + SetServiceStatus(g_hServiceStatus, g_pStatus); +} + +int main(int argc, char *argv[]) +{ + if (2 == argc) + { + const char *szArg = argv[1]; + if (0 == strcmp(szArg, "-r")) + { + return regSysService() ? EXIT_SUCCESS : EXIT_FAILURE; + } + else if (0 == strcmp(szArg, "-u")) + { + return unregSysService() ? EXIT_SUCCESS : EXIT_FAILURE; + } + else if (0 == strcmp(szArg, "-s")) + { + //g_hServiceStatus = NULL; + + g_pStatus = new SERVICE_STATUS; + g_pStatus->dwServiceType = SERVICE_WIN32_OWN_PROCESS; + g_pStatus->dwCurrentState = SERVICE_STOPPED; + g_pStatus->dwControlsAccepted = SERVICE_ACCEPT_STOP; + g_pStatus->dwWin32ExitCode = 0; + g_pStatus->dwServiceSpecificExitCode = 0; + g_pStatus->dwCheckPoint = 0; + g_pStatus->dwWaitHint = 0; + + char szSrvName[] = CN_SERVICE_NAME; + SERVICE_TABLE_ENTRYA st[] = + { + { szSrvName, (LPSERVICE_MAIN_FUNCTIONA)serviceMain }, + { NULL, NULL } + }; + + if(!::StartServiceCtrlDispatcherA(st)) + { + printf("StartServiceCtrlDispatcher() failed.ErrorCode=%d\n",GetLastError()); + printf("May not be started by Service Manager.\n"); + } + + delete g_pStatus; + g_pStatus = NULL; + } + else + { + iot_dbms::printHelp(); + return EXIT_FAILURE; + } + } + else if (argc > 2) + { + iot_dbms::printHelp(); + return EXIT_FAILURE; + } + else + { + //< 注册系统信号处理 + { + signal(SIGTERM, handleSigno); + signal(SIGINT, handleSigno); + signal(SIGBREAK, handleSigno); + } + + //< 初始化 + iot_dbms::CETLServer etlSvr; + if (etlSvr.initialize()) + { + //< 处理业务 + while (!iot_dbms::g_bNeedExit) + { + etlSvr.process(); + boost::mutex::scoped_lock lock(g_objMutex_); + g_objCond_.timed_wait(lock, boost::posix_time::millisec(CN_SCAN_PERIOD)); + } + } + else + { + printf("Initialize failed, exit.\n"); + } + + //< 释放 + etlSvr.release(); + } + + return iot_dbms::g_bNeedExit ? EXIT_SUCCESS : EXIT_FAILURE; +} + + +#endif //< OS_WINDOWS diff --git a/platform/src/dbms/tsdb_etl/tsdb_etl.pro b/platform/src/dbms/tsdb_etl/tsdb_etl.pro new file mode 100644 index 00000000..e8cbca5a --- /dev/null +++ b/platform/src/dbms/tsdb_etl/tsdb_etl.pro @@ -0,0 +1,40 @@ +QT -= gui core +CONFIG -= qt + +CONFIG += console +CONFIG -= app_bundle + +TEMPLATE = app +TARGET = tsdb_etl + + +#DEFINES += _NO_LOGGING +#DEFINES += PRINT_TIME_DEBUG + +HEADERS += ETLCommon.h \ + ETLServer.h + +SOURCES += MainLinux.cpp \ + MainWindows.cpp \ + ETLCommon.cpp \ + ETLServer.cpp + +win32{ + LIBS += -ladvapi32 +} +else{ + LIBS += -lpthread +} + +LIBS += -llog4cplus -lpub_logger_api -lpub_utility_api -lboost_thread -lboost_system +LIBS += -lpub_sysinfo_api -ltsdb_api + + +#------------------------------------------------------------------- +COMMON_PRI=$$PWD/../../common.pri +exists($$COMMON_PRI) { + include($$COMMON_PRI) +}else { + error("FATAL error: can not find common.pri") +} + diff --git a/platform/src/dbms/tsdb_local_save/CFrontThread.cpp b/platform/src/dbms/tsdb_local_save/CFrontThread.cpp new file mode 100644 index 00000000..b84d29e0 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CFrontThread.cpp @@ -0,0 +1,447 @@ + +/******************************************************************************//** +* @file CFrontThread.h +* @brief 时序库存库服务,前端处理线程,负责接收应用发来的写库消息 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +//#include "boost/typeof/typeof.hpp" +#include "boost/lexical_cast.hpp" + +//< 屏蔽Protobuff编译报警 +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#endif + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable: 4100) +#endif + +#include "google/protobuf/text_format.h" + +#ifdef __GNUC__ +#pragma GCC diagnostic pop +#endif + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +#include "pub_logger_api/logger.h" +#include "common/Common.h" +#include "MessageChannel.h" + +#include "CNodeMng.h" +#include "CFrontThread.h" + +namespace iot_dbms +{ + +CFrontThread::CFrontThread() : + CTimerThreadBase("CFrontThread", 0), + m_bIsRunning(false), m_nLoopCnt(0), m_nReleaseThreshold(0), + m_pMbComm(NULL), m_pMbMsgRcv(NULL) +{ +} + + +CFrontThread::~CFrontThread() +{ + suspendThread(); + quit(); + + delete m_pMbComm; + delete m_pMbMsgRcv; +} + + +bool CFrontThread::resumeThread() +{ + if (isThreadRunning()) + { + return true; + } + + if (NULL == m_pMbComm) + { + m_pMbComm = new iot_net::CMbCommunicator(); + } + + //< 订阅 + if (!m_pMbComm->addSub(CNodeMng::getInstance().getAppId(), CH_TSS_APP_TO_SRV)) + { + LOGERROR("添加订阅失败!"); + + delete m_pMbComm; + m_pMbComm = NULL; + + return false; + } + + if (NULL == m_pMbMsgRcv) + { + m_pMbMsgRcv = new iot_net::CMbMessage(); + } + + //< 注意:先赋值,后resume() + m_bIsRunning = true; + + resume(); + + return true; +} + + +bool CFrontThread::suspendThread() +{ + m_bIsRunning = false; + suspend(); + + //< 销毁通讯器 + delete m_pMbComm; + m_pMbComm = NULL; + + //< 销毁消息 + delete m_pMbMsgRcv; + m_pMbMsgRcv = NULL; + + //< 释放内存 + { + std::string str1, str2; + m_strTags.swap(str1); + m_strFields.swap(str2); + + iot_idl::STssInsert objNew; + m_objTssInsert.Swap(&objNew); + } + + return true; +} + + +bool CFrontThread::isThreadRunning() const +{ + return m_bIsRunning && isRunning(); +} + + +void CFrontThread::execute() +{ + for (int i = 0; m_bIsRunning && (i < 100); ++i) + { + //< 接收消息 + if (!m_pMbComm->recvMsg(*m_pMbMsgRcv, 200)) + break; + + handleOneMbMsg(); + } + + if (m_nLoopCnt > 10) + { + m_nLoopCnt = 0; + + if (m_nReleaseThreshold > 0) + m_nReleaseThreshold = static_cast(m_nReleaseThreshold * 0.9); + else if (m_nReleaseThreshold < 0) + m_nReleaseThreshold = 0; + + //< ProtoBuf 的 Clear() 不会清空内存,用此方法清空 + if (m_objTssInsert.mutable_point()->Capacity() > m_nReleaseThreshold * 3) + { + std::string str1, str2; + m_strTags.swap(str1); + m_strFields.swap(str2); + + iot_idl::STssInsert objNew; + m_objTssInsert.Swap(&objNew); + + if (m_objTssInsert.mutable_point()->Capacity() > 0) + { + LOGERROR("与预期不一致,Capacity() == %d", + m_objTssInsert.mutable_point()->Capacity()); + } + } + } + else + ++m_nLoopCnt; +} + +void CFrontThread::handleOneMbMsg_bak() +{ + /* + LOGDEBUG("MsgType = %d , Para1 = %d , Para2 = %d , DataSize = %llu", + m_pMbMsgRcv->getMsgType(), + m_pMbMsgRcv->getPara1(), + m_pMbMsgRcv->getPara2(), + (unsigned long long)m_pMbMsgRcv->getDataSize()); + */ + + //< 检查是否本域消息 + if (m_pMbMsgRcv->getPara2() != CNodeMng::getInstance().getLocalDomainID()) + { + LOGINFO("收到非本域消息,忽略!"); + return; + } + + //< 不管消息是否正确,先回复已收到,原因: + //< 1、防止应用端因一条错误消息而死循环 + //< 2、提高速度 + { + iot_net::CMbMessage objMbMSgRep; + objMbMSgRep.setMsgType(iot_idl::MT_TSS_SRV2APP_ADD_ACK); + objMbMSgRep.setSubject(CN_AppId_PUBLIC, CH_TSS_SRV_TO_APP); + objMbMSgRep.setData(std::string("TSS")); + objMbMSgRep.setPara1(m_pMbMsgRcv->getPara1()); + objMbMSgRep.setPara2(CNodeMng::getInstance().getLocalDomainID()); + + //< 不管是否发送失败,应用端收不到确认会重发,对于重复数据时序库自动覆盖 + m_pMbComm->replyMsg(objMbMSgRep, *m_pMbMsgRcv); + } + + //< 当前只有一种消息 + if (m_pMbMsgRcv->getMsgType() != iot_idl::MT_TSS_APP2SRV_ADD) + { + LOGINFO("收到非预期消息类型!"); + return; + } + + //< 反序列化 + bool bRc = m_objTssInsert.ParseFromArray(m_pMbMsgRcv->getDataPtr(), (int) m_pMbMsgRcv->getDataSize()); + if (!bRc) + { + LOGWARN("STssInsert 反序列化失败,忽略消息!"); + return; + } + + //< 为了调试,打印所有消息,生产环境不要打 + //{ + // std::string strPrint; + // google::protobuf::TextFormat::PrintToString(objTssInsert, &strPrint); + // LOGDEBUG("收到消息,内容:\n%s", strPrint.c_str()); + //} + + //< 当前不允许没有tag + if (m_objTssInsert.tag_name_size() <= 0) + { + LOGERROR("STssInsert未设置tag_name,忽略消息!"); + return; + } + + //< 不允许没有field + if (m_objTssInsert.field_name_size() <= 0) + { + LOGERROR("STssInsert未设置field_name,忽略消息!"); + return; + } + + //< 没有数据插入,无效 + if (m_objTssInsert.point_size() <= 0) + { + LOGERROR("STssInsert未插入任何point,无意义数据,忽略消息!"); + return; + } + + const int nPointCnt = m_objTssInsert.point_size(); + if (nPointCnt > m_nReleaseThreshold) + m_nReleaseThreshold = nPointCnt; + + //< 生成 InfluxDB 插入字串 + //< 批量插入的语句中,有错误语句,InfluxDB会跳过错误语句,不影响正确语句 + StdStringPtr ptrStrInsert(new std::string); + for (int nPointIndex = 0; nPointIndex < nPointCnt; nPointIndex++) + { + const iot_idl::STsdbPoint &objPoint = m_objTssInsert.point(nPointIndex); + + if (objPoint.tag_val_size() != m_objTssInsert.tag_name_size()) + { + LOGERROR("STsdbPoint中tag_val数量不正确,忽略此点!"); + continue; + } + + if (objPoint.field_val_size() != m_objTssInsert.field_name_size()) + { + LOGERROR("STsdbPoint中field_val数量不正确,忽略此点!"); + continue; + } + + //< 生成 Tag 字串 + m_strTags.clear(); + for (int nTagIndex = 0; nTagIndex < objPoint.tag_val_size(); nTagIndex++) + { + if (0 != nTagIndex) + { + m_strTags += ","; + } + + m_strTags += m_objTssInsert.tag_name(nTagIndex); + m_strTags += "="; + + //< dp为了查实时库,经常将tag_name进行resize()操作,尾部会有大量的\0 + //< std::string直接相加会带上这些\0,即按size()长度相加 + //< 而influxdb解析语句时遇到\0就截断了 + //< 为了防止这种问题,使用c_str()的方式 + const std::string &strTagVal = objPoint.tag_val(nTagIndex); + m_strTags += strTagVal; + + //< 遇到此情况输出日志 + if (strlen(strTagVal.c_str()) != strTagVal.size()) + { + std::string strPrint; + google::protobuf::TextFormat::PrintToString(m_objTssInsert, &strPrint); + + LOGWARN("收到的tag_val字串中包含\\0字符,仅使用\\0前的内容,以防字符串截断。请检查消息源程序,消息内容如下:\n%s\n", + strPrint.c_str()); + } + } + + //< 生成 Field 字串 + m_strFields.clear(); + for (int nFieldIndex = 0; nFieldIndex < objPoint.field_val_size(); nFieldIndex++) + { + if (0 != nFieldIndex) + { + m_strFields += ","; + } + + m_strFields += m_objTssInsert.field_name(nFieldIndex); + m_strFields += "="; + m_strFields += toInfluxString(objPoint.field_val(nFieldIndex)); + } + + //< 预先分配空间,避免string频繁扩充,扩充时还需要复制内存 + if (0 == nPointIndex) + { + const size_t nSizeOfOne = m_objTssInsert.meas_name().size() + + m_strTags.size() + m_strFields.size() + 30; + ptrStrInsert->reserve(nSizeOfOne * nPointCnt); + } + + //< 添加到 ptrStrInsert + //*ptrStrInsert += boost::str(boost::format("%1%,%2% %3% %4% \n") + // % objTssInsert.meas_name() + // % strTags + // % strFields + // % objPoint.time_stamp()); + + //< 经测试,lexical_cast比format快几十倍 + *ptrStrInsert += m_objTssInsert.meas_name() + ","; + *ptrStrInsert += m_strTags + " "; + *ptrStrInsert += m_strFields + " "; + *ptrStrInsert += boost::lexical_cast(objPoint.time_stamp()) + " \n"; + } + + //< 判断save_action + bool bToLocal = false; + bool bToRemote = false; + switch (m_objTssInsert.save_action()) + { + case iot_idl::SA_TSS_DO_NOTHING: + break; + case iot_idl::SA_TSS_LOCAL_ONLY: + bToLocal = true; + break; + case iot_idl::SA_TSS_REMOTE_ONLY: + bToRemote = true; + break; + case iot_idl::SA_TSS_LOCAL_REMOTE: + bToLocal = true; + bToRemote = true; + break; + default: LOGERROR("Unknow save_action value !"); + break; + } + + //< 添加给NodeMng,pushSaveStr()中有判是否为空字串 + CNodeMng::getInstance().pushSaveStr(ptrStrInsert, bToLocal, bToRemote); +} + + +void CFrontThread::handleOneMbMsg() +{ + /* + LOGDEBUG("MsgType = %d , Para1 = %d , Para2 = %d , DataSize = %llu", + m_pMbMsgRcv->getMsgType(), + m_pMbMsgRcv->getPara1(), + m_pMbMsgRcv->getPara2(), + (unsigned long long)m_pMbMsgRcv->getDataSize()); + */ + + //< 检查是否本域消息 + if (m_pMbMsgRcv->getPara2() != CNodeMng::getInstance().getLocalDomainID()) + { + LOGINFO("收到非本域消息,忽略!"); + return; + } + + //< 不管消息是否正确,先回复已收到,原因: + //< 1、防止应用端因一条错误消息而死循环 + //< 2、提高速度 + { + iot_net::CMbMessage objMbMSgRep; + objMbMSgRep.setMsgType(iot_idl::MT_TSS_SRV2APP_ADD_ACK); + objMbMSgRep.setSubject(CN_AppId_PUBLIC, CH_TSS_SRV_TO_APP); + objMbMSgRep.setData(std::string("TSS")); + objMbMSgRep.setPara1(m_pMbMsgRcv->getPara1()); + objMbMSgRep.setPara2(CNodeMng::getInstance().getLocalDomainID()); + + //< 不管是否发送失败,应用端收不到确认会重发,对于重复数据时序库自动覆盖 + m_pMbComm->replyMsg(objMbMSgRep, *m_pMbMsgRcv); + } + + //< 当前只有一种消息 + if (m_pMbMsgRcv->getMsgType() != iot_idl::MT_TSS_APP2SRV_ADD) + { + LOGINFO("收到非预期消息类型!"); + return; + } + + //< 反序列化 + bool bRc = m_objTssInsert.ParseFromArray(m_pMbMsgRcv->getDataPtr(), (int) m_pMbMsgRcv->getDataSize()); + if (!bRc) + { + LOGWARN("STssInsert 反序列化失败,忽略消息!"); + return; + } + + //< 为了调试,打印所有消息,生产环境不要打 + //{ + // std::string strPrint; + // google::protobuf::TextFormat::PrintToString(objTssInsert, &strPrint); + // LOGDEBUG("收到消息,内容:\n%s", strPrint.c_str()); + //} + + //< 当前不允许没有tag + if (m_objTssInsert.tag_name_size() <= 0) + { + LOGERROR("STssInsert未设置tag_name,忽略消息!"); + return; + } + + //< 不允许没有field + if (m_objTssInsert.field_name_size() <= 0) + { + LOGERROR("STssInsert未设置field_name,忽略消息!"); + return; + } + + //< 没有数据插入,无效 + if (m_objTssInsert.point_size() <= 0) + { + LOGERROR("STssInsert未插入任何point,无意义数据,忽略消息!"); + return; + } + + const int nPointCnt = m_objTssInsert.point_size(); + if (nPointCnt > m_nReleaseThreshold) + m_nReleaseThreshold = nPointCnt; + + //< 添加给NodeMng,pushSaveStr()中有判是否为空字串 + CNodeMng::getInstance().pushSaveMsg(m_objTssInsert); +} + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/CFrontThread.h b/platform/src/dbms/tsdb_local_save/CFrontThread.h new file mode 100644 index 00000000..6320adbc --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CFrontThread.h @@ -0,0 +1,76 @@ + +/******************************************************************************//** +* @file CFrontThread.h +* @brief 时序库存库服务,前端处理线程,负责接收应用发来的写库消息 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#pragma once + +//< 屏蔽Protobuff编译报警 +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#endif + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable: 4100) +#endif + +#include "TsdbSaveMessage.pb.h" + +#ifdef __GNUC__ +#pragma GCC diagnostic pop +#endif + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +#include "pub_utility_api/TimerThreadBase.h" +#include "net_msg_bus_api/MsgBusApi.h" + +namespace iot_dbms +{ + +class CFrontThread final : private iot_public::CTimerThreadBase +{ +public: + CFrontThread(); + ~CFrontThread() override; + + //< 线程正在运行时调用,安全,返回true + bool resumeThread(); + + bool suspendThread(); + + bool isThreadRunning() const; + +private: + void execute()override; + + void handleOneMbMsg_bak(); + void handleOneMbMsg(); + +private: + volatile bool m_bIsRunning; + + //< 用于动态调节 m_objTssInsert 的释放 + int m_nLoopCnt; + int m_nReleaseThreshold; + + iot_net::CMbCommunicator *m_pMbComm; //< 消息总线通讯器 + iot_net::CMbMessage *m_pMbMsgRcv; //< 接收的消息总线消息 + + //< handleOneMbMsg() 函数中使用,为了性能作为成员函数,避免频繁构造 + std::string m_strTags; + std::string m_strFields; + iot_idl::STssInsert m_objTssInsert; + +}; + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/CNodeMng.cpp b/platform/src/dbms/tsdb_local_save/CNodeMng.cpp new file mode 100644 index 00000000..21b8cb4f --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CNodeMng.cpp @@ -0,0 +1,383 @@ + +/******************************************************************************//** +* @file CNodeMng.cpp +* @brief 时序库存库服务,时序库服务器节点管理类 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#include "boost/typeof/typeof.hpp" +#include "boost/lexical_cast.hpp" + +#include "pub_logger_api/logger.h" +#include "pub_sysinfo_api/SysInfoApi.h" +#include "google/protobuf/text_format.h" +#include "CNodeMng.h" + +namespace iot_dbms +{ + +static CNodeMng *g_pNodeMng = NULL; + +static CNodeMng::GC gc; + +CNodeMng::GC::GC() +{ + +} + +CNodeMng::GC::~GC() +{ + if (NULL != g_pNodeMng) + { + delete g_pNodeMng; + g_pNodeMng = NULL; + } +} + +//<================================================ + +CNodeMng::CNodeMng() +{ + m_nLocalDomainID = -1; +} + + +CNodeMng::~CNodeMng() +{ + release(); +} + + +CNodeMng& CNodeMng::getInstance() +{ + if (NULL == g_pNodeMng) + { + g_pNodeMng = new CNodeMng(); + } + + return *g_pNodeMng; +} + + +bool CNodeMng::haveInstance() +{ + return NULL != g_pNodeMng; +} + + +bool CNodeMng::init(const iot_public::SRunAppInfo &stRunAppInfo) +{ + m_stRunAppInfo = stRunAppInfo; + + iot_public::CSysInfoInterfacePtr ptrSysInfo; + if (iot_public::createSysInfoInstance(ptrSysInfo) == false) + { + LOGERROR("createSysInfoInstance() return false !"); + return false; + } + + //< 获取本机域ID和主机名 + { + iot_public::SNodeInfo stLocalNodeInfo; + if (iotSuccess != ptrSysInfo->getLocalNodeInfo(stLocalNodeInfo)) + { + LOGERROR("getLocalNodeInfo() failed !"); + return false; + } + + m_nLocalDomainID = stLocalNodeInfo.nDomainId; + m_strLocalNodeName = stLocalNodeInfo.strName; + } + + //< 本地域数据库连接信息 + iot_public::SDatabaseInfo stFirstDbInfo; + std::vector vecLocalDbInfo; + std::vector vecRemoteDbInfo; + int nRc = ptrSysInfo->getDBInfoByDomainId(m_nLocalDomainID, stFirstDbInfo, + vecLocalDbInfo, vecRemoteDbInfo); + if (iotSuccess != nRc) + { + LOGERROR("getDbInfoByNodeName failed , return false !"); + return false; + } + + //< 初始化 m_vecLocalThreads + for (size_t nDbInfoIndex = 0; nDbInfoIndex < vecLocalDbInfo.size(); nDbInfoIndex++) + { + iot_public::SNodeInfo stDbNodeInfo; + + nRc = ptrSysInfo->getNodeInfoByName(vecLocalDbInfo[nDbInfoIndex].strNodeName, stDbNodeInfo); + if (iotSuccess != nRc) + { + LOGERROR("getNodeInfoByName failed, continue !"); + continue; + } + + std::string strIpA, strIpB; + for (int i = 0; i < stDbNodeInfo.nNicNum; i++) + { + switch (i + 1) + { + case 1: + strIpA = stDbNodeInfo.strNic1Addr; + break; + case 2: + strIpB = stDbNodeInfo.strNic2Addr; + break; + default: + break; + } + } + + //< todo 端口号、数据库名、用户名、密码 暂时写死 + /* 为以后搜索方便,保留此注释 + * 取消默认EMS_DEFAULT_DATABASE,时序库使用系统建模中关系库的首链接的数据库配置 + * 时序库用户名用的数据库名称 + */ + CNodeThreadPtr ptrNodeThread(new CNodeThread(stDbNodeInfo.strName, + strIpA, strIpB, + 8086, stFirstDbInfo.strServiceName, + stFirstDbInfo.strServiceName, stFirstDbInfo.strUserPassword)); + + m_vecLocalThreads.push_back(ptrNodeThread); + + ptrNodeThread->resumeThread(); + } + + //< 初始化 m_vecRemoteThreads + for (size_t nDbInfoIndex = 0; nDbInfoIndex < vecRemoteDbInfo.size(); nDbInfoIndex++) + { + iot_public::SNodeInfo stDbNodeInfo; + + nRc = ptrSysInfo->getNodeInfoByName(vecRemoteDbInfo[nDbInfoIndex].strNodeName, stDbNodeInfo); + if (iotSuccess != nRc) + { + LOGERROR("getNodeInfoByName failed, continue !"); + continue; + } + + std::string strIpA, strIpB; + for (int i = 0; i < stDbNodeInfo.nNicNum; i++) + { + switch (i + 1) + { + case 1: + strIpA = stDbNodeInfo.strNic1Addr; + break; + case 2: + strIpB = stDbNodeInfo.strNic2Addr; + break; + default: + break; + } + } + + //< todo 端口号、数据库名、用户名、密码 暂时写死 + /* 为以后搜索方便,保留此注释 + * 取消默认EMS_DEFAULT_DATABASE,时序库使用系统建模中关系库的首链接的数据库配置 + * 时序库用户名用的数据库名称 + */ + CNodeThreadPtr ptrNodeThread(new CNodeThread(stDbNodeInfo.strName, + strIpA, strIpB, + 8086, stFirstDbInfo.strServiceName, + stFirstDbInfo.strServiceName, stFirstDbInfo.strUserPassword)); + + m_vecRemoteThreads.push_back(ptrNodeThread); + + ptrNodeThread->resumeThread(); + } + + return true; +} + + +void CNodeMng::release() +{ + //< CTimerThreadBase 析构时会退出线程 + m_vecLocalThreads.clear(); + m_vecRemoteThreads.clear(); +} + + +//< 获取本域ID +int CNodeMng::getLocalDomainID() const +{ + return m_nLocalDomainID; +} + +//< 获取本机主机名 +const std::string& CNodeMng::getLocalNodeName() const +{ + return m_strLocalNodeName; +} + +std::string CNodeMng::getAppName() const +{ + return m_stRunAppInfo.strAppName; +} + +int CNodeMng::getAppId() const +{ + return m_stRunAppInfo.nAppId; +} + +void CNodeMng::pushSaveMsg(const iot_idl::STssInsert &objTssInsert) +{ + //< 为了调试,打印所有消息,生产环境不要打 + //{ + // std::string strPrint; + // google::protobuf::TextFormat::PrintToString(objTssInsert, &strPrint); + // LOGDEBUG("收到消息,内容:\n%s", strPrint.c_str()); + //} + + std::string strTags; + std::string strFields; + + const int nPointCnt = objTssInsert.point_size(); + + //< 生成 InfluxDB 插入字串 + //< 批量插入的语句中,有错误语句,InfluxDB会跳过错误语句,不影响正确语句 + StdStringPtr ptrStrInsert(new std::string); + for (int nPointIndex = 0; nPointIndex < nPointCnt; nPointIndex++) + { + const iot_idl::STsdbPoint &objPoint = objTssInsert.point(nPointIndex); + + if (objPoint.tag_val_size() != objTssInsert.tag_name_size()) + { + LOGERROR("STsdbPoint中tag_val数量不正确,忽略此点!"); + continue; + } + + if (objPoint.field_val_size() != objTssInsert.field_name_size()) + { + LOGERROR("STsdbPoint中field_val数量不正确,忽略此点!"); + continue; + } + + //< 生成 Tag 字串 + strTags.clear(); + for (int nTagIndex = 0; nTagIndex < objPoint.tag_val_size(); nTagIndex++) + { + if (0 != nTagIndex) + { + strTags += ","; + } + + strTags += objTssInsert.tag_name(nTagIndex); + strTags += "="; + + //< dp为了查实时库,经常将tag_name进行resize()操作,尾部会有大量的\0 + //< std::string直接相加会带上这些\0,即按size()长度相加 + //< 而influxdb解析语句时遇到\0就截断了 + //< 为了防止这种问题,使用c_str()的方式 + const std::string &strTagVal = objPoint.tag_val(nTagIndex); + strTags += strTagVal; + + //< 遇到此情况输出日志 + if (strlen(strTagVal.c_str()) != strTagVal.size()) + { + std::string strPrint; + google::protobuf::TextFormat::PrintToString(objTssInsert, &strPrint); + + LOGWARN("收到的tag_val字串中包含\\0字符,仅使用\\0前的内容,以防字符串截断。请检查消息源程序,消息内容如下:\n%s\n", + strPrint.c_str()); + } + } + + //< 生成 Field 字串 + strFields.clear(); + for (int nFieldIndex = 0; nFieldIndex < objPoint.field_val_size(); nFieldIndex++) + { + if (0 != nFieldIndex) + { + strFields += ","; + } + + strFields += objTssInsert.field_name(nFieldIndex); + strFields += "="; + strFields += toInfluxString(objPoint.field_val(nFieldIndex)); + } + + //< 预先分配空间,避免string频繁扩充,扩充时还需要复制内存 + if (0 == nPointIndex) + { + const size_t nSizeOfOne = objTssInsert.meas_name().size() + + strTags.size() + strFields.size() + 30; + ptrStrInsert->reserve(nSizeOfOne * nPointCnt); + } + + //< 添加到 ptrStrInsert + //*ptrStrInsert += boost::str(boost::format("%1%,%2% %3% %4% \n") + // % objTssInsert.meas_name() + // % strTags + // % strFields + // % objPoint.time_stamp()); + + //< 经测试,lexical_cast比format快几十倍 + *ptrStrInsert += objTssInsert.meas_name() + ","; + *ptrStrInsert += strTags + " "; + *ptrStrInsert += strFields + " "; + *ptrStrInsert += boost::lexical_cast(objPoint.time_stamp()) + " \n"; + } + + //< 判断save_action + bool bToLocal = false; + bool bToRemote = false; + switch (objTssInsert.save_action()) + { + case iot_idl::SA_TSS_DO_NOTHING: + break; + case iot_idl::SA_TSS_LOCAL_ONLY: + bToLocal = true; + break; + case iot_idl::SA_TSS_REMOTE_ONLY: + bToRemote = true; + break; + case iot_idl::SA_TSS_LOCAL_REMOTE: + bToLocal = true; + bToRemote = true; + break; + default: LOGERROR("Unknow save_action value !"); + break; + } + + //< 添加给NodeMng,pushSaveStr()中有判是否为空字串 + pushSaveStr(ptrStrInsert, bToLocal, bToRemote); +} + +void CNodeMng::pushSaveStr(StdStringPtr ptrStrSave, bool bToLocal, bool bToRemote) +{ + if (!ptrStrSave) + { + LOGERROR("pushSaveStr(): ptrStrSave == NULL, return !"); + return; + } + + if (ptrStrSave->empty()) + { + LOGINFO("pushSaveStr(): ptrStrSave->empty(), return !"); + return; + } + + if (bToLocal) + { + for (size_t i = 0; i < m_vecLocalThreads.size(); i++) + { + m_vecLocalThreads[i]->pushSaveStr(ptrStrSave); + } + } + + if (bToRemote) + { + for (size_t i = 0; i < m_vecRemoteThreads.size(); i++) + { + m_vecRemoteThreads[i]->pushSaveStr(ptrStrSave); + } + } +} + + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/CNodeMng.h b/platform/src/dbms/tsdb_local_save/CNodeMng.h new file mode 100644 index 00000000..1c3f4710 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CNodeMng.h @@ -0,0 +1,82 @@ + +/******************************************************************************//** +* @file CNodeMng.h +* @brief 时序库存库服务,时序库服务器节点管理类 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#pragma once + +#include + +#include "CNodeThread.h" +#include "TsdbSaveCommon.h" +#include "TsdbSaveMessage.pb.h" +#include "pub_sysinfo_api/SysInfoApi.h" + +namespace iot_dbms +{ + +class CNodeThread; + +class CNodeMng final +{ +public: + //< 单例释放类 + class GC final + { + public: + GC(); + ~GC(); + }; + +public: + //< 单例 + static CNodeMng& getInstance(); + + //< 判断单例是否已实例化 + static bool haveInstance(); + + ~CNodeMng(); + + //< 初始化 + bool init(const iot_public::SRunAppInfo &stRunAppInfo); + + //< 释放资源 + void release(); + + //< 获取本域ID + int getLocalDomainID() const; + + //< 获取本机主机名 + const std::string& getLocalNodeName() const; + + std::string getAppName() const; + int getAppId() const; + + //< 添加存库消息 + void pushSaveMsg(const iot_idl::STssInsert &objTssInsert); + void pushSaveStr(StdStringPtr ptrStrSave, bool bToLocal, bool bToRemote); + +private: + CNodeMng(); + +private: + //< 本域ID + int m_nLocalDomainID; + iot_public::SRunAppInfo m_stRunAppInfo; + //< 本机主机名 + std::string m_strLocalNodeName; + + //< 本地域时序库服务器节点线程 + std::vector m_vecLocalThreads; + + //< 远程域时序库服务器节点线程 + std::vector m_vecRemoteThreads; + +}; + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/CNodeThread.cpp b/platform/src/dbms/tsdb_local_save/CNodeThread.cpp new file mode 100644 index 00000000..dc3fc082 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CNodeThread.cpp @@ -0,0 +1,433 @@ + +/******************************************************************************//** +* @file CNodeThread.h +* @brief 时序库存库服务,负责对单个时序库服务器节点的(写库,缓存文件等)处理线程 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#include "boost/filesystem/operations.hpp" + +#include "pub_logger_api/logger.h" +#include "pub_utility_api/TimeUtil.h" +#include "pub_utility_api/FileUtil.h" +#include "tsdb_api/CTsdbConn.h" + +#include "CNodeThread.h" +#include "CNodeMng.h" + +namespace iot_dbms +{ + +//< 1s运行一次,死锁超时 1天 +CNodeThread::CNodeThread(const std::string &strNodeName, + const std::string &strIpA, const std::string &strIpB, + int nPort, const std::string &strDbName, + const std::string &strUser, const std::string &strPasswd) : + CTimerThreadBase("CNodeThread", 1000, 1000 * 60 * 60 * 24) +{ + m_bDbCreated = false; + + m_nLastCheckConnTime = 0; + + m_pConnInUse = NULL; + + //< m_pConnIpA + { + if (strIpA.empty()) + { + m_pConnIpA = NULL; + } + else + { + m_pConnIpA = new CTsdbConn(strIpA.c_str(), nPort, strDbName.c_str(), + strUser.c_str(), strPasswd.c_str()); + } + } + + //< m_pConnIpB + { + if (strIpB.empty()) + { + m_pConnIpB = NULL; + } + else + { + m_pConnIpB = new CTsdbConn(strIpB.c_str(), nPort, strDbName.c_str(), + strUser.c_str(), strPasswd.c_str()); + } + } + + m_strNodeName = strNodeName; + m_strDbName = strDbName; +} + + +CNodeThread::~CNodeThread() +{ + suspendThread(); + quit(); + + m_pConnInUse = NULL; + + if (m_pConnIpA) + { + delete m_pConnIpA; + m_pConnIpA = NULL; + } + + if (m_pConnIpB) + { + delete m_pConnIpB; + m_pConnIpB = NULL; + } + + m_objReadStream.close(); + m_objWriteStream.close(); +} + + +bool CNodeThread::resumeThread() +{ + resume(); + return true; +} + +bool CNodeThread::suspendThread() +{ + suspend(); + return true; +} + + +bool CNodeThread::isThreadRunning() const +{ + return isRunning(); +} + + +void CNodeThread::pushSaveStr(StdStringPtr ptrStrSave) +{ + boost::mutex::scoped_lock locker(m_mutexQue); + m_queStrSave.push(ptrStrSave); +} + + +void CNodeThread::afterResume() +{ + updateConnInUse(true); +} + + +void CNodeThread::execute() +{ + //< 非强制性更新连接状态 + updateConnInUse(false); + + //< 自动创建数据库,无需工程人员手动创建 + //< 已测试:若数据库已存在,再创建同名数据库,创建无效,不影响原有数据 + { + if (!m_bDbCreated && NULL != m_pConnInUse) + { + //< 已测试:若数据库已存在,再创建同名数据库,HTTP返回200,createDatabase()函数返回true + if (m_pConnInUse->createDatabase(m_strDbName.c_str())) + { + m_bDbCreated = true; + } + else + { + LOGERROR("createDatabase %s failed !", m_strDbName.c_str()); + } + } + } + + //< 从文件处理 + dealFromFile(); + + //< 从队列处理 + dealFromQueue(); + +} + + +void CNodeThread::dealFromFile() +{ + //< 一次处理完全部缓存 + + //< 提交成功,则清空,不成功下个循环继续 + std::string strToDeal; + + //< 注意防止死循环 + while (m_pConnInUse) + { + //< strToDeal为空,则读取文件 + //< 不为空,则是上次循环未提交成功,切换网络后继续执行到此处 + if (strToDeal.empty()) + { + //< 16MiB + while (strToDeal.size() < 0x00FFFFFF) + { + std::string strTemp; + if (readFromFile(strTemp)) + { + strToDeal += strTemp; + } + else + { + break; + } + } + } + + if (strToDeal.empty()) + { + delFile(); + break; + } + + //< 超时时间1分钟 + const bool bRc = m_pConnInUse->doInsert(strToDeal.c_str(), 60 * 1000); + + if (!bRc) + { + //< 插入不成功,强制更新连接状态 + //< 返回true表示当前m_pConnInUse不可用,发生了变化 + if (updateConnInUse(true)) + { + //< 继续大循环 + continue; + } + else + { + //< 连接状态正常,可能是提交语句有问题,跳过 + + //< 从文件读取的内容可能很长,不输出日志 + //LOGERROR("提交TSDB失败,但连接状态正常,放弃提交该语句,语句如下:\n%s", strToDeal.c_str()); + } + } + + //< 成功,或者语句有问题,清空 + strToDeal.clear(); + } +} + + +void CNodeThread::dealFromQueue() +{ + //< 从 m_queStrSave 获取全部 + std::string strToDeal; + { + std::queue queToDeal; + { + //< 加锁 + boost::mutex::scoped_lock locker(m_mutexQue); + + if (m_queStrSave.empty()) + { + return; + } + + std::swap(m_queStrSave, queToDeal); + } + + while (!(queToDeal.empty())) + { + //< 批量一次性处理,效率更高 + strToDeal += *(queToDeal.front()); + + //< 前端已经加了\n + //strToDeal += "\n"; + + queToDeal.pop(); + } + } + + if (m_pConnInUse) + { + //< 超时时间1分钟 + bool bRc = m_pConnInUse->doInsert(strToDeal.c_str(), 60 * 1000); + if (!bRc) + { + //< 插入不成功,强制更新连接状态 + //< 返回true表示当前m_pConnInUse不可用,发生了变化 + if (updateConnInUse(true)) + { + //< 写文件 + writeToFile(strToDeal); + } + else + { + //< 连接状态正常,可能是提交语句有问题,跳过 + LOGERROR("提交TSDB失败,但连接状态正常,提交语句如下:\n%s", strToDeal.c_str()); + } + } + } + else + { + //< 写文件 + writeToFile(strToDeal); + } +} + + +bool CNodeThread::updateConnInUse(bool bForce) +{ + bool bRet = false; + if (NULL == m_pConnInUse) + { + if (m_pConnIpA + && m_pConnIpA->pingServer(1000)) + { + m_pConnInUse = m_pConnIpA; + bRet = true; + } + else if (m_pConnIpB + && m_pConnIpB->pingServer(1000)) + { + m_pConnInUse = m_pConnIpB; + bRet = true; + } + + //< 更新时间 + m_nLastCheckConnTime = iot_public::getMonotonicMsec(); + } + else + { + //< 强制更新,或者已到周期性检查时间(防止频繁检查) + if (bForce + || m_nLastCheckConnTime + 5000 < iot_public::getMonotonicMsec()) + { + //< 检查当前 m_pConnInUse 是否可用 + if (!m_pConnInUse->pingServer(1000)) + { + m_pConnInUse = NULL; + + bRet = true; + + if (m_pConnIpA + && m_pConnIpA->pingServer(1000)) + { + m_pConnInUse = m_pConnIpA; + } + else if (m_pConnIpB + && m_pConnIpB->pingServer(1000)) + { + m_pConnInUse = m_pConnIpB; + } + } + + //< 更新时间 + m_nLastCheckConnTime = iot_public::getMonotonicMsec(); + } + } + + return bRet; +} + + +bool CNodeThread::readFromFile(std::string &strOutput) +{ + if (m_objWriteStream.is_open()) + { + m_objWriteStream.close(); + } + + if (!m_objReadStream.is_open()) + { + //std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, m_strNodeName); + std::string strFullPath = getCacheFilePath(); + if (strFullPath.empty()) + return false; + + if (boost::filesystem::exists(strFullPath) + && boost::filesystem::is_regular_file(strFullPath)) + { + m_objReadStream.open(strFullPath, std::ios::in); + } + else + return false; + } + + if (!std::getline(m_objReadStream, strOutput)) + return false; + + strOutput.append("\n"); + + return true; +} + + +void CNodeThread::writeToFile(const std::string &strInput) +{ + if (m_objReadStream.is_open()) + { + m_objReadStream.close(); + } + + if (!m_objWriteStream.is_open()) + { + //std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, ""); + std::string strFullPath = getCachePath(); + + if (!boost::filesystem::is_directory(strFullPath)) + { + if (!boost::filesystem::create_directories(strFullPath)) + { + LOGERROR("create directories %s failed.", strFullPath.c_str()); + } + } + + //strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, m_strNodeName); + strFullPath = getCacheFilePath(); + if (strFullPath.empty()) + return; + + //< append模式,在文件末尾添加,没有文件则自动创建 + m_objWriteStream.open(strFullPath, std::ios::app); + } + + m_objWriteStream << strInput; +} + +void CNodeThread::delFile() +{ + m_objReadStream.close(); + m_objWriteStream.close(); + + //std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, m_strNodeName); + std::string strFullPath = getCacheFilePath(); + if (strFullPath.empty()) + return; + + if (boost::filesystem::exists(strFullPath)) + { + if (!boost::filesystem::remove_all(strFullPath)) + { + LOGERROR("删除文件失败."); + } + } +} + +std::string CNodeThread::getCachePath() +{ + std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, ""); + boost::filesystem::path objFullPath = strFullPath; + objFullPath /= CNodeMng::getInstance().getAppName(); + + return objFullPath.string(); +} + +std::string CNodeThread::getCacheFilePath() +{ + std::string strFullPath = iot_public::CFileUtil::getAbsolutePath(DATA_DIRECTORY, ""); + boost::filesystem::path objFullPath = strFullPath; + objFullPath /= CNodeMng::getInstance().getAppName(); + + strFullPath = iot_public::CFileUtil::getAbsolutePath(objFullPath.string(), m_strNodeName); + + return strFullPath; +} + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/CNodeThread.h b/platform/src/dbms/tsdb_local_save/CNodeThread.h new file mode 100644 index 00000000..b4ab8bf3 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CNodeThread.h @@ -0,0 +1,97 @@ + +/******************************************************************************//** +* @file CNodeThread.h +* @brief 时序库存库服务,负责对单个时序库服务器节点的(写库,缓存文件等)处理线程 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#pragma once + +#include + +#include "boost/shared_ptr.hpp" +#include "boost/thread/mutex.hpp" +#include "boost/filesystem/fstream.hpp" + +#include "pub_utility_api/TimerThreadBase.h" + +#include "TsdbSaveCommon.h" + + +namespace iot_dbms +{ + +class CTsdbConn; + +class CNodeThread final : private iot_public::CTimerThreadBase +{ +public: + CNodeThread(const std::string &strNodeName, + const std::string &strIpA, const std::string &strIpB, + int nPort, const std::string &strDbName, + const std::string &strUser, const std::string &strPasswd); + + ~CNodeThread() override; + + bool resumeThread(); + + bool suspendThread(); + + bool isThreadRunning() const; + + void pushSaveStr(StdStringPtr ptrStrSave); + +private: + void afterResume()override; + void execute()override; + + //< 从缓存文件处理 + void dealFromFile(); + + //< 从内存队列处理 + void dealFromQueue(); + + //< 更新 m_pConnInUse,如果当前m_pConnInUse可用,则不变化 + //< 返回值:m_pConnInUse 是否发生了变化 + bool updateConnInUse(bool bForce); + + bool readFromFile(std::string &strOutput); + void writeToFile(const std::string &strInput); + + void delFile(); + + //< 获取时序数据缓存目录 + std::string getCachePath(); + //< 获取时序数据缓存文件路径 + std::string getCacheFilePath(); + +private: + + //< 是否已执行创建数据库命令 + bool m_bDbCreated; + + //< 上一次检查连接的时间,防止频繁检查 + boost::int64_t m_nLastCheckConnTime; + + CTsdbConn *m_pConnInUse; //< 正在使用的连接 + CTsdbConn *m_pConnIpA; //< A网连接 + CTsdbConn *m_pConnIpB; //< B网连接 + + std::string m_strNodeName; //< 目标节点主机名 + std::string m_strDbName; //< 数据库名 + + //< 指针数据,各线程共享,不应修改 + std::queue m_queStrSave; //< 插入数据队列 + boost::mutex m_mutexQue; //< m_queStrSave的锁 + + boost::filesystem::ifstream m_objReadStream; + boost::filesystem::ofstream m_objWriteStream; + +}; + +typedef boost::shared_ptr CNodeThreadPtr; + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/CTsdbSaveRedunSw.cpp b/platform/src/dbms/tsdb_local_save/CTsdbSaveRedunSw.cpp new file mode 100644 index 00000000..7480f0d4 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CTsdbSaveRedunSw.cpp @@ -0,0 +1,82 @@ + +/******************************************************************************//** +* @file CTsdbSaveRedunSw.cpp +* @brief 时序库存库服务,冗余切换类 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#include "CTsdbSaveSrv.h" +#include "CNodeMng.h" +#include "CTsdbSaveRedunSw.h" +#include "SampleThread.h" + +namespace iot_dbms +{ + +CTsdbSaveRedunSw::CTsdbSaveRedunSw( CTsdbSaveSrv *pParent, const iot_public::SRunAppInfo &stRunAppInfo) + : m_pParent( pParent ), + m_stRunAppInfo(stRunAppInfo), + m_ptrSampleThread(NULL) +{ + assert(m_pParent); +} + +CTsdbSaveRedunSw::~CTsdbSaveRedunSw() +{ + //< m_objFrontThread析构时会 quit() + m_ptrSampleThread.reset(); +} + +int CTsdbSaveRedunSw::redundantSwitch(bool bMaster, bool bSlave) +{ + //< 当前逻辑 bSlave 无需使用 + + if (bMaster) + { + m_objFrontThread.resumeThread(); + } + else + { + m_objFrontThread.suspendThread(); + } + + if(m_ptrSampleThread != NULL) + { + m_ptrSampleThread->redundantSwitch(bMaster); + } + + m_pParent->updateProcInfo(true, m_objFrontThread.isThreadRunning(), bSlave); + + return iotSuccess; +} + +int CTsdbSaveRedunSw::initialize() +{ + //PUBLIC应用不加载定时存盘功能 + if(m_stRunAppInfo.nAppId == CN_AppId_PUBLIC) + { + return iotSuccess; + } + + //初始化历史采样接口 + //================================================================================================ + m_ptrSampleThread = boost::make_shared(m_stRunAppInfo); + if (m_ptrSampleThread == NULL) + { + LOGERROR("CTsdbSaveRedunSw::initialize(), create SampleInstance fail!"); + return iotFailed; + } + + if(!m_ptrSampleThread->initialize()) + { + LOGERROR("CTsdbSaveRedunSw::initialize(), init SampleInstance fail!"); + return iotFailed; + } + + return iotSuccess; +} + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/CTsdbSaveRedunSw.h b/platform/src/dbms/tsdb_local_save/CTsdbSaveRedunSw.h new file mode 100644 index 00000000..cb0c5398 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CTsdbSaveRedunSw.h @@ -0,0 +1,45 @@ + +/******************************************************************************//** +* @file CTsdbSaveRedunSw.h +* @brief 时序库存库服务,冗余切换类 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#pragma once + +#include "sys_node_mng_api/NodeMngInterface.h" +#include "CFrontThread.h" +#include "SampleThread.h" + +namespace iot_dbms +{ + +class CTsdbSaveSrv; + +class CTsdbSaveRedunSw final : public ::iot_sys::CRedundantSwitchInterface +{ +public: + explicit CTsdbSaveRedunSw( CTsdbSaveSrv *pParent,const iot_public::SRunAppInfo &stRunAppInfo ); + ~CTsdbSaveRedunSw(); + +public: + + //< 见父类CRedundantSwitchInterface说明 + int redundantSwitch( bool bMaster, bool bSlave ) override; + + int initialize(); + +private: + CTsdbSaveSrv *const m_pParent; + iot_public::SRunAppInfo m_stRunAppInfo; + + CFrontThread m_objFrontThread; + CSampleThreadPtr m_ptrSampleThread; +}; + +typedef boost::shared_ptr CTsdbSaveRedunSwPtr; + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/CTsdbSaveSrv.cpp b/platform/src/dbms/tsdb_local_save/CTsdbSaveSrv.cpp new file mode 100644 index 00000000..30a6e63a --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CTsdbSaveSrv.cpp @@ -0,0 +1,372 @@ + +/******************************************************************************//** +* @file CTsdbSaveSrv.cpp +* @brief 时序库存库服务,,服务框架 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#include "boost/program_options.hpp" +#include "boost/algorithm/string/predicate.hpp" + +#include "pub_logger_api/logger.h" +#include "pub_utility_api/SingleProcInstance.h" +#include "net_msg_bus_api/MsgBusApi.h" +#include "tsdb_api/TsdbApi.h" + +#include "CNodeMng.h" + +#include "CTsdbSaveSrv.h" + + +#define TSDB_LOCAL_SAVE_SRV_PROC_NAME "tsdb_local_save" +using namespace std; +using namespace iot_public; + +namespace iot_dbms +{ + +CTsdbSaveSrv::CTsdbSaveSrv() +{ + m_enRunModel = RM_NORMAL; +} + + +CTsdbSaveSrv::~CTsdbSaveSrv() +{ + stop(); + + iot_public::StopLogSystem(); +} + + +//< @param int & nStatus 错误码 +bool CTsdbSaveSrv::start(int argc, char *argv[], int& /*nStatus*/) +{ + assert(!m_ptrRedunSw); //< NULL + + string strAppName; + string strStartArgs; + //< 参数解析 + if (!parseCommandLine(argc, argv, strAppName,strStartArgs)) + { + std::cerr << "参数解析失败" << std::endl; + return false; + } + + //< 启动日志 + iot_public::StartLogSystem(strAppName.c_str(), TSDB_LOCAL_SAVE_SRV_PROC_NAME); + + if(!getSysAppInfo(strAppName)) + { + LOGERROR("获取应用信息失败"); + return false; + } + + //< 判断是否已启动 + if (isAlreadyRunning(strStartArgs)) + { + LOGFATAL(TSDB_LOCAL_SAVE_SRV_PROC_NAME" 已启动,不可重复启动,本实例退出!"); + return false; + } + + //< 消息总线 + if (!iot_net::initMsgBus(TSDB_LOCAL_SAVE_SRV_PROC_NAME, "", true)) + { + LOGFATAL("初始化消息总线失败,程序启动失败!"); + return false; + } + + //< 时序库接口库 + if (!initTsdbApi()) + { + LOGFATAL("初始化时序库接口库失败,程序启动失败!"); + return false; + } + + //< 初始化 CNodeMng 加载配置等 + if (!CNodeMng::getInstance().init(m_stRunAppInfo)) + { + LOGFATAL("CNodeMng 初始化失败,程序启动失败!"); + return false; + } + + //< CFrontThread 由 m_ptrRedunSw 管理 + + //< 初始化 m_ptrRedunSw + m_ptrRedunSw.reset(new CTsdbSaveRedunSw(this,m_stRunAppInfo)); + if(m_ptrRedunSw == NULL || m_ptrRedunSw->initialize() != iotSuccess) + { + LOGFATAL("CNodeMng 初始化失败,程序启动失败!"); + return false; + } + + switch (m_enRunModel) + { + case RM_NORMAL: + { + //< 进程管理 + { + iot_sys::SProcessInfoKey objProcInfo; + objProcInfo.nAppId = m_stRunAppInfo.nAppId; + objProcInfo.nDomainId = CNodeMng::getInstance().getLocalDomainID(); + objProcInfo.strNodeName = CNodeMng::getInstance().getLocalNodeName(); + objProcInfo.strProcName = TSDB_LOCAL_SAVE_SRV_PROC_NAME; + objProcInfo.strProcParam = strStartArgs; + + m_ptrProcMng = iot_sys::getProcMngInstance(objProcInfo); + if (!m_ptrProcMng) + { + LOGFATAL("getProcMngInstance return NULL"); + return false; + } + + m_ptrProcMng->setCallback(this); + } + + //< 冗余管理 + { + m_ptrRedundantMng = iot_sys::getRedundantMngInstance(CNodeMng::getInstance().getLocalDomainID(), + m_stRunAppInfo.nAppId, + CNodeMng::getInstance().getLocalNodeName()); + if (!m_ptrRedundantMng) + { + LOGERROR("getRedundantMngInstance return NULL"); + return false; + } + + m_ptrRedundantMng->setCallback(m_ptrRedunSw); + } + + //< 更新进程管理状态 + updateProcInfo(true, false, false); + } + break; + case RM_NO_PROC_MNG_MASTER: + { + if (iotSuccess != m_ptrRedunSw->redundantSwitch(true, false)) + { + LOGFATAL("以主机模式启动失败!"); + return false; + } + } + break; + case RM_NO_PROC_MNG_SLAVE: + { + if (iotSuccess != m_ptrRedunSw->redundantSwitch(false, true)) + { + LOGFATAL("以备机模式启动失败!"); + return false; + } + } + break; + default: + { + LOGFATAL("非预期的启动模式,程序启动失败!"); + return false; + } + break; + } + + LOGINFO(TSDB_LOCAL_SAVE_SRV_PROC_NAME" is now running ..."); + + return true; +} + + +bool CTsdbSaveSrv::stop() +{ + LOGINFO(TSDB_LOCAL_SAVE_SRV_PROC_NAME" is now exiting ..."); + + //< 取消冗余切换,防止正在退出时发生冗余切换 + if (m_ptrRedundantMng) + { + //LOGDEBUG("Release m_ptrRedundantMng ..."); + + m_ptrRedundantMng->unsetCallback(); + m_ptrRedundantMng.reset(); + + //LOGDEBUG("Release m_ptrRedundantMng complete !"); + } + + //< 释放 m_ptrRedunSw + if (m_ptrRedunSw) + { + //LOGDEBUG("Release m_ptrRedunSw ..."); + + m_ptrRedunSw.reset(); + + //LOGDEBUG("Release m_ptrRedunSw complete !"); + } + + //< 取消进程管理回调 + //if (m_ptrProcMng) + //{ + // m_ptrProcMng->unsetCallback(); + //} + + //< 清理业务线程 + if (CNodeMng::haveInstance()) + { + //LOGDEBUG("Release CNodeMng ..."); + + //< CFrontThread 由 m_ptrRedunSw 管理 + CNodeMng::getInstance().release(); + + //LOGDEBUG("Release CNodeMng complete !"); + } + + //< 更新进程管理状态 + if (m_ptrProcMng) + { + //LOGDEBUG("Release m_ptrProcMng ..."); + + updateProcInfo(false, false, false); + m_ptrProcMng.reset(); + + //LOGDEBUG("Release m_ptrProcMng complete !"); + } + + //< 释放时序库接口库 + releaseTsdbApi(); + + //< 停止消息总线 + iot_net::releaseMsgBus(); + + //< 停止日志系统 + //< 移到析构函数中,防止日志库停止后,又写日志,从而使log4cplus提示找不到logger + //iot_public::StopLogSystem(); + + return true; +} + + +int CTsdbSaveSrv::toQuit() +{ + shutdown(); + return iotSuccess; +} + + +int CTsdbSaveSrv::updateProcInfo(bool bActive, bool bMaster, bool bSlave) +{ + if (m_ptrProcMng) + { + return m_ptrProcMng->updateProcessInfo(bActive, bMaster, bSlave); + } + + return iotFailed; +} + + +bool CTsdbSaveSrv::isAlreadyRunning(const std::string &strStartArgs ) +{ + std::string strUniqueName = TSDB_LOCAL_SAVE_SRV_PROC_NAME; + strUniqueName += strStartArgs; + return iot_public::CSingleProcInstance::hasInstanceRunning( strUniqueName ); +} + + +bool CTsdbSaveSrv::parseCommandLine(int argc, char *argv[],std::string &strAppName, std::string &strStartArgs) +{ + //< 拼接启动参数,用于向进程管理注册 + for ( int i = 1; i < argc; ++i ) + { + if ( i != 1 ) + { + strStartArgs += " "; + } + strStartArgs += argv[i]; + } + + namespace po = boost::program_options; + po::options_description desc("usage"); + po::variables_map vm; + try + { + desc.add_options() + ("app"",a", po::value(), "\t""The APP name, can only ran as PUBLIC") + ("no_proc_mng_master"",m", "\t""Run as master without ProcMng and RedundantMng") + ("no_proc_mng_slave"",s", "\t""Run as slave without ProcMng and RedundantMng") + ("help"",h", "\t""Print this info"); + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) + { + std::cout << desc << std::endl; + return false; + } + + if (vm.count("no_proc_mng_master") && vm.count("no_proc_mng_slave")) + { + std::cout << "no_proc_mng_master and no_proc_mng_slave can not use at the same time !" << std::endl; + return false; + } + + if (0 == vm.count("app")) + { + std::cout << "Must set app !" << std::endl; + return false; + } + else + { + strAppName = vm["app"].as(); + } + + if (vm.count("no_proc_mng_master")) + { + m_enRunModel = RM_NO_PROC_MNG_MASTER; + } + else if (vm.count("no_proc_mng_slave")) + { + m_enRunModel = RM_NO_PROC_MNG_SLAVE; + } + else + { + m_enRunModel = RM_NORMAL; + } + } + catch (std::exception &ex) + { + std::cerr << ex.what() << std::endl; + std::cout << desc << std::endl; + return false; + } + catch (...) + { + std::cerr << "未知错误" << std::endl; + std::cout << desc << std::endl; + return false; + } + + return true; +} + +bool CTsdbSaveSrv::getSysAppInfo(const std::string &strAppName) +{ + CSysInfoInterfacePtr sysInfoPtr; + if(createSysInfoInstance(sysInfoPtr) == false) + { + LOGERROR("AppName=%s ,createSysInfoInstance fail!\n", strAppName.c_str()); + return false; + } + if(sysInfoPtr == NULL) + { + LOGERROR("AppName=%s ,Get System Info fail!\n", strAppName.c_str()); + return false; + } + + + if(iotSuccess != sysInfoPtr->getLocalRunAppInfoByName(strAppName,m_stRunAppInfo)) + { + LOGERROR("getLocalRunAppInfoByName fail"); + return false; + } + + return true; +} + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/CTsdbSaveSrv.h b/platform/src/dbms/tsdb_local_save/CTsdbSaveSrv.h new file mode 100644 index 00000000..e86dabfc --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/CTsdbSaveSrv.h @@ -0,0 +1,66 @@ + +/******************************************************************************//** +* @file CTsdbSaveSrv.h +* @brief 时序库存库服务,服务框架 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#pragma once + +#include "pub_utility_api/BaseService.h" +#include "sys_proc_mng_api/ProcMngInterface.h" +#include "pub_sysinfo_api/SysInfoApi.h" +#include "CTsdbSaveRedunSw.h" + +namespace iot_dbms +{ + +class CTsdbSaveSrv final : public iot_public::CBaseService, iot_sys::CProcessQuitInterface +{ +public: + CTsdbSaveSrv(); + ~CTsdbSaveSrv() override; + + //< 见父类CBaseService说明 + bool start(int argc, char *argv[], int &nStatus)override; + + //< 见父类CBaseService说明 + bool stop()override; + + //< 见父类CProcessQuitInterface说明 + int toQuit()override; + + //< 设置进程状态 + int updateProcInfo(bool bActive, bool bMaster, bool bSlave); + +private: + + bool isAlreadyRunning(const std::string &strStartArgs ); + + bool parseCommandLine(int argc, char *argv[],std::string &strAppName, std::string &strStartArgs); + + /** + @brief 获取进程系统信息 + @return + */ + bool getSysAppInfo(const std::string &strAppName); + + enum enRunModel + { + RM_NORMAL = 0, //< 正常模式 + RM_NO_PROC_MNG_MASTER, //< 不注册进程管理、冗余管理,主机模式 + RM_NO_PROC_MNG_SLAVE, //< 不注册进程管理、冗余管理,备机模式 + }; + +private: + iot_public::SRunAppInfo m_stRunAppInfo; + enRunModel m_enRunModel; + CTsdbSaveRedunSwPtr m_ptrRedunSw; + iot_sys::CProcMngInterfacePtr m_ptrProcMng; + iot_sys::CRedundantMngInterfacePtr m_ptrRedundantMng; +}; + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/Main.cpp b/platform/src/dbms/tsdb_local_save/Main.cpp new file mode 100644 index 00000000..28ca3064 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/Main.cpp @@ -0,0 +1,17 @@ + +/******************************************************************************//** +* @file Main.cpp +* @brief 时序库存库服务程序入口 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#include "CTsdbSaveSrv.h" + +int main(int argc, char *argv[]) +{ + iot_dbms::CTsdbSaveSrv objApp; + return objApp.main(argc, argv); +} + diff --git a/platform/src/dbms/tsdb_local_save/SampleThread.cpp b/platform/src/dbms/tsdb_local_save/SampleThread.cpp new file mode 100644 index 00000000..763008a6 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/SampleThread.cpp @@ -0,0 +1,390 @@ +/** + @file OptMainThread.cpp + @brief 进程主模块,负责消息的订阅,消息接收,消息的发送,消息的读取和写入通过消息队列m_pMsgBuffer实现; + @author 周正龙 +*/ + +#include "boost/program_options.hpp" +#include "SampleThread.h" +#include "pub_logger_api/logger.h" +#include "pub_utility_api/TimeUtil.h" +#include "service/sample_server_api/SampleDefine.h" +#include "service/common/CommonDefine.h" + +#include "../service/data_process/AccStructDefine.h" +#include "../service/data_process/AnaStructDefine.h" +#include "../service/data_process/DigStructDefine.h" +#include "../service/data_process/MixStructDefine.h" +#include "CNodeMng.h" + +using namespace iot_public; +using namespace iot_service; +using namespace iot_idl; +using namespace iot_dbms; +using namespace std; + +#define MAX_SAVE_POINT_NUM 10000 + + +CSampleThread::CSampleThread(iot_public::SRunAppInfo &stRunAppInfo): + CTimerThreadBase("CSampleThread",20), + m_bMaster(false), + m_stRunAppInfo(stRunAppInfo) +{ + m_lLastTime = m_lCurTime = getUTCTimeMsec(); +} + +CSampleThread::~CSampleThread() +{ + this->quit(); + + if(m_ptrRdbTableMng != NULL) + { + m_ptrRdbTableMng.reset(); + m_ptrRdbTableMng = NULL; + } +} + +/** +@brief 主模块业务处理执行体 +@param 无 +@return 无 +@retval +*/ +void CSampleThread::execute() +{ + if(!m_bMaster) + { + //备机不保存测点历史数据 + return; + } + + m_lCurTime = getUTCTimeMsec(); //读取当前时间 MS + + //if(m_lLastTime > m_lCurTime ) m_lLastTime = m_lCurTime ; //系统时间调整处理,防止跳过时间点 + + //1.存配置了是否存盘的点,没有配置的点不会存盘,包括5分钟; + /* + //================================================================================================= + if ((m_lCurTime/60000 - m_lLastTime/60000) >0 )//分钟数发生改变;(每1 min 判断一次固定存盘) + { + m_lLastTime = m_lCurTime ; + saveAllCfgPoint();//周期提交 + LOGDEBUG("CSampleThread::execute, doEveryPoint() Time=[%lld] ",m_lCurTime); + }*/ + + //1.所有点存盘周期为固定5分钟 + //================================================================================================= + int64 mMin = m_lCurTime/(60000) ;//转换为分钟 + if ( (mMin % (SAMPLE_CYC_MIN) == 0) && (m_lCurTime/60000 - m_lLastTime/60000) >0 )//每5 min判一次 + { + m_lLastTime = m_lCurTime ; + saveAllPoint() ; + LOGDEBUG("CSampleThread::execute, saveAllPoint() Time=[%" PRId64 "] ", m_lCurTime); + } + + return; +} + + +bool CSampleThread::initialize() +{ + //RDB管理实例 + //============================================================================================= + m_ptrRdbTableMng = boost::make_shared(m_stRunAppInfo.strAppName); + if (m_ptrRdbTableMng == NULL) + { + LOGERROR("CSampleProcess ::initialize(), make_shared fail!\n"); + return false; + } + + resume(); + + return true; +} + +void CSampleThread::redundantSwitch(bool bMaster) +{ + m_bMaster = bMaster; +} + +//调用INFLUEXDB API存盘数据到TSDB 中 +void CSampleThread::addToTsdbSever(iot_idl::STssInsert &stTsdbData,const string&strTable) +{ + if(stTsdbData.point_size() >0 ) + { + LOGDEBUG("表[%s]插入记录数:%d",strTable.c_str(),stTsdbData.point_size()); + + stTsdbData.set_save_action(SA_TSS_LOCAL_REMOTE); + stTsdbData.set_meas_name(strTable); + stTsdbData.add_tag_name("tag_name"); + stTsdbData.add_field_name("status"); + stTsdbData.add_field_name("value"); + + //m_ptrTsdbSaveApi->addInsertMsg(stTsdbData) ; + CNodeMng::getInstance().pushSaveMsg(stTsdbData); + stTsdbData.Clear(); + } + return; +} + +//增加一个测点到TSDB队列 +void CSampleThread::addOneTsdbData(iot_idl::STssInsert& stTsdbData,const string&strTagName,\ + const int64 nStatus,const int64 nValue, const int64 nTimeStamp) +{ + STsdbPoint stPoint; + + stPoint.set_time_stamp(nTimeStamp); + stPoint.add_tag_val(strTagName); + + SVariable *varStatus; + varStatus = stPoint.add_field_val(); + varStatus->set_edatatype(CN_DATATYPE_INT64); + varStatus->set_lvalue(nStatus); + + SVariable *varValue; + varValue = stPoint.add_field_val(); + varValue->set_edatatype(CN_DATATYPE_INT64); + varValue->set_lvalue(nValue); + + stTsdbData.add_point()->CopyFrom(stPoint); + + return; +} + +//增加一个测点到TSDB队列 +void CSampleThread::addOneTsdbData(iot_idl::STssInsert& stTsdbData,const string&strTagName,\ + const int64 nStatus,const double fValue, const int64 nTimeStamp) +{ + STsdbPoint stPoint; + + stPoint.set_time_stamp(nTimeStamp); + stPoint.add_tag_val(strTagName); + + SVariable *varStatus; + varStatus = stPoint.add_field_val(); + varStatus->set_edatatype(CN_DATATYPE_INT64); + varStatus->set_lvalue(nStatus); + + SVariable *varValue; + varValue = stPoint.add_field_val(); + varValue->set_edatatype(CN_DATATYPE_DOUBLE); + varValue->set_dvalue(fValue); + + //LOGINFO("addOneTsdbData:strTagName=%s nStatus=%lld fValue=%f !",strTagName.c_str(),nStatus,fValue); + + stTsdbData.add_point()->CopyFrom(stPoint); + + return; +} + + +void CSampleThread::saveAllCfgPoint() +{ + int nRetCode = 0; + + vector vecSampleDefine; + nRetCode = m_ptrRdbTableMng->selectAllColumnNoCondition("sample_define",vecSampleDefine); + if(nRetCode == false) + { + LOGERROR("doEveryPoint:strTableName=sample_define 获取参数错误 !"); + return; + } + + iot_idl::STssInsert stAnaTsdbData; + iot_idl::STssInsert stDigTsdbData; + iot_idl::STssInsert stMixTsdbData; + iot_idl::STssInsert stAccTsdbData; + + stAnaTsdbData.Clear(); + stDigTsdbData.Clear(); + stMixTsdbData.Clear(); + stAccTsdbData.Clear(); + + for(size_t nLoop=0;nLoop getTableRecordCount(RT_ANA_TBL); + for(int nIndex=0;nIndexgetRecordAllColumnByIndex(RT_ANA_TBL,nIndex); + if(NULL != pAnaPoint) + { + //caodingfa:存盘周期不等于SAMPLE_CYC_MIN时,按存盘周期定时存储,可以是一个小时内的一个分钟数,比如0、15、30、45 + if((pAnaPoint->sample_period != SAMPLE_CYC_MIN) && + (pAnaPoint->sample_period == 0 || minOfHour % pAnaPoint->sample_period != 0)) + { + continue; + } + + addOneTsdbData(stTsdbData, pAnaPoint->tag_name,pAnaPoint->status,pAnaPoint->value,nTimeStamp); + } + else + { + LOGDEBUG("saveAllAnaPoint:: 获取记录失败!"); + continue; + } + if( ((nIndex+1)% MAX_SAVE_POINT_NUM) == 0) + { + addToTsdbSever(stTsdbData,ANA_SAMPLE_RESULT); + stTsdbData.Clear(); + } + } + addToTsdbSever(stTsdbData,ANA_SAMPLE_RESULT); + + return; +} + +//数字量周期存盘 +void CSampleThread::saveAllDigPoint(const int64 nTimeStamp) +{ + STssInsert stTsdbData; + stTsdbData.Clear(); + + SDigPointAll *pDigPoint = NULL; + int nCount = m_ptrRdbTableMng->getTableRecordCount(RT_DIG_TBL); + for(int nIndex=0;nIndexgetRecordAllColumnByIndex(RT_DIG_TBL,nIndex); + if(NULL != pDigPoint) + { + addOneTsdbData(stTsdbData, pDigPoint->tag_name,pDigPoint->status,(int64)pDigPoint->value,nTimeStamp); + } + else + { + LOGDEBUG("saveAllDigPoint:: 获取记录失败!"); + continue; + } + if( ((nIndex+1)% MAX_SAVE_POINT_NUM) == 0) + { + addToTsdbSever(stTsdbData,DIG_SAMPLE_RESULT); + stTsdbData.Clear(); + } + } + addToTsdbSever(stTsdbData,DIG_SAMPLE_RESULT); + + return; +} + +//混合量周期存盘 +void CSampleThread::saveAllMixPoint(const int64 nTimeStamp) +{ + STssInsert stTsdbData; + stTsdbData.Clear(); + + SMixPointAll *pMixPoint = NULL; + int nCount = m_ptrRdbTableMng->getTableRecordCount(RT_MIX_TBL); + for(int nIndex=0;nIndexgetRecordAllColumnByIndex(RT_MIX_TBL,nIndex); + if(NULL != pMixPoint) + { + addOneTsdbData(stTsdbData, pMixPoint->tag_name,pMixPoint->status,(int64)pMixPoint->value,nTimeStamp); + } + else + { + LOGDEBUG("saveAllMixPoint:: 获取记录失败!"); + continue; + } + if( ((nIndex+1)% MAX_SAVE_POINT_NUM) == 0) + { + addToTsdbSever(stTsdbData,MIX_SAMPLE_RESULT); + stTsdbData.Clear(); + } + } + addToTsdbSever(stTsdbData,MIX_SAMPLE_RESULT); + + return; +} + +//累计量周期存盘 +void CSampleThread::saveAllAccPoint(const int64 nTimeStamp) +{ + STssInsert stTsdbData; + stTsdbData.Clear(); + + SAccPointAll *pAccPoint = NULL; + int nCount = m_ptrRdbTableMng->getTableRecordCount(RT_ACC_TBL); + for(int nIndex=0;nIndexgetRecordAllColumnByIndex(RT_ACC_TBL,nIndex); + if(NULL != pAccPoint) + { + addOneTsdbData(stTsdbData, pAccPoint->tag_name,pAccPoint->status,pAccPoint->value,nTimeStamp); + } + else + { + LOGDEBUG("saveAllAccPoint:: 获取记录失败!"); + continue; + } + if( ((nIndex+1)% MAX_SAVE_POINT_NUM) == 0) + { + addToTsdbSever(stTsdbData,ACC_SAMPLE_RESULT); + stTsdbData.Clear(); + } + } + addToTsdbSever(stTsdbData,ACC_SAMPLE_RESULT); + + return; +} + diff --git a/platform/src/dbms/tsdb_local_save/SampleThread.h b/platform/src/dbms/tsdb_local_save/SampleThread.h new file mode 100644 index 00000000..54f85ad6 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/SampleThread.h @@ -0,0 +1,56 @@ +/** + @file MainWorkThread.h + @brief 进程主模块,负责消息的接收订阅,以及消息的对外读取接口 + @author 周正龙 +*/ +#pragma once + +#include "boost/thread.hpp" +#include "pub_utility_api/TimerThreadBase.h" + +#include +#include + +//工作线程实现类 +//=============================================================== +#include "pub_sysinfo_api/SysInfoApi.h" +#include "sample_server_api/SampleDefine.h" +#include "rdb_api/RdbTableMng.h" +#include "TsdbSaveMessage.pb.h" + +namespace iot_dbms +{ +class CSampleThread : public iot_public::CTimerThreadBase +{ +public: + CSampleThread(iot_public::SRunAppInfo &stRunAppInfo); + virtual ~CSampleThread(); + + virtual void execute()override;//业务处理函数,必须继承实现自己的业务逻辑 + bool initialize(); // 初始化工作,得到定义的采样时间 + void redundantSwitch( bool bMaster); + +private: + //全数据存盘 + void saveAllCfgPoint(); //保存所有配置的点全数据 + void saveAllPoint(); //存盘全数据 + void saveAllAnaPoint(const int64 uTimeStamp,const int &minOfHour); + void saveAllDigPoint(const int64 uTimeStamp); + void saveAllAccPoint(const int64 uTimeStamp); + void saveAllMixPoint(const int64 uTimeStamp); + + void addOneTsdbData(iot_idl::STssInsert& stTsdbData,const std::string&strKeyIdTag,\ + const int64 nStatus,const int64 nValue, const int64 uTimeStamp); + void addOneTsdbData(iot_idl::STssInsert& stTsdbData,const std::string&strKeyIdTag,\ + const int64 nStatus,const double fValue, const int64 uTimeStamp); + void addToTsdbSever(iot_idl::STssInsert&stTsdbData,const std::string&strTable=""); //提交数据到 tsdb Server + +private: + volatile bool m_bMaster; + int64 m_lCurTime; + int64 m_lLastTime; + iot_public::SRunAppInfo m_stRunAppInfo; + iot_dbms::CRdbTableMngPtr m_ptrRdbTableMng; +}; +typedef boost::shared_ptr CSampleThreadPtr; +} diff --git a/platform/src/dbms/tsdb_local_save/TsdbSaveCommon.cpp b/platform/src/dbms/tsdb_local_save/TsdbSaveCommon.cpp new file mode 100644 index 00000000..8da77489 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/TsdbSaveCommon.cpp @@ -0,0 +1,80 @@ + +/******************************************************************************//** +* @file TsdbSaveCommon.cpp +* @brief 时序库存库服务,通用定义等 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#include "boost/filesystem.hpp" +#include "boost/lexical_cast.hpp" + +//< 屏蔽Protobuff编译报警 +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#endif + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable: 4100) +#endif + +#include "Public.pb.h" + +#ifdef __GNUC__ +#pragma GCC diagnostic pop +#endif + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +#include "pub_logger_api/logger.h" + +#include "TsdbSaveCommon.h" + +namespace iot_dbms +{ + +std::string toInfluxString(const iot_idl::SVariable &objInput) +{ + //< 经测试,lexical_cast比format快几十倍 + + std::string strOutput; + switch (objInput.edatatype()) + { + case iot_idl::DataType::CN_DATATYPE_BOOL: + strOutput = boost::lexical_cast(objInput.bvalue()); + break; + case iot_idl::DataType::CN_DATATYPE_DOUBLE: + strOutput = boost::lexical_cast(objInput.dvalue()); + break; + case iot_idl::DataType::CN_DATATYPE_FLOAT: + strOutput = boost::lexical_cast(objInput.fvalue()); + break; + case iot_idl::DataType::CN_DATATYPE_INT32: + strOutput = boost::lexical_cast(objInput.nvalue()) + "i"; + break; + case iot_idl::DataType::CN_DATATYPE_INT64: + strOutput = boost::lexical_cast(objInput.lvalue()) + "i"; + break; + case iot_idl::DataType::CN_DATATYPE_STRING: + strOutput = objInput.strvalue(); + break; + case iot_idl::DataType::CN_DATATYPE_UINT32: + strOutput = boost::lexical_cast(objInput.uvalue()) + "i"; + break; + case iot_idl::DataType::CN_DATATYPE_UINT64: + strOutput = boost::lexical_cast(objInput.ulvalue()) + "i"; + break; + default: + LOGERROR("toInfluxString(): Unknow DataType !"); + break; + } + return strOutput; +} + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/TsdbSaveCommon.h b/platform/src/dbms/tsdb_local_save/TsdbSaveCommon.h new file mode 100644 index 00000000..4bb89428 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/TsdbSaveCommon.h @@ -0,0 +1,36 @@ + +/******************************************************************************//** +* @file TsdbSaveCommon.h +* @brief 时序库存库服务,通用定义等 +* @author yikenan +* @version 1.0 +* @date +**********************************************************************************/ + +#pragma once + +#include + +#include "boost/cstdint.hpp" +#include "boost/shared_ptr.hpp" + +typedef boost::shared_ptr StdStringPtr; + +namespace iot_idl +{ +class SVariable; +} + +namespace iot_dbms +{ + +#ifdef OS_WINDOWS +#define DATA_DIRECTORY "..\\..\\data\\cache\\tsdb_save_cache\\" +#else +#define DATA_DIRECTORY "../../data/cache/tsdb_save_cache/" +#endif + +std::string toInfluxString(const iot_idl::SVariable &objInput); + +} //< namespace iot_dbms + diff --git a/platform/src/dbms/tsdb_local_save/tsdb_local_save.pro b/platform/src/dbms/tsdb_local_save/tsdb_local_save.pro new file mode 100644 index 00000000..2b8d9d19 --- /dev/null +++ b/platform/src/dbms/tsdb_local_save/tsdb_local_save.pro @@ -0,0 +1,49 @@ +#本服务与tsdb_save的主要区别是:本服务只处理本应用的数据(包括接收变化数据和定时断面) +QT -= gui core +CONFIG -= qt + +CONFIG += c++11 console +CONFIG -= app_bundle + +TEMPLATE = app +TARGET = tsdb_local_save + +LIBS += -lboost_chrono -lboost_system -lboost_program_options -lboost_filesystem -lboost_date_time \ + -lprotobuf -llog4cplus \ + -lpub_logger_api -lpub_utility_api -lpub_sysinfo_api \ + -lsys_proc_mng_api -lsys_node_mng_api \ + -lnet_msg_bus_api -ltsdb_api \ + -lrdb_api + + +HEADERS += $$PWD/../../idl_files/TsdbSaveMessage.pb.h \ + $$PWD/../../idl_files/Public.pb.h \ + CFrontThread.h \ + CNodeMng.h \ + CNodeThread.h \ + CTsdbSaveRedunSw.h \ + CTsdbSaveSrv.h \ + TsdbSaveCommon.h \ + SampleThread.h + +SOURCES += Main.cpp \ +# $$PWD/../../idl_files/TsdbSaveMessage.pb.cc \ +# $$PWD/../../idl_files/Public.pb.cc \ + CFrontThread.cpp \ + CNodeMng.cpp \ + CNodeThread.cpp \ + CTsdbSaveRedunSw.cpp \ + CTsdbSaveSrv.cpp \ + TsdbSaveCommon.cpp \ + SampleThread.cpp + +include($$PWD/../../idl_files/idl_files.pri) + +#------------------------------------------------------------------- +COMMON_PRI=$$PWD/../../common.pri +exists($$COMMON_PRI) { + include($$COMMON_PRI) +}else { + error("FATAL error: can not find common.pri") +} +