HM-SPMS/product/src/tools/mqtt_tool/QMqttClient.cpp
2025-03-13 15:21:23 +08:00

206 lines
6.5 KiB
C++

#include "QMqttClient.h"
#include <QDebug>
#include <QtConcurrent>
//--------------------------------------------------------------------
QMqttClient::QMqttClient(const QString& mqtt_id, QObject *parent)
: QObject(parent),
mosquittopp(mqtt_id.toLocal8Bit().data()),
m_id(mqtt_id),
m_port(1883),
m_keepAlive(60),
m_connected(false),
MID(0)
{
}
//--------------------------------------------------------------------
QMqttClient::QMqttClient(const QString& mqtt_id, const QHostAddress& host, int port, QObject* parent)
: QObject(parent),
mosquittopp(mqtt_id.toLocal8Bit().data()),
m_id(mqtt_id),
m_host(host),
m_port(port),
m_keepAlive(60), //seconds
m_connected(false),
MID(0)
{
}
//--------------------------------------------------------------------
QMqttClient::~QMqttClient()
{
disconnect();
}
//--------------------------------------------------------------------
void QMqttClient::connect()
{
if(!m_connected)
{
mosqpp::lib_init();
if( !m_username.isEmpty())
{
username_pw_set(m_username.toUtf8().constData(), m_password.toUtf8().constData());
}
int connect_rslt = connect_async(m_host.toString().toLatin1().data(), m_port, m_keepAlive);
if( connect_rslt != MOSQ_ERR_SUCCESS)
{
QString errorMessage = mosquitto_strerror(connect_rslt);
emit error(errorMessage);
}else
{
m_connected = true;
QtConcurrent::run(this, &QMqttClient::runLoop);
}
}
}
//--------------------------------------------------------------------
void QMqttClient::disconnect()
{
if(m_connected) {
int rslt = mosquittopp::disconnect();
if(rslt != MOSQ_ERR_SUCCESS) {
QString errorMessage = mosquitto_strerror(rslt);
emit error(errorMessage);
}
}
}
//--------------------------------------------------------------------
void QMqttClient::on_connect(int rc)
{
if(rc != MOSQ_ERR_SUCCESS) {
m_connected = false;
QString errorMessage = mosquitto_strerror(rc);
emit error(errorMessage);
}else{
m_connected = true;
emit connected();
}
}
//--------------------------------------------------------------------
int QMqttClient::send_message(const QString& topic, const QString& data, bool retain, QMqttMessage::QoS qos)
{
return mosquittopp::publish(&MID,
topic.toLocal8Bit().data(),
strlen(data.toLocal8Bit().data()),
data.toLocal8Bit().data(),
qos,
retain);
}
//--------------------------------------------------------------------
void QMqttClient::on_disconnect(int rc)
{
Q_UNUSED(rc)
if( rc != MOSQ_ERR_SUCCESS )
{
QString errorMessage = QString("Disconnect failed with return code: %1").arg(rc);
emit error(errorMessage);
return;
}
m_connected = false;
loop_stop(true);
if (QThread::currentThread()->isRunning())
{
QThread::currentThread()->quit();
QThread::currentThread()->wait();
}
emit disconnected();
}
//--------------------------------------------------------------------
void QMqttClient::on_message(const mosquitto_message *mqtt_message)
{
if(mqtt_message->payloadlen) {
QByteArray message_received = (char*)mqtt_message->payload;
QMqttMessage message(QString(mqtt_message->topic), message_received, mqtt_message->retain);
emit messageReceived(message);
}
}
//--------------------------------------------------------------------
void QMqttClient::on_subscribe(int mid, int qos_count, const int *granted_qos)
{
if(mid != 0) {
QString subscribedTopic = m_subscribingTopics.value(mid);
if(!subscribedTopic.isEmpty()) {
m_subscribingTopics.remove(mid);
m_subscribedTopics.insert(mid ,subscribedTopic);
emit subscribed(subscribedTopic);
}
}
MID++;
}
//--------------------------------------------------------------------
void QMqttClient::on_publish(int mid)
{
MID++;
emit published();
}
void QMqttClient::on_unsubscribe(int mid)
{
if(mid != 0) {
QString unsubscribedTopic = m_unsubscribingTopics.value(mid);
if( !unsubscribedTopic.isEmpty()) {
m_unsubscribingTopics.remove(mid);
emit unsubscribed(unsubscribedTopic);
}
}
}
void QMqttClient::runLoop()
{
qDebug() << "runLoop started";
while (m_connected)
{
int loopResult = loop();
if (loopResult != MOSQ_ERR_SUCCESS) {
QString errorMessage = mosquitto_strerror(loopResult);
emit error(errorMessage);
}
QThread::msleep(10);
}
}
//--------------------------------------------------------------------
void QMqttClient::publish(QString topic, QByteArray data, QMqttMessage::QoS qos, bool retain)
{
//publish(QMqttMessage(topic, data, retain), qos);
}
//--------------------------------------------------------------------
void QMqttClient::publish(QMqttMessage message, QMqttMessage::QoS qos)
{
int rslt = send_message(message.topic(), message.payload(), message.hasRetainFlag(), qos);
if(rslt != MOSQ_ERR_SUCCESS) {
QString errorMessage = mosquitto_strerror(rslt);
emit error(errorMessage);
}
}
//--------------------------------------------------------------------
void QMqttClient::subscribe(QString topic, QMqttMessage::QoS qos)
{
int rslt = mosquittopp::subscribe(&MID, topic.toLocal8Bit().data(), qos);
m_subscribingTopics.insert(MID, topic);
if(rslt != MOSQ_ERR_SUCCESS) {
QString errorMessage = mosquitto_strerror(rslt);
emit error(errorMessage);
}
}
//--------------------------------------------------------------------
void QMqttClient::unsubscribe(QString topic)
{
int rslt = mosquittopp::unsubscribe(&MID, topic.toLocal8Bit().data());
m_unsubscribingTopics.insert(MID, topic);
if(rslt != MOSQ_ERR_SUCCESS) {
QString errorMessage = mosquitto_strerror(rslt);
emit error(errorMessage);
}
MID++;
}
void QMqttClient::setWillMsg(QString willtopic, QString willMsg, int qos, bool retain)
{
int rslt = mosquittopp::will_set(willtopic.toLocal8Bit().data() , willMsg.length() , willMsg.toLocal8Bit().data() , qos , retain);
if(rslt != MOSQ_ERR_SUCCESS) {
QString errorMessage = mosquitto_strerror(rslt);
emit error(errorMessage);
}
}