diff --git a/Participant.cpp b/Participant.cpp index b8e0d16..dc02a15 100644 --- a/Participant.cpp +++ b/Participant.cpp @@ -67,7 +67,7 @@ bool Participant::Send(IMessage* msg) { #if defined(_WIN32) || defined(_WIN64) Windows::ParticipantUDP* thisWindows = static_cast(this); - return thisWindows->Send(remoteParticipant, bufferSize); + return thisWindows->Send(this, bufferSize); #elif defined(__unix__) || defined(__APPLE__) Posix::ParticipantUDP* thisPosix = static_cast(this); return thisPosix->Send(this, bufferSize); diff --git a/Participants/ParticipantMQTT.cpp b/Participants/ParticipantMQTT.cpp index 1ab9843..4917224 100644 --- a/Participants/ParticipantMQTT.cpp +++ b/Participants/ParticipantMQTT.cpp @@ -10,6 +10,10 @@ namespace RoboidControl { +#if defined(_WIN32) || defined(_WIN64) +Windows::ParticipantUDP thisWindows = Windows::ParticipantUDP(); +#endif + ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port) : Participant("127.0.0.1", port) { this->name = "ParticipantUDP"; @@ -22,13 +26,18 @@ ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port) this->Add(this->root); Participant::ReplaceLocalParticipant(*this); + std::cout << "Setup TCP connection" << std::endl; + + thisWindows.SetupTCP(ipAddress, port); std::cout << "Setting up MQTT" << std::endl; + send_mqtt_connect("RoboidControl"); + // Code for Posix #if defined(__unix__) || defined(__APPLE__) - // Create socket - int sock = socket(AF_INET, SOCK_STREAM, 0); + // Create socket + int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { std::cerr << "Error creating socket" << std::endl; return; @@ -63,7 +72,7 @@ ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port) #define MQTT_CONNECT 0x10 #define MQTT_CONNECT_ACK 0x20 -void ParticipantMQTT::send_mqtt_connect(int sockfd, const char* client_id) { +void ParticipantMQTT::send_mqtt_connect(const char* client_id) { MQTTConnectPacket packet; packet.fixed_header = MQTT_CONNECT; packet.remaining_length = @@ -76,29 +85,29 @@ void ParticipantMQTT::send_mqtt_connect(int sockfd, const char* client_id) { packet.keep_alive = htons(60); // Keep alive time in seconds // Create the MQTT connect packet - uint8_t buffer[256]; int index = 0; - buffer[index++] = packet.fixed_header; - buffer[index++] = packet.remaining_length; - buffer[index++] = packet.protocol_name_length; - memcpy(&buffer[index], packet.protocol_name, packet.protocol_name_length); + this->buffer[index++] = packet.fixed_header; + this->buffer[index++] = packet.remaining_length; + this->buffer[index++] = packet.protocol_name_length; + memcpy(&this->buffer[index], packet.protocol_name, + packet.protocol_name_length); index += packet.protocol_name_length; - buffer[index++] = packet.protocol_level; - buffer[index++] = packet.connect_flags; - buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB - buffer[index++] = packet.keep_alive & 0xFF; // LSB - uint8_t client_id_length = strlen(client_id); - buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB - buffer[index++] = client_id_length & 0xFF; // LSB - memcpy(&buffer[index], client_id, client_id_length); + this->buffer[index++] = packet.protocol_level; + this->buffer[index++] = packet.connect_flags; + this->buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB + this->buffer[index++] = packet.keep_alive & 0xFF; // LSB + size_t client_id_length = strlen(client_id); + this->buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB + this->buffer[index++] = client_id_length & 0xFF; // LSB + memcpy(&this->buffer[index], client_id, client_id_length); index += client_id_length; // Send the MQTT connect packet - send(sockfd, buffer, index, 0); + thisWindows.SendTCP(index); } -void ParticipantMQTT::sendSubscribe(int sock, const char* topic) { +void ParticipantMQTT::sendSubscribe(const char* topic) { // Packet Identifier (2 bytes) static unsigned short packetId = 1; // Increment this for each new subscription @@ -108,43 +117,62 @@ void ParticipantMQTT::sendSubscribe(int sock, const char* topic) { // Remaining length = Packet Identifier (2 bytes) + Topic Length (2 bytes) + // Topic Name + QoS (1 byte) - unsigned char remainingLength = 2 + 2 + topicLength + 1; + size_t remainingLength = 2 + 2 + topicLength + 1; // Construct the SUBSCRIBE packet - unsigned char subscribePacket[3 + topicLength]; // 3 bytes for fixed header - // and packet ID - subscribePacket[0] = 0x82; // Packet type and flags - subscribePacket[1] = remainingLength; // Remaining length - subscribePacket[2] = (packetId >> 8) & 0xFF; // Packet Identifier MSB - subscribePacket[3] = packetId & 0xFF; // Packet Identifier LSB - subscribePacket[4] = (topicLength >> 8) & 0xFF; // Topic Length MSB - subscribePacket[5] = topicLength & 0xFF; // Topic Length LSB + // unsigned char subscribePacket[3 + topicLength]; // 3 bytes for fixed + // header + // // and packet ID + this->buffer[0] = (char)0x82; // Packet type and flags + this->buffer[1] = (char)remainingLength; // Remaining length + this->buffer[2] = (packetId >> 8) & 0xFF; // Packet Identifier MSB + this->buffer[3] = packetId & 0xFF; // Packet Identifier LSB + this->buffer[4] = (topicLength >> 8) & 0xFF; // Topic Length MSB + this->buffer[5] = topicLength & 0xFF; // Topic Length LSB // Copy the topic name into the packet - memcpy(&subscribePacket[6], topic, topicLength); - subscribePacket[6 + topicLength] = 0x00; // QoS level (0) + memcpy(&this->buffer[6], topic, topicLength); + this->buffer[6 + topicLength] = 0x00; // QoS level (0) // Send the SUBSCRIBE packet - send(sock, subscribePacket, sizeof(subscribePacket), 0); + thisWindows.SendTCP(6 + topicLength); } -void ParticipantMQTT::receiveMessages(int sock) { - unsigned char buffer[1024]; - // Check for incoming messages without blocking - int bytesRead = recv(sock, buffer, sizeof(buffer), MSG_DONTWAIT); - if (bytesRead > 0) { - // Process the incoming MQTT message - std::cout << "Received message: "; - for (int i = 0; i < bytesRead; ++i) { - std::cout << std::hex << (int)buffer[i] << " "; - } - std::cout << std::dec << std::endl; // Reset to decimal output - } else if (bytesRead < 0) { - // If no messages are available, check for errors - if (errno != EAGAIN && errno != EWOULDBLOCK) { - std::cerr << "Error receiving message: " << strerror(errno) << std::endl; - } +void ParticipantMQTT::Update() { + int packetSize = ReceiveTCP(); + if (packetSize > 0) { + ReceiveData(packetSize); } } +int ParticipantMQTT::ReceiveTCP() { + return thisWindows.ReceiveTCP(); +} + +void ParticipantMQTT::ReceiveData(unsigned char bufferSize) { + // std::cout << "receive msg " << (int)msgId << "\n"; + // std::cout << " buffer size = " <<(int) bufferSize << "\n"; + +}; + +// void ParticipantMQTT::receiveMessages(int sock) { +// unsigned char buffer[1024]; +// // Check for incoming messages without blocking +// int bytesRead = recv(sock, buffer, sizeof(buffer), MSG_DONTWAIT); +// if (bytesRead > 0) { +// // Process the incoming MQTT message +// std::cout << "Received message: "; +// for (int i = 0; i < bytesRead; ++i) { +// std::cout << std::hex << (int)buffer[i] << " "; +// } +// std::cout << std::dec << std::endl; // Reset to decimal output +// } else if (bytesRead < 0) { +// // If no messages are available, check for errors +// if (errno != EAGAIN && errno != EWOULDBLOCK) { +// std::cerr << "Error receiving message: " << strerror(errno) << +// std::endl; +// } +// } +// } + } // namespace RoboidControl \ No newline at end of file diff --git a/Participants/ParticipantMQTT.h b/Participants/ParticipantMQTT.h index f6729c8..87c64a9 100644 --- a/Participants/ParticipantMQTT.h +++ b/Participants/ParticipantMQTT.h @@ -18,8 +18,9 @@ // #include #endif + #if defined(_WIN32) || defined(_WIN64) -#include +#include "Windows/WindowsParticipant.h" #elif defined(__unix__) || defined(__APPLE__) #include #include @@ -73,7 +74,7 @@ class ParticipantMQTT : public Participant { #pragma region Update public: - // virtual void Update() override; + virtual void Update() override; #pragma endregion Update @@ -92,17 +93,15 @@ class ParticipantMQTT : public Participant { const char* client_id; }; - void send_mqtt_connect(int sockfd, const char* client_id); - void sendSubscribe(int sock, const char* topic); + void send_mqtt_connect(const char* client_id); + void sendSubscribe(const char* topic); #pragma endregion Send #pragma region Receive protected: - void receiveMessages(int sock); - // void ReceiveData(unsigned char bufferSize, - // char* senderIpAddress, - // unsigned int senderPort); + int ReceiveTCP(); + void ReceiveData(unsigned char bufferSize); // void ReceiveData(unsigned char bufferSize, Participant* // remoteParticipant); diff --git a/Windows/WindowsParticipant.cpp b/Windows/WindowsParticipant.cpp index c6e99db..ff26ce5 100644 --- a/Windows/WindowsParticipant.cpp +++ b/Windows/WindowsParticipant.cpp @@ -9,7 +9,11 @@ namespace RoboidControl { namespace Windows { -void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remotePort) { +ParticipantUDP::ParticipantUDP() {} + +void ParticipantUDP::Setup(int localPort, + const char* remoteIpAddress, + int remotePort) { #if defined(_WIN32) || defined(_WIN64) // Create a UDP socket @@ -19,19 +23,28 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot std::cerr << "WSAStartup failed" << std::endl; return; } - this->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (this->sock < 0) { - std::cerr << "Error creating socket" << std::endl; + // Create an UDP socket + this->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (this->sock == INVALID_SOCKET) { + std::cerr << "UDP Socket creation failed: " << WSAGetLastError() + << std::endl; + WSACleanup(); return; } -// Set the socket to non-blocking mode + // Set the socket to non-blocking mode u_long mode = 1; // 1 to enable non-blocking socket - ioctlsocket(this->sock, FIONBIO, &mode); + if (ioctlsocket(sock, FIONBIO, &mode) != NO_ERROR) { + std::cerr << "Failed to set non-blocking mode: " << WSAGetLastError() + << std::endl; + closesocket(sock); + WSACleanup(); + return; + } if (remotePort != 0) { - // Set up the address to send to + // Define the server address memset(&remote_addr, 0, sizeof(remote_addr)); remote_addr.sin_family = AF_INET; remote_addr.sin_port = htons((u_short)remotePort); @@ -55,13 +68,64 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot } // Bind the socket to the specified port - if (bind(this->sock, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) { + if (bind(this->sock, (const struct sockaddr*)&server_addr, + sizeof(server_addr)) < 0) { std::cerr << "Bind failed" << std::endl; closesocket(sock); WSACleanup(); } -#endif // _WIN32 || _WIN64 +#endif // _WIN32 || _WIN64 +} + +void ParticipantUDP::SetupTCP(const char* remoteIpAddress, int remotePort) { +#if defined(_WIN32) || defined(_WIN64) + // Initialize Winsock + WSADATA wsaData; + if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { + std::cerr << "WSAStartup failed: " << std::endl; + return; + } + + // Create a TCP socket + this->sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sock == INVALID_SOCKET) { + std::cerr << "TCP Socket creation failed: " << WSAGetLastError() + << std::endl; + WSACleanup(); + return; + } + + // Set the socket to non-blocking mode + u_long mode = 1; // 1 to enable non-blocking socket + if (ioctlsocket(sock, FIONBIO, &mode) != NO_ERROR) { + std::cerr << "Failed to set non-blocking mode: " << WSAGetLastError() + << std::endl; + closesocket(sock); + WSACleanup(); + return; + } + + // Define the server address + memset(&this->remote_addr, 0, sizeof(server_addr)); + this->remote_addr.sin_family = AF_INET; + this->remote_addr.sin_port = htons((u_short)remotePort); + if (inet_pton(AF_INET, remoteIpAddress, &this->remote_addr.sin_addr) <= 0) { + std::cerr << "Invalid address" << std::endl; + closesocket(sock); + WSACleanup(); + return; + } + + // Connect to the server + if (connect(sock, (sockaddr*)&this->remote_addr, sizeof(this->remote_addr)) == + SOCKET_ERROR) { + std::cerr << "Connection failed: " << WSAGetLastError() << std::endl; + closesocket(sock); + WSACleanup(); + return; + } +#endif // _WIN32 || _WIN64 } void ParticipantUDP::Receive() { @@ -73,7 +137,8 @@ void ParticipantUDP::Receive() { sockaddr_in client_addr; int len = sizeof(client_addr); - int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0, (struct sockaddr*)&client_addr, &len); + int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0, + (struct sockaddr*)&client_addr, &len); // std::cout << "received data " << packetSize << "\n"; if (packetSize < 0) { int error_code = WSAGetLastError(); // Get the error code on Windows @@ -81,12 +146,14 @@ void ParticipantUDP::Receive() { std::cerr << "recvfrom failed with error: " << error_code << std::endl; } else if (packetSize > 0) { char sender_ipAddress[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress, + INET_ADDRSTRLEN); unsigned int sender_port = ntohs(client_addr.sin_port); ReceiveData(packetSize, sender_ipAddress, sender_port); - // RoboidControl::ParticipantUDP* remoteParticipant = this->Get(sender_ipAddress, sender_port); - // if (remoteParticipant == nullptr) { + // RoboidControl::ParticipantUDP* remoteParticipant = + // this->Get(sender_ipAddress, sender_port); if (remoteParticipant == + // nullptr) { // remoteParticipant = this->Add(sender_ipAddress, sender_port); // // std::cout << "New sender " << sender_ipAddress << ":" // // << sender_port << "\n"; @@ -99,15 +166,43 @@ void ParticipantUDP::Receive() { // ReceiveData(packetSize, remoteParticipant); } -#endif // _WIN32 || _WIN64 +#endif // _WIN32 || _WIN64 +} + +int ParticipantUDP::ReceiveTCP() { +#if defined(_WIN32) || defined(_WIN64) + int bytesReceived = recv(sock, buffer, sizeof(buffer) - 1, 0); + if (bytesReceived > 0) { + buffer[bytesReceived] = '\0'; // Null-terminate the received data + std::cout << "Received: " << buffer << std::endl; + return bytesReceived; + } else if (bytesReceived == 0) { + // Connection has been gracefully closed + std::cout << "Connection closed by the server." << std::endl; + return 0; + } else { + int error = WSAGetLastError(); + if (error == WSAEWOULDBLOCK) { + // No data available, continue with other tasks + std::cout << "No data available, continuing..." << std::endl; + // You can add a sleep or other logic here to avoid busy waiting + } else { + std::cerr << "Receive failed: " << error << std::endl; + } + return 0; + } +#endif // _WIN32 || _WIN64 + return 0; } bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) { #if defined(_WIN32) || defined(_WIN64) char ip_str[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &(remote_addr.sin_addr), ip_str, INET_ADDRSTRLEN); - std::cout << "Send to " << ip_str << ":" << ntohs(remote_addr.sin_port) << "\n"; - int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr*)&remote_addr, sizeof(remote_addr)); + std::cout << "Send to " << ip_str << ":" << ntohs(remote_addr.sin_port) + << "\n"; + int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, + (struct sockaddr*)&remote_addr, sizeof(remote_addr)); if (sent_bytes <= SOCKET_ERROR) { int error_code = WSAGetLastError(); // Get the error code on Windows @@ -116,10 +211,17 @@ bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) { WSACleanup(); return false; } -#endif // _WIN32 || _WIN64 +#endif // _WIN32 || _WIN64 return true; } +bool ParticipantUDP::SendTCP(int bufferSize) { +#if defined(_WIN32) || defined(_WIN64) + send(sock, this->buffer, bufferSize, 0); +#endif + return false; +} + bool ParticipantUDP::Publish(IMessage* msg) { #if defined(_WIN32) || defined(_WIN64) int bufferSize = msg->Serialize(this->buffer); @@ -128,8 +230,11 @@ bool ParticipantUDP::Publish(IMessage* msg) { char ip_str[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &(broadcast_addr.sin_addr), ip_str, INET_ADDRSTRLEN); - std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port) << "\n"; - int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr*)&broadcast_addr, sizeof(broadcast_addr)); + std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port) + << "\n"; + int sent_bytes = + sendto(sock, this->buffer, bufferSize, 0, + (struct sockaddr*)&broadcast_addr, sizeof(broadcast_addr)); if (sent_bytes <= SOCKET_ERROR) { int error_code = WSAGetLastError(); // Get the error code on Windows @@ -138,7 +243,7 @@ bool ParticipantUDP::Publish(IMessage* msg) { WSACleanup(); return false; } -#endif // _WIN32 || _WIN64 +#endif // _WIN32 || _WIN64 return true; } diff --git a/Windows/WindowsParticipant.h b/Windows/WindowsParticipant.h index 9a747d9..9405c36 100644 --- a/Windows/WindowsParticipant.h +++ b/Windows/WindowsParticipant.h @@ -7,15 +7,20 @@ namespace Windows { class ParticipantUDP : public RoboidControl::ParticipantUDP { public: + ParticipantUDP(); void Setup(int localPort, const char* remoteIpAddress, int remotePort); - void Receive(); + void SetupTCP(const char* remoteIpAddres, int remotePort); bool Send(Participant* remoteParticipant, int bufferSize); + bool SendTCP(int bufferSize); bool Publish(IMessage* msg); + void Receive(); + int ReceiveTCP(); + protected: - #if defined(_WIN32) || defined(_WIN64) +#if defined(_WIN32) || defined(_WIN64) SOCKET sock; - #endif +#endif }; } // namespace Windows