combined site server & participant

This commit is contained in:
Pascal Serrarens 2025-01-01 15:13:13 +01:00
parent 5040464aab
commit 679a897b0d
3 changed files with 109 additions and 81 deletions

View File

@ -14,51 +14,61 @@ from Thing import Thing
class Participant: class Participant:
publishInterval = 3000 # 3 seconds publishInterval = 3000 # 3 seconds
buffer = bytearray(256) buffer = None
network_id = 0 network_id = None
nextPublishMe = 0 nextPublishMe = 0
others = [] others = None
thread = None thread = None
name = "Participant"
def __init__(self, ipAddress = "0.0.0.0", port=7681, remote=False): def __init__(self, ipAddress = "0.0.0.0", port=7681, remote=False, udp_socket =None):
# self.buffer = bytearray(256)
self.ip_address = ipAddress self.ip_address = ipAddress
self.port = port self.port = port
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.others = []
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.network_id = 0
# self.buffer = bytearray(256) self.buffer = bytearray(256)
if remote == False: if remote == False:
self.udp_socket.bind(("0.0.0.0", 7681)) self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# self.network_id = 0 self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# self.nextPublishMe = 0 self.udp_socket.bind(("0.0.0.0", 0))
# self.others = []
self.AddParticipant(self.ip_address, self.port) self.AddParticipant(self.ip_address, self.port)
self.thread = threading.Thread(target = self.Receiver) self.thread = threading.Thread(target = self.Receiver)
self.thread.daemon = True self.thread.daemon = True
self.thread.start() self.thread.start()
else:
self.udp_socket = udp_socket
#region Update
def GetParticipant(self, ip_address, port): def GetParticipant(self, ip_address, port):
# print(f'{self.name} Get participant {ip_address} {port}')
# for item in self.others:
# print(f'- {item.ip_address} {item.port}')
found_participants = (item for item in self.others found_participants = (item for item in self.others
if item.ip_address == ip_address and item.port == port) if item.ip_address == ip_address and item.port == port)
participant = next(found_participants, None) participant = next(found_participants, None)
return participant return participant
def AddParticipant(self, ip_address, port): def AddParticipant(self, ip_address, port):
remote_participant = Participant(ip_address, port, remote=True) # print(f'{self.name} Add participant {ip_address} {port}')
remote_participant = Participant(ip_address, port, remote=True, udp_socket=self.udp_socket)
remote_participant.network_id = len(self.others) remote_participant.network_id = len(self.others)
self.others.append(remote_participant) self.others.append(remote_participant)
return remote_participant return remote_participant
def Update(self, currentTimeMs): def Update(self, currentTimeMs):
if (currentTimeMs > self.nextPublishMe): if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe:
self.Publish(ClientMsg(self.network_id)) self.Publish(ClientMsg(self.network_id))
print(f'Sent ClientMsg {self.network_id}') print(f'Publish ClientMsg {self.network_id}')
self.nextPublishMe = currentTimeMs + Participant.publishInterval self.nextPublishMe = currentTimeMs + self.publishInterval
Thing.UpdateAll(currentTimeMs) Thing.UpdateAll(currentTimeMs)
#endregion
#region Send #region Send
def SendThingInfo(self, thing): def SendThingInfo(self, thing):
@ -71,7 +81,7 @@ class Participant:
if buffer_size <= 0: if buffer_size <= 0:
return True return True
# print(f'send {self.buffer[0]} to {self.ip_address} {self.port}') # print(f'{self.name} send {self.buffer[0]} to {self.ip_address} {self.port}')
self.udp_socket.sendto(self.buffer[:buffer_size], (self.ip_address, self.port)) self.udp_socket.sendto(self.buffer[:buffer_size], (self.ip_address, self.port))
return True return True
@ -80,6 +90,7 @@ class Participant:
if buffer_size <= 0: if buffer_size <= 0:
return True return True
# print(f'publish {self.buffer[0]} to {self.port}')
self.udp_socket.sendto(self.buffer[:buffer_size], ('<broadcast>', self.port)) self.udp_socket.sendto(self.buffer[:buffer_size], ('<broadcast>', self.port))
return True return True
@ -95,17 +106,18 @@ class Participant:
# print(f'msg received from {remote_ip_address}:{remote_port}') # print(f'msg received from {remote_ip_address}:{remote_port}')
remote_participant = self.GetParticipant(remote_ip_address, remote_port) remote_participant = self.GetParticipant(remote_ip_address, remote_port)
if remote_participant is None: if remote_participant is None:
remote_participant = self.AddParticipant(remote_ip_address, self.port) # print(f'new participant')
remote_participant.ReceiveData(data) remote_participant = self.AddParticipant(remote_ip_address, remote_port)
self.ReceiveData(data, remote_participant)
def ReceiveData(self, data): def ReceiveData(self, data, remote_participant):
msgId = data[0] msgId = data[0]
# print(f'msg {msgId} ') # print(f'msg {msgId} ')
match msgId: match msgId:
case ClientMsg.id: case ClientMsg.id:
self.ProcessClientMsg(ClientMsg(data)) self.ProcessClientMsg(remote_participant, ClientMsg(data))
case NetworkIdMsg.id: case NetworkIdMsg.id:
self.ProcessNetworkIdMsg(NetworkIdMsg(data)) self.ProcessNetworkIdMsg(remote_participant, NetworkIdMsg(data))
# case InvestigateMsg.id: # case InvestigateMsg.id:
# self.ProcessInvestigateMsg(InvestigateMsg(data)) # self.ProcessInvestigateMsg(InvestigateMsg(data))
case ThingMsg.id: case ThingMsg.id:
@ -118,19 +130,16 @@ class Participant:
# msg = Messages.BinaryMsg(data) # msg = Messages.BinaryMsg(data)
# msg.thing.ProcessBinary(msg.data) # msg.thing.ProcessBinary(msg.data)
def ProcessClientMsg(self, msg: ClientMsg): def ProcessClientMsg(self, sender, msg: ClientMsg):
if msg.network_id == 0: pass
self.Send(NetworkIdMsg(self.network_id))
print(f'############################ New Client -> {self.network_id}')
else:
print(f'############################ Client')
def ProcessNetworkIdMsg(self, msg: NetworkIdMsg): def ProcessNetworkIdMsg(self, sender, msg: NetworkIdMsg):
print(f'{self.name} receive network id {self.network_id} -> {msg.network_id}')
if self.network_id != msg.network_id: if self.network_id != msg.network_id:
self.network_id = msg.network_id self.network_id = msg.network_id
print(f'receive network id {msg.network_id}') # print(f'sending all things {len(Thing.allThings)}')
for thing in Thing.allThings: for thing in Thing.allThings:
self.SendThingInfo(thing) sender.SendThingInfo(thing)
# self.Send(NameMsg(self.network_id, thing)) # self.Send(NameMsg(self.network_id, thing))
def ProcessInvestigateMsg(self, data: bytearray): def ProcessInvestigateMsg(self, data: bytearray):

