RoboidControl-python/LocalParticipant.py

237 lines
8.4 KiB
Python

import socket
import threading
import time
from Participant import Participant
from Thing import Thing
from Messages.ParticipantMsg import ParticipantMsg
from Messages.NetworkIdMsg import NetworkIdMsg
from Messages.ThingMsg import ThingMsg
from Messages.NameMsg import NameMsg
from Messages.ModelUrlMsg import ModelUrlMsg
from Messages import *
import sys
micropython = 'micropython' in sys.modules
if micropython:
from MicroPython.uPythonParticipant import Bla
class LocalParticipant(Participant):
"""! A local participant is the local device which can communicate with other participants.
It manages all local things and communcation with other participants. Each application has a local participant which is usually explicit in the code.
An participant can be isolated. In that case it is standalong and does not communicate with other participants.
Currently, only UDP communication is supported
"""
buffer = None
## The network ID of the participant
network_id = None
nextPublishMe = 0
others = None
thread = None
name = "Participant"
def __init__(self, port=7681, ip_address="0.0.0.0", local_port=0): #, remote=False, udp_socket = None):
super().__init__(ip_address = "0.0.0.0", port = port)
if local_port == 0:
local_port = port
self.local_port = local_port
## True if the participant is running isolated.
# Isolated participants do not communicate with other participants
self.isolated = True
## The remote site when this participant is connected to a site
self.remote_site = None
if self.port != 0:
self.isolated = False
self.remote_site = Participant(ip_address, port)
self.others = []
self.network_id = 0
self.buffer = bytearray(256)
self.thing_msg_processors = {}
self.new_thing_handlers = []
self.publishInterval = 3000 # 3 seconds
#if remote == False:
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udp_socket.bind(("0.0.0.0", self.local_port))
self.thread = threading.Thread(target = self.Receiver)
self.thread.daemon = True
self.thread.start()
# else:
# self.udp_socket = udp_socket
#region Update
def GetParticipant(self, ip_address, port):
# print(f'{self.name} Get participant {ip_address} {port}')
# for item in self.others:
# print(f'- {item.ip_address} {item.port}')
found_participants = (item for item in self.others
if item.ip_address == ip_address and item.port == port)
participant = next(found_participants, None)
return participant
def AddParticipant(self, ip_address, port):
print(f'{self.name} Add participant {ip_address} {port}')
remote_participant = Participant(ip_address = ip_address, port = port)
remote_participant.network_id = len(self.others)
self.others.append(remote_participant)
return remote_participant
def Update(self, currentTimeMs = None):
if currentTimeMs is None:
currentTimeMs = time.time() * 1000
if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe:
msg = ParticipantMsg(self.network_id)
if self.remote_site is None:
self.Publish(msg)
else:
self.Send(self.remote_site, msg)
print(f'Publish ParticipantMsg {self.network_id}')
self.nextPublishMe = currentTimeMs + self.publishInterval
for thing in self.things:
thing.Update(currentTimeMs)
# pose_msg = PoseMsg(self.network_id, thing)
# for other in self.others:
# self.Send(other, pose_msg)
super().Update(currentTimeMs)
#endregion
#region Send
def SendThingInfo(self, owner, thing, recursively: bool = False):
self.Send(owner, ThingMsg(self.network_id, thing))
self.Send(owner, NameMsg(self.network_id, thing))
self.Send(owner, ModelUrlMsg(self.network_id, thing))
self.Send(owner, PoseMsg(self.network_id, thing, True))
# self.Send(owner, BinaryMsg(self.network_id, thing))
if recursively:
for child in thing.children:
self.SendThingInfo(owner, child, recursively)
def Send(self, owner, msg):
buffer_size = msg.Serialize([self.buffer])
if buffer_size <= 0:
return True
print(f'{self.name} send {self.buffer[0]} to {owner.ip_address} {owner.port}')
self.udp_socket.sendto(self.buffer[:buffer_size], (owner.ip_address, owner.port))
return True
def Publish(self, msg):
buffer_size = msg.Serialize([self.buffer])
if buffer_size <= 0:
return True
# print(f'publish {self.buffer[0]} to {self.port}')
self.udp_socket.sendto(self.buffer[:buffer_size], ('<broadcast>', self.port))
return True
#endregion
#region Receive
def Receiver(self):
while True:
data, addr = self.udp_socket.recvfrom(1024)
remote_ip_address = addr[0]
remote_port = addr[1]
# print(f'msg received from {remote_ip_address}:{remote_port}')
remote_participant = self.GetParticipant(remote_ip_address, remote_port)
if remote_participant is None:
# print(f'new participant')
remote_participant = self.AddParticipant(remote_ip_address, remote_port)
self.ReceiveData(data, remote_participant)
def ReceiveData(self, data, sender):
msgId = data[0]
# print(f'msg {msgId} ')
match msgId:
case ParticipantMsg.id:
self.ProcessParticipantMsg(sender, ParticipantMsg(data))
case NetworkIdMsg.id:
self.ProcessNetworkIdMsg(sender, NetworkIdMsg(data))
# case InvestigateMsg.id:
# self.ProcessInvestigateMsg(InvestigateMsg(data))
case ThingMsg.id:
self.ProcessThingMsg(ThingMsg(data))
case NameMsg.id:
self.ProcessNameMsg(NameMsg(data))
case ModelUrlMsg.id:
self.ProcessModelUrlMsg(ModelUrlMsg(data))
# case Messages.BinaryMsg.id:
# msg = Messages.BinaryMsg(data)
# msg.thing.ProcessBinary(msg.data)
def ProcessParticipantMsg(self, sender, msg: ParticipantMsg):
pass
def ProcessNetworkIdMsg(self, sender, msg: NetworkIdMsg):
print(f'{self.name} receive network id {self.network_id} -> {msg.network_id}')
if self.network_id != msg.network_id:
self.network_id = msg.network_id
print(f'sending all things {len(self.things)}')
for thing in self.things:
if thing.parent is None:
self.SendThingInfo(sender, thing, recursively=True)
# self.Send(NameMsg(self.network_id, thing))
def ProcessInvestigateMsg(self, data: bytearray):
pass
def ProcessThingMsg(self, msg: ThingMsg):
print(f'received thing {msg.network_id} {msg.thing_id}')
if msg.thing_type in self.thing_msg_processors:
constructor = self.thing_msg_processors[msg.thing_type]
constructor(msg.network_id, msg.thing_id)
# not really 'new' thing, but it is a start
thing = Thing.Get(msg.network_id, msg.thing_id)
if thing is not None:
for handler in self.new_thing_handlers:
handler(thing)
def ProcessNameMsg(self, msg: NameMsg):
print(f'received name {msg.name}')
def ProcessModelUrlMsg(self, msg: ModelUrlMsg):
print(f'received model url: {msg.url}')
def ProcessBinary(self, msg: BinaryMsg):
print('received binary data')
if msg.thing != None:
msg.thing.ProcessBinary(msg.data)
def Register(self, constructor, thing_type):
self.thing_msg_processors[thing_type] = constructor
def OnNewThing(self, event_handler):
self.new_thing_handlers.append(event_handler)
def OnNewThingType(self, thing_type, event_handler):
def ConditionalHandler(thing):
if thing.type == thing_type:
event_handler(thing)
self.new_thing_handlers.append(ConditionalHandler)
#endregion