commit 67300f1c104793e3b4788d0e48280c5d9f46b1c8 Author: Dr. Julian-Steffen Müller Date: Wed Mar 25 17:11:51 2020 +0100 Initial MQTT explorer diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..378eac2 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..3cd2e6e --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.10) +project(mqttExplorer) + +set(CMAKE_CXX_STANDARD 17) + +#PAHO-MQTT-C Library +#USUALLY INSTALLED TO /usr/local/include /usr/local/lib and /usr/local/bin +include_directories(/usr/local/include) +link_directories(/usr/local/lib) +set(LIBS ${LIBS} paho-mqtt3cs) + +find_package(spdlog CONFIG REQUIRED) +find_package(ZLIB REQUIRED) + +add_executable(mqttExplorer src/main.cpp src/MQTT.cpp) + +target_link_libraries(mqttExplorer ${LIBS} spdlog::spdlog ZLIB::ZLIB) + +install(TARGETS mqttExplorer DESTINATION bin) diff --git a/src/.MQTT.cpp.swp b/src/.MQTT.cpp.swp new file mode 100644 index 0000000..c2c770c Binary files /dev/null and b/src/.MQTT.cpp.swp differ diff --git a/src/.main.cpp.swp b/src/.main.cpp.swp new file mode 100644 index 0000000..37b70db Binary files /dev/null and b/src/.main.cpp.swp differ diff --git a/src/MQTT.cpp b/src/MQTT.cpp new file mode 100644 index 0000000..fdf216e --- /dev/null +++ b/src/MQTT.cpp @@ -0,0 +1,227 @@ +// +// Created by Julian-Steffen Müller on 24.01.20. +// + +#include +#include "MQTT.h" +#include "string.h" +#include +#include "spdlog/spdlog.h" + +#define MOSQ_ERR_SUCCESS 1 +#define MOSQ_ERR_INVAL 2 + +/* Does a topic match a subscription? */ +int topic_match(const char *sub, const char *topic, bool *result) +{ + size_t spos; + + if(!result) return MOSQ_ERR_INVAL; + *result = false; + + if(!sub || !topic || sub[0] == 0 || topic[0] == 0){ + return MOSQ_ERR_INVAL; + } + + if((sub[0] == '$' && topic[0] != '$') + || (topic[0] == '$' && sub[0] != '$')){ + + return MOSQ_ERR_SUCCESS; + } + + spos = 0; + + while(sub[0] != 0){ + if(topic[0] == '+' || topic[0] == '#'){ + return MOSQ_ERR_INVAL; + } + if(sub[0] != topic[0] || topic[0] == 0){ /* Check for wildcard matches */ + if(sub[0] == '+'){ + /* Check for bad "+foo" or "a/+foo" subscription */ + if(spos > 0 && sub[-1] != '/'){ + return MOSQ_ERR_INVAL; + } + /* Check for bad "foo+" or "foo+/a" subscription */ + if(sub[1] != 0 && sub[1] != '/'){ + return MOSQ_ERR_INVAL; + } + spos++; + sub++; + while(topic[0] != 0 && topic[0] != '/'){ + if(topic[0] == '+' || topic[0] == '#'){ + return MOSQ_ERR_INVAL; + } + topic++; + } + if(topic[0] == 0 && sub[0] == 0){ + *result = true; + return MOSQ_ERR_SUCCESS; + } + }else if(sub[0] == '#'){ + /* Check for bad "foo#" subscription */ + if(spos > 0 && sub[-1] != '/'){ + return MOSQ_ERR_INVAL; + } + /* Check for # not the final character of the sub, e.g. "#foo" */ + if(sub[1] != 0){ + return MOSQ_ERR_INVAL; + }else{ + while(topic[0] != 0){ + if(topic[0] == '+' || topic[0] == '#'){ + return MOSQ_ERR_INVAL; + } + topic++; + } + *result = true; + return MOSQ_ERR_SUCCESS; + } + }else{ + /* Check for e.g. foo/bar matching foo/+/# */ + if(topic[0] == 0 + && spos > 0 + && sub[-1] == '+' + && sub[0] == '/' + && sub[1] == '#') + { + *result = true; + return MOSQ_ERR_SUCCESS; + } + + /* There is no match at this point, but is the sub invalid? */ + while(sub[0] != 0){ + if(sub[0] == '#' && sub[1] != 0){ + return MOSQ_ERR_INVAL; + } + spos++; + sub++; + } + + /* Valid input, but no match */ + return MOSQ_ERR_SUCCESS; + } + }else{ + /* sub[spos] == topic[tpos] */ + if(topic[1] == 0){ + /* Check for e.g. foo matching foo/# */ + if(sub[1] == '/' + && sub[2] == '#' + && sub[3] == 0){ + *result = true; + return MOSQ_ERR_SUCCESS; + } + } + spos++; + sub++; + topic++; + if(sub[0] == 0 && topic[0] == 0){ + *result = true; + return MOSQ_ERR_SUCCESS; + }else if(topic[0] == 0 && sub[0] == '+' && sub[1] == 0){ + if(spos > 0 && sub[-1] != '/'){ + return MOSQ_ERR_INVAL; + } + spos++; + sub++; + *result = true; + return MOSQ_ERR_SUCCESS; + } + } + } + if((topic[0] != 0 || sub[0] != 0)){ + *result = false; + } + while(topic[0] != 0){ + if(topic[0] == '+' || topic[0] == '#'){ + return MOSQ_ERR_INVAL; + } + topic++; + } + + return MOSQ_ERR_SUCCESS; +} + +void MQTT::connlost(void *context, char *cause) { + std::cout << "Connection lost: cause: " << cause << std::endl; +} + +void MQTT::delivered(void *context, MQTTClient_deliveryToken dt) { + //std::cout << "Message with token value %d delivery confirmed" << std::endl; +} + +int MQTT::msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { + MQTT* mqttClient = static_cast(context); + + bool topic_matches; + for (const auto& sub : mqttClient->subscriptions) { + topic_match(sub.topic.c_str(), topicName, &topic_matches); + if (topic_matches) { + sub.onMsgReceive(topicName, std::string((char *)message->payload, message->payloadlen)); + } + } + return 1; +} + +MQTT::MQTT(const std::string& serverURI) : MQTT(serverURI, 0) {} + +MQTT::MQTT(const std::string& serverURI, int retained_default) { + this->retained_default = retained_default; + + conn_opts = MQTTClient_connectOptions_initializer; + MQTTClient_create(&mqttClient, serverURI.c_str(), "ClientC++-jdj", MQTTCLIENT_PERSISTENCE_NONE, NULL); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + + MQTTClient_setCallbacks(mqttClient, static_cast(this), MQTT::connlost, MQTT::msgarrvd, MQTT::delivered); +} + +bool MQTT::connect() { + int rc; + + if ((rc = MQTTClient_connect(mqttClient, &conn_opts)) != MQTTCLIENT_SUCCESS) { + std::cout << "[ERROR] MQTT::connect: Connectionn failed with return code: " << rc << std::endl; + return false; + } + + return true; +} + +bool MQTT::last_will(const std::string& willTopic, const std::string& willMessage, int retained) { + if (! is_connected()) { + will_opts = MQTTClient_willOptions_initializer; + will_opts.topicName = willTopic.c_str(); + will_opts.message = willMessage.c_str(); + will_opts.retained = retained; + conn_opts.will = &will_opts; + + return true; + } + + return false; +} + +bool MQTT::is_connected() { + return MQTTClient_isConnected(mqttClient); +} +//TODO QOS +void MQTT::subscribe(const std::string& topic, std::function onMsgReceive) { + subscriptions.emplace_back(topic, onMsgReceive); + MQTTClient_subscribe(mqttClient, topic.c_str(), 0); +} + + +void MQTT::publish(const std::string& topic, std::string message) const { + publish(topic, message, retained_default); +} + +void MQTT::publish(const std::string &topic, std::string message, int retained) const { + const char *msg = message.c_str(); + MQTTClient_publish(mqttClient, topic.c_str(), strlen(msg), (void *) msg, 1, retained, NULL); +} + +MQTT::~MQTT() { + spdlog::trace("Closing mqtt connection '{}'", "TODO tcp://chimaera:1883"); + MQTTClient_disconnect(mqttClient, 10000); + MQTTClient_destroy(&mqttClient); + spdlog::trace("Closed"); + +} diff --git a/src/MQTT.h b/src/MQTT.h new file mode 100644 index 0000000..5b87766 --- /dev/null +++ b/src/MQTT.h @@ -0,0 +1,54 @@ +// +// Created by Julian-Steffen Müller on 24.01.20. +// + +#ifndef DECONZ2MQTT_MQTT_H +#define DECONZ2MQTT_MQTT_H + +#include "MQTTClient.h" +#include +#include +#include + +struct MQTT_Topic_Subscription { + std::string topic; + std::function onMsgReceive; + + MQTT_Topic_Subscription(std::string topic, std::function onMsgreceive) : topic(topic), onMsgReceive(onMsgreceive) {} +}; + +class MQTT { +private: + MQTTClient mqttClient; + MQTTClient_connectOptions conn_opts; + MQTTClient_willOptions will_opts; + + int retained_default; + + static void connlost(void *context, char *cause); + + static void delivered(void *context, MQTTClient_deliveryToken dt); + + static int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message); + +public: + MQTT(const std::string& serverURI); + MQTT(const std::string& serverURI, int retained_default); + + + std::vector subscriptions; + bool connect(); + bool is_connected(); + + bool last_will(const std::string& willTopic, const std::string& willMessage, int retained = 1); + + void publish(const std::string& topic, std::string message) const; + void publish(const std::string& topic, std::string message, int retained) const; + + void subscribe(const std::string& topic, std::function onMsgReceive); + + ~MQTT(); + +}; + +#endif //DECONZ2MQTT_MQTT_H diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..35e8870 --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,27 @@ +#include "spdlog/spdlog.h" +#include "MQTT.h" +#include +#include + +int main(int argc, const char* argv[]) { + MQTT mqtt("tcp://chimaera:1883"); + + spdlog::info("Establishing mqtt connection to {} on port {}", "chimaera", "1883"); + + if (! mqtt.connect()) { + spdlog::error("Cannot establish MQTT connection"); + return 1; + } + + spdlog::info("Connection established"); + + spdlog::info("Start subscribing"); + + mqtt.subscribe("#", [](const char* topic, const std::string& msg) { + spdlog::info("Received on topic '{}' message '{}'", topic, msg); + }); + + while (true) { + } + return 0; +}