View File

@ -1,52 +1,46 @@
from .Participant import Participant from Participant import Participant
from .Thing import Thing from ClientMsg import ClientMsg
from . import Messages from NetworkIdMsg import NetworkIdMsg
import select import socket
import threading
class SiteServer(Participant): class SiteServer(Participant):
def __init__(self, ipAddress, port): name = "Site Server"
super().__init__(ipAddress, port)
self.udp_socket.setblocking(0)
def Update(self, currentTime): def __init__(self, ip_address="0.0.0.0", port=7681, remote=False, udp_socket = None):
ready_to_read, _, _ = select.select([self.udp_socket], [], [], 0.1) # Timeout of 0.1 seconds self.ip_address = ip_address
while ready_to_read: self.port = port
data, addr = self.udp_socket.recvfrom(1024) self.publishInterval = 0
self.ReceiveData(data) self.others = []
ready_to_read, _, _ = select.select([self.udp_socket], [], [], 0.1) # Timeout of 0.1 seconds self.network_id = 0
self.buffer = bytearray(256)
return super().Update(currentTime) if remote == False:
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udp_socket.bind(("0.0.0.0", 7681))
self.AddParticipant(self.ip_address, self.port)
self.thread = threading.Thread(target = self.Receiver)
self.thread.daemon = True
self.thread.start()
else:
self.udp_socket = udp_socket
def AddParticipant(self, ip_address, port):
# print(f'{self.name} Add site participant {ip_address} {port}')
remote_participant = SiteServer(ip_address, port, remote=True, udp_socket=self.udp_socket)
remote_participant.network_id = len(self.others)
self.others.append(remote_participant)
return remote_participant
def ProcessClientMsg(self, sender, msg):
if msg.network_id == 0:
sender.Send(NetworkIdMsg(sender.network_id))
print(f'{self.name} New Client -> {sender.network_id}')
# else:
# print(f'{self.name} Client')
def ProcessNetworkIdMsg(self, thing_msg): def ProcessNetworkId(self, msg):
self.network_id = thing_msg.network_id pass
# HACK: send the root things first
for thing in Thing.allThings:
if thing is not None and thing.parent_id == 0:
self.SendThingInfo(thing)
# then sent the rest
for thing in Thing.allThings:
if thing is not None and thing.parent_id != 0:
self.SendThingInfo(thing)
def SendThingInfo(self, thing, recurse = False):
if thing is None:
return
thing_msg = Messages.ThingMsg(self.network_id, thing)
thing_msg.SendTo(self)
name_msg = Messages.NameMsg(self.network_id, thing)
name_msg.SendTo(self)
model_msg = Messages.ModelUrlMsg(self.network_id, thing)
model_msg.SendTo(self)
pose_msg = Messages.PoseMsg(self.network_id, thing)
pose_msg.SendTo(self)
if recurse:
for child in thing.children:
self.SendThingInfo(child, True)
def ProcessInvestigateMsg(self, msg: Messages.InvestigateMsg):
thing = Thing.Get(msg.network_id, msg.thing_id)
if thing is not None:
self.SendThingInfo(thing)

