Skip to content
This repository was archived by the owner on Jan 6, 2025. It is now read-only.
Open

init #43

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 113 additions & 84 deletions python-engine/src/communications/radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import serial
import constants
import numpy as np
import pickle


class Radio:
_instance = None
Expand All @@ -15,8 +17,9 @@ def __new__(cls, *args, **kwargs):
return cls._instance

def __init__(self):
self.r_queue = {'r0': [], 'r1': [], 'r2': [], 'r3': [], 'r4': [], 'r5': []}
self.r_id = {'r0': 0, 'r1': 1, 'r2': 2, 'r3': 3, 'r4': 4, 'r5': 5}
self.robotQueue = {'r0': [], 'r1': [], 'r2': [], 'r3': [], 'r4': [], 'r5': []}
self.robotIds = {'r0': 0, 'r1': 1, 'r2': 2, 'r3': 3, 'r4': 4, 'r5': 5}
self.verbose = False
# Format for the queue
'''
r_queue = {
Expand All @@ -31,98 +34,124 @@ def __init__(self):
self.serial_port = serial.Serial(port= constants.SERIAL_PORT, baudrate=constants.BAUDRATE, timeout=constants.TIMEOUT)
if not self.serial_port.is_open:
raise ValueError("No se pudo abrir el puerto serial")


#print data
def send_data(self):
packet = b''
for rIndex in self.robotQueue.keys():
robot_packet = np.zeros(5, dtype=np.uint8)
if self.robotQueue[rIndex] != []:
# Byte 0
"""
3 bits para el ID del robot a la izquierda del byte
3 bits para la fuerza del dribbler a la derecha del ID del robot
1 bit para el kicker a la derecha de la fuerza del dribbler
1 bit para el callback a la derecha del kicker
"""
robot_packet[0] = self.robotIds[rIndex] << 5
robot_packet[0] = robot_packet[0] | (self.robotQueue[rIndex][0]["drb"] << 2)
robot_packet[0] = robot_packet[0] | (self.robotQueue[rIndex][0]["kick"] << 1)
robot_packet[0] = robot_packet[0] | self.robotQueue[rIndex][0]["cb"]

#print data every 1 second
def send_loop(self):
while True:
packet = b''
for k in self.r_queue.keys():
robot_packet = np.zeros(5, dtype=np.uint8)
if self.r_queue[k] != []:
# Byte 0
"""
3 bits para el ID del robot a la izquierda del byte
3 bits para la fuerza del dribbler a la derecha del ID del robot
1 bit para el kicker a la derecha de la fuerza del dribbler
1 bit para el callback a la derecha del kicker
"""
robot_packet[0] = self.r_id[k] << 5
robot_packet[0] = robot_packet[0] | (self.r_queue[k][0]["drb"] << 2)
robot_packet[0] = robot_packet[0] | (self.r_queue[k][0]["kick"] << 1)
robot_packet[0] = robot_packet[0] | self.r_queue[k][0]["cb"]

# Byte 1
"""
1 bit para el signo de la velocia x del robot a la izquierda del byte
7 bits para la velocidad x del robot
"""
if self.r_queue[k][0]["x"] < 0:
self.r_queue[k][0]["x"] = -self.r_queue[k][0]["x"]
robot_packet[1] = robot_packet[1] | (self.r_queue[k][0]["x"] & 0x7F)
else:
robot_packet[1] = 1 << 7
robot_packet[1] = robot_packet[1] | (self.r_queue[k][0]["x"] & 0x7F)
# Byte 2
"""
1 bit para el signo de la velocia y del robot a la izquierda del byte
7 bits para la velocidad (solo parte entera) y del robot
"""
if self.r_queue[k][0]["y"] < 0:
robot_packet[2] = 1 << 7
self.r_queue[k][0]["y"] = -self.r_queue[k][0]["y"]
robot_packet[2] = robot_packet[2] | (self.r_queue[k][0]["y"] & 0x7F)
else:
robot_packet[2] = robot_packet[2] | (self.r_queue[k][0]["y"] & 0x7F)

# Byte 3
"""
1 bit para el signo de la velocia rotacional del robot a la izquierda del byte
7 bits para la velocidad rotacional del robot
"""
if self.r_queue[k][0]["r"] < 0:
robot_packet[3] = 1 << 7
self.r_queue[k][0]["r"] = -self.r_queue[k][0]["r"]
robot_packet[3] = robot_packet[3] | (self.r_queue[k][0]["r"] & 0x7F)
else:
robot_packet[3] = robot_packet[3] | (self.r_queue[k][0]["r"] & 0x7F)

# Byte 4
"""
2 bits de digitos significativos de la velocidad x del robot a la izquierda del byte
2 bits de digitos significativos de la velocidad y del robot a la derecha de los digitos significativos de la velocidad x
4 bits de digitos significativos de la velocidad rotacional del robot a la derecha de los digitos significativos de la velocidad y
"""
# Convertir los digitos significativos a enteros
robot_packet[4] = robot_packet[4] | ((self.r_queue[k][0]["x"] >> 7) << 6)
robot_packet[4] = robot_packet[4] | ((self.r_queue[k][0]["y"] >> 7) << 4)
robot_packet[4] = robot_packet[4] | ((self.r_queue[k][0]["r"] >> 7) & 0x0F)
# print (robot_packet[4]) in binary
robot_packet_bytes = bytearray(robot_packet.tobytes())
self.r_queue[k].pop(0)
# Byte 1
"""
1 bit para el signo de la velocidad x del robot a la izquierda del byte
7 bits para la velocidad x del robot
"""
if self.robotQueue[rIndex][0]["x"] < 0:
self.robotQueue[rIndex][0]["x"] = -self.robotQueue[rIndex][0]["x"]
robot_packet[1] = robot_packet[1] | (self.robotQueue[rIndex][0]["x"] & 0x7F)
else:
# Si el robot no esta activo, enviar un paquete de ceros
robot_packet[0] = self.r_id[k] << 5
robot_packet[1] = 0
robot_packet[2] = 0
robot_packet[3] = 0
robot_packet[4] = 0
robot_packet_bytes = bytearray(robot_packet.tobytes())
robot_packet[1] = 1 << 7
robot_packet[1] = robot_packet[1] | (self.robotQueue[rIndex][0]["x"] & 0x7F)
# Byte 2
"""
1 bit para el signo de la velocia y del robot a la izquierda del byte
7 bits para la velocidad (solo parte entera) y del robot
"""
if self.robotQueue[rIndex][0]["y"] < 0:
robot_packet[2] = 1 << 7
self.robotQueue[rIndex][0]["y"] = -self.robotQueue[rIndex][0]["y"]
robot_packet[2] = robot_packet[2] | (self.robotQueue[rIndex][0]["y"] & 0x7F)
else:
robot_packet[2] = robot_packet[2] | (self.robotQueue[rIndex][0]["y"] & 0x7F)

# Convertir el arreglo de bytes a un string
# Convert each number in the array to its corresponding Unicode character
# Encode the characters using UTF-8
packet += robot_packet_bytes
# Enviar el string al puerto serial
self.serial_port.write(packet)
# Byte 3
"""
1 bit para el signo de la velocia rotacional del robot a la izquierda del byte
7 bits para la velocidad rotacional del robot
"""
if self.robotQueue[rIndex][0]["r"] < 0:
robot_packet[3] = 1 << 7
self.robotQueue[rIndex][0]["r"] = -self.robotQueue[rIndex][0]["r"]
robot_packet[3] = robot_packet[3] | (self.robotQueue[rIndex][0]["r"] & 0x7F)
else:
robot_packet[3] = robot_packet[3] | (self.robotQueue[rIndex][0]["r"] & 0x7F)

# Byte 4
"""
2 bits de digitos significativos de la velocidad x del robot a la izquierda del byte
2 bits de digitos significativos de la velocidad y del robot a la derecha de los digitos significativos de la velocidad x
4 bits de digitos significativos de la velocidad rotacional del robot a la derecha de los digitos significativos de la velocidad y
"""
# Convertir los digitos significativos a enteros
robot_packet[4] = robot_packet[4] | ((self.robotQueue[rIndex][0]["x"] >> 7) << 6)
robot_packet[4] = robot_packet[4] | ((self.robotQueue[rIndex][0]["y"] >> 7) << 4)
robot_packet[4] = robot_packet[4] | ((self.robotQueue[rIndex][0]["r"] >> 7) & 0x0F)
# print (robot_packet[4]) in binary
robot_packet_bytes = bytearray(robot_packet.tobytes())
self.robotQueue[rIndex].pop(0)
else:
# Si el robot no esta activo, enviar un paquete de ceros
robot_packet[0] = self.robotIds[rIndex] << 5
robot_packet[1] = 0
robot_packet[2] = 0
robot_packet[3] = 0
robot_packet[4] = 0
robot_packet_bytes = bytearray(robot_packet.tobytes())

# Convertir el arreglo de bytes a un string
# Convert each number in the array to its corresponding Unicode character
# Encode the characters using UTF-8
packet += robot_packet_bytes
# Enviar el string al puerto serial
self.serial_port.write(packet)
if self.verbose:
print(chr(27) + "[2J")

for i in range(0, len(packet), 5):
print("robot", i//5, "packet: ", end='')
for j in range(0, 5):
print(format(packet[i+j], '08b'), end=' ')
print()




#print data every 1 second
def send_loop(self)-> None:
while True:
self.send_data()
time.sleep(.016)

def recive_message(self, x,y,r,drb,kick,cb, robot_id):

def add_to_robot_queue(self, robot_id, drb, kick, cb, x, y, r)-> None:
message = {"x": x, "y": y, "r": r, "drb": drb, "kick": kick, "cb": cb}
self.r_queue[robot_id].append(message)
id = "r" + str(robot_id)
self.robotQueue[id].append(message)


def receive_data(self):
# Recibir datos del puerto serial
# NOTE: Aun no se aplica un uso definido para esta funcion
serialized_data = self.serial_port.read(64)
data = pickle.loads(serialized_data)
return data


def list_serial_ports(self)-> None:
# Listar los puertos seriales disponibles
ports = serial.tools.list_ports.comports()
for port in ports:
print(port.device)
78 changes: 66 additions & 12 deletions python-engine/src/communications/wrapper.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,79 @@
# Interfaz de la radio para comunicarse con sim y cancha .
#Esto es para evitar la divergencia entre codigo simulador y cancha
from communications.grsim import Grsim
from communications.radio import Radio
from constants import *
import math

class CommandSender:
def __init__(self) -> None:
self.data_packet = [[0] * 7 for _ in range(6)] # Lista de listas 5x6 para enviar a los robots

if COMMUNICATION_MODE == 1:
self.grsim = Grsim()

def send_robot_data(self, id = 0, is_blue = 0,
velangular = 0, kickspeedx = 0, kickspeedz = 0,
veltangent = 0, velnormal = 0, spinner = 0,
wheelsspeed = False) -> None:
self.kickspeedx = 0
self.kickspeedz = 0
self.veltangent = 0
self.wheelsspeed = False
else:
self.radio = Radio()


# LLena el paquete de datos para enviar a los robots
def add_robot_data(self, id=0, velangular=0, veltangent=0,
velnormal=0, dribbler=0, kicker=0,
call_back=0) -> None:
if id < 0 or id > 5:
raise ValueError("Robot id out of range")

if COMMUNICATION_MODE == 1:
isteamyellow = 1 - is_blue
self.grsim.communicate_grsim(id = id,isteamyellow= isteamyellow, velangular= velangular,
kickspeedx=kickspeedx, kickspeedz=kickspeedz, veltangent=veltangent,
velnormal= velnormal,spinner= spinner,
wheelsspeed=wheelsspeed)


self.data_packet[id] = [id, dribbler, kicker, call_back, veltangent, velnormal, velangular]
else:
self.radio.add_to_robot_queue(id, dribbler, kicker, call_back, veltangent, velnormal, velangular)




# Envía el paquete de datos a los robots en cancha o simulador segun corresponda
def send_robot_packet(self) -> None:
if COMMUNICATION_MODE == 1:
for id in range(6):
self.grsim.communicate_grsim(
id=id,
isteamyellow=ISTEAMYELLOW,
velangular=self.data_packet[id][6],
veltangent=self.data_packet[id][4],
velnormal=self.data_packet[id][5],
spinner=self.data_packet[id][1],
kickspeedx=self.kickspeedx,
kickspeedz=self.kickspeedz,
wheelsspeed=self.wheelsspeed
)

else:
self.radio.send_data()

self.data_packet = [[0] * 7 for _ in range(6)] # Reinicia el paquete de datos

# Envía un paquete de datos a un solo robot
def send_solo_robot_packet(self, id=0, velangular=0, veltangent=0,
velnormal=0, dribbler=0, kicker=0,
call_back=0) -> None:
if id < 0 or id > 5:
raise ValueError("Robot id out of range")

if COMMUNICATION_MODE == 1:
self.grsim.communicate_grsim(
id=id,
isteamyellow=ISTEAMYELLOW,
velangular=velangular,
veltangent=veltangent,
velnormal=velnormal,
spinner=dribbler,
kickspeedx=self.kickspeedx,
kickspeedz=self.kickspeedz,
wheelsspeed=self.wheelsspeed
)
else:
self.radio.add_to_robot_queue(id, dribbler, kicker, call_back, veltangent, velnormal, velangular)
self.radio.send_data()
4 changes: 3 additions & 1 deletion python-engine/src/constants.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#Radio
SERIAL_PORT = '/dev/ttyUSB0'
BAUDRATE = 115200
TIMEOUT = 1

COMMUNICATION_MODE = 1 # 0: Cancha, 1: Grsim
COMMUNICATION_MODE = 0 # 0: Cancha, 1: Grsim
ISTEAMYELLOW = 1

# Grsim
GRSIM_VISION_PORT = 10020
Expand Down
3 changes: 2 additions & 1 deletion python-engine/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@

motion.move_linear([0,0], delta_time)

#radio.communicate_grsim(0, 0, veltangent = 5)
#radio.communicate_grsim(0, 0, veltangent = 5)

2 changes: 1 addition & 1 deletion python-engine/src/stp/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def motion_to_point(self, robot : Robot, to_point : tuple[float, float]):
# Nose porque con negativo funciona si alguien sabe explicar porfavor
dir_vec = rotate_2d_vector(dir_vec, -robot.orientation)
dir_vec = normalize_2d_vector(dir_vec)
self.comms.send_robot_data(id = self.robot[0], is_blue = self.robot[1],
self.comms.send_solo_robot_packet(id = self.robot[0], is_blue = self.robot[1],
veltangent= dir_vec[0]*self.MOVE_SPEED,
velnormal = dir_vec[1]*self.MOVE_SPEED
)
Expand Down