Skip to content

Commit

Permalink
..
Browse files Browse the repository at this point in the history
  • Loading branch information
AriyarathnaDBS committed Mar 3, 2025
1 parent c20043c commit 1ef4280
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 68 deletions.
261 changes: 193 additions & 68 deletions code/realtime_voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
import argparse
import socket
import os
import numpy as np
from collections import deque

# Audio settings - optimized for speech
CHUNK = 2048 # Larger chunk size for better performance
# Enhanced audio settings for clarity
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1 # Mono for voice clarity and smaller data size
RATE = 22050 # Higher sample rate for better voice quality
RECORD_SECONDS = 0.5 # Shorter chunks for more real-time feel
CHANNELS = 1
RATE = 44100 # CD quality sample rate for better clarity
RECORD_SECONDS = 1.0 # Full 1-second chunks
BUFFER_SECONDS = 0.2 # Buffer to smooth playback

# MQTT settings
MQTT_BROKER = "broker.emqx.io" # Public MQTT broker
MQTT_BROKER = "broker.emqx.io"
MQTT_PORT = 1883
QOS_LEVEL = 1 # QoS 1 ensures message delivery at least once
QOS_LEVEL = 1
MAX_PAYLOAD = 65536 # Maximum MQTT payload size (64KB)

class LaptopVoiceCall:
def __init__(self, user_id):
Expand All @@ -28,6 +32,9 @@ def __init__(self, user_id):
self.audio = pyaudio.PyAudio()
self.stream_thread = None
self.temp_dir = "temp_audio"
self.audio_buffer = deque(maxlen=20) # Playback buffer to smooth audio
self.playing = False
self.silence_threshold = 300 # Threshold for silence detection

# Create temp directory if it doesn't exist
if not os.path.exists(self.temp_dir):
Expand All @@ -37,10 +44,13 @@ def __init__(self, user_id):
self.hostname = socket.gethostname()

# Initialize MQTT client with clean session
self.client = mqtt.Client(client_id=f"laptop_{user_id}_{int(time.time())}", clean_session=True)
client_id = f"laptop_{user_id}_{int(time.time())}"
self.client = mqtt.Client(client_id=client_id, clean_session=True)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message

print(f"Initializing voice call system as {client_id}...")

# Connect to MQTT broker
try:
self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
Expand Down Expand Up @@ -84,6 +94,9 @@ def handle_control(self, payload):
self.stream_thread.daemon = True
self.stream_thread.start()

# Start audio playback thread
threading.Thread(target=self.continuous_audio_playback, daemon=True).start()

elif action == "call_end":
if self.call_active or self.other_user == caller:
print(f"Call ended by {caller}")
Expand All @@ -98,30 +111,80 @@ def handle_voice(self, payload):
return

try:
# Save audio data to temp file
temp_file = os.path.join(self.temp_dir, f"temp_audio_{int(time.time()*1000)}.wav")

with open(temp_file, "wb") as f:
f.write(payload)

# Play the audio
threading.Thread(target=self.play_audio, args=(temp_file,)).start()

# Cleanup old files (keep only last 10 seconds of audio files)
self.cleanup_temp_files()

# Add the audio data to the buffer for playback
self.audio_buffer.append(payload)
except Exception as e:
print(f"Error handling voice data: {e}")

def cleanup_temp_files(self):
def continuous_audio_playback(self):
"""Continuously play audio from the buffer for smoother output"""
print("Audio playback thread started")
self.playing = True

# Create output stream once
output_stream = self.audio.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,
frames_per_buffer=CHUNK
)

try:
now = time.time()
for f in os.listdir(self.temp_dir):
file_path = os.path.join(self.temp_dir, f)
if os.path.isfile(file_path) and now - os.path.getmtime(file_path) > 10:
os.unlink(file_path)
while self.call_active:
if len(self.audio_buffer) > 0:
# Get the next audio data from the buffer
audio_data = self.audio_buffer.popleft()

# Convert to wave format for easier processing
temp_file = os.path.join(self.temp_dir, f"temp_play_{int(time.time()*1000)}.wav")
with open(temp_file, "wb") as f:
f.write(audio_data)

# Read and play
try:
wf = wave.open(temp_file, 'rb')
frames = wf.readframes(wf.getnframes())
output_stream.write(frames)
wf.close()

# Clean up temp file
try:
os.unlink(temp_file)
except:
pass
except Exception as e:
print(f"Playback error: {e}")

else:
# No audio data, sleep a bit
time.sleep(0.01)

except Exception as e:
pass # Silent cleanup failure
print(f"Audio playback thread error: {e}")
finally:
try:
output_stream.stop_stream()
output_stream.close()
except:
pass

self.playing = False
print("Audio playback thread stopped")

def is_silent(self, audio_data, format=FORMAT, threshold=None):
"""Check if audio data is silent"""
if threshold is None:
threshold = self.silence_threshold

try:
# Convert audio data to numpy array
data = np.frombuffer(audio_data, dtype=np.int16)
# Calculate amplitude
amplitude = np.abs(data).mean()
return amplitude < threshold
except:
return False # In case of error, assume not silent

def initiate_call(self, recipient):
if self.call_active:
Expand Down Expand Up @@ -163,6 +226,9 @@ def accept_call(self):
self.stream_thread.daemon = True
self.stream_thread.start()

# Start audio playback thread
threading.Thread(target=self.continuous_audio_playback, daemon=True).start()

print(f"Call with {self.other_user} started!")
return True

