HM-SPMS/platform/src/net/net_msg_bus_api/CInProcBroker.cpp
2025-03-12 14:17:01 +08:00

1166 lines
38 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/******************************************************************************//**
* @file CInProcBroker.cpp
* @brief 消息总线接口库中的,进程内代理
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#include "boost/typeof/typeof.hpp"
#include "boost/asio/ip/host_name.hpp"
#include "boost/interprocess/detail/os_thread_functions.hpp"
#include "lz4.h"
#include "pub_logger_api/logger.h"
#include "../net_msg_bus_base_api/kmbp_cnt.h"
#include "../net_msg_bus_base_api/CMbSubEngine.h"
#include "../net_msg_bus_base_api/CommonDef.h"
#include "CInProcBroker.h"
namespace iot_net
{
//< 是否允许通讯器发送数据
volatile bool g_bCommSendEnable = false;
CInProcBroker::SActWorker* CInProcBroker::SActWorker::newActWorker(zsock_t *pActorPipe, SActWorkerInitArg *pArgs)
{
//< zmalloc内部使用calloc开辟的内存都初始化为0值
SActWorker *self = (SActWorker *)zmalloc(sizeof(SActWorker));
self->m_bConnected = false;
self->m_nSendSeq = 0;
{
//< 获取主机名原本使用zsys_hostname()现改为boost的方法原因
//< zsys_hostname() 在使用gethostname()获取到主机名后,
//< 还会gethostbyname()对获取到的主机名进行验证此时系统已验证CentOS 7默认会使用DNS解析主机名
//< 若系统设置了一个无效的DNS服务器地址会等待超时导致耗时过长
//< boost::asio::ip::host_name()无此问题,应该是直接返回主机名,没有进行进一步验证
try
{
//memset(self->m_chHostName, 0, sizeof(self->m_chHostName));
strncpy(self->m_chHostName, boost::asio::ip::host_name().c_str(), sizeof(self->m_chHostName) - 1);
}
catch (std::exception &e)
{
LOGERROR("newActWorker(): %s", e.what());
assert(false);
}
}
//memset(self->m_chProcName, 0, sizeof(self->m_chProcName));
strncpy(self->m_chProcName, pArgs->m_pchProcName, sizeof(self->m_chProcName) - 1);
//memset(self->m_chInstName, 0, sizeof(self->m_chInstName));
strncpy(self->m_chInstName, pArgs->m_pchInstName, sizeof(self->m_chInstName) - 1);
self->m_pActorPipe = pActorPipe;
//< 初始化m_pSubEng
self->m_pSubEng = new CMbSubEngine;
assert(self->m_pSubEng);
//< 初始化
self->m_pCommSet = new boost::unordered_set<std::string>;
assert(self->m_pCommSet);
//< 初始化m_pKmbpMsg
self->m_pKmbpMsg = kmbp_new();
assert(self->m_pKmbpMsg);
//< 初始化m_pSockInCmd
self->m_pSockInCmd = zsock_new_router(MB_P2C_CMD_ENDPOINT);
assert(self->m_pSockInCmd);
//< 初始化m_pSockInData
{
self->m_pSockInData = zsock_new(ZMQ_ROUTER);
assert(self->m_pSockInData);
//< 在zmq 4.1.x版本高水位设置对下一次bind、connect起效
//< 4.2.x版本可以先bind、connect再修改高水位
//< Router发送高水位设大一些防止丢弃
zsock_set_sndhwm(self->m_pSockInData, 1000000);
//< 由于发送的是指针不允许router静默丢弃消息以防内存泄漏
zsock_set_router_mandatory(self->m_pSockInData, 1);
//< 设置收发超时时间防止本线程持续阻塞单位ms
zsock_set_sndtimeo(self->m_pSockInData, 100);
zsock_set_rcvtimeo(self->m_pSockInData, 100);
int nRc = zsock_bind(self->m_pSockInData, MB_P2C_DATA_ENDPOINT);
assert(0 == nRc);
}
//< 初始化m_pSockOut
{
self->m_pSockOut = zsock_new(ZMQ_DEALER);
assert(self->m_pSockOut);
if (pArgs->m_bAddPid)
{
self->m_nPid = boost::interprocess::ipcdetail::get_current_process_id();
}
else
self->m_nPid = -1;
//< pchIdentity需释放
char *pchIdentity = zsys_sprintf("%s@%s@%d",
self->m_chProcName,
self->m_chInstName,
self->m_nPid);
zsock_set_identity(self->m_pSockOut, pchIdentity);
zstr_free(&pchIdentity);
zsock_set_sndhwm(self->m_pSockOut, 10000);
zsock_set_rcvhwm(self->m_pSockOut, 100000);
//< 设置收发超时时间防止本线程持续阻塞单位ms
zsock_set_sndtimeo(self->m_pSockOut, 100);
zsock_set_rcvtimeo(self->m_pSockOut, 100);
zsock_set_reconnect_ivl(self->m_pSockOut, 500);
zsock_set_reconnect_ivl_max(self->m_pSockOut, 1000);
}
//< 初始化m_pSockOutMon
{
//< pchEndpoint需释放
char *pchEndpoint = zsys_sprintf("inproc://zmonitor-%p", self->m_pSockOut);
assert(pchEndpoint);
const int nEvent = ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_MONITOR_STOPPED;
int nRc = zmq_socket_monitor(zsock_resolve(self->m_pSockOut), pchEndpoint, nEvent);
assert(0 == nRc);
self->m_pSockOutMon = zsock_new(ZMQ_PAIR);
assert(self->m_pSockOutMon);
nRc = zsock_connect(self->m_pSockOutMon, pchEndpoint);
assert(0 == nRc);
//< 释放
zstr_free(&pchEndpoint);
}
//< 设置m_pSockOut连接
{
const int nRc = zsock_connect(self->m_pSockOut, MB_H2P_ENDPOINT);
assert(0 == nRc);
}
//< 初始化m_pLoop
{
self->m_pLoop = zloop_new();
assert(self->m_pLoop);
//< 设置m_pLoop事件驱动优先级与书写顺序有关
//< 周期性循环任务定时器1s一次定时器ID从0开始
int nRc = zloop_timer(self->m_pLoop, 1000, 0, handleCycTaskTimer, self);
assert(nRc >= 0);
bool bRc = setSocketHandle(self, self->m_pActorPipe, handleActorPipe);
assert(bRc);
bRc = setSocketHandle(self, self->m_pSockOutMon, handleSockOutMon);
assert(bRc);
bRc = setSocketHandle(self, self->m_pSockInCmd, handleSockInCmd);
assert(bRc);
bRc = setSocketHandle(self, self->m_pSockInData, handleSockInData);
assert(bRc);
bRc = setSocketHandle(self, self->m_pSockOut, handleSockOut);
assert(bRc);
//< 系统信号打断不退出
zloop_set_nonstop(self->m_pLoop, true);
}
return self;
}
void CInProcBroker::SActWorker::destroyActWorker(SActWorker **pSelf)
{
if (*pSelf)
{
SActWorker *self = *pSelf;
delete self->m_pSubEng;
self->m_pSubEng = NULL;
if (self->m_pCommSet)
{
if (!(self->m_pCommSet->empty()))
{
//< 尚有通讯器未注销
LOGERROR("destroyActWorker(): please deconstruct all communicator befor releaseMsgBus() !");
}
delete self->m_pCommSet;
self->m_pCommSet = NULL;
}
zloop_destroy(&(self->m_pLoop));
zsock_destroy(&(self->m_pSockInCmd));
zsock_destroy(&(self->m_pSockInData));
//< 取消监视应先于销毁连接到监视(即接收监视消息)的 ZMQ_PAIR 连接,即 m_pSockOutMon
//< 否则取消监视时zeromq 内部发送监视状态消息时可能阻塞sndtimeo默认值为永久等待
//< 当被监视的连接销毁时,会自动取消监视
//< 所以如果不主动取消监视,应当先销毁被监视连接 m_pSockOut ,再销毁接收监视连接 m_pSockOutMon
zmq_socket_monitor(zsock_resolve(self->m_pSockOut), NULL, 0);
zsock_destroy(&(self->m_pSockOut));
zsock_destroy(&(self->m_pSockOutMon));
kmbp_destroy(&(self->m_pKmbpMsg));
free(self);
*pSelf = NULL;
}
}
void CInProcBroker::SActWorker::actorMain(zsock_t *pActorPipe, void *pArgs)
{
/* zactor的要求
An actor function MUST call zsock_signal (pipe) when initialized
and MUST listen to pipe and exit on $TERM command.
*/
zsock_signal(pActorPipe, 0);
SActWorker *self = newActWorker(pActorPipe, (SActWorkerInitArg *)pArgs);
if (self)
{
zsock_send(pActorPipe, "i", 1);
//< 允许通讯器发送数据
g_bCommSendEnable = true;
//< 进入zloop处理任意zloop函数返回-1则zloop退出
zloop_start(self->m_pLoop);
destroyActWorker(&self);
}
else
{
LOGERROR("newActWorker() failed !");
zsock_send(pActorPipe, "i", 0);
}
}
bool CInProcBroker::SActWorker::setSocketHandle(SActWorker *self, zsock_t *pSock, zloop_reader_fn pHandleFuc)
{
//if (NULL == self || NULL == pSock)
// return false;
if (NULL != pHandleFuc)
{
const int nRc = zloop_reader(self->m_pLoop, pSock, pHandleFuc, self);
if (0 != nRc)
return false;
zloop_reader_set_tolerant(self->m_pLoop, pSock);
}
else
zloop_reader_end(self->m_pLoop, pSock);
return true;
}
bool CInProcBroker::SActWorker::sendOut(SActWorker *self, kmbp_t *pKmbp)
{
//if (NULL == self)
// return false;
const int nMsgID = kmbp_id(pKmbp);
switch ( nMsgID )
{
case KMBP_C2H_DATA_DOMAIN:
case KMBP_C2H_DATA_PEER:
case KMBP_C2H_DATA_HOST:
case KMBP_P2H_HEARTBEAT:
case KMBP_P2H_SUB_ALL:
case KMBP_P2H_SUB_ADD:
case KMBP_P2H_SUB_DEL:
break;
default:
{
LOGERROR("sendOut(): Unexpected message ID = %d ", nMsgID);
}
break;
}
self->m_nSendSeq++;
kmbp_set_seqno(pKmbp, self->m_nSendSeq);
if (0 != kmbp_send(pKmbp, self->m_pSockOut, NULL, NULL))
{
LOGWARN("sendOut(): kmbp_send() failed !");
}
return true;
}
void CInProcBroker::SActWorker::decompressData(kmbp_t *pKmbp)
{
const int nSizeDst = kmbp_data_unzipsize(pKmbp);
if (nSizeDst <= 0)
{
//< 消息未压缩
return;
}
zchunk_t *pChunkSrc = kmbp_data_content(pKmbp);
if (NULL == pChunkSrc)
{
//< 启用了压缩,但是却没有数据,不可能
LOGERROR("decompressData(): NULL == pChunkSrc , some thing wrong !");
return;
}
const size_t nSizeSrc = zchunk_size(pChunkSrc);
if (0 == nSizeSrc)
{
//< 启用了压缩但是消息长度为0不可能
LOGERROR("decompressData(): 0 == nSizeSrc , some thing wrong !");
return;
}
if (nSizeSrc >= 2147483647)
{
LOGERROR("decompressData(): nSizeSrc >= 2147483647 , too big to decompress !");
return;
}
//< 注意:失败需释放 pChunkDst
zchunk_t *pChunkDst = zchunk_new(NULL, nSizeDst);
const int nUnzipSize = LZ4_decompress_safe((const char *)zchunk_data(pChunkSrc),
(char *)zchunk_data(pChunkDst),
(int)nSizeSrc, nSizeDst);
if (nUnzipSize <= 0)
{
LOGERROR("LZ4_decompress_safe() failed !");
zchunk_destroy(&pChunkDst); //< 释放
return;
}
if (nSizeDst != nUnzipSize)
{
LOGERROR("decompressData(): nSizeDst != nUnzipSize , some thing wrong !");
zchunk_destroy(&pChunkDst); //< 释放
return;
}
//< 设置 pChunkDst 的实际长度new时传入的是最大长度
zchunk_set(pChunkDst, NULL, nUnzipSize);
kmbp_set_data_unzipsize(pKmbp, 0);
kmbp_set_data_content(pKmbp, &pChunkDst);
}
int CInProcBroker::SActWorker::handleActorPipe(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pActorPipe);
bool bTerm = false;
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
//< 注意pchCmd需要释放
char *pchCmd = zstr_recv(reader);
if (NULL == pchCmd)
{
//< 有可能是被系统信号打断,不停止
continue;
}
if (streq(pchCmd, "$TERM"))
{
//< zactor要求接收"$TERM"消息,并退出
g_bCommSendEnable = false;
zclock_sleep(300);
//< 释放所有未接收消息
while (zsock_events(self->m_pSockInData) & ZMQ_POLLIN)
{
kmbp_cnt_t *pKmbpCnt = NULL;
if (0 != zsock_recv(self->m_pSockInData, "p", &pKmbpCnt))
continue;
//< 有可能为NULL
if (NULL != pKmbpCnt)
{
//< 释放引用
kmbp_cnt_dec(&pKmbpCnt);
}
}
bTerm = true;
}
else if (streq(pchCmd, "$CONNECTED"))
{
zsock_send(reader, "i", self->m_bConnected);
}
else
{
//< 不正确的消息
LOGWARN("handleActorPipe(): malformed message !");
}
//< 清理,正常情况下不应有垃圾消息
if (zsock_rcvmore(reader))
{
LOGWARN("handleActorPipe(): garbage message !");
zmsg_t *more = zmsg_recv(reader);
zmsg_print(more);
zmsg_destroy(&more);
}
//< 释放
zstr_free(&pchCmd);
}
//< 任意zloop函数返回-1则zloop退出
return bTerm ? -1 : 0;
}
int CInProcBroker::SActWorker::handleSockInCmd(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pSockInCmd);
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
const int nRc = kmbp_recv(self->m_pKmbpMsg, reader);
if (0 != nRc)
{
//< 被系统信号打断,或者消息格式不正确,继续
LOGWARN("kmbp_recv() failed, nRc == %d", nRc);
continue;
}
//< 即通讯器名
const std::string strRoutingID((char *)zframe_data(kmbp_routing_id(self->m_pKmbpMsg)),
zframe_size(kmbp_routing_id(self->m_pKmbpMsg)));
switch (kmbp_id(self->m_pKmbpMsg))
{
case KMBP_C2P_REG:
{
self->m_pCommSet->insert(strRoutingID);
//< 回复通讯器
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_TRUE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
break;
case KMBP_C2P_UNREG:
{
self->m_pCommSet->erase(strRoutingID);
//< 删除订阅者
{
const SubInfoVecPtr ptrVecSub =
self->m_pSubEng->delSuber(strRoutingID);
if (ptrVecSub)
{
//< 发送给本机服务
for (size_t i = 0; i < ptrVecSub->size(); i++)
{
if (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT)
{
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_SUB_DEL);
kmbp_set_sub_appid(self->m_pKmbpMsg, (*ptrVecSub)[i].m_nAppID);
kmbp_set_sub_chanid(self->m_pKmbpMsg, (*ptrVecSub)[i].m_nChannelID);
sendOut(self, self->m_pKmbpMsg);
}
}
}
}
//< 回复通讯器
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_TRUE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
break;
case KMBP_C2P_SUB_ADD:
{
CMbSubInfoImp objSubAdd;
objSubAdd.m_nAppID = kmbp_sub_appid(self->m_pKmbpMsg);
objSubAdd.m_nChannelID = kmbp_sub_chanid(self->m_pKmbpMsg);
if (!(objSubAdd.isValid(false)))
{
//< 回复通讯器false
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_FALSE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
//< 不对上发送
}
else
{
const bool bUpSend =
self->m_pSubEng->addSubtionOfSuber(strRoutingID, objSubAdd);
if (bUpSend && (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT))
{
//< 发送给本机服务
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_SUB_ADD);
sendOut(self, self->m_pKmbpMsg);
}
//< 回复通讯器
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_TRUE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
}
break;
case KMBP_C2P_SUB_DEL:
{
CMbSubInfoImp objSubDel;
objSubDel.m_nAppID = kmbp_sub_appid(self->m_pKmbpMsg);
objSubDel.m_nChannelID = kmbp_sub_chanid(self->m_pKmbpMsg);
if (!(objSubDel.isValid(false)))
{
//< 回复通讯器false
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_FALSE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
//< 不对上发送
}
else
{
//< 注意,接口库需支持通配,服务程序无需作此处理
//< 比如现在已订阅112112取消订阅01
//< 则会取消掉1121订阅12保留。
bool bRet = false;
const SubInfoVecPtr ptrVecSub =
self->m_pSubEng->getSubtionOfSuber(strRoutingID);
if (ptrVecSub)
{
for (size_t i = 0; i < ptrVecSub->size(); i++)
{
const CMbSubInfoImp &Sub = (*ptrVecSub)[i];
if (objSubDel.contain(Sub))
{
bRet = true;
const bool bUpSend =
self->m_pSubEng->delSubtionOfSuber(strRoutingID, Sub);
if (bUpSend && (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT))
{
//< 发送给本机服务
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_SUB_DEL);
kmbp_set_sub_appid(self->m_pKmbpMsg, Sub.m_nAppID);
kmbp_set_sub_chanid(self->m_pKmbpMsg, Sub.m_nChannelID);
sendOut(self, self->m_pKmbpMsg);
}
}
}
}
//< 回复通讯器
if (bRet)
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_TRUE);
else
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_FALSE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
}
break;
case KMBP_C2P_SUB_ALL_REQ:
{
zlist_t *plistSub = zlist_new();
zlist_autofree(plistSub);
const SubInfoVecPtr ptrVecSub = self->m_pSubEng->getSubtionOfSuber(strRoutingID);
if (ptrVecSub)
{
for (size_t i = 0; i < ptrVecSub->size(); i++)
{
const CMbSubInfoImp &Sub = (*ptrVecSub)[i];
//< list设置了autofree会复制内容
zlist_append(plistSub, (char *)Sub.toString().c_str());
}
}
//< 转移plistSub所有权至kmbpkmbp内部释放
kmbp_set_sublist(self->m_pKmbpMsg, &plistSub);
//< 回复通讯器
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2C_SUB_ALL);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
break;
default:
{
LOGERROR("handleSockInCmd(): invalid kmbp_id = %d ", kmbp_id(self->m_pKmbpMsg));
}
break;
}
//< 无需清理
}
return 0;
}
int CInProcBroker::SActWorker::handleSockInData(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pSockInData);
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
//< self->m_pSockInData是router第一帧是routing id
zframe_t *pFrameRoutingID = zframe_recv(reader); //< 需释放
if (!pFrameRoutingID || !zsock_rcvmore(reader))
{
LOGWARN("handleSockInData(): no routing ID , ignore !");
zframe_destroy(&pFrameRoutingID);
continue;
}
//< 即通讯器名
const std::string strRoutingID((char *)zframe_data(pFrameRoutingID),
zframe_size(pFrameRoutingID));
zframe_destroy(&pFrameRoutingID);
kmbp_cnt_t *pKmbpCnt = NULL;
//< 最后统一释放引用
if (0 != zsock_recv(reader, "p", &pKmbpCnt))
continue;
if (NULL == pKmbpCnt)
{
//< 回复通讯器NULL
zframe_t *pDstRoutingID = zframe_from(strRoutingID.c_str());
//< m_pSockInData是router第一帧是id
//< pDstRoutingID 在 zframe_send 后已被接管,无需再释放
if (0 == zframe_send(&pDstRoutingID, self->m_pSockInData, ZFRAME_MORE))
{
zsock_send(self->m_pSockInData, "p", pKmbpCnt);
}
//< pKmbpCnt 无需释放引用
}
else //< != NULL
{
kmbp_t *pKmbp = kmbp_cnt_get(pKmbpCnt);
switch (kmbp_id(pKmbp))
{
case KMBP_C2H_DATA_PEER:
case KMBP_C2H_DATA_HOST:
case KMBP_C2H_DATA_DOMAIN:
{
if (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT)
{
kmbp_set_src_host(pKmbp, self->m_chHostName);
kmbp_set_src_proc(pKmbp, self->m_chProcName);
kmbp_set_src_inst(pKmbp, self->m_chInstName);
kmbp_set_src_pid(pKmbp, self->m_nPid);
kmbp_set_src_comm(pKmbp, strRoutingID.c_str());
sendOut(self, pKmbp);
}
else
{
//< todo 是否增加额外缓存,视情况
if (g_bCommSendEnable)
{
g_bCommSendEnable = false;
}
LOGWARN("handleSockInData(): outbound queue is full , drop message !");
}
}
break;
default:
{
LOGERROR("handleSockInData(): invalid kmbp_id = %d ", kmbp_id(pKmbp));
}
break;
}
//< 释放引用
kmbp_cnt_dec(&pKmbpCnt);
}
}
return 0;
}
int CInProcBroker::SActWorker::handleSockOut(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pSockOut);
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
const int nRc = kmbp_recv(self->m_pKmbpMsg, reader);
if (0 != nRc)
{
//< 被系统信号打断,或者消息格式不正确,继续
LOGWARN("kmbp_recv() failed, nRc == %d", nRc);
continue;
}
switch (kmbp_id(self->m_pKmbpMsg))
{
case KMBP_H2P_DATA:
{
const int nDataType = kmbp_data_type(self->m_pKmbpMsg);
if (nDataType == MB_MT_PING_PROC
|| nDataType == MB_MT_PING_PROC_IN_HOST)
{
//< 响应Ping
kmbp_t *pKmbpSend = self->m_pKmbpMsg;
//< 设置回复消息内容
{
kmbp_set_id(pKmbpSend, KMBP_C2H_DATA_PEER);
kmbp_set_timestamp(pKmbpSend, zclock_mono());
kmbp_set_relaystat(pKmbpSend, MB_RS_DEFAULT);
kmbp_set_dst_comm(pKmbpSend, kmbp_src_comm(pKmbpSend));
kmbp_set_dst_proc(pKmbpSend, kmbp_src_proc(pKmbpSend));
kmbp_set_dst_inst(pKmbpSend, kmbp_src_inst(pKmbpSend));
kmbp_set_dst_pid(pKmbpSend, kmbp_src_pid(pKmbpSend));
kmbp_set_dst_host(pKmbpSend, kmbp_src_host(pKmbpSend));
kmbp_set_src_host(pKmbpSend, self->m_chHostName);
kmbp_set_src_proc(pKmbpSend, self->m_chProcName);
kmbp_set_src_inst(pKmbpSend, self->m_chInstName);
kmbp_set_src_pid(pKmbpSend, self->m_nPid);
kmbp_set_src_comm(pKmbpSend, "");
kmbp_set_data_type(pKmbpSend, MB_MT_PING_PROC_REP);
}
//< 发出
sendOut(self, pKmbpSend);
}
else
{
//< 点对点消息C2H_DATA_PEERDst_Comm不为空直接投送给通讯器。
//< 其他类型消息Dst_Comm为空进程内再进行主题、订阅匹配。
StrVecPtr ptrVecComm;
{
const char *pchComm = kmbp_dst_comm(self->m_pKmbpMsg);
if (strlen(pchComm) > 0)
{
ptrVecComm.reset(new StrVec);
ptrVecComm->push_back(std::string(pchComm));
}
else
{
const CMbSubInfoImp objSub(kmbp_sub_appid(self->m_pKmbpMsg),
kmbp_sub_chanid(self->m_pKmbpMsg));
ptrVecComm = self->m_pSubEng->lookupSuberOfSubject(objSub);
}
}
if (ptrVecComm)
{
if (nDataType == MB_MT_PING_SUBER_IN_DOMAIN
|| nDataType == MB_MT_PING_SUBER_IN_HOST
|| nDataType == MB_MT_PING_SUBER_IN_PROC)
{
//< 响应 ping
kmbp_t *pKmbpSend = self->m_pKmbpMsg;
//< 设置回复消息内容
{
kmbp_set_id(pKmbpSend, KMBP_C2H_DATA_PEER);
kmbp_set_timestamp(pKmbpSend, zclock_mono());
kmbp_set_relaystat(pKmbpSend, MB_RS_DEFAULT);
kmbp_set_dst_comm(pKmbpSend, kmbp_src_comm(pKmbpSend));
kmbp_set_dst_proc(pKmbpSend, kmbp_src_proc(pKmbpSend));
kmbp_set_dst_inst(pKmbpSend, kmbp_src_inst(pKmbpSend));
kmbp_set_dst_pid(pKmbpSend, kmbp_src_pid(pKmbpSend));
kmbp_set_dst_host(pKmbpSend, kmbp_src_host(pKmbpSend));
kmbp_set_src_host(pKmbpSend, self->m_chHostName);
kmbp_set_src_proc(pKmbpSend, self->m_chProcName);
kmbp_set_src_inst(pKmbpSend, self->m_chInstName);
kmbp_set_src_pid(pKmbpSend, self->m_nPid);
//kmbp_set_src_comm(pKmbpSend, "");
kmbp_set_data_type(pKmbpSend, MB_MT_PING_SUBER_REP);
}
for (size_t i = 0; i < ptrVecComm->size(); i++)
{
const std::string *pStrDstComm = &((*ptrVecComm)[i]);
//< 跳过已注销的通讯器
BOOST_AUTO(itComm, self->m_pCommSet->find(*pStrDstComm));
if (self->m_pCommSet->end() == itComm)
{
continue;
}
kmbp_set_src_comm(pKmbpSend, pStrDstComm->c_str());
//< 发出
sendOut(self, pKmbpSend);
}
}
else
{
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2C_DATA);
//< 解压消息数据
decompressData(self->m_pKmbpMsg);
kmbp_cnt_t *pKmbpCnt = kmbp_cnt_new();
kmbp_cnt_swap(pKmbpCnt, &(self->m_pKmbpMsg));
for (size_t i = 0; i < ptrVecComm->size(); i++)
{
const std::string *pStrDstComm = &((*ptrVecComm)[i]);
//< 跳过已注销的通讯器
BOOST_AUTO(itComm, self->m_pCommSet->find(*pStrDstComm));
if (self->m_pCommSet->end() == itComm)
{
continue;
}
zframe_t *pFrameRoutingID = zframe_from(pStrDstComm->c_str());
//< m_pSockInData是router第一帧是id
//< pFrameRoutingID在zframe_send后已被接管无需再释放
if (0 == zframe_send(&pFrameRoutingID, self->m_pSockInData, ZFRAME_MORE))
{
//< 先加对端引用,若失败再减
//< 防止在增加引用计数前,对侧收到并释放
kmbp_cnt_inc(pKmbpCnt);
if (0 != zsock_send(self->m_pSockInData, "p", pKmbpCnt))
{
//< 失败,释放对端引用
kmbp_cnt_dec(&pKmbpCnt);
LOGWARN("handleSockOut(): send to m_pSockInData failed , unexpected !");
}
}
else
{
LOGWARN("handleSockOut(): send to m_pSockInData failed , unexpected !");
}
}
//< 释放本端引用
kmbp_cnt_dec(&pKmbpCnt);
}
}
}
}
break;
case KMBP_H2P_SUB_ALL_REQ:
{
zlist_t *plistSub = zlist_new();
zlist_autofree(plistSub);
const SubInfoVecPtr ptrVecSub = self->m_pSubEng->getSubtionOfAll();
if (ptrVecSub)
{
for (size_t i = 0; i < ptrVecSub->size(); i++)
{
const CMbSubInfoImp &Sub = (*ptrVecSub)[i];
//< zlist设置了autofree会复制数据
zlist_append(plistSub, (char *)(Sub.toString().c_str()));
}
}
//< 转移zlist所有权至kmbpkmbp内部释放
kmbp_set_sublist(self->m_pKmbpMsg, &plistSub);
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_SUB_ALL);
sendOut(self, self->m_pKmbpMsg);
}
break;
default:
{
LOGERROR("handleSockOut(): invalid kmbp_id = %d ", kmbp_id(self->m_pKmbpMsg));
}
break;
}
//< 无需清理
}
return 0;
}
int CInProcBroker::SActWorker::handleCycTaskTimer(zloop_t * /*loop*/, int /*timer_id*/, void *arg)
{
SActWorker *self = (SActWorker *)arg;
//< 判断可以不阻塞地发出
if (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT)
{
if (!g_bCommSendEnable)
{
g_bCommSendEnable = true;
}
if (self->m_bConnected)
{
//< 发送心跳
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_HEARTBEAT);
sendOut(self, self->m_pKmbpMsg);
}
}
else
{
if (g_bCommSendEnable)
{
g_bCommSendEnable = false;
}
}
return 0;
}
int CInProcBroker::SActWorker::handleSockOutMon(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pSockOutMon);
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
//< ZMQ v4版本的格式老版本的格式不一样
//< 第一帧
int nEvent = 0, nValue = 0;
{
zframe_t *pFrame = zframe_recv(reader); //< 需释放
if (NULL == pFrame)
{
//< 被系统信号打断
continue;
}
if (!zsock_rcvmore(reader))
{
//< 没有后续帧,证明格式错误
zframe_destroy(&pFrame); //< 释放
LOGWARN("handleSockOutMon(): malformed zmonitor msg !");
continue;
}
nEvent = *(uint16_t *)(zframe_data(pFrame));
nValue = *(uint32_t *)(zframe_data(pFrame) + 2);
zframe_destroy(&pFrame); //< 释放
}
//< 第二帧
std::string strAddr;
{
char *pchAddress = zstr_recv(reader); //< 需释放
if (NULL == pchAddress)
{
//< 被系统信号打断,再试一次
pchAddress = zstr_recv(reader);
if (NULL == pchAddress)
{
LOGWARN("handleSockOutMon(): recv address failed !");
}
}
if (NULL == pchAddress)
strAddr = "UNKNOWN";
else
strAddr = pchAddress;
//< 释放传入NULL是安全的
zstr_free(&pchAddress);
}
const char *pchName;
switch (nEvent)
{
case ZMQ_EVENT_ACCEPTED:
pchName = "ACCEPTED";
break;
case ZMQ_EVENT_ACCEPT_FAILED:
pchName = "ACCEPT_FAILED";
break;
case ZMQ_EVENT_BIND_FAILED:
pchName = "BIND_FAILED";
break;
case ZMQ_EVENT_CLOSED:
pchName = "CLOSED";
break;
case ZMQ_EVENT_CLOSE_FAILED:
pchName = "CLOSE_FAILED";
break;
case ZMQ_EVENT_DISCONNECTED:
{
pchName = "DISCONNECTED";
self->m_bConnected = false;
}
break;
case ZMQ_EVENT_CONNECTED:
{
pchName = "CONNECTED";
self->m_bConnected = true;
}
break;
case ZMQ_EVENT_CONNECT_DELAYED:
pchName = "CONNECT_DELAYED";
break;
case ZMQ_EVENT_CONNECT_RETRIED:
pchName = "CONNECT_RETRIED";
break;
case ZMQ_EVENT_LISTENING:
pchName = "LISTENING";
break;
case ZMQ_EVENT_MONITOR_STOPPED:
{
pchName = "MONITOR_STOPPED";
LOGERROR("ZMQ_EVENT_MONITOR_STOPPED , unexpected while running !");
}
break;
default:
pchName = "UNKNOWN";
break;
}
LOGINFO("Message bus api connection with local service : \n Event = %s ; Value = %d ; Addr = %s ;",
pchName, nValue, strAddr.c_str());
}
return 0;
}
/*========================= CInProcBroker =========================*/
CInProcBroker::CInProcBroker()
{
//< 取消CZMQ对系统信号的截取
zsys_handler_set(NULL);
m_pActor = NULL;
}
CInProcBroker::~CInProcBroker()
{
//< zactor_destroy()向zactor线程发送"$TERM"消息,并等待返回消息
//< 线程内收到该消息后释放相关资源见handleActorPipe()函数
zactor_destroy(&m_pActor);
//< 明确地提前调用zsys_shutdown(),而非程序退出时自动调用(顺序不可控)
//< 防止Windows系统下报错Assertion failed: Successful WSASTARTUP not yet performed
zsys_shutdown();
}
bool CInProcBroker::start(const char *pchProcName, const char *pchInstName,
bool bAddPid)
{
if (NULL != m_pActor)
return false;
SActWorkerInitArg stArgs;
stArgs.m_pchProcName = pchProcName;
stArgs.m_pchInstName = pchInstName;
stArgs.m_bAddPid = bAddPid;
zactor_t *pActor = zactor_new(SActWorker::actorMain, &stArgs);
if (NULL == pActor)
return false;
int nStarted = 0;
zsock_recv(pActor, "i", &nStarted);
if (nStarted)
{
m_pActor = pActor;
return true;
}
zactor_destroy(&pActor);
return false;
}
bool CInProcBroker::isConnected()
{
if (NULL == m_pActor)
return false;
int nConnected;
zsock_send(m_pActor, "s", "$CONNECTED");
//< 接收,阻塞直到有回复
zsock_recv(m_pActor, "i", &nConnected);
return nConnected == 1;
}
} //< namespace iot_net