RoboidControl-cpp/Participants/ParticipantMQTT.cpp
2025-06-09 13:09:20 +02:00

150 lines
5.1 KiB
C++

#include "ParticipantMQTT.h"
#if defined(__unix__) || defined(__APPLE__)
#include <arpa/inet.h>
#include <fcntl.h> // For fcntl
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#endif
namespace RoboidControl {
ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port)
: Participant("127.0.0.1", port) {
this->name = "ParticipantUDP";
this->remoteSite = new Participant(ipAddress, port);
Participant::registry.Add(this);
this->root = Thing::LocalRoot(); // Participant::LocalParticipant->root;
this->root->owner = this;
this->root->name = "UDP Root";
this->Add(this->root);
Participant::ReplaceLocalParticipant(*this);
std::cout << "Setting up MQTT" << std::endl;
// Code for Posix
#if defined(__unix__) || defined(__APPLE__)
// Create socket
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
std::cerr << "Error creating socket" << std::endl;
return;
}
// Set up the server address structure
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
if (inet_pton(AF_INET, ipAddress, &server_addr.sin_addr) <= 0) {
std::cerr << "Invalid address" << std::endl;
close(sock);
return;
}
// Connect to the MQTT broker
if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
std::cerr << "Error connecting to broker" << std::endl;
close(sock);
return;
}
// Set the socket to non-blocking mode
int flags = fcntl(sock, F_GETFL, 0);
fcntl(sock, F_SETFL, flags | O_NONBLOCK);
// Send MQTT connect packet
send_mqtt_connect(sock, "RoboidControl");
std::cout << "Connected to MQTT broker" << std::endl;
#endif
}
#define MQTT_CONNECT 0x10
#define MQTT_CONNECT_ACK 0x20
void ParticipantMQTT::send_mqtt_connect(int sockfd, const char* client_id) {
MQTTConnectPacket packet;
packet.fixed_header = MQTT_CONNECT;
packet.remaining_length =
2 + strlen(client_id) + 3; // Protocol name + protocol level + connect
// flags + keep alive + client ID
packet.protocol_name_length = 4; // "MQTT"
packet.protocol_name = "MQTT";
packet.protocol_level = 4; // MQTT version 3.1.1
packet.connect_flags = 0; // Clean session
packet.keep_alive = htons(60); // Keep alive time in seconds
// Create the MQTT connect packet
uint8_t buffer[256];
int index = 0;
buffer[index++] = packet.fixed_header;
buffer[index++] = packet.remaining_length;
buffer[index++] = packet.protocol_name_length;
memcpy(&buffer[index], packet.protocol_name, packet.protocol_name_length);
index += packet.protocol_name_length;
buffer[index++] = packet.protocol_level;
buffer[index++] = packet.connect_flags;
buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB
buffer[index++] = packet.keep_alive & 0xFF; // LSB
uint8_t client_id_length = strlen(client_id);
buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB
buffer[index++] = client_id_length & 0xFF; // LSB
memcpy(&buffer[index], client_id, client_id_length);
index += client_id_length;
// Send the MQTT connect packet
send(sockfd, buffer, index, 0);
}
void ParticipantMQTT::sendSubscribe(int sock, const char* topic) {
// Packet Identifier (2 bytes)
static unsigned short packetId =
1; // Increment this for each new subscription
// Calculate the length of the topic name
size_t topicLength = strlen(topic);
// Remaining length = Packet Identifier (2 bytes) + Topic Length (2 bytes) +
// Topic Name + QoS (1 byte)
unsigned char remainingLength = 2 + 2 + topicLength + 1;
// Construct the SUBSCRIBE packet
unsigned char subscribePacket[3 + topicLength]; // 3 bytes for fixed header
// and packet ID
subscribePacket[0] = 0x82; // Packet type and flags
subscribePacket[1] = remainingLength; // Remaining length
subscribePacket[2] = (packetId >> 8) & 0xFF; // Packet Identifier MSB
subscribePacket[3] = packetId & 0xFF; // Packet Identifier LSB
subscribePacket[4] = (topicLength >> 8) & 0xFF; // Topic Length MSB
subscribePacket[5] = topicLength & 0xFF; // Topic Length LSB
// Copy the topic name into the packet
memcpy(&subscribePacket[6], topic, topicLength);
subscribePacket[6 + topicLength] = 0x00; // QoS level (0)
// Send the SUBSCRIBE packet
send(sock, subscribePacket, sizeof(subscribePacket), 0);
}
void ParticipantMQTT::receiveMessages(int sock) {
unsigned char buffer[1024];
// Check for incoming messages without blocking
int bytesRead = recv(sock, buffer, sizeof(buffer), MSG_DONTWAIT);
if (bytesRead > 0) {
// Process the incoming MQTT message
std::cout << "Received message: ";
for (int i = 0; i < bytesRead; ++i) {
std::cout << std::hex << (int)buffer[i] << " ";
}
std::cout << std::dec << std::endl; // Reset to decimal output
} else if (bytesRead < 0) {
// If no messages are available, check for errors
if (errno != EAGAIN && errno != EWOULDBLOCK) {
std::cerr << "Error receiving message: " << strerror(errno) << std::endl;
}
}
}
} // namespace RoboidControl