236 lines
8.4 KiB
Python
236 lines
8.4 KiB
Python
import socket
|
|
import threading
|
|
import time
|
|
|
|
from Participant import Participant
|
|
from Thing import Thing
|
|
from Messages.ParticipantMsg import ParticipantMsg
|
|
from Messages.NetworkIdMsg import NetworkIdMsg
|
|
from Messages.ThingMsg import ThingMsg
|
|
from Messages.NameMsg import NameMsg
|
|
from Messages.ModelUrlMsg import ModelUrlMsg
|
|
from Messages import *
|
|
|
|
import sys
|
|
micropython = 'micropython' in sys.modules
|
|
|
|
if micropython:
|
|
from MicroPython.uPythonParticipant import Bla
|
|
|
|
class LocalParticipant(Participant):
|
|
"""! A local participant is the local device which can communicate with other participants.
|
|
|
|
It manages all local things and communcation with other participants. Each application has a local participant which is usually explicit in the code.
|
|
An participant can be isolated. In that case it is standalong and does not communicate with other participants.
|
|
|
|
Currently, only UDP communication is supported
|
|
"""
|
|
|
|
buffer = None
|
|
## The network ID of the participant
|
|
network_id = None
|
|
nextPublishMe = 0
|
|
others = None
|
|
thread = None
|
|
name = "Participant"
|
|
|
|
def __init__(self, port=7681, ip_address="0.0.0.0", local_port=0): #, remote=False, udp_socket = None):
|
|
super().__init__(ip_address = "0.0.0.0", port = port)
|
|
|
|
if local_port == 0:
|
|
local_port = port
|
|
self.local_port = local_port
|
|
|
|
## True if the participant is running isolated.
|
|
# Isolated participants do not communicate with other participants
|
|
self.isolated = True
|
|
|
|
## The remote site when this participant is connected to a site
|
|
self.remote_site = None
|
|
|
|
if self.port != 0:
|
|
self.isolated = False
|
|
self.remote_site = Participant(ip_address, port)
|
|
|
|
self.others = []
|
|
self.network_id = 0
|
|
self.buffer = bytearray(256)
|
|
self.thing_msg_processors = {}
|
|
self.new_thing_handlers = []
|
|
self.publishInterval = 3000 # 3 seconds
|
|
|
|
|
|
#if remote == False:
|
|
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
self.udp_socket.bind(("0.0.0.0", self.local_port))
|
|
|
|
self.thread = threading.Thread(target = self.Receiver)
|
|
self.thread.daemon = True
|
|
self.thread.start()
|
|
# else:
|
|
# self.udp_socket = udp_socket
|
|
|
|
#region Update
|
|
|
|
def GetParticipant(self, ip_address, port):
|
|
# print(f'{self.name} Get participant {ip_address} {port}')
|
|
# for item in self.others:
|
|
# print(f'- {item.ip_address} {item.port}')
|
|
|
|
found_participants = (item for item in self.others
|
|
if item.ip_address == ip_address and item.port == port)
|
|
participant = next(found_participants, None)
|
|
return participant
|
|
|
|
def AddParticipant(self, ip_address, port):
|
|
print(f'{self.name} Add participant {ip_address} {port}')
|
|
remote_participant = Participant(ip_address = ip_address, port = port)
|
|
remote_participant.network_id = len(self.others)
|
|
self.others.append(remote_participant)
|
|
return remote_participant
|
|
|
|
def Update(self, currentTimeMs = None):
|
|
if currentTimeMs is None:
|
|
currentTimeMs = time.time() * 1000
|
|
|
|
if self.publishInterval > 0 and currentTimeMs > self.nextPublishMe:
|
|
msg = ParticipantMsg(self.network_id)
|
|
if self.remote_site is None:
|
|
self.Publish(msg)
|
|
else:
|
|
self.Send(self.remote_site, msg)
|
|
|
|
print(f'Publish ParticipantMsg {self.network_id}')
|
|
self.nextPublishMe = currentTimeMs + self.publishInterval
|
|
|
|
for thing in self.things:
|
|
thing.Update(currentTimeMs)
|
|
# pose_msg = PoseMsg(self.network_id, thing)
|
|
# for other in self.others:
|
|
# self.Send(other, pose_msg)
|
|
|
|
super().Update(currentTimeMs)
|
|
|
|
#endregion
|
|
|
|
#region Send
|
|
|
|
def SendThingInfo(self, owner, thing, recursively: bool = False):
|
|
self.Send(owner, ThingMsg(self.network_id, thing))
|
|
self.Send(owner, NameMsg(self.network_id, thing))
|
|
self.Send(owner, ModelUrlMsg(self.network_id, thing))
|
|
self.Send(owner, PoseMsg(self.network_id, thing, True))
|
|
self.Send(owner, BinaryMsg(self.network_id, thing))
|
|
|
|
if recursively:
|
|
for child in thing.children:
|
|
self.SendThingInfo(owner, child, recursively)
|
|
|
|
|
|
def Send(self, owner, msg):
|
|
buffer_size = msg.Serialize([self.buffer])
|
|
if buffer_size <= 0:
|
|
return True
|
|
|
|
# print(f'{self.name} send {self.buffer[0]} to {owner.ip_address} {owner.port}')
|
|
self.udp_socket.sendto(self.buffer[:buffer_size], (owner.ip_address, owner.port))
|
|
return True
|
|
|
|
def Publish(self, msg):
|
|
buffer_size = msg.Serialize([self.buffer])
|
|
if buffer_size <= 0:
|
|
return True
|
|
|
|
# print(f'publish {self.buffer[0]} to {self.port}')
|
|
self.udp_socket.sendto(self.buffer[:buffer_size], ('<broadcast>', self.port))
|
|
return True
|
|
|
|
#endregion
|
|
|
|
#region Receive
|
|
|
|
def Receiver(self):
|
|
while True:
|
|
data, addr = self.udp_socket.recvfrom(1024)
|
|
remote_ip_address = addr[0]
|
|
remote_port = addr[1]
|
|
# print(f'msg received from {remote_ip_address}:{remote_port}')
|
|
remote_participant = self.GetParticipant(remote_ip_address, remote_port)
|
|
if remote_participant is None:
|
|
# print(f'new participant')
|
|
remote_participant = self.AddParticipant(remote_ip_address, remote_port)
|
|
self.ReceiveData(data, remote_participant)
|
|
|
|
def ReceiveData(self, data, sender):
|
|
msgId = data[0]
|
|
# print(f'msg {msgId} ')
|
|
match msgId:
|
|
case ParticipantMsg.id:
|
|
self.ProcessParticipantMsg(sender, ParticipantMsg(data))
|
|
case NetworkIdMsg.id:
|
|
self.ProcessNetworkIdMsg(sender, NetworkIdMsg(data))
|
|
# case InvestigateMsg.id:
|
|
# self.ProcessInvestigateMsg(InvestigateMsg(data))
|
|
case ThingMsg.id:
|
|
self.ProcessThingMsg(ThingMsg(data))
|
|
case NameMsg.id:
|
|
self.ProcessNameMsg(NameMsg(data))
|
|
case ModelUrlMsg.id:
|
|
self.ProcessModelUrlMsg(ModelUrlMsg(data))
|
|
case BinaryMsg.id:
|
|
self.ProcessBinaryMsg(BinaryMsg(data))
|
|
|
|
def ProcessParticipantMsg(self, sender, msg: ParticipantMsg):
|
|
pass
|
|
|
|
def ProcessNetworkIdMsg(self, sender, msg: NetworkIdMsg):
|
|
print(f'{self.name} receive network id {self.network_id} -> {msg.network_id}')
|
|
if self.network_id != msg.network_id:
|
|
self.network_id = msg.network_id
|
|
print(f'sending all things {len(self.things)}')
|
|
for thing in self.things:
|
|
if thing.parent is None:
|
|
self.SendThingInfo(sender, thing, recursively=True)
|
|
# self.Send(NameMsg(self.network_id, thing))
|
|
|
|
def ProcessInvestigateMsg(self, data: bytearray):
|
|
pass
|
|
|
|
def ProcessThingMsg(self, msg: ThingMsg):
|
|
print(f'received thing {msg.network_id} {msg.thing_id}')
|
|
if msg.thing_type in self.thing_msg_processors:
|
|
constructor = self.thing_msg_processors[msg.thing_type]
|
|
constructor(msg.network_id, msg.thing_id)
|
|
|
|
# not really 'new' thing, but it is a start
|
|
thing = Thing.Get(msg.network_id, msg.thing_id)
|
|
if thing is not None:
|
|
for handler in self.new_thing_handlers:
|
|
handler(thing)
|
|
|
|
def ProcessNameMsg(self, msg: NameMsg):
|
|
print(f'received name {msg.name}')
|
|
|
|
def ProcessModelUrlMsg(self, msg: ModelUrlMsg):
|
|
print(f'received model url: {msg.url}')
|
|
|
|
def ProcessBinaryMsg(self, msg: BinaryMsg):
|
|
# print('received binary data')
|
|
thing: Thing = self.Get(msg.network_id, msg.thing_id)
|
|
if thing != None:
|
|
thing.ProcessBinary(msg.data)
|
|
|
|
def Register(self, constructor, thing_type):
|
|
self.thing_msg_processors[thing_type] = constructor
|
|
|
|
def OnNewThing(self, event_handler):
|
|
self.new_thing_handlers.append(event_handler)
|
|
|
|
def OnNewThingType(self, thing_type, event_handler):
|
|
def ConditionalHandler(thing):
|
|
if thing.type == thing_type:
|
|
event_handler(thing)
|
|
self.new_thing_handlers.append(ConditionalHandler)
|
|
|
|
#endregion |