serial_audio_catcher/rntest.py

111 lines
3.6 KiB
Python
Raw Permalink Normal View History

import wave
import numpy as np
import ctypes
from ctypes import c_void_p, c_float, POINTER
from ctypes.util import find_library
from scipy.signal import butter, lfilter
FIFO_PATH = "/tmp/esp32_audio"
RAW_FILE = "raw_mono_48k.wav"
DENOISED_FILE = "denoised_mono_48k.wav"
IN_SR = 16000
TARGET_SR = 48000
CHANNELS_IN = 2
BYTES_PER_SAMPLE = 2
FRAME_SIZE = 480 # RNNoise frame size at 48kHz
IN_FRAME_16K = 160 # 160 samples @16kHz → upsample ×3 → 480 @48kHz
# --- High-pass filter design ---
def highpass_filter(data, cutoff=100, fs=TARGET_SR, order=4):
b, a = butter(order, cutoff / (0.5 * fs), btype='high', analog=False)
return lfilter(b, a, data)
# --- Simple linear upsample 16k → 48k (factor 3) ---
def upsample3(x):
out = np.empty(len(x)*3, dtype=np.float32)
out[0::3] = x
out[1::3] = (2*x + np.append(x[1:], x[-1]))/3.0
out[2::3] = (x + np.append(x[1:], x[-1]))/2.0
return out
# --- Load RNNoise ---
libname = find_library("rnnoise")
if not libname:
raise RuntimeError("librnnoise not found. Run sudo ldconfig after install.")
rn = ctypes.CDLL(libname)
rn.rnnoise_create.argtypes = [c_void_p] # takes RNNModel* (NULL for default)
rn.rnnoise_create.restype = c_void_p
rn.rnnoise_destroy.argtypes = [c_void_p]
rn.rnnoise_process_frame.argtypes = [c_void_p,
POINTER(c_float),
POINTER(c_float)]
rn.rnnoise_process_frame.restype = c_float
st = rn.rnnoise_create(None) # NULL = default model
# --- Configure WAV writers ---
raw_wav = wave.open(RAW_FILE, "wb")
raw_wav.setnchannels(1)
raw_wav.setsampwidth(2)
raw_wav.setframerate(TARGET_SR)
den_wav = wave.open(DENOISED_FILE, "wb")
den_wav.setnchannels(1)
den_wav.setsampwidth(2)
den_wav.setframerate(TARGET_SR)
buf = np.empty((0,), dtype=np.int16)
print(f"Recording {FIFO_PATH}{RAW_FILE}, {DENOISED_FILE}")
try:
with open(FIFO_PATH, "rb") as f:
while True:
data = f.read(IN_FRAME_16K * BYTES_PER_SAMPLE * CHANNELS_IN)
if not data:
continue
# Downmix stereo → mono @16k
stereo = np.frombuffer(data, dtype=np.int16).reshape(-1, CHANNELS_IN)
mono16 = stereo.mean(axis=1).astype(np.int16)
buf = np.concatenate([buf, mono16])
# Process when we have multiples of 160 samples
while len(buf) >= IN_FRAME_16K:
frame16 = buf[:IN_FRAME_16K].astype(np.float32) / 32768.0
buf = buf[IN_FRAME_16K:]
# Upsample to 48kHz (480 samples)
frame48 = upsample3(frame16)
# --- Apply high-pass filter ---
frame48 = highpass_filter(frame48, cutoff=100, fs=TARGET_SR)
frame48 = np.ascontiguousarray(frame48, dtype=np.float32)
# --- Write raw upsampled mono (with HPF) ---
raw_wav.writeframes(
np.clip(frame48 * 32767.0, -32768, 32767).astype(np.int16).tobytes()
)
# --- Denoise ---
out48 = np.zeros(FRAME_SIZE, dtype=np.float32)
rn.rnnoise_process_frame(
st,
out48.ctypes.data_as(POINTER(c_float)),
frame48.ctypes.data_as(POINTER(c_float))
)
den_wav.writeframes(
np.clip(out48 * 32767.0, -32768, 32767).astype(np.int16).tobytes()
)
except KeyboardInterrupt:
pass
finally:
raw_wav.close()
den_wav.close()
rn.rnnoise_destroy(st)
print(f"Saved {RAW_FILE} and {DENOISED_FILE}")