#include "Participant.h" // #include #define BUF_SIZE 1024 #if defined(_WIN32) || defined(_WIN64) #include #include #pragma comment(lib, "ws2_32.lib") #elif defined(__unix__) || defined(__APPLE__) #include #include #include #include #include // For fcntl #endif Participant::Participant() {} Participant::Participant(const char *ipAddress, int port) { this->ipAddress = ipAddress; this->port = port; this->participants.push_back(this); int randomPort = (rand() % (65535 - 49152 + 1)) + 49152; SetupUDP(randomPort, ipAddress, port); } void Passer::Control::Participant::SetupUDP(int localPort, const char *remoteIpAddress, int remotePort) { // Create a UDP socket #if defined(_WIN32) || defined(_WIN64) // Windows-specific Winsock initialization WSADATA wsaData; if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { std::cerr << "WSAStartup failed" << std::endl; return; } #endif #if defined(_WIN32) || defined(_WIN64) this->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); #elif defined(__unix__) || defined(__APPLE__) this->sock = socket(AF_INET, SOCK_DGRAM, 0); #endif if (this->sock < 0) { std::cerr << "Error creating socket" << std::endl; return; } // Set the socket to non-blocking mode #if defined(_WIN32) || defined(_WIN64) u_long mode = 1; // 1 to enable non-blocking socket ioctlsocket(this->sock, FIONBIO, &mode); #elif defined(__unix__) || defined(__APPLE__) int flags = fcntl(this->sock, F_GETFL, 0); fcntl(this->sock, F_SETFL, flags | O_NONBLOCK); #endif if (remotePort != 0) { // Set up the address to send to memset(&remote_addr, 0, sizeof(remote_addr)); remote_addr.sin_family = AF_INET; remote_addr.sin_port = htons((u_short)remotePort); if (inet_pton(AF_INET, remoteIpAddress, &remote_addr.sin_addr) <= 0) { std::cerr << "Invalid address" << std::endl; #if defined(_WIN32) || defined(_WIN64) closesocket(sock); WSACleanup(); #elif defined(__unix__) || defined(__APPLE__) close(sock); #endif return; } } // Set up the receiving address memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons((u_short)localPort); if (inet_pton(AF_INET, "0.0.0.0", &server_addr.sin_addr) <= 0) { std::cerr << "Invalid address" << std::endl; #if defined(_WIN32) || defined(_WIN64) closesocket(sock); WSACleanup(); #elif defined(__unix__) || defined(__APPLE__) close(sock); #endif return; } // Bind the socket to the specified port if (bind(this->sock, (const struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { std::cerr << "Bind failed" << std::endl; #if defined(_WIN32) || defined(_WIN64) closesocket(sock); WSACleanup(); #elif defined(__unix__) || defined(__APPLE__) close(sock); #endif return; } // Bind the socket to the specified port // if (bind(this->sock, (const struct sockaddr *)&server_addr, // sizeof(server_addr)) < 0) { // std::cerr << "Bind failed" << std::endl; // #if defined(_WIN32) || defined(_WIN64) // closesocket(sock); // WSACleanup(); // #elif defined(__unix__) || defined(__APPLE__) // close(sock); // #endif // return; // } // Set up the broadcast address memset(&broadcast_addr, 0, sizeof(broadcast_addr)); broadcast_addr.sin_family = AF_INET; broadcast_addr.sin_port = htons((u_short)remotePort); if (inet_pton(AF_INET, "255.255.255.255", &broadcast_addr.sin_addr) <= 0) { std::cerr << "Invalid address" << std::endl; #if defined(_WIN32) || defined(_WIN64) closesocket(sock); WSACleanup(); #elif defined(__unix__) || defined(__APPLE__) close(sock); #endif return; } #if defined(_WIN32) || defined(_WIN64) BOOL broadcast = TRUE; if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, (char *)&broadcast, sizeof(broadcast)) == SOCKET_ERROR) { std::cerr << "Setting socket option for broadcast failed" << std::endl; } #elif defined(__unix__) || defined(__APPLE__) int broadcastEnable = 1; if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable)) < 0) { std::cerr << "Failed to set socket options" << std::endl; } #endif } void Participant::Update(unsigned long currentTimeMs) { if (currentTimeMs > this->nextPublishMe) { ClientMsg* msg = new ClientMsg(this->networkId); this->Publish(msg); delete msg; std::cout << this->name << " published ClientMsg\n"; this->nextPublishMe = currentTimeMs + this->publishInterval; } this->ReceiveUDP(); Thing::UpdateAll(currentTimeMs); } void Participant::ReceiveUDP() { // char ip_str[INET_ADDRSTRLEN]; // inet_ntop(AF_INET, &(server_addr.sin_addr), ip_str, INET_ADDRSTRLEN); // std::cout << this->name << " Receive on " << ip_str << ":" // << ntohs(server_addr.sin_port) << "\n"; sockaddr_in client_addr; #if defined(_WIN32) || defined(_WIN64) int len = sizeof(client_addr); #elif defined(__unix__) || defined(__APPLE__) socklen_t len = sizeof(client_addr); #endif int packetSize = recvfrom(this->sock, buffer, sizeof(buffer), 0, (struct sockaddr *)&client_addr, &len); // std::cout << "received data " << packetSize << "\n"; if (packetSize < 0) { #if defined(_WIN32) || defined(_WIN64) int error_code = WSAGetLastError(); // Get the error code on Windows if (error_code != WSAEWOULDBLOCK) std::cerr << "recvfrom failed with error: " << error_code << std::endl; #else //std::cerr << "recvfrom failed with error: " << packetSize << std::endl; #endif } else if (packetSize > 0) { char sender_ipAddress[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &(client_addr.sin_addr), sender_ipAddress, INET_ADDRSTRLEN); int sender_port = ntohs(client_addr.sin_port); Participant *remoteParticipant = this->GetParticipant(sender_ipAddress, sender_port); if (remoteParticipant == nullptr) { remoteParticipant = this->AddParticipant(sender_ipAddress, sender_port); // std::cout << "New sender " << sender_ipAddress << ":" // << sender_port << "\n"; // std::cout << "New remote participant " << remoteParticipant->ipAddress << ":" // << remoteParticipant->port << " " // << (int)remoteParticipant->networkId << "\n"; } ReceiveData(packetSize, remoteParticipant); } } Participant *Participant::GetParticipant(const char *ipAddress, int port) { for (Participant *participant : this->participants) { if (participant->ipAddress == ipAddress && participant->port == port) return participant; } return nullptr; } Participant *Participant::AddParticipant(const char *ipAddress, int port) { Participant *participant = new Participant(ipAddress, port); participant->networkId = (unsigned char)this->participants.size(); this->participants.push_back(participant); return participant; } #pragma region Send void Participant::SendThingInfo(Thing *thing) { std::cout << "Send thing info\n"; IMessage *msg = new ThingMsg(this->networkId, thing); this->Send(msg); delete msg; msg = new NameMsg(this->networkId, thing); this->Send(msg); delete msg; msg = new ModelUrlMsg(this->networkId, thing); this->Send(msg); delete msg; // this->Send(&ModelUrlMsg(this->networkId, thing)); } bool Participant::Send(IMessage *msg) { // Send the message to the specified address and port int bufferSize = msg->Serialize(this->buffer); // std::cout << "buffer size " << bufferSize << "\n"; if (bufferSize <= 0) return true; char ip_str[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &(remote_addr.sin_addr), ip_str, INET_ADDRSTRLEN); std::cout << "Send to " << ip_str << ":" << ntohs(remote_addr.sin_port) << "\n"; int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr *)&remote_addr, sizeof(remote_addr)); #if defined(_WIN32) || defined(_WIN64) if (sent_bytes <= SOCKET_ERROR) { int error_code = WSAGetLastError(); // Get the error code on Windows std::cerr << "sendto failed with error: " << error_code << std::endl; closesocket(sock); WSACleanup(); return false; } #elif defined(__unix__) || defined(__APPLE__) if (sent_bytes < 0) { std::cerr << "sendto failed with error: " << sent_bytes << std::endl; close(sock); return false; } #endif return true; } bool Participant::Publish(IMessage *msg) { int bufferSize = msg->Serialize(this->buffer); if (bufferSize <= 0) return true; char ip_str[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &(broadcast_addr.sin_addr), ip_str, INET_ADDRSTRLEN); std::cout << "Publish to " << ip_str << ":" << ntohs(broadcast_addr.sin_port) << "\n"; int sent_bytes = sendto(sock, this->buffer, bufferSize, 0, (struct sockaddr *)&broadcast_addr, sizeof(broadcast_addr)); #if defined(_WIN32) || defined(_WIN64) if (sent_bytes <= SOCKET_ERROR) { int error_code = WSAGetLastError(); // Get the error code on Windows std::cerr << "sendto failed with error: " << error_code << std::endl; closesocket(sock); WSACleanup(); return false; } #elif defined(__unix__) || defined(__APPLE__) if (sent_bytes < 0) { std::cerr << "sendto failed with error: " << sent_bytes << std::endl; close(sock); return false; } #endif return true; } // Send #pragma endregion #pragma region Receive void Participant::ReceiveData(unsigned char bufferSize, Participant *remoteParticipant) { unsigned char msgId = this->buffer[0]; switch (msgId) { case ClientMsg::id: { ClientMsg* msg = new ClientMsg(this->buffer); Process(remoteParticipant, msg); delete msg; } case NetworkIdMsg::id: { NetworkIdMsg* msg = new NetworkIdMsg(this->buffer); Process(remoteParticipant, msg); delete msg; } break; case InvestigateMsg::id: { InvestigateMsg msg = InvestigateMsg(this->buffer); ProcessInvestigateMsg(msg); } break; case ThingMsg::id: { ThingMsg msg = ThingMsg(this->buffer); ProcessThingMsg(msg); } break; case PoseMsg::id: { PoseMsg msg = PoseMsg(this->buffer); ProcessPoseMsg(msg); } break; case CustomMsg::id: { CustomMsg msg = CustomMsg(this->buffer); ProcessCustomMsg(msg); } break; }; } void Participant::Process(Participant *sender, ClientMsg *msg) {} void Participant::Process(Participant *sender, NetworkIdMsg *msg) { std::cout << this->name << " receive network id " << (int)this->networkId << " " << (int)msg->networkId << "\n"; if (this->networkId != msg->networkId) { this->networkId = msg->networkId; Thing **allThings = Thing::GetAllThings(); for (uint16_t ix = 0; ix < THING_STORE_SIZE; ix++) { Thing *thing = allThings[ix]; if (thing == nullptr) continue; sender->SendThingInfo(thing); } } } void Participant::ProcessInvestigateMsg(InvestigateMsg msg) {} void Participant::ProcessThingMsg(ThingMsg msg) {} void Passer::Control::Participant::ProcessPoseMsg(PoseMsg msg) {} void Participant::ProcessCustomMsg(CustomMsg msg) {} // Receive #pragma endregion