111 lines
3.6 KiB
Python
111 lines
3.6 KiB
Python
|
|
import wave
|
|||
|
|
import numpy as np
|
|||
|
|
import ctypes
|
|||
|
|
from ctypes import c_void_p, c_float, POINTER
|
|||
|
|
from ctypes.util import find_library
|
|||
|
|
from scipy.signal import butter, lfilter
|
|||
|
|
|
|||
|
|
FIFO_PATH = "/tmp/esp32_audio"
|
|||
|
|
RAW_FILE = "raw_mono_48k.wav"
|
|||
|
|
DENOISED_FILE = "denoised_mono_48k.wav"
|
|||
|
|
|
|||
|
|
IN_SR = 16000
|
|||
|
|
TARGET_SR = 48000
|
|||
|
|
CHANNELS_IN = 2
|
|||
|
|
BYTES_PER_SAMPLE = 2
|
|||
|
|
FRAME_SIZE = 480 # RNNoise frame size at 48kHz
|
|||
|
|
IN_FRAME_16K = 160 # 160 samples @16kHz → upsample ×3 → 480 @48kHz
|
|||
|
|
|
|||
|
|
# --- High-pass filter design ---
|
|||
|
|
def highpass_filter(data, cutoff=100, fs=TARGET_SR, order=4):
|
|||
|
|
b, a = butter(order, cutoff / (0.5 * fs), btype='high', analog=False)
|
|||
|
|
return lfilter(b, a, data)
|
|||
|
|
|
|||
|
|
# --- Simple linear upsample 16k → 48k (factor 3) ---
|
|||
|
|
def upsample3(x):
|
|||
|
|
out = np.empty(len(x)*3, dtype=np.float32)
|
|||
|
|
out[0::3] = x
|
|||
|
|
out[1::3] = (2*x + np.append(x[1:], x[-1]))/3.0
|
|||
|
|
out[2::3] = (x + np.append(x[1:], x[-1]))/2.0
|
|||
|
|
return out
|
|||
|
|
|
|||
|
|
# --- Load RNNoise ---
|
|||
|
|
libname = find_library("rnnoise")
|
|||
|
|
if not libname:
|
|||
|
|
raise RuntimeError("librnnoise not found. Run sudo ldconfig after install.")
|
|||
|
|
rn = ctypes.CDLL(libname)
|
|||
|
|
|
|||
|
|
rn.rnnoise_create.argtypes = [c_void_p] # takes RNNModel* (NULL for default)
|
|||
|
|
rn.rnnoise_create.restype = c_void_p
|
|||
|
|
rn.rnnoise_destroy.argtypes = [c_void_p]
|
|||
|
|
rn.rnnoise_process_frame.argtypes = [c_void_p,
|
|||
|
|
POINTER(c_float),
|
|||
|
|
POINTER(c_float)]
|
|||
|
|
rn.rnnoise_process_frame.restype = c_float
|
|||
|
|
|
|||
|
|
st = rn.rnnoise_create(None) # NULL = default model
|
|||
|
|
|
|||
|
|
# --- Configure WAV writers ---
|
|||
|
|
raw_wav = wave.open(RAW_FILE, "wb")
|
|||
|
|
raw_wav.setnchannels(1)
|
|||
|
|
raw_wav.setsampwidth(2)
|
|||
|
|
raw_wav.setframerate(TARGET_SR)
|
|||
|
|
|
|||
|
|
den_wav = wave.open(DENOISED_FILE, "wb")
|
|||
|
|
den_wav.setnchannels(1)
|
|||
|
|
den_wav.setsampwidth(2)
|
|||
|
|
den_wav.setframerate(TARGET_SR)
|
|||
|
|
|
|||
|
|
buf = np.empty((0,), dtype=np.int16)
|
|||
|
|
|
|||
|
|
print(f"Recording {FIFO_PATH} → {RAW_FILE}, {DENOISED_FILE}")
|
|||
|
|
try:
|
|||
|
|
with open(FIFO_PATH, "rb") as f:
|
|||
|
|
while True:
|
|||
|
|
data = f.read(IN_FRAME_16K * BYTES_PER_SAMPLE * CHANNELS_IN)
|
|||
|
|
if not data:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# Downmix stereo → mono @16k
|
|||
|
|
stereo = np.frombuffer(data, dtype=np.int16).reshape(-1, CHANNELS_IN)
|
|||
|
|
mono16 = stereo.mean(axis=1).astype(np.int16)
|
|||
|
|
|
|||
|
|
buf = np.concatenate([buf, mono16])
|
|||
|
|
|
|||
|
|
# Process when we have multiples of 160 samples
|
|||
|
|
while len(buf) >= IN_FRAME_16K:
|
|||
|
|
frame16 = buf[:IN_FRAME_16K].astype(np.float32) / 32768.0
|
|||
|
|
buf = buf[IN_FRAME_16K:]
|
|||
|
|
|
|||
|
|
# Upsample to 48kHz (480 samples)
|
|||
|
|
frame48 = upsample3(frame16)
|
|||
|
|
|
|||
|
|
# --- Apply high-pass filter ---
|
|||
|
|
frame48 = highpass_filter(frame48, cutoff=100, fs=TARGET_SR)
|
|||
|
|
|
|||
|
|
frame48 = np.ascontiguousarray(frame48, dtype=np.float32)
|
|||
|
|
|
|||
|
|
# --- Write raw upsampled mono (with HPF) ---
|
|||
|
|
raw_wav.writeframes(
|
|||
|
|
np.clip(frame48 * 32767.0, -32768, 32767).astype(np.int16).tobytes()
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# --- Denoise ---
|
|||
|
|
out48 = np.zeros(FRAME_SIZE, dtype=np.float32)
|
|||
|
|
rn.rnnoise_process_frame(
|
|||
|
|
st,
|
|||
|
|
out48.ctypes.data_as(POINTER(c_float)),
|
|||
|
|
frame48.ctypes.data_as(POINTER(c_float))
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
den_wav.writeframes(
|
|||
|
|
np.clip(out48 * 32767.0, -32768, 32767).astype(np.int16).tobytes()
|
|||
|
|
)
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
pass
|
|||
|
|
finally:
|
|||
|
|
raw_wav.close()
|
|||
|
|
den_wav.close()
|
|||
|
|
rn.rnnoise_destroy(st)
|
|||
|
|
print(f"Saved {RAW_FILE} and {DENOISED_FILE}")
|