HM-SPMS/platform/src/net/net_msg_bus_api/CInProcBroker.cpp

1166 lines
38 KiB
C++
Raw Normal View History

2025-03-12 11:08:50 +08:00

/******************************************************************************//**
* @file CInProcBroker.cpp
* @brief 线
* @author yikenan
* @version 1.0
* @date
**********************************************************************************/
#include "boost/typeof/typeof.hpp"
#include "boost/asio/ip/host_name.hpp"
#include "boost/interprocess/detail/os_thread_functions.hpp"
#include "lz4.h"
#include "pub_logger_api/logger.h"
#include "../net_msg_bus_base_api/kmbp_cnt.h"
#include "../net_msg_bus_base_api/CMbSubEngine.h"
#include "../net_msg_bus_base_api/CommonDef.h"
#include "CInProcBroker.h"
namespace kbd_net
{
//< 是否允许通讯器发送数据
volatile bool g_bCommSendEnable = false;
CInProcBroker::SActWorker* CInProcBroker::SActWorker::newActWorker(zsock_t *pActorPipe, SActWorkerInitArg *pArgs)
{
//< zmalloc内部使用calloc开辟的内存都初始化为0值
SActWorker *self = (SActWorker *)zmalloc(sizeof(SActWorker));
self->m_bConnected = false;
self->m_nSendSeq = 0;
{
//< 获取主机名原本使用zsys_hostname()现改为boost的方法原因
//< zsys_hostname() 在使用gethostname()获取到主机名后,
//< 还会gethostbyname()对获取到的主机名进行验证此时系统已验证CentOS 7默认会使用DNS解析主机名
//< 若系统设置了一个无效的DNS服务器地址会等待超时导致耗时过长
//< boost::asio::ip::host_name()无此问题,应该是直接返回主机名,没有进行进一步验证
try
{
//memset(self->m_chHostName, 0, sizeof(self->m_chHostName));
strncpy(self->m_chHostName, boost::asio::ip::host_name().c_str(), sizeof(self->m_chHostName) - 1);
}
catch (std::exception &e)
{
LOGERROR("newActWorker(): %s", e.what());
assert(false);
}
}
//memset(self->m_chProcName, 0, sizeof(self->m_chProcName));
strncpy(self->m_chProcName, pArgs->m_pchProcName, sizeof(self->m_chProcName) - 1);
//memset(self->m_chInstName, 0, sizeof(self->m_chInstName));
strncpy(self->m_chInstName, pArgs->m_pchInstName, sizeof(self->m_chInstName) - 1);
self->m_pActorPipe = pActorPipe;
//< 初始化m_pSubEng
self->m_pSubEng = new CMbSubEngine;
assert(self->m_pSubEng);
//< 初始化
self->m_pCommSet = new boost::unordered_set<std::string>;
assert(self->m_pCommSet);
//< 初始化m_pKmbpMsg
self->m_pKmbpMsg = kmbp_new();
assert(self->m_pKmbpMsg);
//< 初始化m_pSockInCmd
self->m_pSockInCmd = zsock_new_router(MB_P2C_CMD_ENDPOINT);
assert(self->m_pSockInCmd);
//< 初始化m_pSockInData
{
self->m_pSockInData = zsock_new(ZMQ_ROUTER);
assert(self->m_pSockInData);
//< 在zmq 4.1.x版本高水位设置对下一次bind、connect起效
//< 4.2.x版本可以先bind、connect再修改高水位
//< Router发送高水位设大一些防止丢弃
zsock_set_sndhwm(self->m_pSockInData, 1000000);
//< 由于发送的是指针不允许router静默丢弃消息以防内存泄漏
zsock_set_router_mandatory(self->m_pSockInData, 1);
//< 设置收发超时时间防止本线程持续阻塞单位ms
zsock_set_sndtimeo(self->m_pSockInData, 100);
zsock_set_rcvtimeo(self->m_pSockInData, 100);
int nRc = zsock_bind(self->m_pSockInData, MB_P2C_DATA_ENDPOINT);
assert(0 == nRc);
}
//< 初始化m_pSockOut
{
self->m_pSockOut = zsock_new(ZMQ_DEALER);
assert(self->m_pSockOut);
if (pArgs->m_bAddPid)
{
self->m_nPid = boost::interprocess::ipcdetail::get_current_process_id();
}
else
self->m_nPid = -1;
//< pchIdentity需释放
char *pchIdentity = zsys_sprintf("%s@%s@%d",
self->m_chProcName,
self->m_chInstName,
self->m_nPid);
zsock_set_identity(self->m_pSockOut, pchIdentity);
zstr_free(&pchIdentity);
zsock_set_sndhwm(self->m_pSockOut, 10000);
zsock_set_rcvhwm(self->m_pSockOut, 100000);
//< 设置收发超时时间防止本线程持续阻塞单位ms
zsock_set_sndtimeo(self->m_pSockOut, 100);
zsock_set_rcvtimeo(self->m_pSockOut, 100);
zsock_set_reconnect_ivl(self->m_pSockOut, 500);
zsock_set_reconnect_ivl_max(self->m_pSockOut, 1000);
}
//< 初始化m_pSockOutMon
{
//< pchEndpoint需释放
char *pchEndpoint = zsys_sprintf("inproc://zmonitor-%p", self->m_pSockOut);
assert(pchEndpoint);
const int nEvent = ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_MONITOR_STOPPED;
int nRc = zmq_socket_monitor(zsock_resolve(self->m_pSockOut), pchEndpoint, nEvent);
assert(0 == nRc);
self->m_pSockOutMon = zsock_new(ZMQ_PAIR);
assert(self->m_pSockOutMon);
nRc = zsock_connect(self->m_pSockOutMon, pchEndpoint);
assert(0 == nRc);
//< 释放
zstr_free(&pchEndpoint);
}
//< 设置m_pSockOut连接
{
const int nRc = zsock_connect(self->m_pSockOut, MB_H2P_ENDPOINT);
assert(0 == nRc);
}
//< 初始化m_pLoop
{
self->m_pLoop = zloop_new();
assert(self->m_pLoop);
//< 设置m_pLoop事件驱动优先级与书写顺序有关
//< 周期性循环任务定时器1s一次定时器ID从0开始
int nRc = zloop_timer(self->m_pLoop, 1000, 0, handleCycTaskTimer, self);
assert(nRc >= 0);
bool bRc = setSocketHandle(self, self->m_pActorPipe, handleActorPipe);
assert(bRc);
bRc = setSocketHandle(self, self->m_pSockOutMon, handleSockOutMon);
assert(bRc);
bRc = setSocketHandle(self, self->m_pSockInCmd, handleSockInCmd);
assert(bRc);
bRc = setSocketHandle(self, self->m_pSockInData, handleSockInData);
assert(bRc);
bRc = setSocketHandle(self, self->m_pSockOut, handleSockOut);
assert(bRc);
//< 系统信号打断不退出
zloop_set_nonstop(self->m_pLoop, true);
}
return self;
}
void CInProcBroker::SActWorker::destroyActWorker(SActWorker **pSelf)
{
if (*pSelf)
{
SActWorker *self = *pSelf;
delete self->m_pSubEng;
self->m_pSubEng = NULL;
if (self->m_pCommSet)
{
if (!(self->m_pCommSet->empty()))
{
//< 尚有通讯器未注销
LOGERROR("destroyActWorker(): please deconstruct all communicator befor releaseMsgBus() !");
}
delete self->m_pCommSet;
self->m_pCommSet = NULL;
}
zloop_destroy(&(self->m_pLoop));
zsock_destroy(&(self->m_pSockInCmd));
zsock_destroy(&(self->m_pSockInData));
//< 取消监视应先于销毁连接到监视(即接收监视消息)的 ZMQ_PAIR 连接,即 m_pSockOutMon
//< 否则取消监视时zeromq 内部发送监视状态消息时可能阻塞sndtimeo默认值为永久等待
//< 当被监视的连接销毁时,会自动取消监视
//< 所以如果不主动取消监视,应当先销毁被监视连接 m_pSockOut ,再销毁接收监视连接 m_pSockOutMon
zmq_socket_monitor(zsock_resolve(self->m_pSockOut), NULL, 0);
zsock_destroy(&(self->m_pSockOut));
zsock_destroy(&(self->m_pSockOutMon));
kmbp_destroy(&(self->m_pKmbpMsg));
free(self);
*pSelf = NULL;
}
}
void CInProcBroker::SActWorker::actorMain(zsock_t *pActorPipe, void *pArgs)
{
/* zactor的要求
An actor function MUST call zsock_signal (pipe) when initialized
and MUST listen to pipe and exit on $TERM command.
*/
zsock_signal(pActorPipe, 0);
SActWorker *self = newActWorker(pActorPipe, (SActWorkerInitArg *)pArgs);
if (self)
{
zsock_send(pActorPipe, "i", 1);
//< 允许通讯器发送数据
g_bCommSendEnable = true;
//< 进入zloop处理任意zloop函数返回-1则zloop退出
zloop_start(self->m_pLoop);
destroyActWorker(&self);
}
else
{
LOGERROR("newActWorker() failed !");
zsock_send(pActorPipe, "i", 0);
}
}
bool CInProcBroker::SActWorker::setSocketHandle(SActWorker *self, zsock_t *pSock, zloop_reader_fn pHandleFuc)
{
//if (NULL == self || NULL == pSock)
// return false;
if (NULL != pHandleFuc)
{
const int nRc = zloop_reader(self->m_pLoop, pSock, pHandleFuc, self);
if (0 != nRc)
return false;
zloop_reader_set_tolerant(self->m_pLoop, pSock);
}
else
zloop_reader_end(self->m_pLoop, pSock);
return true;
}
bool CInProcBroker::SActWorker::sendOut(SActWorker *self, kmbp_t *pKmbp)
{
//if (NULL == self)
// return false;
const int nMsgID = kmbp_id(pKmbp);
switch ( nMsgID )
{
case KMBP_C2H_DATA_DOMAIN:
case KMBP_C2H_DATA_PEER:
case KMBP_C2H_DATA_HOST:
case KMBP_P2H_HEARTBEAT:
case KMBP_P2H_SUB_ALL:
case KMBP_P2H_SUB_ADD:
case KMBP_P2H_SUB_DEL:
break;
default:
{
LOGERROR("sendOut(): Unexpected message ID = %d ", nMsgID);
}
break;
}
self->m_nSendSeq++;
kmbp_set_seqno(pKmbp, self->m_nSendSeq);
if (0 != kmbp_send(pKmbp, self->m_pSockOut, NULL, NULL))
{
LOGWARN("sendOut(): kmbp_send() failed !");
}
return true;
}
void CInProcBroker::SActWorker::decompressData(kmbp_t *pKmbp)
{
const int nSizeDst = kmbp_data_unzipsize(pKmbp);
if (nSizeDst <= 0)
{
//< 消息未压缩
return;
}
zchunk_t *pChunkSrc = kmbp_data_content(pKmbp);
if (NULL == pChunkSrc)
{
//< 启用了压缩,但是却没有数据,不可能
LOGERROR("decompressData(): NULL == pChunkSrc , some thing wrong !");
return;
}
const size_t nSizeSrc = zchunk_size(pChunkSrc);
if (0 == nSizeSrc)
{
//< 启用了压缩但是消息长度为0不可能
LOGERROR("decompressData(): 0 == nSizeSrc , some thing wrong !");
return;
}
if (nSizeSrc >= 2147483647)
{
LOGERROR("decompressData(): nSizeSrc >= 2147483647 , too big to decompress !");
return;
}
//< 注意:失败需释放 pChunkDst
zchunk_t *pChunkDst = zchunk_new(NULL, nSizeDst);
const int nUnzipSize = LZ4_decompress_safe((const char *)zchunk_data(pChunkSrc),
(char *)zchunk_data(pChunkDst),
(int)nSizeSrc, nSizeDst);
if (nUnzipSize <= 0)
{
LOGERROR("LZ4_decompress_safe() failed !");
zchunk_destroy(&pChunkDst); //< 释放
return;
}
if (nSizeDst != nUnzipSize)
{
LOGERROR("decompressData(): nSizeDst != nUnzipSize , some thing wrong !");
zchunk_destroy(&pChunkDst); //< 释放
return;
}
//< 设置 pChunkDst 的实际长度new时传入的是最大长度
zchunk_set(pChunkDst, NULL, nUnzipSize);
kmbp_set_data_unzipsize(pKmbp, 0);
kmbp_set_data_content(pKmbp, &pChunkDst);
}
int CInProcBroker::SActWorker::handleActorPipe(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pActorPipe);
bool bTerm = false;
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
//< 注意pchCmd需要释放
char *pchCmd = zstr_recv(reader);
if (NULL == pchCmd)
{
//< 有可能是被系统信号打断,不停止
continue;
}
if (streq(pchCmd, "$TERM"))
{
//< zactor要求接收"$TERM"消息,并退出
g_bCommSendEnable = false;
zclock_sleep(300);
//< 释放所有未接收消息
while (zsock_events(self->m_pSockInData) & ZMQ_POLLIN)
{
kmbp_cnt_t *pKmbpCnt = NULL;
if (0 != zsock_recv(self->m_pSockInData, "p", &pKmbpCnt))
continue;
//< 有可能为NULL
if (NULL != pKmbpCnt)
{
//< 释放引用
kmbp_cnt_dec(&pKmbpCnt);
}
}
bTerm = true;
}
else if (streq(pchCmd, "$CONNECTED"))
{
zsock_send(reader, "i", self->m_bConnected);
}
else
{
//< 不正确的消息
LOGWARN("handleActorPipe(): malformed message !");
}
//< 清理,正常情况下不应有垃圾消息
if (zsock_rcvmore(reader))
{
LOGWARN("handleActorPipe(): garbage message !");
zmsg_t *more = zmsg_recv(reader);
zmsg_print(more);
zmsg_destroy(&more);
}
//< 释放
zstr_free(&pchCmd);
}
//< 任意zloop函数返回-1则zloop退出
return bTerm ? -1 : 0;
}
int CInProcBroker::SActWorker::handleSockInCmd(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pSockInCmd);
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
const int nRc = kmbp_recv(self->m_pKmbpMsg, reader);
if (0 != nRc)
{
//< 被系统信号打断,或者消息格式不正确,继续
LOGWARN("kmbp_recv() failed, nRc == %d", nRc);
continue;
}
//< 即通讯器名
const std::string strRoutingID((char *)zframe_data(kmbp_routing_id(self->m_pKmbpMsg)),
zframe_size(kmbp_routing_id(self->m_pKmbpMsg)));
switch (kmbp_id(self->m_pKmbpMsg))
{
case KMBP_C2P_REG:
{
self->m_pCommSet->insert(strRoutingID);
//< 回复通讯器
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_TRUE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
break;
case KMBP_C2P_UNREG:
{
self->m_pCommSet->erase(strRoutingID);
//< 删除订阅者
{
const SubInfoVecPtr ptrVecSub =
self->m_pSubEng->delSuber(strRoutingID);
if (ptrVecSub)
{
//< 发送给本机服务
for (size_t i = 0; i < ptrVecSub->size(); i++)
{
if (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT)
{
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_SUB_DEL);
kmbp_set_sub_appid(self->m_pKmbpMsg, (*ptrVecSub)[i].m_nAppID);
kmbp_set_sub_chanid(self->m_pKmbpMsg, (*ptrVecSub)[i].m_nChannelID);
sendOut(self, self->m_pKmbpMsg);
}
}
}
}
//< 回复通讯器
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_TRUE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
break;
case KMBP_C2P_SUB_ADD:
{
CMbSubInfoImp objSubAdd;
objSubAdd.m_nAppID = kmbp_sub_appid(self->m_pKmbpMsg);
objSubAdd.m_nChannelID = kmbp_sub_chanid(self->m_pKmbpMsg);
if (!(objSubAdd.isValid(false)))
{
//< 回复通讯器false
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_FALSE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
//< 不对上发送
}
else
{
const bool bUpSend =
self->m_pSubEng->addSubtionOfSuber(strRoutingID, objSubAdd);
if (bUpSend && (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT))
{
//< 发送给本机服务
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_SUB_ADD);
sendOut(self, self->m_pKmbpMsg);
}
//< 回复通讯器
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_TRUE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
}
break;
case KMBP_C2P_SUB_DEL:
{
CMbSubInfoImp objSubDel;
objSubDel.m_nAppID = kmbp_sub_appid(self->m_pKmbpMsg);
objSubDel.m_nChannelID = kmbp_sub_chanid(self->m_pKmbpMsg);
if (!(objSubDel.isValid(false)))
{
//< 回复通讯器false
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_FALSE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
//< 不对上发送
}
else
{
//< 注意,接口库需支持通配,服务程序无需作此处理
//< 比如现在已订阅112112取消订阅01
//< 则会取消掉1121订阅12保留。
bool bRet = false;
const SubInfoVecPtr ptrVecSub =
self->m_pSubEng->getSubtionOfSuber(strRoutingID);
if (ptrVecSub)
{
for (size_t i = 0; i < ptrVecSub->size(); i++)
{
const CMbSubInfoImp &Sub = (*ptrVecSub)[i];
if (objSubDel.contain(Sub))
{
bRet = true;
const bool bUpSend =
self->m_pSubEng->delSubtionOfSuber(strRoutingID, Sub);
if (bUpSend && (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT))
{
//< 发送给本机服务
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_SUB_DEL);
kmbp_set_sub_appid(self->m_pKmbpMsg, Sub.m_nAppID);
kmbp_set_sub_chanid(self->m_pKmbpMsg, Sub.m_nChannelID);
sendOut(self, self->m_pKmbpMsg);
}
}
}
}
//< 回复通讯器
if (bRet)
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_TRUE);
else
kmbp_set_id(self->m_pKmbpMsg, KMBP_COMMON_RETURN_FALSE);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
}
break;
case KMBP_C2P_SUB_ALL_REQ:
{
zlist_t *plistSub = zlist_new();
zlist_autofree(plistSub);
const SubInfoVecPtr ptrVecSub = self->m_pSubEng->getSubtionOfSuber(strRoutingID);
if (ptrVecSub)
{
for (size_t i = 0; i < ptrVecSub->size(); i++)
{
const CMbSubInfoImp &Sub = (*ptrVecSub)[i];
//< list设置了autofree会复制内容
zlist_append(plistSub, (char *)Sub.toString().c_str());
}
}
//< 转移plistSub所有权至kmbpkmbp内部释放
kmbp_set_sublist(self->m_pKmbpMsg, &plistSub);
//< 回复通讯器
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2C_SUB_ALL);
kmbp_send(self->m_pKmbpMsg, reader, NULL, NULL);
}
break;
default:
{
LOGERROR("handleSockInCmd(): invalid kmbp_id = %d ", kmbp_id(self->m_pKmbpMsg));
}
break;
}
//< 无需清理
}
return 0;
}
int CInProcBroker::SActWorker::handleSockInData(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pSockInData);
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
//< self->m_pSockInData是router第一帧是routing id
zframe_t *pFrameRoutingID = zframe_recv(reader); //< 需释放
if (!pFrameRoutingID || !zsock_rcvmore(reader))
{
LOGWARN("handleSockInData(): no routing ID , ignore !");
zframe_destroy(&pFrameRoutingID);
continue;
}
//< 即通讯器名
const std::string strRoutingID((char *)zframe_data(pFrameRoutingID),
zframe_size(pFrameRoutingID));
zframe_destroy(&pFrameRoutingID);
kmbp_cnt_t *pKmbpCnt = NULL;
//< 最后统一释放引用
if (0 != zsock_recv(reader, "p", &pKmbpCnt))
continue;
if (NULL == pKmbpCnt)
{
//< 回复通讯器NULL
zframe_t *pDstRoutingID = zframe_from(strRoutingID.c_str());
//< m_pSockInData是router第一帧是id
//< pDstRoutingID 在 zframe_send 后已被接管,无需再释放
if (0 == zframe_send(&pDstRoutingID, self->m_pSockInData, ZFRAME_MORE))
{
zsock_send(self->m_pSockInData, "p", pKmbpCnt);
}
//< pKmbpCnt 无需释放引用
}
else //< != NULL
{
kmbp_t *pKmbp = kmbp_cnt_get(pKmbpCnt);
switch (kmbp_id(pKmbp))
{
case KMBP_C2H_DATA_PEER:
case KMBP_C2H_DATA_HOST:
case KMBP_C2H_DATA_DOMAIN:
{
if (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT)
{
kmbp_set_src_host(pKmbp, self->m_chHostName);
kmbp_set_src_proc(pKmbp, self->m_chProcName);
kmbp_set_src_inst(pKmbp, self->m_chInstName);
kmbp_set_src_pid(pKmbp, self->m_nPid);
kmbp_set_src_comm(pKmbp, strRoutingID.c_str());
sendOut(self, pKmbp);
}
else
{
//< todo 是否增加额外缓存,视情况
if (g_bCommSendEnable)
{
g_bCommSendEnable = false;
}
LOGWARN("handleSockInData(): outbound queue is full , drop message !");
}
}
break;
default:
{
LOGERROR("handleSockInData(): invalid kmbp_id = %d ", kmbp_id(pKmbp));
}
break;
}
//< 释放引用
kmbp_cnt_dec(&pKmbpCnt);
}
}
return 0;
}
int CInProcBroker::SActWorker::handleSockOut(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pSockOut);
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
const int nRc = kmbp_recv(self->m_pKmbpMsg, reader);
if (0 != nRc)
{
//< 被系统信号打断,或者消息格式不正确,继续
LOGWARN("kmbp_recv() failed, nRc == %d", nRc);
continue;
}
switch (kmbp_id(self->m_pKmbpMsg))
{
case KMBP_H2P_DATA:
{
const int nDataType = kmbp_data_type(self->m_pKmbpMsg);
if (nDataType == MB_MT_PING_PROC
|| nDataType == MB_MT_PING_PROC_IN_HOST)
{
//< 响应Ping
kmbp_t *pKmbpSend = self->m_pKmbpMsg;
//< 设置回复消息内容
{
kmbp_set_id(pKmbpSend, KMBP_C2H_DATA_PEER);
kmbp_set_timestamp(pKmbpSend, zclock_mono());
kmbp_set_relaystat(pKmbpSend, MB_RS_DEFAULT);
kmbp_set_dst_comm(pKmbpSend, kmbp_src_comm(pKmbpSend));
kmbp_set_dst_proc(pKmbpSend, kmbp_src_proc(pKmbpSend));
kmbp_set_dst_inst(pKmbpSend, kmbp_src_inst(pKmbpSend));
kmbp_set_dst_pid(pKmbpSend, kmbp_src_pid(pKmbpSend));
kmbp_set_dst_host(pKmbpSend, kmbp_src_host(pKmbpSend));
kmbp_set_src_host(pKmbpSend, self->m_chHostName);
kmbp_set_src_proc(pKmbpSend, self->m_chProcName);
kmbp_set_src_inst(pKmbpSend, self->m_chInstName);
kmbp_set_src_pid(pKmbpSend, self->m_nPid);
kmbp_set_src_comm(pKmbpSend, "");
kmbp_set_data_type(pKmbpSend, MB_MT_PING_PROC_REP);
}
//< 发出
sendOut(self, pKmbpSend);
}
else
{
//< 点对点消息C2H_DATA_PEERDst_Comm不为空直接投送给通讯器。
//< 其他类型消息Dst_Comm为空进程内再进行主题、订阅匹配。
StrVecPtr ptrVecComm;
{
const char *pchComm = kmbp_dst_comm(self->m_pKmbpMsg);
if (strlen(pchComm) > 0)
{
ptrVecComm.reset(new StrVec);
ptrVecComm->push_back(std::string(pchComm));
}
else
{
const CMbSubInfoImp objSub(kmbp_sub_appid(self->m_pKmbpMsg),
kmbp_sub_chanid(self->m_pKmbpMsg));
ptrVecComm = self->m_pSubEng->lookupSuberOfSubject(objSub);
}
}
if (ptrVecComm)
{
if (nDataType == MB_MT_PING_SUBER_IN_DOMAIN
|| nDataType == MB_MT_PING_SUBER_IN_HOST
|| nDataType == MB_MT_PING_SUBER_IN_PROC)
{
//< 响应 ping
kmbp_t *pKmbpSend = self->m_pKmbpMsg;
//< 设置回复消息内容
{
kmbp_set_id(pKmbpSend, KMBP_C2H_DATA_PEER);
kmbp_set_timestamp(pKmbpSend, zclock_mono());
kmbp_set_relaystat(pKmbpSend, MB_RS_DEFAULT);
kmbp_set_dst_comm(pKmbpSend, kmbp_src_comm(pKmbpSend));
kmbp_set_dst_proc(pKmbpSend, kmbp_src_proc(pKmbpSend));
kmbp_set_dst_inst(pKmbpSend, kmbp_src_inst(pKmbpSend));
kmbp_set_dst_pid(pKmbpSend, kmbp_src_pid(pKmbpSend));
kmbp_set_dst_host(pKmbpSend, kmbp_src_host(pKmbpSend));
kmbp_set_src_host(pKmbpSend, self->m_chHostName);
kmbp_set_src_proc(pKmbpSend, self->m_chProcName);
kmbp_set_src_inst(pKmbpSend, self->m_chInstName);
kmbp_set_src_pid(pKmbpSend, self->m_nPid);
//kmbp_set_src_comm(pKmbpSend, "");
kmbp_set_data_type(pKmbpSend, MB_MT_PING_SUBER_REP);
}
for (size_t i = 0; i < ptrVecComm->size(); i++)
{
const std::string *pStrDstComm = &((*ptrVecComm)[i]);
//< 跳过已注销的通讯器
BOOST_AUTO(itComm, self->m_pCommSet->find(*pStrDstComm));
if (self->m_pCommSet->end() == itComm)
{
continue;
}
kmbp_set_src_comm(pKmbpSend, pStrDstComm->c_str());
//< 发出
sendOut(self, pKmbpSend);
}
}
else
{
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2C_DATA);
//< 解压消息数据
decompressData(self->m_pKmbpMsg);
kmbp_cnt_t *pKmbpCnt = kmbp_cnt_new();
kmbp_cnt_swap(pKmbpCnt, &(self->m_pKmbpMsg));
for (size_t i = 0; i < ptrVecComm->size(); i++)
{
const std::string *pStrDstComm = &((*ptrVecComm)[i]);
//< 跳过已注销的通讯器
BOOST_AUTO(itComm, self->m_pCommSet->find(*pStrDstComm));
if (self->m_pCommSet->end() == itComm)
{
continue;
}
zframe_t *pFrameRoutingID = zframe_from(pStrDstComm->c_str());
//< m_pSockInData是router第一帧是id
//< pFrameRoutingID在zframe_send后已被接管无需再释放
if (0 == zframe_send(&pFrameRoutingID, self->m_pSockInData, ZFRAME_MORE))
{
//< 先加对端引用,若失败再减
//< 防止在增加引用计数前,对侧收到并释放
kmbp_cnt_inc(pKmbpCnt);
if (0 != zsock_send(self->m_pSockInData, "p", pKmbpCnt))
{
//< 失败,释放对端引用
kmbp_cnt_dec(&pKmbpCnt);
LOGWARN("handleSockOut(): send to m_pSockInData failed , unexpected !");
}
}
else
{
LOGWARN("handleSockOut(): send to m_pSockInData failed , unexpected !");
}
}
//< 释放本端引用
kmbp_cnt_dec(&pKmbpCnt);
}
}
}
}
break;
case KMBP_H2P_SUB_ALL_REQ:
{
zlist_t *plistSub = zlist_new();
zlist_autofree(plistSub);
const SubInfoVecPtr ptrVecSub = self->m_pSubEng->getSubtionOfAll();
if (ptrVecSub)
{
for (size_t i = 0; i < ptrVecSub->size(); i++)
{
const CMbSubInfoImp &Sub = (*ptrVecSub)[i];
//< zlist设置了autofree会复制数据
zlist_append(plistSub, (char *)(Sub.toString().c_str()));
}
}
//< 转移zlist所有权至kmbpkmbp内部释放
kmbp_set_sublist(self->m_pKmbpMsg, &plistSub);
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_SUB_ALL);
sendOut(self, self->m_pKmbpMsg);
}
break;
default:
{
LOGERROR("handleSockOut(): invalid kmbp_id = %d ", kmbp_id(self->m_pKmbpMsg));
}
break;
}
//< 无需清理
}
return 0;
}
int CInProcBroker::SActWorker::handleCycTaskTimer(zloop_t * /*loop*/, int /*timer_id*/, void *arg)
{
SActWorker *self = (SActWorker *)arg;
//< 判断可以不阻塞地发出
if (zsock_events(self->m_pSockOut) & ZMQ_POLLOUT)
{
if (!g_bCommSendEnable)
{
g_bCommSendEnable = true;
}
if (self->m_bConnected)
{
//< 发送心跳
kmbp_set_id(self->m_pKmbpMsg, KMBP_P2H_HEARTBEAT);
sendOut(self, self->m_pKmbpMsg);
}
}
else
{
if (g_bCommSendEnable)
{
g_bCommSendEnable = false;
}
}
return 0;
}
int CInProcBroker::SActWorker::handleSockOutMon(zloop_t *, zsock_t *reader, void *arg)
{
SActWorker *self = (SActWorker *)arg;
assert(reader == self->m_pSockOutMon);
for (int i = 0;
(i < 10) && (zsock_events(reader) & ZMQ_POLLIN);
i++)
{
//< ZMQ v4版本的格式老版本的格式不一样
//< 第一帧
int nEvent = 0, nValue = 0;
{
zframe_t *pFrame = zframe_recv(reader); //< 需释放
if (NULL == pFrame)
{
//< 被系统信号打断
continue;
}
if (!zsock_rcvmore(reader))
{
//< 没有后续帧,证明格式错误
zframe_destroy(&pFrame); //< 释放
LOGWARN("handleSockOutMon(): malformed zmonitor msg !");
continue;
}
nEvent = *(uint16_t *)(zframe_data(pFrame));
nValue = *(uint32_t *)(zframe_data(pFrame) + 2);
zframe_destroy(&pFrame); //< 释放
}
//< 第二帧
std::string strAddr;
{
char *pchAddress = zstr_recv(reader); //< 需释放
if (NULL == pchAddress)
{
//< 被系统信号打断,再试一次
pchAddress = zstr_recv(reader);
if (NULL == pchAddress)
{
LOGWARN("handleSockOutMon(): recv address failed !");
}
}
if (NULL == pchAddress)
strAddr = "UNKNOWN";
else
strAddr = pchAddress;
//< 释放传入NULL是安全的
zstr_free(&pchAddress);
}
const char *pchName;
switch (nEvent)
{
case ZMQ_EVENT_ACCEPTED:
pchName = "ACCEPTED";
break;
case ZMQ_EVENT_ACCEPT_FAILED:
pchName = "ACCEPT_FAILED";
break;
case ZMQ_EVENT_BIND_FAILED:
pchName = "BIND_FAILED";
break;
case ZMQ_EVENT_CLOSED:
pchName = "CLOSED";
break;
case ZMQ_EVENT_CLOSE_FAILED:
pchName = "CLOSE_FAILED";
break;
case ZMQ_EVENT_DISCONNECTED:
{
pchName = "DISCONNECTED";
self->m_bConnected = false;
}
break;
case ZMQ_EVENT_CONNECTED:
{
pchName = "CONNECTED";
self->m_bConnected = true;
}
break;
case ZMQ_EVENT_CONNECT_DELAYED:
pchName = "CONNECT_DELAYED";
break;
case ZMQ_EVENT_CONNECT_RETRIED:
pchName = "CONNECT_RETRIED";
break;
case ZMQ_EVENT_LISTENING:
pchName = "LISTENING";
break;
case ZMQ_EVENT_MONITOR_STOPPED:
{
pchName = "MONITOR_STOPPED";
LOGERROR("ZMQ_EVENT_MONITOR_STOPPED , unexpected while running !");
}
break;
default:
pchName = "UNKNOWN";
break;
}
LOGINFO("Message bus api connection with local service : \n Event = %s ; Value = %d ; Addr = %s ;",
pchName, nValue, strAddr.c_str());
}
return 0;
}
/*========================= CInProcBroker =========================*/
CInProcBroker::CInProcBroker()
{
//< 取消CZMQ对系统信号的截取
zsys_handler_set(NULL);
m_pActor = NULL;
}
CInProcBroker::~CInProcBroker()
{
//< zactor_destroy()向zactor线程发送"$TERM"消息,并等待返回消息
//< 线程内收到该消息后释放相关资源见handleActorPipe()函数
zactor_destroy(&m_pActor);
//< 明确地提前调用zsys_shutdown(),而非程序退出时自动调用(顺序不可控)
//< 防止Windows系统下报错Assertion failed: Successful WSASTARTUP not yet performed
zsys_shutdown();
}
bool CInProcBroker::start(const char *pchProcName, const char *pchInstName,
bool bAddPid)
{
if (NULL != m_pActor)
return false;
SActWorkerInitArg stArgs;
stArgs.m_pchProcName = pchProcName;
stArgs.m_pchInstName = pchInstName;
stArgs.m_bAddPid = bAddPid;
zactor_t *pActor = zactor_new(SActWorker::actorMain, &stArgs);
if (NULL == pActor)
return false;
int nStarted = 0;
zsock_recv(pActor, "i", &nStarted);
if (nStarted)
{
m_pActor = pActor;
return true;
}
zactor_destroy(&pActor);
return false;
}
bool CInProcBroker::isConnected()
{
if (NULL == m_pActor)
return false;
int nConnected;
zsock_send(m_pActor, "s", "$CONNECTED");
//< 接收,阻塞直到有回复
zsock_recv(m_pActor, "i", &nConnected);
return nConnected == 1;
}
} //< namespace kbd_net