Expand All @@ -188,103 +254,138 @@ def end_call(self):
self.other_user = None
return True

def record_audio(self, duration=0.5):
def record_audio(self, duration=RECORD_SECONDS):
try:
stream = self.audio.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
# Create a new input stream for each recording
stream = self.audio.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)

print("Recording...", end="", flush=True)
frames = []

for i in range(0, int(RATE / CHUNK * duration)):
# Calculate number of chunks to read based on duration
num_chunks = int(RATE / CHUNK * duration)

# Ensure we always read at least 1 chunk
num_chunks = max(1, num_chunks)

for i in range(num_chunks):
data = stream.read(CHUNK, exception_on_overflow=False)
frames.append(data)

print(" Done")

stream.stop_stream()
stream.close()

# Save to temp file
# Check if audio is silent
all_data = b''.join(frames)
if self.is_silent(all_data):
print("Silent frame detected, skipping")
return None

# Save to temp wave file
temp_file = os.path.join(self.temp_dir, f"temp_recording_{int(time.time()*1000)}.wav")
wf = wave.open(temp_file, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.writeframes(all_data)
wf.close()

return temp_file
except Exception as e:
print(f"Error recording audio: {e}")
return None

def play_audio(self, filename):
try:
wf = wave.open(filename, 'rb')
stream = self.audio.open(format=self.audio.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)

data = wf.readframes(CHUNK)
while data and self.call_active:
stream.write(data)
data = wf.readframes(CHUNK)

stream.stop_stream()
stream.close()

# Try to remove the file after playing
try:
os.unlink(filename)
except:
pass

except Exception as e:
print(f"Error playing audio: {e}")

def stream_voice(self):
print("Voice streaming started. Speak now! (Call in progress)")
print("\nVoice streaming started. Speak now! (Call in progress)")
print("------------------------------------------------------")

# Wait for buffer time before starting to ensure smooth start
time.sleep(BUFFER_SECONDS)

while self.call_active:
try:
# Record a chunk of audio
# Record a chunk of audio (full 1 second)
audio_file = self.record_audio(RECORD_SECONDS)

if not audio_file or not self.call_active:
time.sleep(0.1) # Small delay if no audio or call ended
continue

# Read the recorded data
with open(audio_file, "rb") as f:
audio_data = f.read()

# Check file size before sending
if len(audio_data) > MAX_PAYLOAD:
print(f"Warning: Audio payload too large ({len(audio_data)} bytes), skipping")
continue

# Send to the other device with QoS 1 for reliability
self.client.publish(f"laptop/voice/{self.other_user}", audio_data, qos=QOS_LEVEL)
result = self.client.publish(f"laptop/voice/{self.other_user}", audio_data, qos=QOS_LEVEL)

# Check if publish was successful
if not result.is_published():
result.wait_for_publish(timeout=2.0)

# Clean up the temp recording file
try:
os.unlink(audio_file)
except:
pass

# Small delay to prevent overwhelming the CPU
time.sleep(0.05)

except Exception as e:
print(f"Error in voice streaming: {e}")
time.sleep(1) # Wait a bit before retrying
time.sleep(0.5) # Wait before retrying

print("Voice streaming ended.")

def test_microphone(self):
"""Test microphone and speakers"""
print("\nTesting microphone... speak for 3 seconds")
audio_file = self.record_audio(3.0)

if audio_file:
print("Playing back recorded audio...")
# Create temporary output stream
stream = self.audio.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,
frames_per_buffer=CHUNK
)

wf = wave.open(audio_file, 'rb')
data = wf.readframes(CHUNK)

while data:
stream.write(data)
data = wf.readframes(CHUNK)

stream.stop_stream()
stream.close()
wf.close()

print("Audio test complete. Did you hear your voice?")
else:
print("Microphone test failed! Check your microphone settings.")

def interactive_console(self):
print(f"=== Laptop Voice Call System ({self.user_id} on {self.hostname}) ===")
print("Commands:")
print(" call <user_id> - Start a call with another laptop")
print(" accept - Accept incoming call")
print(" end - End current call")
print(" status - Show current status")
print(" test - Test microphone and speakers")
print(" exit - Exit application")

while True:
Expand Down Expand Up @@ -312,6 +413,9 @@ def interactive_console(self):
print(f"Call pending with {self.other_user}")
else:
print("Not in a call")

elif command == "test":
self.test_microphone()

elif command == "exit":
if self.call_active:
Expand Down Expand Up @@ -344,8 +448,29 @@ def interactive_console(self):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Laptop Voice Call System')
parser.add_argument('user_id', type=str, help='Unique user ID (e.g., alice, bob)')
parser.add_argument('--broker', type=str, default=MQTT_BROKER, help='MQTT broker address')
parser.add_argument('--port', type=int, default=MQTT_PORT, help='MQTT broker port')

args = parser.parse_args()

# Update global settings if provided
MQTT_BROKER = args.broker
MQTT_PORT = args.port

try:
# Check if numpy is installed
import numpy
except ImportError:
print("NumPy is required. Installing...")
os.system('pip install numpy')
print("NumPy installed. Starting application...")

# Start the voice call system
laptop_call = LaptopVoiceCall(args.user_id)

# Offer to test the microphone first
test_mic = input("Would you like to test your microphone before starting? (y/n): ").strip().lower()
if test_mic == 'y':
laptop_call.test_microphone()

laptop_call.interactive_console()
Binary file added code/temp_audio/temp_audio_1740980704875.wav
Binary file not shown.

0 comments on commit 1ef4280

Please sign in to comment.