RoboidControl-python/Participant.py
Pascal Serrarens d4d67e8233 Merged changes
2025-01-21 11:05:19 +01:00

197 lines
6.9 KiB
Python

import socket
import threading
import time
from Messages import *
from Thing import Thing
## A participant is device which can communicate with other participants
#
## Currently only UDP communication is supported
class Participant:
publishInterval = 3000 # 3 seconds
buffer = None
network_id = None
nextPublishMe = 0
others = None
thread = None
name = "Participant"
def __init__(self, ipAddress = "0.0.0.0", port=7681, remote=False, udp_socket =None):
self.ip_address = ipAddress
self.port = port
self.others = []
self.network_id = 0
self.buffer = bytearray(256)
self.thing_msg_processors = {}
self.new_thing_handlers = []
if remote == False:
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udp_socket.bind(("0.0.0.0", port))
self.AddParticipant(self.ip_address, self.port)
self.thread = threading.Thread(target = self.Receiver)
self.thread.daemon = True
self.thread.start()
else:
self.udp_socket = udp_socket
#region Update
def GetParticipant(self, ip_address, port):
# print(f'{self.name} Get participant {ip_address} {port}')
# for item in self.others:
# print(f'- {item.ip_address} {item.port}')
found_participants = (item for item in self.others
if item.ip_address == ip_address and item.port == port)
participant = next(found_participants, None)
return participant
def AddParticipant(self, ip_address, port):
# print(f'{self.name} Add participant {ip_address} {port}')
remote_participant = Participant(ip_address, port, remote=True, udp_socket=self.udp_socket)
remote_participant.network_id = len(self.others)
self.others.append(remote_participant)
return remote_participant
def Update(self, currentTimeMs = None):
if currentTimeMs is None:
currentTimeMs = time.time() * 1000
if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe:
self.Publish(ClientMsg(self.network_id))
print(f'Publish ClientMsg {self.network_id}')
self.nextPublishMe = currentTimeMs + self.publishInterval
Thing.UpdateAll(currentTimeMs)
#endregion
#region Send
def SendThingInfo(self, thing):
self.Send(ThingMsg(self.network_id, thing))
self.Send(NameMsg(self.network_id, thing))
self.Send(ModelUrlMsg(self.network_id, thing))
def Send(self, msg):
buffer_size = msg.Serialize([self.buffer])
if buffer_size <= 0:
return True
# print(f'{self.name} send {self.buffer[0]} to {self.ip_address} {self.port}')
self.udp_socket.sendto(self.buffer[:buffer_size], (self.ip_address, self.port))
return True
def Publish(self, msg):
buffer_size = msg.Serialize([self.buffer])
if buffer_size <= 0:
return True
# print(f'publish {self.buffer[0]} to {self.port}')
self.udp_socket.sendto(self.buffer[:buffer_size], ('<broadcast>', self.port))
return True
#endregion
#region Receive
def Receiver(self):
while True:
data, addr = self.udp_socket.recvfrom(1024)
remote_ip_address = addr[0]
remote_port = addr[1]
# print(f'msg received from {remote_ip_address}:{remote_port}')
remote_participant = self.GetParticipant(remote_ip_address, remote_port)
if remote_participant is None:
# print(f'new participant')
remote_participant = self.AddParticipant(remote_ip_address, remote_port)
self.ReceiveData(data, remote_participant)
def ReceiveData(self, data, remote_participant):
msgId = data[0]
# print(f'msg {msgId} ')
match msgId:
case ClientMsg.id:
self.ProcessClientMsg(remote_participant, ClientMsg(data))
case NetworkIdMsg.id:
self.ProcessNetworkIdMsg(remote_participant, NetworkIdMsg(data))
# case InvestigateMsg.id:
# self.ProcessInvestigateMsg(InvestigateMsg(data))
case ThingMsg.id:
self.ProcessThingMsg(ThingMsg(data))
case NameMsg.id:
self.ProcessNameMsg(NameMsg(data))
case ModelUrlMsg.id:
self.ProcessModelUrlMsg(ModelUrlMsg(data))
case BinaryMsg.id:
self.ProcessBinary(BinaryMsg(data))
# msg = BinaryMsg(data)
# if msg.thing != None:
# msg.thing.ProcessBinary(msg.data)
# For consistency with the C# and C++ versions
def Process(self, msg):
if isinstance(ThingMsg):
self.ProcessThingMsg(msg)
elif isinstance(NameMsg):
self.ProcessNameMsg(msg)
elif isinstance(ModelUrlMsg):
self.ProcessModelUrlMsg(msg)
def ProcessClientMsg(self, sender, msg: ClientMsg):
pass
def ProcessNetworkIdMsg(self, sender, msg: NetworkIdMsg):
print(f'{self.name} receive network id {self.network_id} -> {msg.network_id}')
if self.network_id != msg.network_id:
self.network_id = msg.network_id
# print(f'sending all things {len(Thing.allThings)}')
for thing in Thing.allThings:
sender.SendThingInfo(thing)
# self.Send(NameMsg(self.network_id, thing))
def ProcessInvestigateMsg(self, data: bytearray):
pass
def ProcessThingMsg(self, msg: ThingMsg):
print(f'received thing {msg.network_id} {msg.thing_id}')
if msg.thing_type in self.thing_msg_processors:
constructor = self.thing_msg_processors[msg.thing_type]
constructor(msg.network_id, msg.thing_id)
# not really 'new' thing, but it is a start
thing = Thing.Get(msg.network_id, msg.thing_id)
if thing is not None:
for handler in self.new_thing_handlers:
handler(thing)
def ProcessNameMsg(self, msg: NameMsg):
print(f'received name {msg.name}')
def ProcessModelUrlMsg(self, msg: ModelUrlMsg):
print(f'received model url: {msg.url}')
def ProcessBinary(self, msg: BinaryMsg):
print('received binary data')
if msg.thing != None:
msg.thing.ProcessBinary(msg.data)
def Register(self, constructor, thing_type):
self.thing_msg_processors[thing_type] = constructor
def OnNewThing(self, event_handler):
self.new_thing_handlers.append(event_handler)
def OnNewThingType(self, thing_type, event_handler):
def ConditionalHandler(thing):
if thing.type == thing_type:
event_handler(thing)
self.new_thing_handlers.append(ConditionalHandler)
#endregion