From 26a80cd94a614bf926392109993b02d63e9b131e Mon Sep 17 00:00:00 2001 From: Pascal Serrarens Date: Sat, 28 Jun 2025 09:15:16 +0200 Subject: [PATCH] Improvements --- Participants/ParticipantMQTT.cpp | 83 +++++++++-------------- Participants/ParticipantMQTT.h | 1 + Posix/PosixParticipant.cpp | 111 ++++++++++++++++++++++++++----- Posix/PosixParticipant.h | 4 ++ test/participant_test.cc | 2 +- 5 files changed, 135 insertions(+), 66 deletions(-) diff --git a/Participants/ParticipantMQTT.cpp b/Participants/ParticipantMQTT.cpp index 4917224..26e3675 100644 --- a/Participants/ParticipantMQTT.cpp +++ b/Participants/ParticipantMQTT.cpp @@ -12,6 +12,8 @@ namespace RoboidControl { #if defined(_WIN32) || defined(_WIN64) Windows::ParticipantUDP thisWindows = Windows::ParticipantUDP(); +#elif defined(__unix__) || defined(__APPLE__) +Posix::ParticipantUDP thisWindows = Posix::ParticipantUDP(); #endif ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port) @@ -30,43 +32,9 @@ ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port) thisWindows.SetupTCP(ipAddress, port); - std::cout << "Setting up MQTT" << std::endl; send_mqtt_connect("RoboidControl"); - - // Code for Posix -#if defined(__unix__) || defined(__APPLE__) - - // Create socket - int sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) { - std::cerr << "Error creating socket" << std::endl; - return; - } - - // Set up the server address structure - memset(&server_addr, 0, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(port); - if (inet_pton(AF_INET, ipAddress, &server_addr.sin_addr) <= 0) { - std::cerr << "Invalid address" << std::endl; - close(sock); - return; - } - - // Connect to the MQTT broker - if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) { - std::cerr << "Error connecting to broker" << std::endl; - close(sock); - return; - } - // Set the socket to non-blocking mode - int flags = fcntl(sock, F_GETFL, 0); - fcntl(sock, F_SETFL, flags | O_NONBLOCK); - - // Send MQTT connect packet - send_mqtt_connect(sock, "RoboidControl"); - std::cout << "Connected to MQTT broker" << std::endl; -#endif + sendSubscribe("domoticz/out"); + // #endif } #define MQTT_CONNECT 0x10 @@ -75,13 +43,13 @@ ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port) void ParticipantMQTT::send_mqtt_connect(const char* client_id) { MQTTConnectPacket packet; packet.fixed_header = MQTT_CONNECT; - packet.remaining_length = - 2 + strlen(client_id) + 3; // Protocol name + protocol level + connect - // flags + keep alive + client ID + packet.remaining_length = //14; + 2 + 4 + 2+ strlen(client_id) + 3; // Protocol name + protocol level + connect + // flags + keep alive + client ID packet.protocol_name_length = 4; // "MQTT" packet.protocol_name = "MQTT"; packet.protocol_level = 4; // MQTT version 3.1.1 - packet.connect_flags = 0; // Clean session + packet.connect_flags = 2; // Clean session packet.keep_alive = htons(60); // Keep alive time in seconds // Create the MQTT connect packet @@ -89,22 +57,31 @@ void ParticipantMQTT::send_mqtt_connect(const char* client_id) { this->buffer[index++] = packet.fixed_header; this->buffer[index++] = packet.remaining_length; + this->buffer[index++] = 0; // MSB protocol_name_length this->buffer[index++] = packet.protocol_name_length; memcpy(&this->buffer[index], packet.protocol_name, packet.protocol_name_length); index += packet.protocol_name_length; this->buffer[index++] = packet.protocol_level; this->buffer[index++] = packet.connect_flags; - this->buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB this->buffer[index++] = packet.keep_alive & 0xFF; // LSB - size_t client_id_length = strlen(client_id); - this->buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB - this->buffer[index++] = client_id_length & 0xFF; // LSB - memcpy(&this->buffer[index], client_id, client_id_length); - index += client_id_length; + this->buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB +// this->buffer[index++] = 0; // MSB: no client ID +// this->buffer[index++] = 0; // LSB: no client ID + size_t client_id_length = strlen(client_id); + this->buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB + this->buffer[index++] = client_id_length & 0xFF; // LSB + memcpy(&this->buffer[index], client_id, client_id_length); + index += client_id_length; + + for (int ix = 0; ix < index; ix++) + std::cout << std::hex << (int)this->buffer[ix] << std::dec << "\n"; // Send the MQTT connect packet thisWindows.SendTCP(index); + + std::cout << "Send connect, client ID = " << client_id << std::endl; + } void ParticipantMQTT::sendSubscribe(const char* topic) { @@ -117,14 +94,14 @@ void ParticipantMQTT::sendSubscribe(const char* topic) { // Remaining length = Packet Identifier (2 bytes) + Topic Length (2 bytes) + // Topic Name + QoS (1 byte) - size_t remainingLength = 2 + 2 + topicLength + 1; + size_t remainingLength = 1 + 2 + 2 + topicLength + 1; // Construct the SUBSCRIBE packet // unsigned char subscribePacket[3 + topicLength]; // 3 bytes for fixed // header // // and packet ID this->buffer[0] = (char)0x82; // Packet type and flags - this->buffer[1] = (char)remainingLength; // Remaining length + this->buffer[1] = (char)remainingLength; // Remaining length this->buffer[2] = (packetId >> 8) & 0xFF; // Packet Identifier MSB this->buffer[3] = packetId & 0xFF; // Packet Identifier LSB this->buffer[4] = (topicLength >> 8) & 0xFF; // Topic Length MSB @@ -134,8 +111,14 @@ void ParticipantMQTT::sendSubscribe(const char* topic) { memcpy(&this->buffer[6], topic, topicLength); this->buffer[6 + topicLength] = 0x00; // QoS level (0) + int index = 7 + topicLength; +// for (int ix = 0; ix < index; ix++) +// std::cout << std::hex << (int)this->buffer[ix] << std::dec << "\n"; + // Send the SUBSCRIBE packet - thisWindows.SendTCP(6 + topicLength); + thisWindows.SendTCP(7 + topicLength); + + std::cout << "Send subscribe to topic: " << topic << std::endl; } void ParticipantMQTT::Update() { @@ -150,9 +133,9 @@ int ParticipantMQTT::ReceiveTCP() { } void ParticipantMQTT::ReceiveData(unsigned char bufferSize) { + std::cout << " receivemsg\n"; // std::cout << "receive msg " << (int)msgId << "\n"; // std::cout << " buffer size = " <<(int) bufferSize << "\n"; - }; // void ParticipantMQTT::receiveMessages(int sock) { diff --git a/Participants/ParticipantMQTT.h b/Participants/ParticipantMQTT.h index 87c64a9..c8e992d 100644 --- a/Participants/ParticipantMQTT.h +++ b/Participants/ParticipantMQTT.h @@ -22,6 +22,7 @@ #if defined(_WIN32) || defined(_WIN64) #include "Windows/WindowsParticipant.h" #elif defined(__unix__) || defined(__APPLE__) +#include "Posix/PosixParticipant.h" #include #include #include diff --git a/Posix/PosixParticipant.cpp b/Posix/PosixParticipant.cpp index 011a759..27e3e4d 100644 --- a/Posix/PosixParticipant.cpp +++ b/Posix/PosixParticipant.cpp @@ -11,19 +11,20 @@ namespace RoboidControl { namespace Posix { -void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remotePort) { +void ParticipantUDP::Setup(int localPort, + const char* remoteIpAddress, + int remotePort) { #if defined(__unix__) || defined(__APPLE__) // Create a UDP socket this->sock = socket(AF_INET, SOCK_DGRAM, 0); - if (this->sock < 0) { std::cerr << "Error creating socket" << std::endl; return; } -// Set the socket to non-blocking mode + // Set the socket to non-blocking mode int flags = fcntl(this->sock, F_GETFL, 0); fcntl(this->sock, F_SETFL, flags | O_NONBLOCK); @@ -50,7 +51,8 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot } // Bind the socket to the specified port - if (bind(this->sock, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) { + if (bind(this->sock, (const struct sockaddr*)&server_addr, + sizeof(server_addr)) < 0) { std::cerr << "Bind failed" << std::endl; close(sock); } @@ -58,23 +60,59 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot #endif } +void ParticipantUDP::SetupTCP(const char* remoteIpAddress, int remotePort) { +#if defined(__unix__) || defined(__APPLE__) + + // Create a UDP socket + + this->sock = socket(AF_INET, SOCK_STREAM, 0); + if (this->sock < 0) { + std::cerr << "Error creating socket" << std::endl; + return; + } + + remote_addr.sin_family = AF_INET; + remote_addr.sin_port = htons(remotePort); + inet_pton(AF_INET, remoteIpAddress, &remote_addr.sin_addr); + + int connect_result = + connect(this->sock, (struct sockaddr*)&remote_addr, sizeof(remote_addr)); + if (connect_result < 0) { //} && errno != EINPROGRESS) { + std::cerr << "Error connecting to server" << std::endl; + close(this->sock); + } + + std::cout << "TCP connection to " << remoteIpAddress << ":" << remotePort + << "\n"; + + // Set the socket to non-blocking mode + int flags = fcntl(this->sock, F_GETFL, 0); + fcntl(this->sock, F_SETFL, flags | O_NONBLOCK); + +#endif +} + void ParticipantUDP::Receive() { #if defined(__unix__) || defined(__APPLE__) sockaddr_in client_addr; socklen_t len = sizeof(client_addr); - int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0, (struct sockaddr*)&client_addr, &len); + int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0, + (struct sockaddr*)&client_addr, &len); if (packetSize > 0) { char sender_ipAddress[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress, + INET_ADDRSTRLEN); unsigned int sender_port = ntohs(client_addr.sin_port); ReceiveData(packetSize, sender_ipAddress, sender_port); - // RoboidControl::Participant* remoteParticipant = this->Get(sender_ipAddress, sender_port); - // if (remoteParticipant == nullptr) { + // RoboidControl::Participant* remoteParticipant = + // this->Get(sender_ipAddress, sender_port); if (remoteParticipant == + // nullptr) { // remoteParticipant = this->Add(sender_ipAddress, sender_port); // // std::cout << "New sender " << sender_ipAddress << ":" << sender_port // // << "\n"; - // // std::cout << "New remote participant " << remoteParticipant->ipAddress + // // std::cout << "New remote participant " << + // remoteParticipant->ipAddress // // << ":" << remoteParticipant->port << " " // // << (int)remoteParticipant->networkId << "\n"; // } @@ -85,12 +123,37 @@ void ParticipantUDP::Receive() { #endif } +int ParticipantUDP::ReceiveTCP() { +#if defined(__unix__) || defined(__APPLE__) + int bytesReceived = recv(sock, buffer, sizeof(buffer) - 1, 0); + if (bytesReceived > 0) { + buffer[bytesReceived] = '\0'; // Null-terminate the received data + std::cout << "Received: " << buffer << std::endl; + return bytesReceived; + } else if (bytesReceived == 0) { + // Connection has been gracefully closed + std::cout << "Connection closed by the server." << std::endl; + return 0; + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // No data available to read, continue the loop + // std::cout << "No data available" << std::endl; + } else { + std::cerr << "Error receiving data: " << strerror(errno) << std::endl; + } + return 0; + } +#endif // _WIN32 || _WIN64 + return 0; +} + bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) { #if defined(__unix__) || defined(__APPLE__) - // std::cout << "Send to " << remoteParticipant->ipAddress << ":" << ntohs(remoteParticipant->port) + // std::cout << "Send to " << remoteParticipant->ipAddress << ":" << + // ntohs(remoteParticipant->port) // << "\n"; - // Set up the destination address + // Set up the destination address struct sockaddr_in dest_addr; memset(&dest_addr, 0, sizeof(dest_addr)); dest_addr.sin_family = AF_INET; @@ -98,9 +161,11 @@ bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) { dest_addr.sin_addr.s_addr = inet_addr(remoteParticipant->ipAddress); // Send the message - int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr)); + int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, + (struct sockaddr*)&dest_addr, sizeof(dest_addr)); if (sent_bytes < 0) { - std::cerr << "sendto failed with error: " << sent_bytes << " " << strerror(errno) << std::endl; + std::cerr << "sendto failed with error: " << sent_bytes << " " + << strerror(errno) << std::endl; close(sock); return false; } @@ -108,6 +173,19 @@ bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) { return true; } +bool ParticipantUDP::SendTCP(int bufferSize) { +#if defined(__unix__) || defined(__APPLE__) + // send(sock, this->buffer, bufferSize, 0); + ssize_t sent_bytes = send(this->sock, this->buffer, bufferSize, 0); + if (sent_bytes < 0) { + std::cerr << "Failed to send packet" << strerror(errno) << std::endl; + close(sock); + return -1; + } +#endif + return false; +} + bool ParticipantUDP::Publish(IMessage* msg) { #if defined(__unix__) || defined(__APPLE__) int bufferSize = msg->Serialize(this->buffer); @@ -116,8 +194,11 @@ bool ParticipantUDP::Publish(IMessage* msg) { char ip_str[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &(broadcast_addr.sin_addr), ip_str, INET_ADDRSTRLEN); - std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port) << "\n"; - int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr*)&broadcast_addr, sizeof(broadcast_addr)); + std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port) + << "\n"; + int sent_bytes = + sendto(sock, this->buffer, bufferSize, 0, + (struct sockaddr*)&broadcast_addr, sizeof(broadcast_addr)); if (sent_bytes < 0) { std::cerr << "sendto failed with error: " << sent_bytes << std::endl; close(sock); diff --git a/Posix/PosixParticipant.h b/Posix/PosixParticipant.h index 98302f3..7624e5d 100644 --- a/Posix/PosixParticipant.h +++ b/Posix/PosixParticipant.h @@ -8,8 +8,12 @@ namespace Posix { class ParticipantUDP : public RoboidControl::ParticipantUDP { public: void Setup(int localPort, const char* remoteIpAddress, int remotePort); + void SetupTCP(const char* remoteIpAddress, int remotePort); + void Receive(); + int ReceiveTCP(); bool Send(Participant* remoteParticipant, int bufferSize); + bool SendTCP(int bufferSize); bool Publish(IMessage* msg); protected: diff --git a/test/participant_test.cc b/test/participant_test.cc index d0bc3a7..cf9513c 100644 --- a/test/participant_test.cc +++ b/test/participant_test.cc @@ -81,7 +81,7 @@ TEST(Participant, MQTT) { unsigned long milliseconds = Thing::GetTimeMs(); unsigned long startTime = milliseconds; - while (milliseconds < startTime + 2000) { + while (milliseconds < startTime + 10000) { participant->Update(); std::this_thread::sleep_for(std::chrono::milliseconds(100));