diff --git a/Messages.cpp b/Messages.cpp index e9d7358..64c1663 100644 --- a/Messages.cpp +++ b/Messages.cpp @@ -33,22 +33,6 @@ unsigned char *IMessage::ReceiveMsg(unsigned char packetSize) { // IMessage #pragma endregion -#pragma region Network Id - -NetworkIdMsg::NetworkIdMsg(char *buffer) { this->networkId = buffer[1]; } - -// void NetworkIdMsg::Deserialize(unsigned char *buffer) { -// this->networkId = buffer[1]; -// } - -NetworkIdMsg NetworkIdMsg::Receive(char *buffer, unsigned char bufferSize) { - NetworkIdMsg msg = NetworkIdMsg(buffer); - return msg; -} - -// Network Id -#pragma endregion - #pragma region Investigate InvestigateMsg::InvestigateMsg(char *buffer) { diff --git a/Messages.h b/Messages.h index 37da6c1..b9ba569 100644 --- a/Messages.h +++ b/Messages.h @@ -21,17 +21,6 @@ public: // bool SendTo(Participant *participant); }; -class NetworkIdMsg : public IMessage { -public: - static const unsigned char id = 0xA1; - static const unsigned char length = 2; - unsigned char networkId; - - NetworkIdMsg(char *buffer); - - static NetworkIdMsg Receive(char *buffer, unsigned char bufferSize); -}; - class InvestigateMsg : public IMessage { public: static const unsigned char id = 0x81; diff --git a/NetworkIdMsg.cpp b/NetworkIdMsg.cpp new file mode 100644 index 0000000..69234ef --- /dev/null +++ b/NetworkIdMsg.cpp @@ -0,0 +1,25 @@ +#include "NetworkIdMsg.h" + +namespace Passer { +namespace Control { + +NetworkIdMsg::NetworkIdMsg(const char *buffer) { this->networkId = buffer[1]; } + +NetworkIdMsg::NetworkIdMsg(unsigned char networkId) { + this->networkId = networkId; +} + +unsigned char NetworkIdMsg::Serialize(char *buffer) { + unsigned char ix = 0; + buffer[ix++] = this->id; + buffer[ix++] = this->networkId; + return NetworkIdMsg::length; +} + +// NetworkIdMsg NetworkIdMsg::Receive(char *buffer, unsigned char bufferSize) { +// NetworkIdMsg msg = NetworkIdMsg(buffer); +// return msg; +// } + +} // namespace Control +} // namespace Passer \ No newline at end of file diff --git a/NetworkIdMsg.h b/NetworkIdMsg.h new file mode 100644 index 0000000..81cb4f0 --- /dev/null +++ b/NetworkIdMsg.h @@ -0,0 +1,20 @@ +#include "Messages.h" + +namespace Passer { +namespace Control { + +class NetworkIdMsg : public IMessage { +public: + static const unsigned char id = 0xA1; + static const unsigned char length = 2; + unsigned char networkId; + + NetworkIdMsg(const char *buffer); + NetworkIdMsg(unsigned char networkId); + + virtual unsigned char Serialize(char *buffer) override; + // static NetworkIdMsg Receive(char *buffer, unsigned char bufferSize); +}; + +} // namespace Control +} // namespace Passer diff --git a/Participant.cpp b/Participant.cpp index dce6631..28cc070 100644 --- a/Participant.cpp +++ b/Participant.cpp @@ -21,6 +21,15 @@ Participant::Participant(const char *ipAddress, int port) { this->ipAddress = ipAddress; this->port = port; + this->participants.push_back(this); + + int randomPort = (rand() % (65535 - 49152 + 1)) + 49152; + SetupUDP(randomPort, ipAddress, port); +} + +void Passer::Control::Participant::SetupUDP(int localPort, + const char *remoteIpAddress, + int remotePort) { // Create a UDP socket #if defined(_WIN32) || defined(_WIN64) // Windows-specific Winsock initialization @@ -42,27 +51,58 @@ Participant::Participant(const char *ipAddress, int port) { return; } - // Set up the server address +// Set the socket to non-blocking mode +#if defined(_WIN32) || defined(_WIN64) + u_long mode = 1; // 1 to enable non-blocking socket + ioctlsocket(this->sock, FIONBIO, &mode); +#elif defined(__unix__) || defined(__APPLE__) + int flags = fcntl(this->sock, F_GETFL, 0); + fcntl(this->sock, F_SETFL, flags | O_NONBLOCK); +#endif + + if (remotePort != 0) { + // Set up the address to send to + memset(&remote_addr, 0, sizeof(remote_addr)); + remote_addr.sin_family = AF_INET; + remote_addr.sin_port = htons((u_short)remotePort); + if (inet_pton(AF_INET, remoteIpAddress, &remote_addr.sin_addr) <= 0) { + std::cerr << "Invalid address" << std::endl; + closesocket(sock); + WSACleanup(); + return; + } + } + + // Set up the receiving address memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; - server_addr.sin_port = htons((u_short)port); // Port to send the packet to - if (inet_pton(AF_INET, ipAddress, &server_addr.sin_addr) <= 0) { + server_addr.sin_port = htons((u_short)localPort); + if (inet_pton(AF_INET, "0.0.0.0", &server_addr.sin_addr) <= 0) { std::cerr << "Invalid address" << std::endl; closesocket(sock); WSACleanup(); return; } + // Bind the socket to the specified port + if (bind(this->sock, (const struct sockaddr *)&server_addr, + sizeof(server_addr)) < 0) { + std::cerr << "Bind failed" << std::endl; + closesocket(this->sock); + return; + } + // Set up the broadcast address memset(&broadcast_addr, 0, sizeof(broadcast_addr)); broadcast_addr.sin_family = AF_INET; - broadcast_addr.sin_port = htons((u_short)port); // Port to send the packet to + broadcast_addr.sin_port = htons((u_short)remotePort); if (inet_pton(AF_INET, "255.255.255.255", &broadcast_addr.sin_addr) <= 0) { std::cerr << "Invalid address" << std::endl; closesocket(sock); WSACleanup(); return; } + BOOL broadcast = TRUE; if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, (char *)&broadcast, sizeof(broadcast)) == SOCKET_ERROR) { @@ -71,14 +111,65 @@ Participant::Participant(const char *ipAddress, int port) { } void Participant::Update(unsigned long currentTimeMs) { - // std::cout << "update\n"; if (currentTimeMs > this->nextPublishMe) { - ClientMsg msg = ClientMsg(this->networkId); - this->Publish(&msg); - // Console.WriteLine($"{this.name} Sent ClientMsg {this.networkId}"); - std::cout << this->name << " sent ClientMsg\n"; + this->Publish(&ClientMsg(this->networkId)); + std::cout << this->name << " published ClientMsg\n"; this->nextPublishMe = currentTimeMs + this->publishInterval; } + this->ReceiveUDP(); + + Thing::UpdateAll(currentTimeMs); +} + +void Participant::ReceiveUDP() { + // char ip_str[INET_ADDRSTRLEN]; + // inet_ntop(AF_INET, &(server_addr.sin_addr), ip_str, INET_ADDRSTRLEN); + // std::cout << this->name << " Receive on " << ip_str << ":" + // << ntohs(server_addr.sin_port) << "\n"; + + sockaddr_in client_addr; + int len = sizeof(client_addr); + int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0, + (struct sockaddr *)&client_addr, &len); + // std::cout << "received data " << packetSize << "\n"; + if (packetSize < 0) { + int error_code = WSAGetLastError(); // Get the error code on Windows + if (error_code != WSAEWOULDBLOCK) + std::cerr << "recvfrom failed with error: " << error_code << std::endl; + + } else if (packetSize > 0) { + char sender_ipAddress[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress, + INET_ADDRSTRLEN); + int sender_port = ntohs(client_addr.sin_port); + + Participant *remoteParticipant = + this->GetParticipant(sender_ipAddress, sender_port); + if (remoteParticipant == nullptr) { + remoteParticipant = this->AddParticipant(sender_ipAddress, sender_port); + std::cout << "New remote participant " << sender_ipAddress << ":" + << sender_port << " " + << (unsigned char)remoteParticipant->networkId << "|\n"; + } + + ReceiveData(packetSize, remoteParticipant); + } +} + +Participant *Participant::GetParticipant(const char *ipAddress, int port) { + for (Participant *participant : this->participants) { + if (participant->ipAddress == ipAddress && participant->port == port) + return participant; + } + return nullptr; +} + +Participant *Participant::AddParticipant(const char *ipAddress, int port) { + Participant *participant = new Participant(ipAddress, port); + participant->networkId = (unsigned char)this->participants.size(); + std::cout << "nw id " << (unsigned char)participant->networkId << "\n"; + this->participants.push_back(participant); + return participant; } #pragma region Send @@ -100,16 +191,16 @@ void Participant::SendThingInfo(Thing *thing) { bool Participant::Send(IMessage *msg) { // Send the message to the specified address and port int bufferSize = msg->Serialize(this->buffer); - std::cout << "buffer size " << bufferSize << "\n"; + // std::cout << "buffer size " << bufferSize << "\n"; if (bufferSize <= 0) return true; - // char ip_str[INET_ADDRSTRLEN]; - // inet_ntop(AF_INET, &(server_addr.sin_addr), ip_str, INET_ADDRSTRLEN); - // std::cout << "Publish to " << ip_str << ":" << ntohs(server_addr.sin_port) - // << "\n"; + char ip_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(server_addr.sin_addr), ip_str, INET_ADDRSTRLEN); + std::cout << "Publish to " << ip_str << ":" << ntohs(server_addr.sin_port) + << "\n"; int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, - (struct sockaddr *)&server_addr, sizeof(server_addr)); + (struct sockaddr *)&remote_addr, sizeof(remote_addr)); if (sent_bytes == SOCKET_ERROR) { int error_code = WSAGetLastError(); // Get the error code on Windows std::cerr << "sendto failed with error: " << error_code << std::endl; @@ -121,16 +212,14 @@ bool Participant::Send(IMessage *msg) { } bool Participant::Publish(IMessage *msg) { - // Send the message to the specified address and port int bufferSize = msg->Serialize(this->buffer); - std::cout << "buffer size " << bufferSize << "\n"; if (bufferSize <= 0) return true; - // char ip_str[INET_ADDRSTRLEN]; - // inet_ntop(AF_INET, &(server_addr.sin_addr), ip_str, INET_ADDRSTRLEN); - // std::cout << "Publish to " << ip_str << ":" << ntohs(server_addr.sin_port) - // << "\n"; + char ip_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(broadcast_addr.sin_addr), ip_str, INET_ADDRSTRLEN); + std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port) + << "\n"; int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr *)&broadcast_addr, sizeof(broadcast_addr)); @@ -181,8 +270,8 @@ void Participant::ReceiveData(unsigned char bufferSize, void Participant::Process(Participant *sender, ClientMsg *msg) {} void Participant::Process(Participant *sender, NetworkIdMsg *msg) { - std::cout << this->name << " receive network id " << this->networkId << " " - << msg->networkId << "\n"; + std::cout << this->name << " receive network id " << (int)this->networkId + << " " << msg->networkId << "\n"; if (this->networkId != msg->networkId) { this->networkId = msg->networkId; Thing **allThings = Thing::GetAllThings(); diff --git a/Participant.h b/Participant.h index 83f927e..1f99129 100644 --- a/Participant.h +++ b/Participant.h @@ -4,8 +4,11 @@ #include "Messages.h" #include "ModelUrlMsg.h" #include "NameMsg.h" +#include "NetworkIdMsg.h" #include "ThingMsg.h" +#include + #if defined(_WIN32) || defined(_WIN64) #include #endif @@ -26,6 +29,7 @@ public: int port = 0; SOCKET sock; + sockaddr_in remote_addr; sockaddr_in server_addr; sockaddr_in broadcast_addr; @@ -42,8 +46,17 @@ public: void ReceiveData(unsigned char bufferSize, Participant *remoteParticipant); protected: + std::list participants; + unsigned long nextPublishMe = 0; + void SetupUDP(int localPort, const char *remoteIpAddress, int remotePort); + + Participant *GetParticipant(const char *ipAddress, int port); + Participant *AddParticipant(const char *ipAddress, int port); + + void ReceiveUDP(); + virtual void Process(Participant *sender, ClientMsg *msg); virtual void Process(Participant *sender, NetworkIdMsg *msg); virtual void ProcessInvestigateMsg(InvestigateMsg msg); diff --git a/SiteServer.cpp b/SiteServer.cpp index 15721ac..e75e076 100644 --- a/SiteServer.cpp +++ b/SiteServer.cpp @@ -5,16 +5,22 @@ Passer::Control::SiteServer::SiteServer(int port) { this->ipAddress = "0.0.0.0"; this->port = port; + + this->participants.push_back(this); + + SetupUDP(port, ipAddress, 0); } void Passer::Control::SiteServer::Update(unsigned long currentTimeMs) { + this->ReceiveUDP(); Thing::UpdateAll(currentTimeMs); } void Passer::Control::SiteServer::Process(Participant *sender, ClientMsg *msg) { if (msg->networkId == 0) { - std::cout << this->name << " received New Client -> " << sender->networkId - << "\n"; + std::cout << this->name << " received New Client -> " + << (unsigned char)sender->networkId << "\n"; + sender->Send(&NetworkIdMsg(sender->networkId)); } } diff --git a/test/thing_test.cc b/test/thing_test.cc index d38fad4..e261fb5 100644 --- a/test/thing_test.cc +++ b/test/thing_test.cc @@ -5,6 +5,7 @@ #include #include +#include #include #include "Participant.h" @@ -60,5 +61,21 @@ TEST_F(ControlCoreSuite, SiteServer) { ASSERT_EQ(1, 1); } +TEST_F(ControlCoreSuite, SiteParticipant) { + SiteServer site = SiteServer(7681); + Participant participant = Participant("127.0.0.1", 7681); + + unsigned long milliseconds = get_time_ms(); + unsigned long startTime = milliseconds; + while (milliseconds < startTime + 1000) { + site.Update(milliseconds); + participant.Update(milliseconds); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + milliseconds = get_time_ms(); + } + ASSERT_EQ(1, 1); +} + } // namespace Passer #endif