Added initial Windows code

This commit is contained in:
Pascal Serrarens 2025-06-09 15:35:09 +02:00
parent 19e4e70e97
commit 0066042e07
5 changed files with 215 additions and 78 deletions

View File

@ -67,7 +67,7 @@ bool Participant::Send(IMessage* msg) {
#if defined(_WIN32) || defined(_WIN64)
Windows::ParticipantUDP* thisWindows =
static_cast<Windows::ParticipantUDP*>(this);
return thisWindows->Send(remoteParticipant, bufferSize);
return thisWindows->Send(this, bufferSize);
#elif defined(__unix__) || defined(__APPLE__)
Posix::ParticipantUDP* thisPosix = static_cast<Posix::ParticipantUDP*>(this);
return thisPosix->Send(this, bufferSize);

View File

@ -10,6 +10,10 @@
namespace RoboidControl {
#if defined(_WIN32) || defined(_WIN64)
Windows::ParticipantUDP thisWindows = Windows::ParticipantUDP();
#endif
ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port)
: Participant("127.0.0.1", port) {
this->name = "ParticipantUDP";
@ -22,8 +26,13 @@ ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port)
this->Add(this->root);
Participant::ReplaceLocalParticipant(*this);
std::cout << "Setup TCP connection" << std::endl;
thisWindows.SetupTCP(ipAddress, port);
std::cout << "Setting up MQTT" << std::endl;
send_mqtt_connect("RoboidControl");
// Code for Posix
#if defined(__unix__) || defined(__APPLE__)
@ -63,7 +72,7 @@ ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port)
#define MQTT_CONNECT 0x10
#define MQTT_CONNECT_ACK 0x20
void ParticipantMQTT::send_mqtt_connect(int sockfd, const char* client_id) {
void ParticipantMQTT::send_mqtt_connect(const char* client_id) {
MQTTConnectPacket packet;
packet.fixed_header = MQTT_CONNECT;
packet.remaining_length =
@ -76,29 +85,29 @@ void ParticipantMQTT::send_mqtt_connect(int sockfd, const char* client_id) {
packet.keep_alive = htons(60); // Keep alive time in seconds
// Create the MQTT connect packet
uint8_t buffer[256];
int index = 0;
buffer[index++] = packet.fixed_header;
buffer[index++] = packet.remaining_length;
buffer[index++] = packet.protocol_name_length;
memcpy(&buffer[index], packet.protocol_name, packet.protocol_name_length);
this->buffer[index++] = packet.fixed_header;
this->buffer[index++] = packet.remaining_length;
this->buffer[index++] = packet.protocol_name_length;
memcpy(&this->buffer[index], packet.protocol_name,
packet.protocol_name_length);
index += packet.protocol_name_length;
buffer[index++] = packet.protocol_level;
buffer[index++] = packet.connect_flags;
buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB
buffer[index++] = packet.keep_alive & 0xFF; // LSB
uint8_t client_id_length = strlen(client_id);
buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB
buffer[index++] = client_id_length & 0xFF; // LSB
memcpy(&buffer[index], client_id, client_id_length);
this->buffer[index++] = packet.protocol_level;
this->buffer[index++] = packet.connect_flags;
this->buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB
this->buffer[index++] = packet.keep_alive & 0xFF; // LSB
size_t client_id_length = strlen(client_id);
this->buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB
this->buffer[index++] = client_id_length & 0xFF; // LSB
memcpy(&this->buffer[index], client_id, client_id_length);
index += client_id_length;
// Send the MQTT connect packet
send(sockfd, buffer, index, 0);
thisWindows.SendTCP(index);
}
void ParticipantMQTT::sendSubscribe(int sock, const char* topic) {
void ParticipantMQTT::sendSubscribe(const char* topic) {
// Packet Identifier (2 bytes)
static unsigned short packetId =
1; // Increment this for each new subscription
@ -108,43 +117,62 @@ void ParticipantMQTT::sendSubscribe(int sock, const char* topic) {
// Remaining length = Packet Identifier (2 bytes) + Topic Length (2 bytes) +
// Topic Name + QoS (1 byte)
unsigned char remainingLength = 2 + 2 + topicLength + 1;
size_t remainingLength = 2 + 2 + topicLength + 1;
// Construct the SUBSCRIBE packet
unsigned char subscribePacket[3 + topicLength]; // 3 bytes for fixed header
// and packet ID
subscribePacket[0] = 0x82; // Packet type and flags
subscribePacket[1] = remainingLength; // Remaining length
subscribePacket[2] = (packetId >> 8) & 0xFF; // Packet Identifier MSB
subscribePacket[3] = packetId & 0xFF; // Packet Identifier LSB
subscribePacket[4] = (topicLength >> 8) & 0xFF; // Topic Length MSB
subscribePacket[5] = topicLength & 0xFF; // Topic Length LSB
// unsigned char subscribePacket[3 + topicLength]; // 3 bytes for fixed
// header
// // and packet ID
this->buffer[0] = (char)0x82; // Packet type and flags
this->buffer[1] = (char)remainingLength; // Remaining length
this->buffer[2] = (packetId >> 8) & 0xFF; // Packet Identifier MSB
this->buffer[3] = packetId & 0xFF; // Packet Identifier LSB
this->buffer[4] = (topicLength >> 8) & 0xFF; // Topic Length MSB
this->buffer[5] = topicLength & 0xFF; // Topic Length LSB
// Copy the topic name into the packet
memcpy(&subscribePacket[6], topic, topicLength);
subscribePacket[6 + topicLength] = 0x00; // QoS level (0)
memcpy(&this->buffer[6], topic, topicLength);
this->buffer[6 + topicLength] = 0x00; // QoS level (0)
// Send the SUBSCRIBE packet
send(sock, subscribePacket, sizeof(subscribePacket), 0);
thisWindows.SendTCP(6 + topicLength);
}
void ParticipantMQTT::receiveMessages(int sock) {
unsigned char buffer[1024];
// Check for incoming messages without blocking
int bytesRead = recv(sock, buffer, sizeof(buffer), MSG_DONTWAIT);
if (bytesRead > 0) {
// Process the incoming MQTT message
std::cout << "Received message: ";
for (int i = 0; i < bytesRead; ++i) {
std::cout << std::hex << (int)buffer[i] << " ";
}
std::cout << std::dec << std::endl; // Reset to decimal output
} else if (bytesRead < 0) {
// If no messages are available, check for errors
if (errno != EAGAIN && errno != EWOULDBLOCK) {
std::cerr << "Error receiving message: " << strerror(errno) << std::endl;
void ParticipantMQTT::Update() {
int packetSize = ReceiveTCP();
if (packetSize > 0) {
ReceiveData(packetSize);
}
}
int ParticipantMQTT::ReceiveTCP() {
return thisWindows.ReceiveTCP();
}
void ParticipantMQTT::ReceiveData(unsigned char bufferSize) {
// std::cout << "receive msg " << (int)msgId << "\n";
// std::cout << " buffer size = " <<(int) bufferSize << "\n";
};
// void ParticipantMQTT::receiveMessages(int sock) {
// unsigned char buffer[1024];
// // Check for incoming messages without blocking
// int bytesRead = recv(sock, buffer, sizeof(buffer), MSG_DONTWAIT);
// if (bytesRead > 0) {
// // Process the incoming MQTT message
// std::cout << "Received message: ";
// for (int i = 0; i < bytesRead; ++i) {
// std::cout << std::hex << (int)buffer[i] << " ";
// }
// std::cout << std::dec << std::endl; // Reset to decimal output
// } else if (bytesRead < 0) {
// // If no messages are available, check for errors
// if (errno != EAGAIN && errno != EWOULDBLOCK) {
// std::cerr << "Error receiving message: " << strerror(errno) <<
// std::endl;
// }
// }
// }
} // namespace RoboidControl

View File

@ -18,8 +18,9 @@
// #include <unordered_map>
#endif
#if defined(_WIN32) || defined(_WIN64)
#include <winsock2.h>
#include "Windows/WindowsParticipant.h"
#elif defined(__unix__) || defined(__APPLE__)
#include <arpa/inet.h>
#include <netinet/in.h>
@ -73,7 +74,7 @@ class ParticipantMQTT : public Participant {
#pragma region Update
public:
// virtual void Update() override;
virtual void Update() override;
#pragma endregion Update
@ -92,17 +93,15 @@ class ParticipantMQTT : public Participant {
const char* client_id;
};
void send_mqtt_connect(int sockfd, const char* client_id);
void sendSubscribe(int sock, const char* topic);
void send_mqtt_connect(const char* client_id);
void sendSubscribe(const char* topic);
#pragma endregion Send
#pragma region Receive
protected:
void receiveMessages(int sock);
// void ReceiveData(unsigned char bufferSize,
// char* senderIpAddress,
// unsigned int senderPort);
int ReceiveTCP();
void ReceiveData(unsigned char bufferSize);
// void ReceiveData(unsigned char bufferSize, Participant*
// remoteParticipant);

View File

@ -9,7 +9,11 @@
namespace RoboidControl {
namespace Windows {
void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remotePort) {
ParticipantUDP::ParticipantUDP() {}
void ParticipantUDP::Setup(int localPort,
const char* remoteIpAddress,
int remotePort) {
#if defined(_WIN32) || defined(_WIN64)
// Create a UDP socket
@ -19,19 +23,28 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot
std::cerr << "WSAStartup failed" << std::endl;
return;
}
this->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (this->sock < 0) {
std::cerr << "Error creating socket" << std::endl;
// Create an UDP socket
this->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (this->sock == INVALID_SOCKET) {
std::cerr << "UDP Socket creation failed: " << WSAGetLastError()
<< std::endl;
WSACleanup();
return;
}
// Set the socket to non-blocking mode
u_long mode = 1; // 1 to enable non-blocking socket
ioctlsocket(this->sock, FIONBIO, &mode);
if (ioctlsocket(sock, FIONBIO, &mode) != NO_ERROR) {
std::cerr << "Failed to set non-blocking mode: " << WSAGetLastError()
<< std::endl;
closesocket(sock);
WSACleanup();
return;
}
if (remotePort != 0) {
// Set up the address to send to
// Define the server address
memset(&remote_addr, 0, sizeof(remote_addr));
remote_addr.sin_family = AF_INET;
remote_addr.sin_port = htons((u_short)remotePort);
@ -55,7 +68,8 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot
}
// Bind the socket to the specified port
if (bind(this->sock, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
if (bind(this->sock, (const struct sockaddr*)&server_addr,
sizeof(server_addr)) < 0) {
std::cerr << "Bind failed" << std::endl;
closesocket(sock);
WSACleanup();
@ -64,6 +78,56 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot
#endif // _WIN32 || _WIN64
}
void ParticipantUDP::SetupTCP(const char* remoteIpAddress, int remotePort) {
#if defined(_WIN32) || defined(_WIN64)
// Initialize Winsock
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
std::cerr << "WSAStartup failed: " << std::endl;
return;
}
// Create a TCP socket
this->sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock == INVALID_SOCKET) {
std::cerr << "TCP Socket creation failed: " << WSAGetLastError()
<< std::endl;
WSACleanup();
return;
}
// Set the socket to non-blocking mode
u_long mode = 1; // 1 to enable non-blocking socket
if (ioctlsocket(sock, FIONBIO, &mode) != NO_ERROR) {
std::cerr << "Failed to set non-blocking mode: " << WSAGetLastError()
<< std::endl;
closesocket(sock);
WSACleanup();
return;
}
// Define the server address
memset(&this->remote_addr, 0, sizeof(server_addr));
this->remote_addr.sin_family = AF_INET;
this->remote_addr.sin_port = htons((u_short)remotePort);
if (inet_pton(AF_INET, remoteIpAddress, &this->remote_addr.sin_addr) <= 0) {
std::cerr << "Invalid address" << std::endl;
closesocket(sock);
WSACleanup();
return;
}
// Connect to the server
if (connect(sock, (sockaddr*)&this->remote_addr, sizeof(this->remote_addr)) ==
SOCKET_ERROR) {
std::cerr << "Connection failed: " << WSAGetLastError() << std::endl;
closesocket(sock);
WSACleanup();
return;
}
#endif // _WIN32 || _WIN64
}
void ParticipantUDP::Receive() {
#if defined(_WIN32) || defined(_WIN64)
// char ip_str[INET_ADDRSTRLEN];
@ -73,7 +137,8 @@ void ParticipantUDP::Receive() {
sockaddr_in client_addr;
int len = sizeof(client_addr);
int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0, (struct sockaddr*)&client_addr, &len);
int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0,
(struct sockaddr*)&client_addr, &len);
// std::cout << "received data " << packetSize << "\n";
if (packetSize < 0) {
int error_code = WSAGetLastError(); // Get the error code on Windows
@ -81,12 +146,14 @@ void ParticipantUDP::Receive() {
std::cerr << "recvfrom failed with error: " << error_code << std::endl;
} else if (packetSize > 0) {
char sender_ipAddress[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress, INET_ADDRSTRLEN);
inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress,
INET_ADDRSTRLEN);
unsigned int sender_port = ntohs(client_addr.sin_port);
ReceiveData(packetSize, sender_ipAddress, sender_port);
// RoboidControl::ParticipantUDP* remoteParticipant = this->Get(sender_ipAddress, sender_port);
// if (remoteParticipant == nullptr) {
// RoboidControl::ParticipantUDP* remoteParticipant =
// this->Get(sender_ipAddress, sender_port); if (remoteParticipant ==
// nullptr) {
// remoteParticipant = this->Add(sender_ipAddress, sender_port);
// // std::cout << "New sender " << sender_ipAddress << ":"
// // << sender_port << "\n";
@ -102,12 +169,40 @@ void ParticipantUDP::Receive() {
#endif // _WIN32 || _WIN64
}
int ParticipantUDP::ReceiveTCP() {
#if defined(_WIN32) || defined(_WIN64)
int bytesReceived = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (bytesReceived > 0) {
buffer[bytesReceived] = '\0'; // Null-terminate the received data
std::cout << "Received: " << buffer << std::endl;
return bytesReceived;
} else if (bytesReceived == 0) {
// Connection has been gracefully closed
std::cout << "Connection closed by the server." << std::endl;
return 0;
} else {
int error = WSAGetLastError();
if (error == WSAEWOULDBLOCK) {
// No data available, continue with other tasks
std::cout << "No data available, continuing..." << std::endl;
// You can add a sleep or other logic here to avoid busy waiting
} else {
std::cerr << "Receive failed: " << error << std::endl;
}
return 0;
}
#endif // _WIN32 || _WIN64
return 0;
}
bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) {
#if defined(_WIN32) || defined(_WIN64)
char ip_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(remote_addr.sin_addr), ip_str, INET_ADDRSTRLEN);
std::cout << "Send to " << ip_str << ":" << ntohs(remote_addr.sin_port) << "\n";
int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr*)&remote_addr, sizeof(remote_addr));
std::cout << "Send to " << ip_str << ":" << ntohs(remote_addr.sin_port)
<< "\n";
int sent_bytes = sendto(sock, this->buffer, bufferSize, 0,
(struct sockaddr*)&remote_addr, sizeof(remote_addr));
if (sent_bytes <= SOCKET_ERROR) {
int error_code = WSAGetLastError(); // Get the error code on Windows
@ -120,6 +215,13 @@ bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) {
return true;
}
bool ParticipantUDP::SendTCP(int bufferSize) {
#if defined(_WIN32) || defined(_WIN64)
send(sock, this->buffer, bufferSize, 0);
#endif
return false;
}
bool ParticipantUDP::Publish(IMessage* msg) {
#if defined(_WIN32) || defined(_WIN64)
int bufferSize = msg->Serialize(this->buffer);
@ -128,8 +230,11 @@ bool ParticipantUDP::Publish(IMessage* msg) {
char ip_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(broadcast_addr.sin_addr), ip_str, INET_ADDRSTRLEN);
std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port) << "\n";
int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr*)&broadcast_addr, sizeof(broadcast_addr));
std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port)
<< "\n";
int sent_bytes =
sendto(sock, this->buffer, bufferSize, 0,
(struct sockaddr*)&broadcast_addr, sizeof(broadcast_addr));
if (sent_bytes <= SOCKET_ERROR) {
int error_code = WSAGetLastError(); // Get the error code on Windows

View File

@ -7,11 +7,16 @@ namespace Windows {
class ParticipantUDP : public RoboidControl::ParticipantUDP {
public:
ParticipantUDP();
void Setup(int localPort, const char* remoteIpAddress, int remotePort);
void Receive();
void SetupTCP(const char* remoteIpAddres, int remotePort);
bool Send(Participant* remoteParticipant, int bufferSize);
bool SendTCP(int bufferSize);
bool Publish(IMessage* msg);
void Receive();
int ReceiveTCP();
protected:
#if defined(_WIN32) || defined(_WIN64)
SOCKET sock;