Initial MQTT explorer
This commit is contained in:
BIN
src/.MQTT.cpp.swp
Normal file
BIN
src/.MQTT.cpp.swp
Normal file
Binary file not shown.
BIN
src/.main.cpp.swp
Normal file
BIN
src/.main.cpp.swp
Normal file
Binary file not shown.
227
src/MQTT.cpp
Normal file
227
src/MQTT.cpp
Normal file
@@ -0,0 +1,227 @@
|
||||
//
|
||||
// Created by Julian-Steffen Müller on 24.01.20.
|
||||
//
|
||||
|
||||
#include <iostream>
|
||||
#include "MQTT.h"
|
||||
#include "string.h"
|
||||
#include <string>
|
||||
#include "spdlog/spdlog.h"
|
||||
|
||||
#define MOSQ_ERR_SUCCESS 1
|
||||
#define MOSQ_ERR_INVAL 2
|
||||
|
||||
/* Does a topic match a subscription? */
|
||||
int topic_match(const char *sub, const char *topic, bool *result)
|
||||
{
|
||||
size_t spos;
|
||||
|
||||
if(!result) return MOSQ_ERR_INVAL;
|
||||
*result = false;
|
||||
|
||||
if(!sub || !topic || sub[0] == 0 || topic[0] == 0){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
|
||||
if((sub[0] == '$' && topic[0] != '$')
|
||||
|| (topic[0] == '$' && sub[0] != '$')){
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
spos = 0;
|
||||
|
||||
while(sub[0] != 0){
|
||||
if(topic[0] == '+' || topic[0] == '#'){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
if(sub[0] != topic[0] || topic[0] == 0){ /* Check for wildcard matches */
|
||||
if(sub[0] == '+'){
|
||||
/* Check for bad "+foo" or "a/+foo" subscription */
|
||||
if(spos > 0 && sub[-1] != '/'){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
/* Check for bad "foo+" or "foo+/a" subscription */
|
||||
if(sub[1] != 0 && sub[1] != '/'){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
spos++;
|
||||
sub++;
|
||||
while(topic[0] != 0 && topic[0] != '/'){
|
||||
if(topic[0] == '+' || topic[0] == '#'){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
topic++;
|
||||
}
|
||||
if(topic[0] == 0 && sub[0] == 0){
|
||||
*result = true;
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
}else if(sub[0] == '#'){
|
||||
/* Check for bad "foo#" subscription */
|
||||
if(spos > 0 && sub[-1] != '/'){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
/* Check for # not the final character of the sub, e.g. "#foo" */
|
||||
if(sub[1] != 0){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}else{
|
||||
while(topic[0] != 0){
|
||||
if(topic[0] == '+' || topic[0] == '#'){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
topic++;
|
||||
}
|
||||
*result = true;
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
}else{
|
||||
/* Check for e.g. foo/bar matching foo/+/# */
|
||||
if(topic[0] == 0
|
||||
&& spos > 0
|
||||
&& sub[-1] == '+'
|
||||
&& sub[0] == '/'
|
||||
&& sub[1] == '#')
|
||||
{
|
||||
*result = true;
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
/* There is no match at this point, but is the sub invalid? */
|
||||
while(sub[0] != 0){
|
||||
if(sub[0] == '#' && sub[1] != 0){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
spos++;
|
||||
sub++;
|
||||
}
|
||||
|
||||
/* Valid input, but no match */
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
}else{
|
||||
/* sub[spos] == topic[tpos] */
|
||||
if(topic[1] == 0){
|
||||
/* Check for e.g. foo matching foo/# */
|
||||
if(sub[1] == '/'
|
||||
&& sub[2] == '#'
|
||||
&& sub[3] == 0){
|
||||
*result = true;
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
}
|
||||
spos++;
|
||||
sub++;
|
||||
topic++;
|
||||
if(sub[0] == 0 && topic[0] == 0){
|
||||
*result = true;
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}else if(topic[0] == 0 && sub[0] == '+' && sub[1] == 0){
|
||||
if(spos > 0 && sub[-1] != '/'){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
spos++;
|
||||
sub++;
|
||||
*result = true;
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
if((topic[0] != 0 || sub[0] != 0)){
|
||||
*result = false;
|
||||
}
|
||||
while(topic[0] != 0){
|
||||
if(topic[0] == '+' || topic[0] == '#'){
|
||||
return MOSQ_ERR_INVAL;
|
||||
}
|
||||
topic++;
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
void MQTT::connlost(void *context, char *cause) {
|
||||
std::cout << "Connection lost: cause: " << cause << std::endl;
|
||||
}
|
||||
|
||||
void MQTT::delivered(void *context, MQTTClient_deliveryToken dt) {
|
||||
//std::cout << "Message with token value %d delivery confirmed" << std::endl;
|
||||
}
|
||||
|
||||
int MQTT::msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
|
||||
MQTT* mqttClient = static_cast<MQTT *>(context);
|
||||
|
||||
bool topic_matches;
|
||||
for (const auto& sub : mqttClient->subscriptions) {
|
||||
topic_match(sub.topic.c_str(), topicName, &topic_matches);
|
||||
if (topic_matches) {
|
||||
sub.onMsgReceive(topicName, std::string((char *)message->payload, message->payloadlen));
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
MQTT::MQTT(const std::string& serverURI) : MQTT(serverURI, 0) {}
|
||||
|
||||
MQTT::MQTT(const std::string& serverURI, int retained_default) {
|
||||
this->retained_default = retained_default;
|
||||
|
||||
conn_opts = MQTTClient_connectOptions_initializer;
|
||||
MQTTClient_create(&mqttClient, serverURI.c_str(), "ClientC++-jdj", MQTTCLIENT_PERSISTENCE_NONE, NULL);
|
||||
conn_opts.keepAliveInterval = 20;
|
||||
conn_opts.cleansession = 1;
|
||||
|
||||
MQTTClient_setCallbacks(mqttClient, static_cast<void*>(this), MQTT::connlost, MQTT::msgarrvd, MQTT::delivered);
|
||||
}
|
||||
|
||||
bool MQTT::connect() {
|
||||
int rc;
|
||||
|
||||
if ((rc = MQTTClient_connect(mqttClient, &conn_opts)) != MQTTCLIENT_SUCCESS) {
|
||||
std::cout << "[ERROR] MQTT::connect: Connectionn failed with return code: " << rc << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool MQTT::last_will(const std::string& willTopic, const std::string& willMessage, int retained) {
|
||||
if (! is_connected()) {
|
||||
will_opts = MQTTClient_willOptions_initializer;
|
||||
will_opts.topicName = willTopic.c_str();
|
||||
will_opts.message = willMessage.c_str();
|
||||
will_opts.retained = retained;
|
||||
conn_opts.will = &will_opts;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool MQTT::is_connected() {
|
||||
return MQTTClient_isConnected(mqttClient);
|
||||
}
|
||||
//TODO QOS
|
||||
void MQTT::subscribe(const std::string& topic, std::function<void(const char*, const std::string&)> onMsgReceive) {
|
||||
subscriptions.emplace_back(topic, onMsgReceive);
|
||||
MQTTClient_subscribe(mqttClient, topic.c_str(), 0);
|
||||
}
|
||||
|
||||
|
||||
void MQTT::publish(const std::string& topic, std::string message) const {
|
||||
publish(topic, message, retained_default);
|
||||
}
|
||||
|
||||
void MQTT::publish(const std::string &topic, std::string message, int retained) const {
|
||||
const char *msg = message.c_str();
|
||||
MQTTClient_publish(mqttClient, topic.c_str(), strlen(msg), (void *) msg, 1, retained, NULL);
|
||||
}
|
||||
|
||||
MQTT::~MQTT() {
|
||||
spdlog::trace("Closing mqtt connection '{}'", "TODO tcp://chimaera:1883");
|
||||
MQTTClient_disconnect(mqttClient, 10000);
|
||||
MQTTClient_destroy(&mqttClient);
|
||||
spdlog::trace("Closed");
|
||||
|
||||
}
|
||||
54
src/MQTT.h
Normal file
54
src/MQTT.h
Normal file
@@ -0,0 +1,54 @@
|
||||
//
|
||||
// Created by Julian-Steffen Müller on 24.01.20.
|
||||
//
|
||||
|
||||
#ifndef DECONZ2MQTT_MQTT_H
|
||||
#define DECONZ2MQTT_MQTT_H
|
||||
|
||||
#include "MQTTClient.h"
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
#include<string>
|
||||
|
||||
struct MQTT_Topic_Subscription {
|
||||
std::string topic;
|
||||
std::function<void(const char*, const std::string& )> onMsgReceive;
|
||||
|
||||
MQTT_Topic_Subscription(std::string topic, std::function<void(const char*, std::string)> onMsgreceive) : topic(topic), onMsgReceive(onMsgreceive) {}
|
||||
};
|
||||
|
||||
class MQTT {
|
||||
private:
|
||||
MQTTClient mqttClient;
|
||||
MQTTClient_connectOptions conn_opts;
|
||||
MQTTClient_willOptions will_opts;
|
||||
|
||||
int retained_default;
|
||||
|
||||
static void connlost(void *context, char *cause);
|
||||
|
||||
static void delivered(void *context, MQTTClient_deliveryToken dt);
|
||||
|
||||
static int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message);
|
||||
|
||||
public:
|
||||
MQTT(const std::string& serverURI);
|
||||
MQTT(const std::string& serverURI, int retained_default);
|
||||
|
||||
|
||||
std::vector<MQTT_Topic_Subscription> subscriptions;
|
||||
bool connect();
|
||||
bool is_connected();
|
||||
|
||||
bool last_will(const std::string& willTopic, const std::string& willMessage, int retained = 1);
|
||||
|
||||
void publish(const std::string& topic, std::string message) const;
|
||||
void publish(const std::string& topic, std::string message, int retained) const;
|
||||
|
||||
void subscribe(const std::string& topic, std::function<void(const char*, const std::string&)> onMsgReceive);
|
||||
|
||||
~MQTT();
|
||||
|
||||
};
|
||||
|
||||
#endif //DECONZ2MQTT_MQTT_H
|
||||
27
src/main.cpp
Normal file
27
src/main.cpp
Normal file
@@ -0,0 +1,27 @@
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "MQTT.h"
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
int main(int argc, const char* argv[]) {
|
||||
MQTT mqtt("tcp://chimaera:1883");
|
||||
|
||||
spdlog::info("Establishing mqtt connection to {} on port {}", "chimaera", "1883");
|
||||
|
||||
if (! mqtt.connect()) {
|
||||
spdlog::error("Cannot establish MQTT connection");
|
||||
return 1;
|
||||
}
|
||||
|
||||
spdlog::info("Connection established");
|
||||
|
||||
spdlog::info("Start subscribing");
|
||||
|
||||
mqtt.subscribe("#", [](const char* topic, const std::string& msg) {
|
||||
spdlog::info("Received on topic '{}' message '{}'", topic, msg);
|
||||
});
|
||||
|
||||
while (true) {
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
Reference in New Issue
Block a user