micropython_trasmit_recieve/transmitter.py

324 lines
9.4 KiB
Python
Raw Permalink Normal View History

2026-05-27 04:39:50 +00:00
from machine import Pin, ADC
import time, network, espnow
# ---------------- ESP-NOW helpers ----------------
_espnow_sta = None
_espnow = None
_espnow_peers = []
_espnow_broadcast = b'\xff\xff\xff\xff\xff\xff'
led = Pin(15, Pin.OUT);
led.value(0)
btn2 = Pin(2, Pin.IN, Pin.PULL_DOWN)
btn5 = Pin(5, Pin.IN, Pin.PULL_DOWN)
btn6 = Pin(6, Pin.IN, Pin.PULL_DOWN)
btn7 = Pin(7, Pin.IN, Pin.PULL_DOWN)
btn11 = Pin(11, Pin.IN, Pin.PULL_DOWN)
btn12 = Pin(12, Pin.IN, Pin.PULL_DOWN)
time.sleep_ms(20)
PAIRMODE_ON_START = btn12.value()
CALIBRATE_ON_START = btn5.value()
print(PAIRMODE_ON_START)
adc_3 = ADC(Pin(3))
adc_4 = ADC(Pin(4))
adc_8 = ADC(Pin(8))
adc_9 = ADC(Pin(9))
adc_1 = ADC(Pin(1))
adc_10 = ADC(Pin(10))
def _espnow_parse_mac(mac):
if isinstance(mac, bytes):
return mac
if isinstance(mac, bytearray):
return bytes(mac)
if isinstance(mac, str):
mac = mac.replace(':', '').replace('-', '').replace(' ', '')
if len(mac) != 12:
raise ValueError('MAC must be 12 hex chars')
return bytes(int(mac[i:i+2], 16) for i in range(0, 12, 2))
raise ValueError('MAC must be bytes or string')
def _espnow_init(channel=6, rate='RATE_LORA_250K'):
global _espnow_sta, _espnow
_espnow_sta = network.WLAN(network.STA_IF)
_espnow_sta.active(True)
_espnow_sta.disconnect()
_espnow_sta.config(channel=int(channel), protocol=network.WLAN.PROTOCOL_LR)
_espnow = espnow.ESPNow()
_espnow.active(True)
rate_val = getattr(espnow, str(rate), espnow.RATE_LORA_250K)
_espnow.config(rate=rate_val)
return _espnow
def _espnow_my_mac():
if _espnow_sta is None:
_espnow_init()
return _espnow_sta.config('mac')
def _espnow_add_peer(mac):
global _espnow_peers
if _espnow is None:
_espnow_init()
peer = _espnow_parse_mac(mac)
if peer not in _espnow_peers:
_espnow.add_peer(peer)
_espnow_peers.append(peer)
return peer
def _espnow_pack(data):
if isinstance(data, (list, tuple)):
return ','.join(str(v) for v in data)
return str(data)
def _espnow_send_peer(mac, data):
if _espnow is None:
_espnow_init()
peer = _espnow_add_peer(mac)
payload = _espnow_pack(data)
_espnow.send(peer, payload)
def _espnow_send_all(data):
if _espnow is None:
_espnow_init()
payload = _espnow_pack(data)
try:
_espnow.add_peer(_espnow_broadcast)
except:
pass
_espnow.send(_espnow_broadcast, payload)
def _espnow_try_num(s):
try:
return int(s)
except:
pass
try:
return float(s)
except:
return s
def _espnow_unpack(msg):
if msg is None:
return []
if isinstance(msg, bytes):
msg = msg.decode('utf-8', 'ignore')
parts = str(msg).split(',')
out = []
for p in parts:
p = p.strip()
if p == '':
continue
out.append(_espnow_try_num(p))
return out
def _espnow_recv(timeout_ms=10):
if _espnow is None:
_espnow_init()
host, msg = _espnow.recv(timeout_ms)
return host, _espnow_unpack(msg)
# ---------------- Pairing logic ----------------
PAIR_PREFIX = "PAIR_REQ:"
PAIR_ACK_PREFIX = "PAIR_ACK:"
PEER_FILE = "peer_mac.txt"
peer_mac = None
pairing_mode = False
def load_peer():
try:
with open(PEER_FILE, "rb") as f:
return f.read()
except:
return None
def save_peer(mac):
with open(PEER_FILE, "wb") as f:
f.write(mac)
def enter_pairing():
global pairing_mode, peer_mac
pairing_mode = True
my_mac = _espnow_my_mac()
print("Pairing mode started, broadcasting...")
while pairing_mode:
# Broadcast as two fields
_espnow_send_all([PAIR_PREFIX, my_mac.hex()])
print("Sent:", [PAIR_PREFIX, my_mac.hex()])
led.value(not led.value())
# Non-blocking receive
host, msg = _espnow_recv(timeout_ms=50)
if msg and isinstance(msg, list) and len(msg) > 1:
if msg[0] == PAIR_ACK_PREFIX:
partner_hex = str(msg[1]).strip()
if len(partner_hex) == 12: # sanity check
partner = _espnow_parse_mac(partner_hex)
save_peer(partner)
peer_mac = partner
print("Paired with", partner_hex)
pairing_mode = False
time.sleep_ms(500)
def calibrate():
print("CALIBRATING")
# ADCs
axes = [adc_3, adc_4, adc_8, adc_9]
# --- Centering phase ---
centers = [0]*len(axes)
samples = 0
start = time.ticks_ms()
next_flash = start
while time.ticks_diff(time.ticks_ms(), start) < 3000:
vals = [a.read() for a in axes]
centers = [c+v for c,v in zip(centers, vals)]
samples += 1
# LED flash 6 Hz
if time.ticks_diff(time.ticks_ms(), next_flash) >= 0:
led.value(not led.value())
next_flash = time.ticks_add(next_flash, 83) # ~83ms
time.sleep_ms(5)
centers = [int(c/samples) for c in centers]
print("Centers:", centers)
# --- Min/Max phase ---
mins = [65535]*len(axes)
maxs = [0]*len(axes)
start = time.ticks_ms()
next_flash = start
while time.ticks_diff(time.ticks_ms(), start) < 5000:
vals = [a.read() for a in axes]
mins = [min(m,v) for m,v in zip(mins, vals)]
maxs = [max(m,v) for m,v in zip(maxs, vals)]
# LED flash 2 Hz
if time.ticks_diff(time.ticks_ms(), next_flash) >= 0:
led.value(not led.value())
next_flash = time.ticks_add(next_flash, 250) # 250ms
time.sleep_ms(5)
print("Min/Max:", list(zip(mins, maxs)))
# Return calibration data
return centers, mins, maxs
def apply_deadzone(norm_val, deadzone=0.05):
if abs(norm_val) < deadzone:
return 0.0
if norm_val > 0:
return (norm_val - deadzone) / (1.0 - deadzone)
else:
return (norm_val + deadzone) / (1.0 - deadzone)
def normalize_axis(raw, center, min_val, max_val):
if raw >= center:
span = max_val - center
if span <= 0:
return 0
norm = (raw - center) / span
else:
span = center - min_val
if span <= 0:
return 0
norm = (raw - center) / span # negative
# Clamp to [-1, 1]
if norm > 1:
norm = 1
if norm < -1:
norm = -1
# Apply deadzone (still in 1…+1)
norm = apply_deadzone(norm)
# Scale to 255…+255 and return integer
return int(norm * 255)
CAL_FILE = "calibration.txt"
def save_calibration(centers, mins, maxs):
with open(CAL_FILE, "w") as f:
# Write as comma-separated values
f.write(",".join(str(v) for v in centers) + "\n")
f.write(",".join(str(v) for v in mins) + "\n")
f.write(",".join(str(v) for v in maxs) + "\n")
def load_calibration():
try:
with open(CAL_FILE, "r") as f:
lines = f.readlines()
centers = [int(v) for v in lines[0].strip().split(",")]
mins = [int(v) for v in lines[1].strip().split(",")]
maxs = [int(v) for v in lines[2].strip().split(",")]
return centers, mins, maxs
except:
return None, None, None
# Globals to hold calibration
cal_centers, cal_mins, cal_maxs = load_calibration()
if cal_centers:
print("Loaded calibration:", cal_centers, cal_mins, cal_maxs)
else:
print("No calibration stored, will use raw values until calibrated")
# ---------------- Setup ----------------
_espnow_init(1, 'RATE_LORA_500K')
peer_mac = load_peer()
if not peer_mac:
enter_pairing()
else:
_espnow_add_peer(peer_mac)
# ---------------- Main loop ----------------
while True:
if btn12.value() == 1 and PAIRMODE_ON_START == 1:
print("PAIRING")
start = time.ticks_ms()
while btn12.value() == 1:
if time.ticks_diff(time.ticks_ms(), start) > 3000:
enter_pairing()
if peer_mac:
_espnow_add_peer(peer_mac)
PAIRMODE_ON_START = 0
break
else:
PAIRMODE_ON_START = 0
if btn5.value() == 1 and CALIBRATE_ON_START == 1:
print("CALIBRATING")
cal_centers, cal_mins, cal_maxs = calibrate()
print(cal_centers, cal_mins, cal_maxs)
save_calibration(cal_centers, cal_mins, cal_maxs)
CALIBRATE_ON_START = 0
else:
CALIBRATE_ON_START = 0
if peer_mac:
if cal_centers and cal_mins and cal_maxs:
# Normalize the four joystick axes
axes_raw = [adc_3.read(), adc_4.read(), adc_8.read(), adc_9.read()]
axes_norm = [
normalize_axis(axes_raw[0], cal_centers[0], cal_mins[0], cal_maxs[0]),
normalize_axis(axes_raw[1], cal_centers[1], cal_mins[1], cal_maxs[1]),
normalize_axis(axes_raw[2], cal_centers[2], cal_mins[2], cal_maxs[2]),
normalize_axis(axes_raw[3], cal_centers[3], cal_mins[3], cal_maxs[3]),
]
else:
# Fallback to raw values if not calibrated
axes_norm = [adc_3.read(), adc_4.read(), adc_8.read(), adc_9.read()]
# Build the packet explicitly
payload = [
axes_norm[0], axes_norm[1], axes_norm[2], axes_norm[3],
adc_1.read(), adc_10.read(),
btn2.value(), btn5.value(), btn6.value(),
btn7.value(), btn11.value(), btn12.value()
]
_espnow_send_peer(peer_mac, payload)
led.value(not led.value())
time.sleep_ms(50)