From 67300f1c104793e3b4788d0e48280c5d9f46b1c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dr=2E=20Julian-Steffen=20M=C3=BCller?= Date: Wed, 25 Mar 2020 17:11:51 +0100 Subject: [PATCH] Initial MQTT explorer --- .gitignore | 1 + CMakeLists.txt | 19 ++++ src/.MQTT.cpp.swp | Bin 0 -> 16384 bytes src/.main.cpp.swp | Bin 0 -> 16384 bytes src/MQTT.cpp | 227 ++++++++++++++++++++++++++++++++++++++++++++++ src/MQTT.h | 54 +++++++++++ src/main.cpp | 27 ++++++ 7 files changed, 328 insertions(+) create mode 100644 .gitignore create mode 100644 CMakeLists.txt create mode 100644 src/.MQTT.cpp.swp create mode 100644 src/.main.cpp.swp create mode 100644 src/MQTT.cpp create mode 100644 src/MQTT.h create mode 100644 src/main.cpp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..378eac2 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..3cd2e6e --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.10) +project(mqttExplorer) + +set(CMAKE_CXX_STANDARD 17) + +#PAHO-MQTT-C Library +#USUALLY INSTALLED TO /usr/local/include /usr/local/lib and /usr/local/bin +include_directories(/usr/local/include) +link_directories(/usr/local/lib) +set(LIBS ${LIBS} paho-mqtt3cs) + +find_package(spdlog CONFIG REQUIRED) +find_package(ZLIB REQUIRED) + +add_executable(mqttExplorer src/main.cpp src/MQTT.cpp) + +target_link_libraries(mqttExplorer ${LIBS} spdlog::spdlog ZLIB::ZLIB) + +install(TARGETS mqttExplorer DESTINATION bin) diff --git a/src/.MQTT.cpp.swp b/src/.MQTT.cpp.swp new file mode 100644 index 0000000000000000000000000000000000000000..c2c770c1662ac801de6fe96d29fd4588d013411f GIT binary patch literal 16384 zcmeI3U2Ggz6~`wj6kL+DqOB^mqPpH~?EUiY+M#N+j!jd?Y90Bbv7JaIDa&}~u6N?` z>@qVOCvjaQ1P?S6M5IDNfZ%~NB79XXsz6X4B2h#okWvZp0uL=JQThOg7Z9TGKlk4G zTCZ*L2GXqh%kIqFd+xdCocndgU8*6hii<)-2V_UjI z54Qgt73I8{M;_n4+|yEEWzzB2EMvv)ILnpQPx$`i*|yzsxm)pEqcU@Baj|T)+uMsu zpQaW_EpRgy==hDHgL}0DBg0jymd*6DJ3n~*W+bF#QwyXPNG*_BAhkehfz$%21yT#7 z7WhB6fZyJ&eG+}xt@`n0^}Va>`xbRQt-hb{daf?huhasm1yT#77Dz3SS|GJRYJt=O zsRdFCq!vgmkXqn>&;q)tX%9lL?@_RnKi{WmKLbAi-v`ftvtSK80`g!0><91e)wDOjHSh{}8N388gBQR>&;+yKD0p{| zrdV}N(6k%i58w@O37i9`z;W;>I1KIsw}R`pY1*6M7vQVl4EQKm z1pB~0`ZVqL;2L-Zd={(#11y3Wa1h)J?gF=g-@RYcu7VfASHP#hX<&jmFbs;I0RHqo zP5TYF3@(6AgEg=MI55G(APepXcZ0v(3Z1}<;A`MBzyVKyQSczBg8kq=a67p1UQPQg z_%YZ3Pk=G-5GVo$_JBXX2W7$A;4SbfxB@PNFM?$-1E#<@7zH)J!2RGu;2k82E`v+p z+h7UY0rr8tfa2`CL9*#)$AjhgjQ&2PR$KL!b=N&(=FYS%lNF4Xw^Yw+}6BCmQ3!|FWx1sg*3ELKnqxeaen~G1(o;W;BPpw8yls#E}l8ueA!Qvo$ z>M7}(;8u1pzpnMIH7%P*W&6iim2Q*tg3G;*?Xxk~&~1-HETkrjl2EU+L?T7apsG+# z{If~Lut!io)`ahNs0Dl=PxcT$+bu8nEv0P|7xq~6nJTbxR%HVNLa(7=q1a$W^c6@UZRtjvQ$&D4(Xt|7+m&^FkVyn7uG%aVDE-G18 zs8HF^vI)yU02zWhC0dha@WrL5C)|RlQ&GLWr6KKvZet#@oOTD3&=TeiN9jXq2%VL^ zVzHD6COLZ0UbjzU;Y6K@{5O~#41!9=R1g(%0I7n(@PTBgx!B|`qRorc(=k0ZaiiVB zOes~zmEYuKAan>5on?NcyBFO`?(B78ODK_uV{{=ej1%5h0k8ZV-lkC_CyiKfHL}E= z(9KdnRov_uCY2?u5Q$JJhQShfJ#fd(cQmLOpBd5b@b0t>`MqaJ61l}pNsZC0 zqD+ zNFf{PvZR|VgEq1micQaOt+rVGa%>7VQ&!=_uCox`>}u7VM#!YnC1K68d` zZtL$Z@mnv7y}g~7!QsMiRf$d9iCJa~J8T&eL3iZ@6N&HXmtuxtIwlVlD|$~KZRLlq z_HRBaiX!x=b-f~g_lOV`?cvB0&Wmose&K4^HRdq0vP75HEZa16*OU_p_hi%y>vqRj z*th@M*S+R%O}G&opK7r2>TV~DWeRoZA8m_h;(Q=nE6{z4lm`a$)PE=yUs7bT&`f0I1qy3_4Y!4>(8Jn#MiP82XwwLRd8@xQ*qAi%gI?wAqqm_G9Ti3$v zeO*lHIc#h_U!4M=6AlEi8$vm(ajllkRP%Waem&BH2X*Ov=}}z814yoaS(9TDP+@J&hozkKLQ)zVUPu{;@ti+ptJoq!8gFy!6|SO+zZ~pS^Wl}v;1Qq z19pR5;8~p0FMwx&2bMtt+zGDWO#U4B0{A@G0PEm9xCi_bXY#AyBG5q%+zsvm|Hj#y z&g*{%Z-YOAx4>)QdGIB00o()L#M%2h;4*jtTmq-S5D3oZ*KpRp0*->mz@y+Ha2ZvoZUs z{n--V&cdPRW9LP$%vaksBCfuq8!K{~T*AI+##^4}21hnGse07V4$3z=j!4HIqJGDA zqJ>}=8tdXfemvPEvd@)@h`3{O3#zxlCOAqCTi%3>Q>2iiD2KjcPb-7c^6F5Of*FNk zv8kk~-L@EGGTlu>Q9YEILh+35lAL<0?R#Y$cx(jQ0DX%)a*$G?@K!kp0Tc{=_9~{l zi&9i|Mkd#z_$8V2iR6ks&G7Qw)`fqD3?jIVA@Q)oH&W3{M`JL z>b;2ae2u9%!P@}`$#P`WO=yGH7pggB-Dl1g|i@=9zVjFEhv z4wl03(^HePi@}=$5rRS{Ud6aZB31*5u`rE!&g1@sZre0Wy+os)ultr!H{cmD*b3u0 zN@8Lbl|z8U((8@j8Z)JMTNGS}?_n@MA@*p?s$kvdap9ZhGm#U&sR8aAEsA6 zQ+VU#B0GvgBRG_kl$h#wp}YJw4Pj+=Yr{rlIj*bOv)Zgmm8umN-(VSKx=8Nd|TxQ8-jO zV@C;jRulcs#bj0?0J{V^6jaPAUqr(r`bE~qJY9~@;&Nu66>kA!V>XwQ?+Ej18X6jJ z>*ws2j&-*C4i@J;B7}s~hsK1Mie#>Vd(=tw2272z^noioTRu&8pDl!Zuyf(yjVqn&_M9N((PB=OG+GhlU4a@uX(WtMX&HqLzv04_j99@j^~F6eDgLay&1oTDM#%gs0-Q+yU-DGY%B3 z**p8&$=<%+9@;D_?h=0qy{IfIGk);0|yH{^Jf_4sZv!1OGw?q%mp^!I;3; z;Bzn!4uc8M3)X{QHV|?NdOUSR_BKQHUf>(hACc(Yn7I5hr zLe7IHK?%si!71<_cn3TO#z6|~2Rp&7U=!E~zG{PCUJxe%90FA@RC9)q9;-_*g;*>6xmWSR>icWnVf27m9(S85qk#o(&O~fD~6#PF>z4R zG~KE+6NX2}#|4bfO-PeO)L9|6ORx{mt%-Fw-g(&^(~@527^f|F6L|v|emFHcog5pR z4g^vYPeKQ)T!)WqOmGL^!B>(vCZR@&I-M#P@#&d!h&^n`(p6`{Qw z=y7NyVpInk>btrLoJuv+Oty#Y8%Z#a>kTRz%~&@_(1OsE)jhJMP4rc7ZtYobA`hL# zfP84=!2XBmWunf;0+sgp#g5XmF8su3q+OWCsS8ttotBNY3sVLA_o;nF=K?QZiY!y= z&S0}*x7+I8oxSQ9y^y?{gTE7|iW#);D|Ss1Y>cSMYJ=Q<#nOT@Y z*9>2suNXK1nwsvgL^u>Tm$!Al9NgWSs59rxVBOhVL(RDDGg}0M!H9D{)``~%r<^#m zmKm%)&^1Ji|O~ z%y|yO{W7%D%Ub*N;vRYO;AOiYszZtXv z+D8lc9{nwYx4@g=4ZuI#0qy{IfIGk);0|yHxC7h)?f`e-f8{{s=Q4Zct5m8ARPC&} zF4mA5JyiFxW?H`7_D?Lg*$aV|E-aMj>SaO40^&jmYmuUyRr8W!NVMGB-@n&cA+(1; r7x!YKB^MI$xF<_^8db9d`k(YMT9Ay4+(j4m%8Qza%uP +#include "MQTT.h" +#include "string.h" +#include +#include "spdlog/spdlog.h" + +#define MOSQ_ERR_SUCCESS 1 +#define MOSQ_ERR_INVAL 2 + +/* Does a topic match a subscription? */ +int topic_match(const char *sub, const char *topic, bool *result) +{ + size_t spos; + + if(!result) return MOSQ_ERR_INVAL; + *result = false; + + if(!sub || !topic || sub[0] == 0 || topic[0] == 0){ + return MOSQ_ERR_INVAL; + } + + if((sub[0] == '$' && topic[0] != '$') + || (topic[0] == '$' && sub[0] != '$')){ + + return MOSQ_ERR_SUCCESS; + } + + spos = 0; + + while(sub[0] != 0){ + if(topic[0] == '+' || topic[0] == '#'){ + return MOSQ_ERR_INVAL; + } + if(sub[0] != topic[0] || topic[0] == 0){ /* Check for wildcard matches */ + if(sub[0] == '+'){ + /* Check for bad "+foo" or "a/+foo" subscription */ + if(spos > 0 && sub[-1] != '/'){ + return MOSQ_ERR_INVAL; + } + /* Check for bad "foo+" or "foo+/a" subscription */ + if(sub[1] != 0 && sub[1] != '/'){ + return MOSQ_ERR_INVAL; + } + spos++; + sub++; + while(topic[0] != 0 && topic[0] != '/'){ + if(topic[0] == '+' || topic[0] == '#'){ + return MOSQ_ERR_INVAL; + } + topic++; + } + if(topic[0] == 0 && sub[0] == 0){ + *result = true; + return MOSQ_ERR_SUCCESS; + } + }else if(sub[0] == '#'){ + /* Check for bad "foo#" subscription */ + if(spos > 0 && sub[-1] != '/'){ + return MOSQ_ERR_INVAL; + } + /* Check for # not the final character of the sub, e.g. "#foo" */ + if(sub[1] != 0){ + return MOSQ_ERR_INVAL; + }else{ + while(topic[0] != 0){ + if(topic[0] == '+' || topic[0] == '#'){ + return MOSQ_ERR_INVAL; + } + topic++; + } + *result = true; + return MOSQ_ERR_SUCCESS; + } + }else{ + /* Check for e.g. foo/bar matching foo/+/# */ + if(topic[0] == 0 + && spos > 0 + && sub[-1] == '+' + && sub[0] == '/' + && sub[1] == '#') + { + *result = true; + return MOSQ_ERR_SUCCESS; + } + + /* There is no match at this point, but is the sub invalid? */ + while(sub[0] != 0){ + if(sub[0] == '#' && sub[1] != 0){ + return MOSQ_ERR_INVAL; + } + spos++; + sub++; + } + + /* Valid input, but no match */ + return MOSQ_ERR_SUCCESS; + } + }else{ + /* sub[spos] == topic[tpos] */ + if(topic[1] == 0){ + /* Check for e.g. foo matching foo/# */ + if(sub[1] == '/' + && sub[2] == '#' + && sub[3] == 0){ + *result = true; + return MOSQ_ERR_SUCCESS; + } + } + spos++; + sub++; + topic++; + if(sub[0] == 0 && topic[0] == 0){ + *result = true; + return MOSQ_ERR_SUCCESS; + }else if(topic[0] == 0 && sub[0] == '+' && sub[1] == 0){ + if(spos > 0 && sub[-1] != '/'){ + return MOSQ_ERR_INVAL; + } + spos++; + sub++; + *result = true; + return MOSQ_ERR_SUCCESS; + } + } + } + if((topic[0] != 0 || sub[0] != 0)){ + *result = false; + } + while(topic[0] != 0){ + if(topic[0] == '+' || topic[0] == '#'){ + return MOSQ_ERR_INVAL; + } + topic++; + } + + return MOSQ_ERR_SUCCESS; +} + +void MQTT::connlost(void *context, char *cause) { + std::cout << "Connection lost: cause: " << cause << std::endl; +} + +void MQTT::delivered(void *context, MQTTClient_deliveryToken dt) { + //std::cout << "Message with token value %d delivery confirmed" << std::endl; +} + +int MQTT::msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { + MQTT* mqttClient = static_cast(context); + + bool topic_matches; + for (const auto& sub : mqttClient->subscriptions) { + topic_match(sub.topic.c_str(), topicName, &topic_matches); + if (topic_matches) { + sub.onMsgReceive(topicName, std::string((char *)message->payload, message->payloadlen)); + } + } + return 1; +} + +MQTT::MQTT(const std::string& serverURI) : MQTT(serverURI, 0) {} + +MQTT::MQTT(const std::string& serverURI, int retained_default) { + this->retained_default = retained_default; + + conn_opts = MQTTClient_connectOptions_initializer; + MQTTClient_create(&mqttClient, serverURI.c_str(), "ClientC++-jdj", MQTTCLIENT_PERSISTENCE_NONE, NULL); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + + MQTTClient_setCallbacks(mqttClient, static_cast(this), MQTT::connlost, MQTT::msgarrvd, MQTT::delivered); +} + +bool MQTT::connect() { + int rc; + + if ((rc = MQTTClient_connect(mqttClient, &conn_opts)) != MQTTCLIENT_SUCCESS) { + std::cout << "[ERROR] MQTT::connect: Connectionn failed with return code: " << rc << std::endl; + return false; + } + + return true; +} + +bool MQTT::last_will(const std::string& willTopic, const std::string& willMessage, int retained) { + if (! is_connected()) { + will_opts = MQTTClient_willOptions_initializer; + will_opts.topicName = willTopic.c_str(); + will_opts.message = willMessage.c_str(); + will_opts.retained = retained; + conn_opts.will = &will_opts; + + return true; + } + + return false; +} + +bool MQTT::is_connected() { + return MQTTClient_isConnected(mqttClient); +} +//TODO QOS +void MQTT::subscribe(const std::string& topic, std::function onMsgReceive) { + subscriptions.emplace_back(topic, onMsgReceive); + MQTTClient_subscribe(mqttClient, topic.c_str(), 0); +} + + +void MQTT::publish(const std::string& topic, std::string message) const { + publish(topic, message, retained_default); +} + +void MQTT::publish(const std::string &topic, std::string message, int retained) const { + const char *msg = message.c_str(); + MQTTClient_publish(mqttClient, topic.c_str(), strlen(msg), (void *) msg, 1, retained, NULL); +} + +MQTT::~MQTT() { + spdlog::trace("Closing mqtt connection '{}'", "TODO tcp://chimaera:1883"); + MQTTClient_disconnect(mqttClient, 10000); + MQTTClient_destroy(&mqttClient); + spdlog::trace("Closed"); + +} diff --git a/src/MQTT.h b/src/MQTT.h new file mode 100644 index 0000000..5b87766 --- /dev/null +++ b/src/MQTT.h @@ -0,0 +1,54 @@ +// +// Created by Julian-Steffen Müller on 24.01.20. +// + +#ifndef DECONZ2MQTT_MQTT_H +#define DECONZ2MQTT_MQTT_H + +#include "MQTTClient.h" +#include +#include +#include + +struct MQTT_Topic_Subscription { + std::string topic; + std::function onMsgReceive; + + MQTT_Topic_Subscription(std::string topic, std::function onMsgreceive) : topic(topic), onMsgReceive(onMsgreceive) {} +}; + +class MQTT { +private: + MQTTClient mqttClient; + MQTTClient_connectOptions conn_opts; + MQTTClient_willOptions will_opts; + + int retained_default; + + static void connlost(void *context, char *cause); + + static void delivered(void *context, MQTTClient_deliveryToken dt); + + static int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message); + +public: + MQTT(const std::string& serverURI); + MQTT(const std::string& serverURI, int retained_default); + + + std::vector subscriptions; + bool connect(); + bool is_connected(); + + bool last_will(const std::string& willTopic, const std::string& willMessage, int retained = 1); + + void publish(const std::string& topic, std::string message) const; + void publish(const std::string& topic, std::string message, int retained) const; + + void subscribe(const std::string& topic, std::function onMsgReceive); + + ~MQTT(); + +}; + +#endif //DECONZ2MQTT_MQTT_H diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..35e8870 --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,27 @@ +#include "spdlog/spdlog.h" +#include "MQTT.h" +#include +#include + +int main(int argc, const char* argv[]) { + MQTT mqtt("tcp://chimaera:1883"); + + spdlog::info("Establishing mqtt connection to {} on port {}", "chimaera", "1883"); + + if (! mqtt.connect()) { + spdlog::error("Cannot establish MQTT connection"); + return 1; + } + + spdlog::info("Connection established"); + + spdlog::info("Start subscribing"); + + mqtt.subscribe("#", [](const char* topic, const std::string& msg) { + spdlog::info("Received on topic '{}' message '{}'", topic, msg); + }); + + while (true) { + } + return 0; +}