View File

@ -3,19 +3,43 @@ import time
from Thing import Thing from Thing import Thing
from Participant import Participant from Participant import Participant
from SiteServer import SiteServer
class ThingTest(unittest.TestCase): class ThingTest(unittest.TestCase):
def test_client_msg(self): def test_participant(self):
participant: Participant = Participant(ipAddress="127.0.0.1", port=7681) participant: Participant = Participant(ipAddress="127.0.0.1", port=7681)
milliseconds = time.time() * 1000 milliseconds = time.time() * 1000
start_time = milliseconds start_time = milliseconds
while milliseconds < start_time + 5000: while milliseconds < start_time + 5000:
milliseconds = time.time() * 1000 milliseconds = time.time() * 1000
participant.Update(milliseconds) participant.Update(milliseconds)
def test_site_server(self):
site = SiteServer(port=7681)
milliseconds = time.time() * 1000
start_time = milliseconds
while milliseconds < start_time + 5000:
milliseconds = time.time() * 1000
site.Update(milliseconds)
def test_site_participant(self):
site = SiteServer(7681)
participant = Participant("127.0.0.1", 7681)
milliseconds = time.time() * 1000
start_time = milliseconds
while milliseconds < start_time + 7000:
milliseconds = time.time() * 1000
site.Update(milliseconds)
participant.Update(milliseconds)
time.sleep(0.1)
def test_thing_msg(self): def test_thing_msg(self):
site = SiteServer()
participant = Participant("127.0.0.1") participant = Participant("127.0.0.1")
thing = Thing() thing = Thing()
thing.name = "First thing" thing.name = "First thing"
@ -25,6 +49,7 @@ class ThingTest(unittest.TestCase):
start_time = milliseconds start_time = milliseconds
while milliseconds < start_time + 7000: while milliseconds < start_time + 7000:
milliseconds = time.time() * 1000 milliseconds = time.time() * 1000
site.Update(milliseconds)
participant.Update(milliseconds) participant.Update(milliseconds)
if __name__ == '__main__': if __name__ == '__main__':