import io import logging from typing import Optional from werkzeug.datastructures import FileStorage from core.model_manager import ModelManager from core.model_runtime.entities.model_entities import ModelType from models.model import App, AppMode, AppModelConfig, Message from services.errors.audio import ( AudioTooLargeServiceError, NoAudioUploadedServiceError, ProviderNotSupportSpeechToTextServiceError, ProviderNotSupportTextToSpeechServiceError, UnsupportedAudioTypeServiceError, ) FILE_SIZE = 30 FILE_SIZE_LIMIT = FILE_SIZE * 1024 * 1024 ALLOWED_EXTENSIONS = ['mp3', 'mp4', 'mpeg', 'mpga', 'm4a', 'wav', 'webm', 'amr'] logger = logging.getLogger(__name__) class AudioService: @classmethod def transcript_asr(cls, app_model: App, file: FileStorage, end_user: Optional[str] = None): if app_model.mode in [AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value]: workflow = app_model.workflow if workflow is None: raise ValueError("Speech to text is not enabled") features_dict = workflow.features_dict if 'speech_to_text' not in features_dict or not features_dict['speech_to_text'].get('enabled'): raise ValueError("Speech to text is not enabled") else: app_model_config: AppModelConfig = app_model.app_model_config if not app_model_config.speech_to_text_dict['enabled']: raise ValueError("Speech to text is not enabled") if file is None: raise NoAudioUploadedServiceError() extension = file.mimetype if extension not in [f'audio/{ext}' for ext in ALLOWED_EXTENSIONS]: raise UnsupportedAudioTypeServiceError() file_content = file.read() file_size = len(file_content) if file_size > FILE_SIZE_LIMIT: message = f"Audio size larger than {FILE_SIZE} mb" raise AudioTooLargeServiceError(message) model_manager = ModelManager() model_instance = model_manager.get_default_model_instance( tenant_id=app_model.tenant_id, model_type=ModelType.SPEECH2TEXT ) if model_instance is None: raise ProviderNotSupportSpeechToTextServiceError() buffer = io.BytesIO(file_content) buffer.name = 'temp.mp3' return {"text": model_instance.invoke_speech2text(file=buffer, user=end_user)} @classmethod def transcript_tts(cls, app_model: App, text: Optional[str] = None, voice: Optional[str] = None, end_user: Optional[str] = None, message_id: Optional[str] = None): from collections.abc import Generator from flask import Response, stream_with_context from app import app from extensions.ext_database import db def invoke_tts(text_content: str, app_model, voice: Optional[str] = None): with app.app_context(): if app_model.mode in [AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value]: workflow = app_model.workflow if workflow is None: raise ValueError("TTS is not enabled") features_dict = workflow.features_dict if 'text_to_speech' not in features_dict or not features_dict['text_to_speech'].get('enabled'): raise ValueError("TTS is not enabled") voice = features_dict['text_to_speech'].get('voice') if voice is None else voice else: text_to_speech_dict = app_model.app_model_config.text_to_speech_dict if not text_to_speech_dict.get('enabled'): raise ValueError("TTS is not enabled") voice = text_to_speech_dict.get('voice') if voice is None else voice model_manager = ModelManager() model_instance = model_manager.get_default_model_instance( tenant_id=app_model.tenant_id, model_type=ModelType.TTS ) try: if not voice: voices = model_instance.get_tts_voices() if voices: voice = voices[0].get('value') else: raise ValueError("Sorry, no voice available.") return model_instance.invoke_tts( content_text=text_content.strip(), user=end_user, tenant_id=app_model.tenant_id, voice=voice ) except Exception as e: raise e if message_id: message = db.session.query(Message).filter( Message.id == message_id ).first() if message.answer == '' and message.status == 'normal': return None else: response = invoke_tts(message.answer, app_model=app_model, voice=voice) if isinstance(response, Generator): return Response(stream_with_context(response), content_type='audio/mpeg') return response else: response = invoke_tts(text, app_model, voice) if isinstance(response, Generator): return Response(stream_with_context(response), content_type='audio/mpeg') return response @classmethod def transcript_tts_voices(cls, tenant_id: str, language: str): model_manager = ModelManager() model_instance = model_manager.get_default_model_instance( tenant_id=tenant_id, model_type=ModelType.TTS ) if model_instance is None: raise ProviderNotSupportTextToSpeechServiceError() try: return model_instance.get_tts_voices(language) except Exception as e: raise e