#include "ParticipantMQTT.h" #if defined(__unix__) || defined(__APPLE__) #include #include // For fcntl #include #include #include #endif #include "Participants/ParticipantUDP.h" namespace RoboidControl { MQTTParticipantBase::MQTTParticipantBase(const char* ipAddress, int port) : ParticipantSocket("127.0.0.1", port) { this->name = "ParticipantUDP"; this->remoteSite = new ParticipantSocket(ipAddress, port); // Participant::registry.Add(this); this->root = Thing::LocalRoot(); // Participant::LocalParticipant->root; this->root->owner = this; this->root->name = "UDP Root"; this->Add(this->root); Participant::ReplaceLocalParticipant(*this); } void MQTTParticipantBase::SetupTCP() { this->connected = false; } #define MQTT_CONNECT 0x10 #define MQTT_CONNECT_ACK 0x20 // MQTT CONNECT packet construction std::vector createConnectPacket(const std::string& clientId) { std::vector packet; // Fixed header packet.push_back(0x10); // CONNECT packet type // Remaining length calculation int remainingLength = 2 + 4 + // Protocol name length and name (MQTT) 1 + // Protocol level 1 + // Connect flags 2 + // Keep alive 2 + clientId.length(); // Client ID // Encode remaining length do { uint8_t encodedByte = remainingLength % 128; remainingLength /= 128; if (remainingLength > 0) { encodedByte |= 128; } packet.push_back(encodedByte); } while (remainingLength > 0); // Protocol name (MQTT) packet.push_back(0x00); packet.push_back(0x04); packet.push_back('M'); packet.push_back('Q'); packet.push_back('T'); packet.push_back('T'); // Protocol level (v3.1.1) packet.push_back(0x04); // Connect flags packet.push_back(0x02); // Clean session // Keep alive (10 seconds) packet.push_back(0x00); packet.push_back(0x0A); // Client ID packet.push_back(0x00); packet.push_back(static_cast(clientId.length())); packet.insert(packet.end(), clientId.begin(), clientId.end()); return packet; } void MQTTParticipantBase::send_mqtt_connect(const char* clientId) { std::vector packet; // Fixed header packet.push_back(0x10); // CONNECT packet type // Calculate client ID length size_t clientIdLength = strlen(clientId); // Remaining length calculation int remainingLength = 2 + 4 + // Protocol name length and name (MQTT) 1 + // Protocol level 1 + // Connect flags 2 + // Keep alive 2 + clientIdLength; // Client ID // Encode remaining length do { uint8_t encodedByte = remainingLength % 128; remainingLength /= 128; if (remainingLength > 0) { encodedByte |= 128; } packet.push_back(encodedByte); } while (remainingLength > 0); // Protocol name (MQTT) packet.push_back(0x00); packet.push_back(0x04); packet.push_back('M'); packet.push_back('Q'); packet.push_back('T'); packet.push_back('T'); // Protocol level (v3.1.1) packet.push_back(0x04); // Connect flags packet.push_back(0x02); // Clean session // Keep alive (10 seconds) packet.push_back(0x00); packet.push_back(0x0A); // Client ID packet.push_back(0x00); packet.push_back(static_cast(clientIdLength)); packet.insert(packet.end(), clientId, clientId + clientIdLength); for (int i = 0; i < packet.size(); i++) this->buffer[i] = packet.data()[i]; int buffersize = packet.size(); SendTCP(buffersize); //send(sock, (const char*)connectPacket.data(), connectPacket.size(), 0); /* MQTTConnectPacket packet; packet.fixed_header = MQTT_CONNECT; packet.remaining_length = // 14; 2 + 4 + 2 + strlen(clientId) + 3; // Protocol name + protocol level + connect // flags + keep alive + client ID packet.protocol_name_length = 4; // "MQTT" packet.protocol_name = "MQTT"; packet.protocol_level = 4; // MQTT version 3.1.1 packet.connect_flags = 2; // Clean session packet.keep_alive = htons(15); // Keep alive time in seconds // Create the MQTT connect packet int index = 0; this->buffer[index++] = packet.fixed_header; this->buffer[index++] = packet.remaining_length; this->buffer[index++] = 0; // MSB protocol_name_length this->buffer[index++] = packet.protocol_name_length; memcpy(&this->buffer[index], packet.protocol_name, packet.protocol_name_length); index += packet.protocol_name_length; this->buffer[index++] = packet.protocol_level; this->buffer[index++] = packet.connect_flags; this->buffer[index++] = packet.keep_alive & 0xFF; // LSB this->buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB size_t client_id_length = strlen(clientId); this->buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB this->buffer[index++] = client_id_length & 0xFF; // LSB memcpy(&this->buffer[index], clientId, client_id_length); index += client_id_length; for (int ix = 0; ix < index; ix++) std::cout << std::hex << "0x" << (int)this->buffer[ix] << " "; std::cout << std::dec << std::endl; // Send the MQTT connect packet SendTCP(index); */ std::cout << "Send connect, client ID = " << clientId << std::endl; } uint16_t packetId; // Generate packet ID (incremental) uint16_t getNextPacketId() { return ++packetId; } std::vector createSubscribePacket(const char* topic, uint8_t qos) { std::vector packet; // Fixed header packet.push_back(0x82); // SUBSCRIBE packet type with flags // Calculate packet length size_t topicLength = strlen(topic); int remainingLength = 2 + // Packet ID 2 + // Topic length topicLength + 1; // QoS byte // Encode remaining length do { uint8_t encodedByte = remainingLength % 128; remainingLength /= 128; if (remainingLength > 0) { encodedByte |= 128; } packet.push_back(encodedByte); } while (remainingLength > 0); // Packet ID (2 bytes) uint16_t packetIdentifier = getNextPacketId(); packet.push_back((packetIdentifier >> 8) & 0xFF); // High byte packet.push_back(packetIdentifier & 0xFF); // Low byte // Topic length (2 bytes) packet.push_back((topicLength >> 8) & 0xFF); packet.push_back(topicLength & 0xFF); // Topic string packet.insert(packet.end(), topic, topic + topicLength); // QoS (1 byte) packet.push_back(qos); return packet; } void MQTTParticipantBase::sendSubscribe(const char* topic) { std::vector packet = createSubscribePacket(topic, 0); for (int i = 0; i < packet.size(); i++) this->buffer[i] = packet.data()[i]; int buffersize = packet.size(); SendTCP(buffersize); /* // Packet Identifier (2 bytes) static unsigned short packetId = 1; // Increment this for each new subscription // Calculate the length of the topic name size_t topicLength = strlen(topic); // Remaining length = Packet Identifier (2 bytes) + Topic Length (2 bytes) + // Topic Name + QoS (1 byte) size_t remainingLength = 1 + 2 + 2 + topicLength + 1; // Construct the SUBSCRIBE packet // unsigned char subscribePacket[3 + topicLength]; // 3 bytes for fixed // header // // and packet ID this->buffer[0] = (char)0x82; // Packet type and flags this->buffer[1] = (char)remainingLength; // Remaining length this->buffer[2] = (packetId >> 8) & 0xFF; // Packet Identifier MSB this->buffer[3] = packetId & 0xFF; // Packet Identifier LSB this->buffer[4] = (topicLength >> 8) & 0xFF; // Topic Length MSB this->buffer[5] = topicLength & 0xFF; // Topic Length LSB // Copy the topic name into the packet memcpy(&this->buffer[6], topic, topicLength); this->buffer[6 + topicLength] = 0x00; // QoS level (0) int index = 7 + topicLength; // for (int ix = 0; ix < index; ix++) // std::cout << std::hex << (int)this->buffer[ix] << std::dec << "\n"; // Send the SUBSCRIBE packet SendTCP(7 + topicLength); */ std::cout << "Send subscribe to topic: " << topic << std::endl; } void MQTTParticipantBase::Update(bool recurse) { int packetSize = ReceiveTCP(); if (packetSize > 0) { ReceiveData(packetSize); } } void MQTTParticipantBase::SendTCP(int bufferSize) {} int MQTTParticipantBase::ReceiveTCP() { return 0; } void MQTTParticipantBase::ReceiveData(unsigned char bufferSize) { std::cout << " receivemsg\n"; // std::cout << "receive msg " << (int)msgId << "\n"; std::cout << " buffer size = " <<(int) bufferSize << "\n"; }; // void ParticipantMQTT::receiveMessages(int sock) { // unsigned char buffer[1024]; // // Check for incoming messages without blocking // int bytesRead = recv(sock, buffer, sizeof(buffer), MSG_DONTWAIT); // if (bytesRead > 0) { // // Process the incoming MQTT message // std::cout << "Received message: "; // for (int i = 0; i < bytesRead; ++i) { // std::cout << std::hex << (int)buffer[i] << " "; // } // std::cout << std::dec << std::endl; // Reset to decimal output // } else if (bytesRead < 0) { // // If no messages are available, check for errors // if (errno != EAGAIN && errno != EWOULDBLOCK) { // std::cerr << "Error receiving message: " << strerror(errno) << // std::endl; // } // } // } } // namespace RoboidControl