server still rejects this

This commit is contained in:
Pascal Serrarens 2025-06-29 13:01:31 +02:00
parent ff8570e2aa
commit 5b582fe9b8
7 changed files with 133 additions and 96 deletions

View File

@ -24,13 +24,6 @@ MQTTParticipantBase::MQTTParticipantBase(const char* ipAddress, int port)
this->Add(this->root);
Participant::ReplaceLocalParticipant(*this);
std::cout << "Setup TCP connection" << std::endl;
// thisWindows.SetupTCP(ipAddress, port);
// send_mqtt_connect("RoboidControl");
// sendSubscribe("domoticz/out");
// #endif
}
void MQTTParticipantBase::SetupTCP() {
@ -51,7 +44,7 @@ void MQTTParticipantBase::send_mqtt_connect(const char* client_id) {
packet.protocol_name = "MQTT";
packet.protocol_level = 4; // MQTT version 3.1.1
packet.connect_flags = 2; // Clean session
packet.keep_alive = htons(60); // Keep alive time in seconds
packet.keep_alive = htons(15); // Keep alive time in seconds
// Create the MQTT connect packet
int index = 0;
@ -67,9 +60,6 @@ void MQTTParticipantBase::send_mqtt_connect(const char* client_id) {
this->buffer[index++] = packet.connect_flags;
this->buffer[index++] = packet.keep_alive & 0xFF; // LSB
this->buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB
// this->buffer[index++] = 0; // MSB: no
// client ID this->buffer[index++] = 0; //
// LSB: no client ID
size_t client_id_length = strlen(client_id);
this->buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB
this->buffer[index++] = client_id_length & 0xFF; // LSB
@ -77,7 +67,8 @@ void MQTTParticipantBase::send_mqtt_connect(const char* client_id) {
index += client_id_length;
for (int ix = 0; ix < index; ix++)
std::cout << std::hex << (int)this->buffer[ix] << std::dec << "\n";
std::cout << std::hex << "0x" << (int)this->buffer[ix] << " ";
std::cout << std::dec << std::endl;
// Send the MQTT connect packet
SendTCP(index);
@ -131,7 +122,7 @@ void MQTTParticipantBase::Update(bool recurse) {
void MQTTParticipantBase::SendTCP(int bufferSize) {}
int MQTTParticipantBase::ReceiveTCP() {
return false;
return 0;
}
void MQTTParticipantBase::ReceiveData(unsigned char bufferSize) {

View File

@ -63,10 +63,6 @@ protected:
RemoteParticipantUDP* remoteSite = nullptr;
protected:
#if defined(__unix__) || defined(__APPLE__)
sockaddr_in server_addr;
int sock;
#endif
public:
// void Begin();
@ -105,7 +101,7 @@ protected:
#pragma region Receive
protected:
int ReceiveTCP();
virtual int ReceiveTCP();
void ReceiveData(unsigned char bufferSize);
// void ReceiveData(unsigned char bufferSize, Participant*
// remoteParticipant);
@ -130,3 +126,5 @@ protected:
};
} // namespace RoboidControl
#include "Posix/PosixMQTT.h"

View File

@ -1,2 +1,95 @@
#include "PosixMQTT.h"
#if defined(__unix__) || defined(__APPLE__)
#include <arpa/inet.h>
#include <fcntl.h> // For fcntl
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <chrono>
#include <thread>
namespace RoboidControl {
MQTTParticipant::MQTTParticipant(const char* remoteIpAddress, int port)
: MQTTParticipantBase(remoteIpAddress, port) {
SetupTCP();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
send_mqtt_connect("RoboidControl1");
// sendSubscribe("domoticz/out");
}
void MQTTParticipant::SetupTCP() {
// Create a TCP socket
this->sock = socket(AF_INET, SOCK_STREAM, 0);
if (this->sock < 0) {
std::cerr << "Error creating socket" << std::endl;
return;
}
// Set the socket to non-blocking mode
int flags = fcntl(this->sock, F_GETFL, 0);
fcntl(this->sock, F_SETFL, flags | O_NONBLOCK);
this->remote_addr.sin_family = AF_INET;
this->remote_addr.sin_port = htons(this->remoteSite->port);
int result =
inet_pton(AF_INET, this->remoteSite->ipAddress, &remote_addr.sin_addr);
if (result <= 0) {
std::cerr << "Invalid address/ Address not supported" << std::endl;
close(this->sock);
return;
}
int connect_result =
connect(this->sock, (struct sockaddr*)&remote_addr, sizeof(remote_addr));
if (connect_result < 0 && errno != EINPROGRESS) {
std::cerr << "Error connecting to server:" << (int)errno << std::endl;
close(this->sock);
return;
}
std::cout << "TCP connection to " << this->remoteSite->ipAddress << ":"
<< this->remoteSite->port << "\n";
this->connected = true;
}
void MQTTParticipant::SendTCP(int bufferSize) {
send(this->sock, this->buffer, bufferSize, 0);
std::cout << "Posix: sent TCP\n";
}
int MQTTParticipant::ReceiveTCP() {
if (this->connected == false)
return 0;
int bytesReceived = recv(this->sock, this->buffer, sizeof(this->buffer) - 1, 0);
if (bytesReceived > 0) {
std::cout << " !\n";
buffer[bytesReceived] = '\0'; // Null-terminate the received data
std::cout << "Received: " << this->buffer << std::endl;
return bytesReceived;
} else if (bytesReceived == 0) {
this->connected = false;
// Connection has been gracefully closed
std::cout << "Connection closed by the server." << std::endl;
return 0;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No data available to read, continue the loop
// std::cout << "No data available" << std::endl;
} else {
std::cerr << "Error receiving data: " << strerror(errno) << std::endl;
}
return 0;
}
return 0;
}
} // namespace RoboidControl
#endif

View File

@ -6,18 +6,20 @@
namespace RoboidControl {
class MQTTParticipant : public MQTTParticipantBase {
public:
MQTTParticipant(const char* ipAddress, int port = 1883);
public:
MQTTParticipant(const char* ipAddress, int port = 1883);
protected:
void SetupTCP() override;
void SetupTCP() override;
void SendTCP(int bufferSize) override;
void SendTCP(int bufferSize) override;
int ReceiveTCP() override;
#if defined(__unix__) || defined(__APPLE__)
sockaddr_in remote_addr;
#endif
int sock;
};
}
} // namespace RoboidControl
#endif

View File

@ -128,29 +128,29 @@ void ParticipantUDP::ReceiveUDP() {
#endif
}
int ParticipantUDP::ReceiveTCP() {
#if defined(__unix__) || defined(__APPLE__)
int bytesReceived = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (bytesReceived > 0) {
buffer[bytesReceived] = '\0'; // Null-terminate the received data
std::cout << "Received: " << buffer << std::endl;
return bytesReceived;
} else if (bytesReceived == 0) {
// Connection has been gracefully closed
std::cout << "Connection closed by the server." << std::endl;
return 0;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No data available to read, continue the loop
// std::cout << "No data available" << std::endl;
} else {
std::cerr << "Error receiving data: " << strerror(errno) << std::endl;
}
return 0;
}
#endif // _WIN32 || _WIN64
return 0;
}
// int ParticipantUDP::ReceiveTCP() {
// #if defined(__unix__) || defined(__APPLE__)
// int bytesReceived = recv(sock, buffer, sizeof(buffer) - 1, 0);
// if (bytesReceived > 0) {
// buffer[bytesReceived] = '\0'; // Null-terminate the received data
// std::cout << "Received: " << buffer << std::endl;
// return bytesReceived;
// } else if (bytesReceived == 0) {
// // Connection has been gracefully closed
// std::cout << "Connection closed by the server." << std::endl;
// return 0;
// } else {
// if (errno == EAGAIN || errno == EWOULDBLOCK) {
// // No data available to read, continue the loop
// // std::cout << "No data available" << std::endl;
// } else {
// std::cerr << "Error receiving data: " << strerror(errno) << std::endl;
// }
// return 0;
// }
// #endif // _WIN32 || _WIN64
// return 0;
// }
bool ParticipantUDP::SendTo(RemoteParticipantUDP* remoteParticipant,
int bufferSize) {

View File

@ -1,47 +0,0 @@
#include "PoxisMQTT.h"
#if defined(__unix__) || defined(__APPLE__)
#include <arpa/inet.h>
#include <fcntl.h> // For fcntl
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
namespace RoboidControl {
RoboidControl::MQTTParticipant::MQTTParticipant(const char* remoteIpAddress,
int port)
: MQTTParticipantBase(remoteIpAddress, port) {}
void MQTTParticipant::SetupTCP() {
// Create a TCP socket
this->sock = socket(AF_INET, SOCK_STREAM, 0);
if (this->sock < 0) {
std::cerr << "Error creating socket" << std::endl;
return;
}
this->remote_addr.sin_family = AF_INET;
this->remote_addr.sin_port = htons(this->remoteSite->port);
inet_pton(AF_INET, this->remoteSite->ipAddress, &remote_addr.sin_addr);
int connect_result =
connect(this->sock, (struct sockaddr*)&remote_addr, sizeof(remote_addr));
if (connect_result < 0) { //} && errno != EINPROGRESS) {
std::cerr << "Error connecting to server" << std::endl;
close(this->sock);
}
std::cout << "TCP connection to " << this->remoteSite->ipAddress << ":" << this->remoteSite->port
<< "\n";
// Set the socket to non-blocking mode
int flags = fcntl(this->sock, F_GETFL, 0);
fcntl(this->sock, F_SETFL, flags | O_NONBLOCK);
}
void MQTTParticipant::SendTCP(int bufferSize) {
send(this->sock, this->buffer, bufferSize, 0);
}
} // namespace RoboidControl
#endif

View File

@ -77,7 +77,7 @@ TEST(Participant, ThingMsg) {
}
TEST(Participant, MQTT) {
MQTTParticipantBase* participant = new MQTTParticipantBase("192.168.77.11");
MQTTParticipant* participant = new MQTTParticipant("192.168.77.11");
unsigned long milliseconds = Thing::GetTimeMs();
unsigned long startTime = milliseconds;