BB2B_test does not crash

This commit is contained in:
Pascal Serrarens 2025-03-19 17:55:35 +01:00
parent 0b7bbd0497
commit 2065789576
3 changed files with 71 additions and 43 deletions

View File

@ -59,8 +59,6 @@ class LocalParticipant(Participant):
self.new_thing_handlers = [] self.new_thing_handlers = []
self.publishInterval = 3000 # 3 seconds self.publishInterval = 3000 # 3 seconds
#if remote == False:
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udp_socket.bind(("0.0.0.0", self.local_port)) self.udp_socket.bind(("0.0.0.0", self.local_port))
@ -68,8 +66,6 @@ class LocalParticipant(Participant):
self.thread = threading.Thread(target = self.Receiver) self.thread = threading.Thread(target = self.Receiver)
self.thread.daemon = True self.thread.daemon = True
self.thread.start() self.thread.start()
# else:
# self.udp_socket = udp_socket
#region Update #region Update

View File

@ -13,62 +13,37 @@ class SiteServer(LocalParticipant):
name = "Site Server" name = "Site Server"
def __init__(self, port=7681, remote=False, udp_socket = None): def __init__(self, port=7681):
"""! Create a new site server """! Create a new site server
@param port The UDP port on which communication is received @param port The UDP port on which communication is received
""" """
self.ip_address = "0.0.0.0" self.ip_address = "0.0.0.0"
self.port = port self.port = port
self.publishInterval = 0 self.local_port = port
self.others = [] self.others = []
self.network_id = 0 self.network_id = 0
self.buffer = bytearray(256) self.buffer = bytearray(256)
self.thing_msg_constructors = {} self.thing_msg_processors = {}
self.new_thing_handlers = []
self.publishInterval = 0
if remote == False:
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udp_socket.bind(("0.0.0.0", port)) self.udp_socket.bind(("0.0.0.0", self.local_port))
self.AddParticipant(self.ip_address, self.port)
self.thread = threading.Thread(target = self.Receiver) self.thread = threading.Thread(target = self.Receiver)
self.thread.daemon = True self.thread.daemon = True
self.thread.start() self.thread.start()
else:
self.udp_socket = udp_socket
self.Register(TemperatureSensor, Thing.Type.TemperatureSensor) self.Register(TemperatureSensor, Thing.Type.TemperatureSensor)
def AddParticipant(self, ip_address, port):
# print(f'{self.name} Add site participant {ip_address} {port}')
remote_participant = SiteServer(ip_address, port, remote=True, udp_socket=self.udp_socket)
remote_participant.network_id = len(self.others)
self.others.append(remote_participant)
return remote_participant
def ProcessParticipantMsg(self, sender, msg): def ProcessParticipantMsg(self, sender, msg):
if msg.network_id == 0: if msg.network_id == 0:
print(f'{self.name} received New Client -> {sender.network_id}') print(f'{self.name} received New Client -> {sender.network_id}')
sender.Send(NetworkIdMsg(sender.network_id)) self.Send(sender, NetworkIdMsg(sender.network_id))
# else: # else:
# print(f'{self.name} Client') # print(f'{self.name} Client')
def ProcessNetworkId(self, msg): def ProcessNetworkId(self, msg):
pass pass
# Register function
def Register(self, ThingClass, thing_type):
self.thing_msg_constructors[thing_type] = lambda network_id, thing_id: ThingClass(network_id, thing_id)
def Process(self, msg):
if isinstance(ThingMsg):
self.ProcessThingMsg(msg)
else:
super().Process(msg)
def ProcessThingMsg(self, msg):
if msg.thingType in self.thing_msg_constructors:
self.thing_msg_constructors[msg.thing_type](msg.network_id, msg.thing_id)
else:
Thing(msg.network_id, msg.thing_id, msg.thing_type)

57
test/BB2B_test.py Normal file
View File

@ -0,0 +1,57 @@
import sys
from pathlib import Path
# Add the project root to sys.path
sys.path.append(str(Path(__file__).resolve().parent.parent))
import time
from SiteServer import SiteServer
from LocalParticipant import LocalParticipant
from Thing import *
from Things.DifferentialDrive import DifferentialDrive
from Things.TouchSensor import TouchSensor
# Start a site server
site = SiteServer(port=7691)
# Create a local participant for handling communcation
# using default settings (UDP communciation over port 7681)
participant = LocalParticipant(port=7691, local_port=7683, ip_address="127.0.0.1")
# The robot's propulsion is a differential drive
bb2b = DifferentialDrive(participant)
bb2b.name = "BB2B"
bb2b.SetDriveDimensions(0.064, 0.128)
bb2b.SetOrientation(SwingTwist.Degrees(40, 0, 0))
bb2b.SetPosition(Spherical(0.15, Direction.up))
model = Thing(parent=bb2b)
model.model_url = "https://passer.life/extras/Roller1anim1.png"
model.SetOrientation(SwingTwist.Degrees(90, 0, 0))
# It has a touch sensor at the front left of the roboid
touch_left = TouchSensor(parent=bb2b)
touch_left.name = "Touch left"
touch_left.SetPosition(Spherical(0.12, Direction.Degrees(-30, 0)))
# and other one on the right
touch_right = TouchSensor(parent=bb2b)
touch_right.name = "Touch right"
touch_right.SetPosition(Spherical(0.12, Direction.Degrees(30, 0)))
# Do forever:
while True:
# The left wheel turns forward when nothing is touched on the right side
# and turn backward when the roboid hits something on the right
wheel_speed_left = -600 if touch_right.touched_something else 600
# The right wheel does the same, but instead is controlled by
# touches on the left side
wheel_speed_right = -600 if touch_left.touched_something else 600
# When both sides are touching something, both wheels will turn backward
# and the roboid will move backwards
bb2b.SetWheelVelocity(wheel_speed_left, wheel_speed_right)
if touch_left.touched_something or touch_right.touched_something:
print(f'{wheel_speed_left} {wheel_speed_right} {bb2b.linear_velocity.distance} {bb2b.angular_velocity.distance}')
# Update the roboid state
participant.Update()
# and sleep for 100ms
time.sleep(0.1)