From 19e4e70e97199a60865dd7c96ba7df98ee60aafd Mon Sep 17 00:00:00 2001 From: Pascal Serrarens Date: Mon, 9 Jun 2025 13:09:20 +0200 Subject: [PATCH] Initial Posix MQTT implementation --- CMakeLists.txt | 1 - Messages/TextMsg.h | 2 + Messages/ThingMsg.h | 2 + Participants/ParticipantMQTT.cpp | 150 +++++++++++++++++++++++++++++++ Participants/ParticipantMQTT.h | 128 ++++++++++++++++++++++++++ Participants/ParticipantUDP.cpp | 4 +- Posix/PosixParticipant.cpp | 5 -- test/participant_test.cc | 15 ++++ 8 files changed, 300 insertions(+), 7 deletions(-) create mode 100644 Participants/ParticipantMQTT.cpp create mode 100644 Participants/ParticipantMQTT.h diff --git a/CMakeLists.txt b/CMakeLists.txt index e4a9a1b..a15671c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -61,4 +61,3 @@ else() include(GoogleTest) gtest_discover_tests(RoboidControlTest) endif() - diff --git a/Messages/TextMsg.h b/Messages/TextMsg.h index a5763e6..2d4ae93 100644 --- a/Messages/TextMsg.h +++ b/Messages/TextMsg.h @@ -1,3 +1,5 @@ +#pragma once + #include "IMessage.h" namespace RoboidControl { diff --git a/Messages/ThingMsg.h b/Messages/ThingMsg.h index a26b60d..ce9f9e8 100644 --- a/Messages/ThingMsg.h +++ b/Messages/ThingMsg.h @@ -1,3 +1,5 @@ +#pragma once + #include "IMessage.h" #include "Thing.h" diff --git a/Participants/ParticipantMQTT.cpp b/Participants/ParticipantMQTT.cpp new file mode 100644 index 0000000..1ab9843 --- /dev/null +++ b/Participants/ParticipantMQTT.cpp @@ -0,0 +1,150 @@ +#include "ParticipantMQTT.h" + +#if defined(__unix__) || defined(__APPLE__) +#include +#include // For fcntl +#include +#include +#include +#endif + +namespace RoboidControl { + +ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port) + : Participant("127.0.0.1", port) { + this->name = "ParticipantUDP"; + this->remoteSite = new Participant(ipAddress, port); + Participant::registry.Add(this); + + this->root = Thing::LocalRoot(); // Participant::LocalParticipant->root; + this->root->owner = this; + this->root->name = "UDP Root"; + this->Add(this->root); + + Participant::ReplaceLocalParticipant(*this); + + std::cout << "Setting up MQTT" << std::endl; + // Code for Posix +#if defined(__unix__) || defined(__APPLE__) + + // Create socket + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + std::cerr << "Error creating socket" << std::endl; + return; + } + + // Set up the server address structure + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(port); + if (inet_pton(AF_INET, ipAddress, &server_addr.sin_addr) <= 0) { + std::cerr << "Invalid address" << std::endl; + close(sock); + return; + } + + // Connect to the MQTT broker + if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) { + std::cerr << "Error connecting to broker" << std::endl; + close(sock); + return; + } + // Set the socket to non-blocking mode + int flags = fcntl(sock, F_GETFL, 0); + fcntl(sock, F_SETFL, flags | O_NONBLOCK); + + // Send MQTT connect packet + send_mqtt_connect(sock, "RoboidControl"); + std::cout << "Connected to MQTT broker" << std::endl; +#endif +} + +#define MQTT_CONNECT 0x10 +#define MQTT_CONNECT_ACK 0x20 + +void ParticipantMQTT::send_mqtt_connect(int sockfd, const char* client_id) { + MQTTConnectPacket packet; + packet.fixed_header = MQTT_CONNECT; + packet.remaining_length = + 2 + strlen(client_id) + 3; // Protocol name + protocol level + connect + // flags + keep alive + client ID + packet.protocol_name_length = 4; // "MQTT" + packet.protocol_name = "MQTT"; + packet.protocol_level = 4; // MQTT version 3.1.1 + packet.connect_flags = 0; // Clean session + packet.keep_alive = htons(60); // Keep alive time in seconds + + // Create the MQTT connect packet + uint8_t buffer[256]; + int index = 0; + + buffer[index++] = packet.fixed_header; + buffer[index++] = packet.remaining_length; + buffer[index++] = packet.protocol_name_length; + memcpy(&buffer[index], packet.protocol_name, packet.protocol_name_length); + index += packet.protocol_name_length; + buffer[index++] = packet.protocol_level; + buffer[index++] = packet.connect_flags; + buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB + buffer[index++] = packet.keep_alive & 0xFF; // LSB + uint8_t client_id_length = strlen(client_id); + buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB + buffer[index++] = client_id_length & 0xFF; // LSB + memcpy(&buffer[index], client_id, client_id_length); + index += client_id_length; + + // Send the MQTT connect packet + send(sockfd, buffer, index, 0); +} + +void ParticipantMQTT::sendSubscribe(int sock, const char* topic) { + // Packet Identifier (2 bytes) + static unsigned short packetId = + 1; // Increment this for each new subscription + + // Calculate the length of the topic name + size_t topicLength = strlen(topic); + + // Remaining length = Packet Identifier (2 bytes) + Topic Length (2 bytes) + + // Topic Name + QoS (1 byte) + unsigned char remainingLength = 2 + 2 + topicLength + 1; + + // Construct the SUBSCRIBE packet + unsigned char subscribePacket[3 + topicLength]; // 3 bytes for fixed header + // and packet ID + subscribePacket[0] = 0x82; // Packet type and flags + subscribePacket[1] = remainingLength; // Remaining length + subscribePacket[2] = (packetId >> 8) & 0xFF; // Packet Identifier MSB + subscribePacket[3] = packetId & 0xFF; // Packet Identifier LSB + subscribePacket[4] = (topicLength >> 8) & 0xFF; // Topic Length MSB + subscribePacket[5] = topicLength & 0xFF; // Topic Length LSB + + // Copy the topic name into the packet + memcpy(&subscribePacket[6], topic, topicLength); + subscribePacket[6 + topicLength] = 0x00; // QoS level (0) + + // Send the SUBSCRIBE packet + send(sock, subscribePacket, sizeof(subscribePacket), 0); +} + +void ParticipantMQTT::receiveMessages(int sock) { + unsigned char buffer[1024]; + // Check for incoming messages without blocking + int bytesRead = recv(sock, buffer, sizeof(buffer), MSG_DONTWAIT); + if (bytesRead > 0) { + // Process the incoming MQTT message + std::cout << "Received message: "; + for (int i = 0; i < bytesRead; ++i) { + std::cout << std::hex << (int)buffer[i] << " "; + } + std::cout << std::dec << std::endl; // Reset to decimal output + } else if (bytesRead < 0) { + // If no messages are available, check for errors + if (errno != EAGAIN && errno != EWOULDBLOCK) { + std::cerr << "Error receiving message: " << strerror(errno) << std::endl; + } + } +} + +} // namespace RoboidControl \ No newline at end of file diff --git a/Participants/ParticipantMQTT.h b/Participants/ParticipantMQTT.h new file mode 100644 index 0000000..f6729c8 --- /dev/null +++ b/Participants/ParticipantMQTT.h @@ -0,0 +1,128 @@ +#pragma once + +#include "Messages/BinaryMsg.h" +#include "Messages/DestroyMsg.h" +#include "Messages/InvestigateMsg.h" +#include "Messages/ModelUrlMsg.h" +#include "Messages/NameMsg.h" +#include "Messages/NetworkIdMsg.h" +#include "Messages/ParticipantMsg.h" +#include "Messages/PoseMsg.h" +#include "Messages/TextMsg.h" +#include "Messages/ThingMsg.h" +#include "Participant.h" + +#if !defined(NO_STD) +#include +#include +// #include +#endif + +#if defined(_WIN32) || defined(_WIN64) +#include +#elif defined(__unix__) || defined(__APPLE__) +#include +#include +#include +#include +#endif + +namespace RoboidControl { + +/// @brief A participant using UDP communication +/// A local participant is the local device which can communicate with +/// other participants It manages all local things and communcation with other +/// participants. Each application has a local participant which is usually +/// explicit in the code. An participant can be isolated. In that case it is +/// standalong and does not communicate with other participants. +/// +/// It is possible to work with an hidden participant by creating things without +/// specifying a participant in the constructor. In that case an hidden isolated +/// participant is created which can be obtained using +/// RoboidControl::IsolatedParticipant::Isolated(). +/// @sa RoboidControl::Thing::Thing() +class ParticipantMQTT : public Participant { +#pragma region Init + + public: + /// @brief Create a participant which will try to connect to a MQTT server. + /// @param ipAddress The IP address of the MQTT server + /// @param port The port used by the MQTT server + ParticipantMQTT(const char* ipAddress, int port = 1883); + +#pragma endregion Init + +#pragma region Properties + + public: + /// @brief The remote site when this participant is connected to a site + Participant* remoteSite = nullptr; + + protected: +#if defined(__unix__) || defined(__APPLE__) + sockaddr_in server_addr; + int sock; +#endif + + public: + // void Begin(); + bool connected = false; + +#pragma endregion Properties + +#pragma region Update + + public: + // virtual void Update() override; + +#pragma endregion Update + +#pragma region Send + + protected: + // MQTT Connect Packet Structure + struct MQTTConnectPacket { + uint8_t fixed_header; + uint8_t remaining_length; + uint8_t protocol_name_length; + const char* protocol_name; + uint8_t protocol_level; + uint8_t connect_flags; + uint16_t keep_alive; + const char* client_id; + }; + + void send_mqtt_connect(int sockfd, const char* client_id); + void sendSubscribe(int sock, const char* topic); +#pragma endregion Send + +#pragma region Receive + + protected: + void receiveMessages(int sock); + // void ReceiveData(unsigned char bufferSize, + // char* senderIpAddress, + // unsigned int senderPort); + // void ReceiveData(unsigned char bufferSize, Participant* + // remoteParticipant); + + // void SetupUDP(int localPort, const char* remoteIpAddress, int + // remotePort); + + // void ReceiveUDP(); + + // virtual void Process(Participant* sender, ParticipantMsg* msg); + // virtual void Process(Participant* sender, NetworkIdMsg* msg); + // virtual void Process(Participant* sender, InvestigateMsg* msg); + // virtual void Process(Participant* sender, ThingMsg* msg); + // virtual void Process(Participant* sender, NameMsg* msg); + // virtual void Process(Participant* sender, ModelUrlMsg* msg); + // virtual void Process(Participant* sender, PoseMsg* msg); + // virtual void Process(Participant* sender, BinaryMsg* msg); + // virtual void Process(Participant* sender, TextMsg* msg); + // virtual void Process(Participant* sender, DestroyMsg* msg); + +#pragma endregion Receive +}; + +} // namespace RoboidControl diff --git a/Participants/ParticipantUDP.cpp b/Participants/ParticipantUDP.cpp index fc99aae..3289e79 100644 --- a/Participants/ParticipantUDP.cpp +++ b/Participants/ParticipantUDP.cpp @@ -232,7 +232,9 @@ void ParticipantUDP::SendThingInfo(Participant* remoteParticipant, bool ParticipantUDP::Send(IMessage* msg) { if (this->remoteSite != nullptr) - this->remoteSite->Send(msg); + return this->remoteSite->Send(msg); + + return false; } void ParticipantUDP::PublishThingInfo(Thing* thing) { diff --git a/Posix/PosixParticipant.cpp b/Posix/PosixParticipant.cpp index a01a059..011a759 100644 --- a/Posix/PosixParticipant.cpp +++ b/Posix/PosixParticipant.cpp @@ -24,13 +24,8 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot } // Set the socket to non-blocking mode -#if defined(_WIN32) || defined(_WIN64) - u_long mode = 1; // 1 to enable non-blocking socket - ioctlsocket(this->sock, FIONBIO, &mode); -#elif defined(__unix__) || defined(__APPLE__) int flags = fcntl(this->sock, F_GETFL, 0); fcntl(this->sock, F_SETFL, flags | O_NONBLOCK); -#endif if (remotePort != 0) { // Set up the address to send to diff --git a/test/participant_test.cc b/test/participant_test.cc index 13831bd..d0bc3a7 100644 --- a/test/participant_test.cc +++ b/test/participant_test.cc @@ -5,6 +5,7 @@ #include #include "Participants/SiteServer.h" +#include "Participants/ParticipantMQTT.h" #include "Thing.h" #include @@ -75,4 +76,18 @@ TEST(Participant, ThingMsg) { SUCCEED(); } +TEST(Participant, MQTT) { + ParticipantMQTT* participant = new ParticipantMQTT("192.168.77.11"); + + unsigned long milliseconds = Thing::GetTimeMs(); + unsigned long startTime = milliseconds; + while (milliseconds < startTime + 2000) { + participant->Update(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + milliseconds = Thing::GetTimeMs(); + } + SUCCEED(); +} + #endif