serial_audio_catcher/tdoa.py

118 lines
3.8 KiB
Python
Raw Permalink Normal View History

import numpy as np
import time
from datetime import datetime
FIFO_PATH = "/tmp/esp32_audio"
SAMPLE_RATE = 16000
CHANNELS = 2
BYTES_PER_SAMPLE = 2
# Voicespecific params
BLOCK_FRAMES = 4096 # ~256 ms @16k, good for speech segments
BAND_LOW = 300 # Hz
BAND_HIGH = 3000 # Hz
ALPHA = 0.005 # slower baseline adaptation
MARGIN = 2.0 # multiplier above baseline RMS
COOLDOWN = 0.7 # seconds; suppress retriggers
# Geometry
MIC_DISTANCE = 0.13 # meters between microphones
SPEED_OF_SOUND = 343.0 # m/s
def read_block(f, block_bytes):
data = f.read(block_bytes)
if not data or len(data) < block_bytes:
return None
return np.frombuffer(data, dtype=np.int16)
def bandpass_fft(x, fs, low, high):
n = len(x)
X = np.fft.rfft(x)
freqs = np.fft.rfftfreq(n, d=1.0/fs)
mask = (freqs >= low) & (freqs <= high)
X_filtered = X * mask
x_filtered = np.fft.irfft(X_filtered, n=n)
return x_filtered.astype(x.dtype)
def gcc_phat(sig, refsig, fs, max_tau=None, interp=1):
n = sig.shape[0] + refsig.shape[0]
SIG = np.fft.rfft(sig, n=n)
REFSIG = np.fft.rfft(refsig, n=n)
R = SIG * np.conj(REFSIG)
R /= np.abs(R) + 1e-15
cc = np.fft.irfft(R, n=(interp * n))
if max_tau is None:
max_tau = MIC_DISTANCE / SPEED_OF_SOUND
max_shift = int(interp * fs * max_tau)
mid = cc.shape[0] // 2
cc = np.concatenate((cc[mid - max_shift: mid + max_shift + 1],))
shift = np.argmax(cc) - max_shift
tau = shift / float(interp * fs)
# confidence: peak vs average correlation magnitude
peak_val = np.max(cc)
avg_val = np.mean(np.abs(cc))
confidence = peak_val / (avg_val + 1e-9)
return tau, confidence
def tau_to_angle(tau, mic_distance, speed_of_sound):
arg = (tau * speed_of_sound) / mic_distance
arg = max(-1.0, min(1.0, arg))
angle_rad = np.arcsin(arg)
return np.degrees(angle_rad)
def main():
block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE
baseline = None
last_trigger = 0.0
with open(FIFO_PATH, "rb") as f:
print("Listening for voice events (RMS + GCC-PHAT + confidence)...")
while True:
audio = read_block(f, block_bytes)
if audio is None:
continue
left = audio[0::2]
right = audio[1::2]
# RMS energy across both channels
rms = np.sqrt(np.mean(((left.astype(np.float32)**2 + right.astype(np.float32)**2) / 2)))
if baseline is None:
baseline = rms
continue
baseline = (1 - ALPHA) * baseline + ALPHA * rms
threshold = baseline * MARGIN
now = time.time()
if now - last_trigger < COOLDOWN:
continue
if rms <= threshold:
continue
# Band-pass filter to voice band
l_bp = bandpass_fft(left.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
r_bp = bandpass_fft(right.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
# GCC-PHAT for TDOA + confidence
tau, confidence = gcc_phat(l_bp, r_bp, SAMPLE_RATE,
max_tau=MIC_DISTANCE/SPEED_OF_SOUND, interp=4)
angle = tau_to_angle(tau, MIC_DISTANCE, SPEED_OF_SOUND)
# Only report strong detections
if confidence > 2.0:
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
louder = "LEFT" if np.max(np.abs(left)) > np.max(np.abs(right)) else "RIGHT"
print(f"[{ts}] Voice event: {louder} louder | RMS={rms:.1f}, baseline={baseline:.1f}, "
f"TDOA={tau*1000:.2f} ms | angle≈{angle:.1f}° | conf={confidence:.2f}")
last_trigger = now
if __name__ == "__main__":
main()