import os import sys import signal import re import queue import threading import time import numpy as np # SETTINGS os.environ["HSA_OVERRIDE_GFX_VERSION"] = "10.3.0" os.environ["ORT_LOGGING_LEVEL"] = "3" os.environ["MIGRAPHX_ENABLE_CACHE"] = "1" os.environ["MIGRAPHX_CACHE_PATH"] = os.path.expanduser("./migraphx_cache") def signal_handler(sig, frame): print("\n[Ctrl+C] Stopping...") os._exit(0) signal.signal(signal.SIGINT, signal_handler) import onnxruntime as ort import sounddevice as sd from kokoro_onnx import Kokoro # Paths base_dir = os.path.expanduser("./") model_path = os.path.join(base_dir, "kokoro-v1.0.onnx") voices_path = os.path.join(base_dir, "voices-v1.0.bin") input_text_path = os.path.join(base_dir, "narrate.txt") # Initialize Session try: session = ort.InferenceSession(model_path, providers=[ ('MIGraphXExecutionProvider', {'device_id': 0, 'migraphx_fp16_enable': True}), 'CPUExecutionProvider' ]) kokoro = Kokoro.from_session(session, voices_path) except Exception as e: print(f"Init Error: {e}") sys.exit(1) audio_queue = queue.Queue() def playback_worker(): samplerate = 24000 try: with sd.OutputStream(samplerate=samplerate, channels=1, dtype='float32') as stream: while True: item = audio_queue.get() if item is None: audio_queue.task_done() break samples, text_preview = item samples_reshaped = samples.reshape(-1, 1).astype('float32') print(f"Playing: {text_preview}...") stream.write(samples_reshaped) # Physical silence (0.5s) silence = np.zeros((int(samplerate * 0.5), 1), dtype='float32') stream.write(silence) audio_queue.task_done() except Exception as e: print(f"Playback Error: {e}") play_thread = threading.Thread(target=playback_worker, daemon=True) play_thread.start() # Read and Split Text if os.path.exists(input_text_path): with open(input_text_path, "r") as f: full_text = f.read() else: full_text = "No input file found." # Split on punctuation (and any following whitespace/newlines) OR standalone newlines # This avoids word-level splitting and ensures no double pauses sentences = [s.strip() for s in re.split(r'(?<=[.!?])(?![.!?])\s*|\n+', full_text) if s.strip()] print(f"Detected {len(sentences)} units (sentences/lines).") try: for i, sentence in enumerate(sentences): samples, _ = kokoro.create(sentence, voice="af_sky", speed=1.3, lang="en-us") audio_queue.put((samples, sentence[:50])) print(f"[{i+1}/{len(sentences)}] Buffered.") print("Generation complete. Waiting for playback to finish...") audio_queue.put(None) audio_queue.join() # Final flush time.sleep(0.1) except Exception as e: print(f"Process Error: {e}") finally: print("Finished.")