mirror of
https://github.com/langgenius/dify.git
synced 2024-11-16 11:42:29 +08:00
153 lines
5.9 KiB
Python
153 lines
5.9 KiB
Python
import io
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from werkzeug.datastructures import FileStorage
|
|
|
|
from core.model_manager import ModelManager
|
|
from core.model_runtime.entities.model_entities import ModelType
|
|
from models.model import App, AppMode, AppModelConfig, Message
|
|
from services.errors.audio import (
|
|
AudioTooLargeServiceError,
|
|
NoAudioUploadedServiceError,
|
|
ProviderNotSupportSpeechToTextServiceError,
|
|
ProviderNotSupportTextToSpeechServiceError,
|
|
UnsupportedAudioTypeServiceError,
|
|
)
|
|
|
|
FILE_SIZE = 30
|
|
FILE_SIZE_LIMIT = FILE_SIZE * 1024 * 1024
|
|
ALLOWED_EXTENSIONS = ['mp3', 'mp4', 'mpeg', 'mpga', 'm4a', 'wav', 'webm', 'amr']
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AudioService:
|
|
@classmethod
|
|
def transcript_asr(cls, app_model: App, file: FileStorage, end_user: Optional[str] = None):
|
|
if app_model.mode in [AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value]:
|
|
workflow = app_model.workflow
|
|
if workflow is None:
|
|
raise ValueError("Speech to text is not enabled")
|
|
|
|
features_dict = workflow.features_dict
|
|
if 'speech_to_text' not in features_dict or not features_dict['speech_to_text'].get('enabled'):
|
|
raise ValueError("Speech to text is not enabled")
|
|
else:
|
|
app_model_config: AppModelConfig = app_model.app_model_config
|
|
|
|
if not app_model_config.speech_to_text_dict['enabled']:
|
|
raise ValueError("Speech to text is not enabled")
|
|
|
|
if file is None:
|
|
raise NoAudioUploadedServiceError()
|
|
|
|
extension = file.mimetype
|
|
if extension not in [f'audio/{ext}' for ext in ALLOWED_EXTENSIONS]:
|
|
raise UnsupportedAudioTypeServiceError()
|
|
|
|
file_content = file.read()
|
|
file_size = len(file_content)
|
|
|
|
if file_size > FILE_SIZE_LIMIT:
|
|
message = f"Audio size larger than {FILE_SIZE} mb"
|
|
raise AudioTooLargeServiceError(message)
|
|
|
|
model_manager = ModelManager()
|
|
model_instance = model_manager.get_default_model_instance(
|
|
tenant_id=app_model.tenant_id,
|
|
model_type=ModelType.SPEECH2TEXT
|
|
)
|
|
if model_instance is None:
|
|
raise ProviderNotSupportSpeechToTextServiceError()
|
|
|
|
buffer = io.BytesIO(file_content)
|
|
buffer.name = 'temp.mp3'
|
|
|
|
return {"text": model_instance.invoke_speech2text(file=buffer, user=end_user)}
|
|
|
|
@classmethod
|
|
def transcript_tts(cls, app_model: App, text: Optional[str] = None,
|
|
voice: Optional[str] = None, end_user: Optional[str] = None, message_id: Optional[str] = None):
|
|
from collections.abc import Generator
|
|
|
|
from flask import Response, stream_with_context
|
|
|
|
from app import app
|
|
from extensions.ext_database import db
|
|
|
|
def invoke_tts(text_content: str, app_model, voice: Optional[str] = None):
|
|
with app.app_context():
|
|
if app_model.mode in [AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value]:
|
|
workflow = app_model.workflow
|
|
if workflow is None:
|
|
raise ValueError("TTS is not enabled")
|
|
|
|
features_dict = workflow.features_dict
|
|
if 'text_to_speech' not in features_dict or not features_dict['text_to_speech'].get('enabled'):
|
|
raise ValueError("TTS is not enabled")
|
|
|
|
voice = features_dict['text_to_speech'].get('voice') if voice is None else voice
|
|
else:
|
|
text_to_speech_dict = app_model.app_model_config.text_to_speech_dict
|
|
|
|
if not text_to_speech_dict.get('enabled'):
|
|
raise ValueError("TTS is not enabled")
|
|
|
|
voice = text_to_speech_dict.get('voice') if voice is None else voice
|
|
|
|
model_manager = ModelManager()
|
|
model_instance = model_manager.get_default_model_instance(
|
|
tenant_id=app_model.tenant_id,
|
|
model_type=ModelType.TTS
|
|
)
|
|
try:
|
|
if not voice:
|
|
voices = model_instance.get_tts_voices()
|
|
if voices:
|
|
voice = voices[0].get('value')
|
|
else:
|
|
raise ValueError("Sorry, no voice available.")
|
|
|
|
return model_instance.invoke_tts(
|
|
content_text=text_content.strip(),
|
|
user=end_user,
|
|
tenant_id=app_model.tenant_id,
|
|
voice=voice
|
|
)
|
|
except Exception as e:
|
|
raise e
|
|
|
|
if message_id:
|
|
message = db.session.query(Message).filter(
|
|
Message.id == message_id
|
|
).first()
|
|
if message.answer == '' and message.status == 'normal':
|
|
return None
|
|
|
|
else:
|
|
response = invoke_tts(message.answer, app_model=app_model, voice=voice)
|
|
if isinstance(response, Generator):
|
|
return Response(stream_with_context(response), content_type='audio/mpeg')
|
|
return response
|
|
else:
|
|
response = invoke_tts(text, app_model, voice)
|
|
if isinstance(response, Generator):
|
|
return Response(stream_with_context(response), content_type='audio/mpeg')
|
|
return response
|
|
|
|
@classmethod
|
|
def transcript_tts_voices(cls, tenant_id: str, language: str):
|
|
model_manager = ModelManager()
|
|
model_instance = model_manager.get_default_model_instance(
|
|
tenant_id=tenant_id,
|
|
model_type=ModelType.TTS
|
|
)
|
|
if model_instance is None:
|
|
raise ProviderNotSupportTextToSpeechServiceError()
|
|
|
|
try:
|
|
return model_instance.get_tts_voices(language)
|
|
except Exception as e:
|
|
raise e
|