Improvements

This commit is contained in:
Pascal Serrarens 2025-06-28 09:15:16 +02:00
parent 0066042e07
commit 26a80cd94a
5 changed files with 135 additions and 66 deletions

View File

@ -12,6 +12,8 @@ namespace RoboidControl {
#if defined(_WIN32) || defined(_WIN64)
Windows::ParticipantUDP thisWindows = Windows::ParticipantUDP();
#elif defined(__unix__) || defined(__APPLE__)
Posix::ParticipantUDP thisWindows = Posix::ParticipantUDP();
#endif
ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port)
@ -30,43 +32,9 @@ ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port)
thisWindows.SetupTCP(ipAddress, port);
std::cout << "Setting up MQTT" << std::endl;
send_mqtt_connect("RoboidControl");
// Code for Posix
#if defined(__unix__) || defined(__APPLE__)
// Create socket
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
std::cerr << "Error creating socket" << std::endl;
return;
}
// Set up the server address structure
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
if (inet_pton(AF_INET, ipAddress, &server_addr.sin_addr) <= 0) {
std::cerr << "Invalid address" << std::endl;
close(sock);
return;
}
// Connect to the MQTT broker
if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
std::cerr << "Error connecting to broker" << std::endl;
close(sock);
return;
}
// Set the socket to non-blocking mode
int flags = fcntl(sock, F_GETFL, 0);
fcntl(sock, F_SETFL, flags | O_NONBLOCK);
// Send MQTT connect packet
send_mqtt_connect(sock, "RoboidControl");
std::cout << "Connected to MQTT broker" << std::endl;
#endif
sendSubscribe("domoticz/out");
// #endif
}
#define MQTT_CONNECT 0x10
@ -75,13 +43,13 @@ ParticipantMQTT::ParticipantMQTT(const char* ipAddress, int port)
void ParticipantMQTT::send_mqtt_connect(const char* client_id) {
MQTTConnectPacket packet;
packet.fixed_header = MQTT_CONNECT;
packet.remaining_length =
2 + strlen(client_id) + 3; // Protocol name + protocol level + connect
packet.remaining_length = //14;
2 + 4 + 2+ strlen(client_id) + 3; // Protocol name + protocol level + connect
// flags + keep alive + client ID
packet.protocol_name_length = 4; // "MQTT"
packet.protocol_name = "MQTT";
packet.protocol_level = 4; // MQTT version 3.1.1
packet.connect_flags = 0; // Clean session
packet.connect_flags = 2; // Clean session
packet.keep_alive = htons(60); // Keep alive time in seconds
// Create the MQTT connect packet
@ -89,22 +57,31 @@ void ParticipantMQTT::send_mqtt_connect(const char* client_id) {
this->buffer[index++] = packet.fixed_header;
this->buffer[index++] = packet.remaining_length;
this->buffer[index++] = 0; // MSB protocol_name_length
this->buffer[index++] = packet.protocol_name_length;
memcpy(&this->buffer[index], packet.protocol_name,
packet.protocol_name_length);
index += packet.protocol_name_length;
this->buffer[index++] = packet.protocol_level;
this->buffer[index++] = packet.connect_flags;
this->buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB
this->buffer[index++] = packet.keep_alive & 0xFF; // LSB
this->buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB
// this->buffer[index++] = 0; // MSB: no client ID
// this->buffer[index++] = 0; // LSB: no client ID
size_t client_id_length = strlen(client_id);
this->buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB
this->buffer[index++] = client_id_length & 0xFF; // LSB
memcpy(&this->buffer[index], client_id, client_id_length);
index += client_id_length;
for (int ix = 0; ix < index; ix++)
std::cout << std::hex << (int)this->buffer[ix] << std::dec << "\n";
// Send the MQTT connect packet
thisWindows.SendTCP(index);
std::cout << "Send connect, client ID = " << client_id << std::endl;
}
void ParticipantMQTT::sendSubscribe(const char* topic) {
@ -117,7 +94,7 @@ void ParticipantMQTT::sendSubscribe(const char* topic) {
// Remaining length = Packet Identifier (2 bytes) + Topic Length (2 bytes) +
// Topic Name + QoS (1 byte)
size_t remainingLength = 2 + 2 + topicLength + 1;
size_t remainingLength = 1 + 2 + 2 + topicLength + 1;
// Construct the SUBSCRIBE packet
// unsigned char subscribePacket[3 + topicLength]; // 3 bytes for fixed
@ -134,8 +111,14 @@ void ParticipantMQTT::sendSubscribe(const char* topic) {
memcpy(&this->buffer[6], topic, topicLength);
this->buffer[6 + topicLength] = 0x00; // QoS level (0)
int index = 7 + topicLength;
// for (int ix = 0; ix < index; ix++)
// std::cout << std::hex << (int)this->buffer[ix] << std::dec << "\n";
// Send the SUBSCRIBE packet
thisWindows.SendTCP(6 + topicLength);
thisWindows.SendTCP(7 + topicLength);
std::cout << "Send subscribe to topic: " << topic << std::endl;
}
void ParticipantMQTT::Update() {
@ -150,9 +133,9 @@ int ParticipantMQTT::ReceiveTCP() {
}
void ParticipantMQTT::ReceiveData(unsigned char bufferSize) {
std::cout << " receivemsg\n";
// std::cout << "receive msg " << (int)msgId << "\n";
// std::cout << " buffer size = " <<(int) bufferSize << "\n";
};
// void ParticipantMQTT::receiveMessages(int sock) {

View File

@ -22,6 +22,7 @@
#if defined(_WIN32) || defined(_WIN64)
#include "Windows/WindowsParticipant.h"
#elif defined(__unix__) || defined(__APPLE__)
#include "Posix/PosixParticipant.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>

View File

@ -11,19 +11,20 @@
namespace RoboidControl {
namespace Posix {
void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remotePort) {
void ParticipantUDP::Setup(int localPort,
const char* remoteIpAddress,
int remotePort) {
#if defined(__unix__) || defined(__APPLE__)
// Create a UDP socket
this->sock = socket(AF_INET, SOCK_DGRAM, 0);
if (this->sock < 0) {
std::cerr << "Error creating socket" << std::endl;
return;
}
// Set the socket to non-blocking mode
// Set the socket to non-blocking mode
int flags = fcntl(this->sock, F_GETFL, 0);
fcntl(this->sock, F_SETFL, flags | O_NONBLOCK);
@ -50,7 +51,8 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot
}
// Bind the socket to the specified port
if (bind(this->sock, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
if (bind(this->sock, (const struct sockaddr*)&server_addr,
sizeof(server_addr)) < 0) {
std::cerr << "Bind failed" << std::endl;
close(sock);
}
@ -58,23 +60,59 @@ void ParticipantUDP::Setup(int localPort, const char* remoteIpAddress, int remot
#endif
}
void ParticipantUDP::SetupTCP(const char* remoteIpAddress, int remotePort) {
#if defined(__unix__) || defined(__APPLE__)
// Create a UDP socket
this->sock = socket(AF_INET, SOCK_STREAM, 0);
if (this->sock < 0) {
std::cerr << "Error creating socket" << std::endl;
return;
}
remote_addr.sin_family = AF_INET;
remote_addr.sin_port = htons(remotePort);
inet_pton(AF_INET, remoteIpAddress, &remote_addr.sin_addr);
int connect_result =
connect(this->sock, (struct sockaddr*)&remote_addr, sizeof(remote_addr));
if (connect_result < 0) { //} && errno != EINPROGRESS) {
std::cerr << "Error connecting to server" << std::endl;
close(this->sock);
}
std::cout << "TCP connection to " << remoteIpAddress << ":" << remotePort
<< "\n";
// Set the socket to non-blocking mode
int flags = fcntl(this->sock, F_GETFL, 0);
fcntl(this->sock, F_SETFL, flags | O_NONBLOCK);
#endif
}
void ParticipantUDP::Receive() {
#if defined(__unix__) || defined(__APPLE__)
sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0, (struct sockaddr*)&client_addr, &len);
int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0,
(struct sockaddr*)&client_addr, &len);
if (packetSize > 0) {
char sender_ipAddress[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress, INET_ADDRSTRLEN);
inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress,
INET_ADDRSTRLEN);
unsigned int sender_port = ntohs(client_addr.sin_port);
ReceiveData(packetSize, sender_ipAddress, sender_port);
// RoboidControl::Participant* remoteParticipant = this->Get(sender_ipAddress, sender_port);
// if (remoteParticipant == nullptr) {
// RoboidControl::Participant* remoteParticipant =
// this->Get(sender_ipAddress, sender_port); if (remoteParticipant ==
// nullptr) {
// remoteParticipant = this->Add(sender_ipAddress, sender_port);
// // std::cout << "New sender " << sender_ipAddress << ":" << sender_port
// // << "\n";
// // std::cout << "New remote participant " << remoteParticipant->ipAddress
// // std::cout << "New remote participant " <<
// remoteParticipant->ipAddress
// // << ":" << remoteParticipant->port << " "
// // << (int)remoteParticipant->networkId << "\n";
// }
@ -85,9 +123,34 @@ void ParticipantUDP::Receive() {
#endif
}
int ParticipantUDP::ReceiveTCP() {
#if defined(__unix__) || defined(__APPLE__)
int bytesReceived = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (bytesReceived > 0) {
buffer[bytesReceived] = '\0'; // Null-terminate the received data
std::cout << "Received: " << buffer << std::endl;
return bytesReceived;
} else if (bytesReceived == 0) {
// Connection has been gracefully closed
std::cout << "Connection closed by the server." << std::endl;
return 0;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No data available to read, continue the loop
// std::cout << "No data available" << std::endl;
} else {
std::cerr << "Error receiving data: " << strerror(errno) << std::endl;
}
return 0;
}
#endif // _WIN32 || _WIN64
return 0;
}
bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) {
#if defined(__unix__) || defined(__APPLE__)
// std::cout << "Send to " << remoteParticipant->ipAddress << ":" << ntohs(remoteParticipant->port)
// std::cout << "Send to " << remoteParticipant->ipAddress << ":" <<
// ntohs(remoteParticipant->port)
// << "\n";
// Set up the destination address
@ -98,9 +161,11 @@ bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) {
dest_addr.sin_addr.s_addr = inet_addr(remoteParticipant->ipAddress);
// Send the message
int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr));
int sent_bytes = sendto(sock, this->buffer, bufferSize, 0,
(struct sockaddr*)&dest_addr, sizeof(dest_addr));
if (sent_bytes < 0) {
std::cerr << "sendto failed with error: " << sent_bytes << " " << strerror(errno) << std::endl;
std::cerr << "sendto failed with error: " << sent_bytes << " "
<< strerror(errno) << std::endl;
close(sock);
return false;
}
@ -108,6 +173,19 @@ bool ParticipantUDP::Send(Participant* remoteParticipant, int bufferSize) {
return true;
}
bool ParticipantUDP::SendTCP(int bufferSize) {
#if defined(__unix__) || defined(__APPLE__)
// send(sock, this->buffer, bufferSize, 0);
ssize_t sent_bytes = send(this->sock, this->buffer, bufferSize, 0);
if (sent_bytes < 0) {
std::cerr << "Failed to send packet" << strerror(errno) << std::endl;
close(sock);
return -1;
}
#endif
return false;
}
bool ParticipantUDP::Publish(IMessage* msg) {
#if defined(__unix__) || defined(__APPLE__)
int bufferSize = msg->Serialize(this->buffer);
@ -116,8 +194,11 @@ bool ParticipantUDP::Publish(IMessage* msg) {
char ip_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(broadcast_addr.sin_addr), ip_str, INET_ADDRSTRLEN);
std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port) << "\n";
int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr*)&broadcast_addr, sizeof(broadcast_addr));
std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port)
<< "\n";
int sent_bytes =
sendto(sock, this->buffer, bufferSize, 0,
(struct sockaddr*)&broadcast_addr, sizeof(broadcast_addr));
if (sent_bytes < 0) {
std::cerr << "sendto failed with error: " << sent_bytes << std::endl;
close(sock);

View File

@ -8,8 +8,12 @@ namespace Posix {
class ParticipantUDP : public RoboidControl::ParticipantUDP {
public:
void Setup(int localPort, const char* remoteIpAddress, int remotePort);
void SetupTCP(const char* remoteIpAddress, int remotePort);
void Receive();
int ReceiveTCP();
bool Send(Participant* remoteParticipant, int bufferSize);
bool SendTCP(int bufferSize);
bool Publish(IMessage* msg);
protected:

View File

@ -81,7 +81,7 @@ TEST(Participant, MQTT) {
unsigned long milliseconds = Thing::GetTimeMs();
unsigned long startTime = milliseconds;
while (milliseconds < startTime + 2000) {
while (milliseconds < startTime + 10000) {
participant->Update();
std::this_thread::sleep_for(std::chrono::milliseconds(100));