diff --git a/Participants/ParticipantMQTT.cpp b/Participants/ParticipantMQTT.cpp index f23a1dc..4a3b6f3 100644 --- a/Participants/ParticipantMQTT.cpp +++ b/Participants/ParticipantMQTT.cpp @@ -24,13 +24,6 @@ MQTTParticipantBase::MQTTParticipantBase(const char* ipAddress, int port) this->Add(this->root); Participant::ReplaceLocalParticipant(*this); - std::cout << "Setup TCP connection" << std::endl; - - // thisWindows.SetupTCP(ipAddress, port); - - // send_mqtt_connect("RoboidControl"); - // sendSubscribe("domoticz/out"); - // #endif } void MQTTParticipantBase::SetupTCP() { @@ -51,7 +44,7 @@ void MQTTParticipantBase::send_mqtt_connect(const char* client_id) { packet.protocol_name = "MQTT"; packet.protocol_level = 4; // MQTT version 3.1.1 packet.connect_flags = 2; // Clean session - packet.keep_alive = htons(60); // Keep alive time in seconds + packet.keep_alive = htons(15); // Keep alive time in seconds // Create the MQTT connect packet int index = 0; @@ -67,9 +60,6 @@ void MQTTParticipantBase::send_mqtt_connect(const char* client_id) { this->buffer[index++] = packet.connect_flags; this->buffer[index++] = packet.keep_alive & 0xFF; // LSB this->buffer[index++] = (packet.keep_alive >> 8) & 0xFF; // MSB - // this->buffer[index++] = 0; // MSB: no - // client ID this->buffer[index++] = 0; // - // LSB: no client ID size_t client_id_length = strlen(client_id); this->buffer[index++] = (client_id_length >> 8) & 0xFF; // MSB this->buffer[index++] = client_id_length & 0xFF; // LSB @@ -77,7 +67,8 @@ void MQTTParticipantBase::send_mqtt_connect(const char* client_id) { index += client_id_length; for (int ix = 0; ix < index; ix++) - std::cout << std::hex << (int)this->buffer[ix] << std::dec << "\n"; + std::cout << std::hex << "0x" << (int)this->buffer[ix] << " "; + std::cout << std::dec << std::endl; // Send the MQTT connect packet SendTCP(index); @@ -131,7 +122,7 @@ void MQTTParticipantBase::Update(bool recurse) { void MQTTParticipantBase::SendTCP(int bufferSize) {} int MQTTParticipantBase::ReceiveTCP() { - return false; + return 0; } void MQTTParticipantBase::ReceiveData(unsigned char bufferSize) { diff --git a/Participants/ParticipantMQTT.h b/Participants/ParticipantMQTT.h index 2ed7fbb..76dd935 100644 --- a/Participants/ParticipantMQTT.h +++ b/Participants/ParticipantMQTT.h @@ -63,10 +63,6 @@ protected: RemoteParticipantUDP* remoteSite = nullptr; protected: -#if defined(__unix__) || defined(__APPLE__) - sockaddr_in server_addr; - int sock; -#endif public: // void Begin(); @@ -105,7 +101,7 @@ protected: #pragma region Receive protected: - int ReceiveTCP(); + virtual int ReceiveTCP(); void ReceiveData(unsigned char bufferSize); // void ReceiveData(unsigned char bufferSize, Participant* // remoteParticipant); @@ -130,3 +126,5 @@ protected: }; } // namespace RoboidControl + +#include "Posix/PosixMQTT.h" \ No newline at end of file diff --git a/Posix/PosixMQTT.cpp b/Posix/PosixMQTT.cpp index 5b1b2c6..dec21eb 100644 --- a/Posix/PosixMQTT.cpp +++ b/Posix/PosixMQTT.cpp @@ -1,2 +1,95 @@ #include "PosixMQTT.h" +#if defined(__unix__) || defined(__APPLE__) + +#include +#include // For fcntl +#include +#include +#include + +#include +#include + + +namespace RoboidControl { + +MQTTParticipant::MQTTParticipant(const char* remoteIpAddress, int port) + : MQTTParticipantBase(remoteIpAddress, port) { + SetupTCP(); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + send_mqtt_connect("RoboidControl1"); + // sendSubscribe("domoticz/out"); +} + +void MQTTParticipant::SetupTCP() { + // Create a TCP socket + this->sock = socket(AF_INET, SOCK_STREAM, 0); + if (this->sock < 0) { + std::cerr << "Error creating socket" << std::endl; + return; + } + + // Set the socket to non-blocking mode + int flags = fcntl(this->sock, F_GETFL, 0); + fcntl(this->sock, F_SETFL, flags | O_NONBLOCK); + + this->remote_addr.sin_family = AF_INET; + this->remote_addr.sin_port = htons(this->remoteSite->port); + int result = + inet_pton(AF_INET, this->remoteSite->ipAddress, &remote_addr.sin_addr); + if (result <= 0) { + std::cerr << "Invalid address/ Address not supported" << std::endl; + close(this->sock); + return; + } + + int connect_result = + connect(this->sock, (struct sockaddr*)&remote_addr, sizeof(remote_addr)); + if (connect_result < 0 && errno != EINPROGRESS) { + std::cerr << "Error connecting to server:" << (int)errno << std::endl; + close(this->sock); + return; + } + + std::cout << "TCP connection to " << this->remoteSite->ipAddress << ":" + << this->remoteSite->port << "\n"; + + this->connected = true; +} + +void MQTTParticipant::SendTCP(int bufferSize) { + send(this->sock, this->buffer, bufferSize, 0); + std::cout << "Posix: sent TCP\n"; +} + +int MQTTParticipant::ReceiveTCP() { + if (this->connected == false) + return 0; + + int bytesReceived = recv(this->sock, this->buffer, sizeof(this->buffer) - 1, 0); + if (bytesReceived > 0) { + std::cout << " !\n"; + buffer[bytesReceived] = '\0'; // Null-terminate the received data + std::cout << "Received: " << this->buffer << std::endl; + return bytesReceived; + } else if (bytesReceived == 0) { + this->connected = false; + // Connection has been gracefully closed + std::cout << "Connection closed by the server." << std::endl; + return 0; + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // No data available to read, continue the loop + // std::cout << "No data available" << std::endl; + } else { + std::cerr << "Error receiving data: " << strerror(errno) << std::endl; + } + return 0; + } + return 0; +} + +} // namespace RoboidControl +#endif \ No newline at end of file diff --git a/Posix/PoxisMQTT.h b/Posix/PosixMQTT.h similarity index 52% rename from Posix/PoxisMQTT.h rename to Posix/PosixMQTT.h index 05c447b..f1677a5 100644 --- a/Posix/PoxisMQTT.h +++ b/Posix/PosixMQTT.h @@ -6,18 +6,20 @@ namespace RoboidControl { class MQTTParticipant : public MQTTParticipantBase { -public: - MQTTParticipant(const char* ipAddress, int port = 1883); + public: + MQTTParticipant(const char* ipAddress, int port = 1883); + protected: - void SetupTCP() override; + void SetupTCP() override; - void SendTCP(int bufferSize) override; + void SendTCP(int bufferSize) override; + + int ReceiveTCP() override; -#if defined(__unix__) || defined(__APPLE__) sockaddr_in remote_addr; -#endif + int sock; }; -} +} // namespace RoboidControl #endif \ No newline at end of file diff --git a/Posix/PosixParticipant.cpp b/Posix/PosixParticipant.cpp index 89b7425..6b8d98d 100644 --- a/Posix/PosixParticipant.cpp +++ b/Posix/PosixParticipant.cpp @@ -128,29 +128,29 @@ void ParticipantUDP::ReceiveUDP() { #endif } -int ParticipantUDP::ReceiveTCP() { -#if defined(__unix__) || defined(__APPLE__) - int bytesReceived = recv(sock, buffer, sizeof(buffer) - 1, 0); - if (bytesReceived > 0) { - buffer[bytesReceived] = '\0'; // Null-terminate the received data - std::cout << "Received: " << buffer << std::endl; - return bytesReceived; - } else if (bytesReceived == 0) { - // Connection has been gracefully closed - std::cout << "Connection closed by the server." << std::endl; - return 0; - } else { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - // No data available to read, continue the loop - // std::cout << "No data available" << std::endl; - } else { - std::cerr << "Error receiving data: " << strerror(errno) << std::endl; - } - return 0; - } -#endif // _WIN32 || _WIN64 - return 0; -} +// int ParticipantUDP::ReceiveTCP() { +// #if defined(__unix__) || defined(__APPLE__) +// int bytesReceived = recv(sock, buffer, sizeof(buffer) - 1, 0); +// if (bytesReceived > 0) { +// buffer[bytesReceived] = '\0'; // Null-terminate the received data +// std::cout << "Received: " << buffer << std::endl; +// return bytesReceived; +// } else if (bytesReceived == 0) { +// // Connection has been gracefully closed +// std::cout << "Connection closed by the server." << std::endl; +// return 0; +// } else { +// if (errno == EAGAIN || errno == EWOULDBLOCK) { +// // No data available to read, continue the loop +// // std::cout << "No data available" << std::endl; +// } else { +// std::cerr << "Error receiving data: " << strerror(errno) << std::endl; +// } +// return 0; +// } +// #endif // _WIN32 || _WIN64 +// return 0; +// } bool ParticipantUDP::SendTo(RemoteParticipantUDP* remoteParticipant, int bufferSize) { diff --git a/Posix/PoxisMQTT.cpp b/Posix/PoxisMQTT.cpp deleted file mode 100644 index bd62531..0000000 --- a/Posix/PoxisMQTT.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "PoxisMQTT.h" -#if defined(__unix__) || defined(__APPLE__) - -#include -#include // For fcntl -#include -#include -#include - -namespace RoboidControl { - -RoboidControl::MQTTParticipant::MQTTParticipant(const char* remoteIpAddress, - int port) - : MQTTParticipantBase(remoteIpAddress, port) {} - -void MQTTParticipant::SetupTCP() { - // Create a TCP socket - this->sock = socket(AF_INET, SOCK_STREAM, 0); - if (this->sock < 0) { - std::cerr << "Error creating socket" << std::endl; - return; - } - - this->remote_addr.sin_family = AF_INET; - this->remote_addr.sin_port = htons(this->remoteSite->port); - inet_pton(AF_INET, this->remoteSite->ipAddress, &remote_addr.sin_addr); - - int connect_result = - connect(this->sock, (struct sockaddr*)&remote_addr, sizeof(remote_addr)); - if (connect_result < 0) { //} && errno != EINPROGRESS) { - std::cerr << "Error connecting to server" << std::endl; - close(this->sock); - } - - std::cout << "TCP connection to " << this->remoteSite->ipAddress << ":" << this->remoteSite->port - << "\n"; - - // Set the socket to non-blocking mode - int flags = fcntl(this->sock, F_GETFL, 0); - fcntl(this->sock, F_SETFL, flags | O_NONBLOCK); -} -void MQTTParticipant::SendTCP(int bufferSize) { - send(this->sock, this->buffer, bufferSize, 0); -} - -} // namespace RoboidControl -#endif \ No newline at end of file diff --git a/test/participant_test.cc b/test/participant_test.cc index bf5bf35..c82d59e 100644 --- a/test/participant_test.cc +++ b/test/participant_test.cc @@ -77,7 +77,7 @@ TEST(Participant, ThingMsg) { } TEST(Participant, MQTT) { - MQTTParticipantBase* participant = new MQTTParticipantBase("192.168.77.11"); + MQTTParticipant* participant = new MQTTParticipant("192.168.77.11"); unsigned long milliseconds = Thing::GetTimeMs(); unsigned long startTime = milliseconds;