mirror of
https://github.com/langgenius/dify.git
synced 2024-11-16 19:59:50 +08:00
8c51d06222
Some checks are pending
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Waiting to run
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Waiting to run
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Waiting to run
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Blocked by required conditions
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Blocked by required conditions
473 lines
18 KiB
Python
473 lines
18 KiB
Python
import json
|
|
import logging
|
|
|
|
from flask import abort, request
|
|
from flask_restful import Resource, marshal_with, reqparse
|
|
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
|
|
|
|
import services
|
|
from controllers.console import api
|
|
from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
|
|
from controllers.console.app.wraps import get_app_model
|
|
from controllers.console.setup import setup_required
|
|
from controllers.console.wraps import account_initialization_required
|
|
from core.app.apps.base_app_queue_manager import AppQueueManager
|
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
|
from core.app.segments import factory
|
|
from core.errors.error import AppInvokeQuotaExceededError
|
|
from fields.workflow_fields import workflow_fields
|
|
from fields.workflow_run_fields import workflow_run_node_execution_fields
|
|
from libs import helper
|
|
from libs.helper import TimestampField, uuid_value
|
|
from libs.login import current_user, login_required
|
|
from models.model import App, AppMode
|
|
from services.app_dsl_service import AppDslService
|
|
from services.app_generate_service import AppGenerateService
|
|
from services.errors.app import WorkflowHashNotEqualError
|
|
from services.workflow_service import WorkflowService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DraftWorkflowApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
|
@marshal_with(workflow_fields)
|
|
def get(self, app_model: App):
|
|
"""
|
|
Get draft workflow
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
# fetch draft workflow by app_model
|
|
workflow_service = WorkflowService()
|
|
workflow = workflow_service.get_draft_workflow(app_model=app_model)
|
|
|
|
if not workflow:
|
|
raise DraftWorkflowNotExist()
|
|
|
|
# return workflow, if not found, return None (initiate graph by frontend)
|
|
return workflow
|
|
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
|
def post(self, app_model: App):
|
|
"""
|
|
Sync draft workflow
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
content_type = request.headers.get("Content-Type", "")
|
|
|
|
if "application/json" in content_type:
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument("graph", type=dict, required=True, nullable=False, location="json")
|
|
parser.add_argument("features", type=dict, required=True, nullable=False, location="json")
|
|
parser.add_argument("hash", type=str, required=False, location="json")
|
|
# TODO: set this to required=True after frontend is updated
|
|
parser.add_argument("environment_variables", type=list, required=False, location="json")
|
|
parser.add_argument("conversation_variables", type=list, required=False, location="json")
|
|
args = parser.parse_args()
|
|
elif "text/plain" in content_type:
|
|
try:
|
|
data = json.loads(request.data.decode("utf-8"))
|
|
if "graph" not in data or "features" not in data:
|
|
raise ValueError("graph or features not found in data")
|
|
|
|
if not isinstance(data.get("graph"), dict) or not isinstance(data.get("features"), dict):
|
|
raise ValueError("graph or features is not a dict")
|
|
|
|
args = {
|
|
"graph": data.get("graph"),
|
|
"features": data.get("features"),
|
|
"hash": data.get("hash"),
|
|
"environment_variables": data.get("environment_variables"),
|
|
"conversation_variables": data.get("conversation_variables"),
|
|
}
|
|
except json.JSONDecodeError:
|
|
return {"message": "Invalid JSON data"}, 400
|
|
else:
|
|
abort(415)
|
|
|
|
workflow_service = WorkflowService()
|
|
|
|
try:
|
|
environment_variables_list = args.get("environment_variables") or []
|
|
environment_variables = [factory.build_variable_from_mapping(obj) for obj in environment_variables_list]
|
|
conversation_variables_list = args.get("conversation_variables") or []
|
|
conversation_variables = [factory.build_variable_from_mapping(obj) for obj in conversation_variables_list]
|
|
workflow = workflow_service.sync_draft_workflow(
|
|
app_model=app_model,
|
|
graph=args["graph"],
|
|
features=args["features"],
|
|
unique_hash=args.get("hash"),
|
|
account=current_user,
|
|
environment_variables=environment_variables,
|
|
conversation_variables=conversation_variables,
|
|
)
|
|
except WorkflowHashNotEqualError:
|
|
raise DraftWorkflowNotSync()
|
|
|
|
return {
|
|
"result": "success",
|
|
"hash": workflow.unique_hash,
|
|
"updated_at": TimestampField().format(workflow.updated_at or workflow.created_at),
|
|
}
|
|
|
|
|
|
class DraftWorkflowImportApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
|
@marshal_with(workflow_fields)
|
|
def post(self, app_model: App):
|
|
"""
|
|
Import draft workflow
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument("data", type=str, required=True, nullable=False, location="json")
|
|
args = parser.parse_args()
|
|
|
|
workflow = AppDslService.import_and_overwrite_workflow(
|
|
app_model=app_model, data=args["data"], account=current_user
|
|
)
|
|
|
|
return workflow
|
|
|
|
|
|
class AdvancedChatDraftWorkflowRunApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
|
|
def post(self, app_model: App):
|
|
"""
|
|
Run draft workflow
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument("inputs", type=dict, location="json")
|
|
parser.add_argument("query", type=str, required=True, location="json", default="")
|
|
parser.add_argument("files", type=list, location="json")
|
|
parser.add_argument("conversation_id", type=uuid_value, location="json")
|
|
parser.add_argument("parent_message_id", type=uuid_value, required=False, location="json")
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
response = AppGenerateService.generate(
|
|
app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.DEBUGGER, streaming=True
|
|
)
|
|
|
|
return helper.compact_generate_response(response)
|
|
except services.errors.conversation.ConversationNotExistsError:
|
|
raise NotFound("Conversation Not Exists.")
|
|
except services.errors.conversation.ConversationCompletedError:
|
|
raise ConversationCompletedError()
|
|
except ValueError as e:
|
|
raise e
|
|
except Exception as e:
|
|
logging.exception("internal server error.")
|
|
raise InternalServerError()
|
|
|
|
|
|
class AdvancedChatDraftRunIterationNodeApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
|
|
def post(self, app_model: App, node_id: str):
|
|
"""
|
|
Run draft workflow iteration node
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument("inputs", type=dict, location="json")
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
response = AppGenerateService.generate_single_iteration(
|
|
app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True
|
|
)
|
|
|
|
return helper.compact_generate_response(response)
|
|
except services.errors.conversation.ConversationNotExistsError:
|
|
raise NotFound("Conversation Not Exists.")
|
|
except services.errors.conversation.ConversationCompletedError:
|
|
raise ConversationCompletedError()
|
|
except ValueError as e:
|
|
raise e
|
|
except Exception as e:
|
|
logging.exception("internal server error.")
|
|
raise InternalServerError()
|
|
|
|
|
|
class WorkflowDraftRunIterationNodeApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.WORKFLOW])
|
|
def post(self, app_model: App, node_id: str):
|
|
"""
|
|
Run draft workflow iteration node
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument("inputs", type=dict, location="json")
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
response = AppGenerateService.generate_single_iteration(
|
|
app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True
|
|
)
|
|
|
|
return helper.compact_generate_response(response)
|
|
except services.errors.conversation.ConversationNotExistsError:
|
|
raise NotFound("Conversation Not Exists.")
|
|
except services.errors.conversation.ConversationCompletedError:
|
|
raise ConversationCompletedError()
|
|
except ValueError as e:
|
|
raise e
|
|
except Exception as e:
|
|
logging.exception("internal server error.")
|
|
raise InternalServerError()
|
|
|
|
|
|
class DraftWorkflowRunApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.WORKFLOW])
|
|
def post(self, app_model: App):
|
|
"""
|
|
Run draft workflow
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
|
|
parser.add_argument("files", type=list, required=False, location="json")
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
response = AppGenerateService.generate(
|
|
app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.DEBUGGER, streaming=True
|
|
)
|
|
|
|
return helper.compact_generate_response(response)
|
|
except (ValueError, AppInvokeQuotaExceededError) as e:
|
|
raise e
|
|
except Exception as e:
|
|
logging.exception("internal server error.")
|
|
raise InternalServerError()
|
|
|
|
|
|
class WorkflowTaskStopApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
|
def post(self, app_model: App, task_id: str):
|
|
"""
|
|
Stop workflow task
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id)
|
|
|
|
return {"result": "success"}
|
|
|
|
|
|
class DraftWorkflowNodeRunApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
|
@marshal_with(workflow_run_node_execution_fields)
|
|
def post(self, app_model: App, node_id: str):
|
|
"""
|
|
Run draft workflow node
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
|
|
args = parser.parse_args()
|
|
|
|
workflow_service = WorkflowService()
|
|
workflow_node_execution = workflow_service.run_draft_workflow_node(
|
|
app_model=app_model, node_id=node_id, user_inputs=args.get("inputs"), account=current_user
|
|
)
|
|
|
|
return workflow_node_execution
|
|
|
|
|
|
class PublishedWorkflowApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
|
@marshal_with(workflow_fields)
|
|
def get(self, app_model: App):
|
|
"""
|
|
Get published workflow
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
# fetch published workflow by app_model
|
|
workflow_service = WorkflowService()
|
|
workflow = workflow_service.get_published_workflow(app_model=app_model)
|
|
|
|
# return workflow, if not found, return None
|
|
return workflow
|
|
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
|
def post(self, app_model: App):
|
|
"""
|
|
Publish workflow
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
workflow_service = WorkflowService()
|
|
workflow = workflow_service.publish_workflow(app_model=app_model, account=current_user)
|
|
|
|
return {"result": "success", "created_at": TimestampField().format(workflow.created_at)}
|
|
|
|
|
|
class DefaultBlockConfigsApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
|
def get(self, app_model: App):
|
|
"""
|
|
Get default block config
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
# Get default block configs
|
|
workflow_service = WorkflowService()
|
|
return workflow_service.get_default_block_configs()
|
|
|
|
|
|
class DefaultBlockConfigApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
|
def get(self, app_model: App, block_type: str):
|
|
"""
|
|
Get default block config
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument("q", type=str, location="args")
|
|
args = parser.parse_args()
|
|
|
|
filters = None
|
|
if args.get("q"):
|
|
try:
|
|
filters = json.loads(args.get("q"))
|
|
except json.JSONDecodeError:
|
|
raise ValueError("Invalid filters")
|
|
|
|
# Get default block configs
|
|
workflow_service = WorkflowService()
|
|
return workflow_service.get_default_block_config(node_type=block_type, filters=filters)
|
|
|
|
|
|
class ConvertToWorkflowApi(Resource):
|
|
@setup_required
|
|
@login_required
|
|
@account_initialization_required
|
|
@get_app_model(mode=[AppMode.CHAT, AppMode.COMPLETION])
|
|
def post(self, app_model: App):
|
|
"""
|
|
Convert basic mode of chatbot app to workflow mode
|
|
Convert expert mode of chatbot app to workflow mode
|
|
Convert Completion App to Workflow App
|
|
"""
|
|
# The role of the current user in the ta table must be admin, owner, or editor
|
|
if not current_user.is_editor:
|
|
raise Forbidden()
|
|
|
|
if request.data:
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument("name", type=str, required=False, nullable=True, location="json")
|
|
parser.add_argument("icon_type", type=str, required=False, nullable=True, location="json")
|
|
parser.add_argument("icon", type=str, required=False, nullable=True, location="json")
|
|
parser.add_argument("icon_background", type=str, required=False, nullable=True, location="json")
|
|
args = parser.parse_args()
|
|
else:
|
|
args = {}
|
|
|
|
# convert to workflow mode
|
|
workflow_service = WorkflowService()
|
|
new_app_model = workflow_service.convert_to_workflow(app_model=app_model, account=current_user, args=args)
|
|
|
|
# return app id
|
|
return {
|
|
"new_app_id": new_app_model.id,
|
|
}
|
|
|
|
|
|
api.add_resource(DraftWorkflowApi, "/apps/<uuid:app_id>/workflows/draft")
|
|
api.add_resource(DraftWorkflowImportApi, "/apps/<uuid:app_id>/workflows/draft/import")
|
|
api.add_resource(AdvancedChatDraftWorkflowRunApi, "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run")
|
|
api.add_resource(DraftWorkflowRunApi, "/apps/<uuid:app_id>/workflows/draft/run")
|
|
api.add_resource(WorkflowTaskStopApi, "/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop")
|
|
api.add_resource(DraftWorkflowNodeRunApi, "/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run")
|
|
api.add_resource(
|
|
AdvancedChatDraftRunIterationNodeApi,
|
|
"/apps/<uuid:app_id>/advanced-chat/workflows/draft/iteration/nodes/<string:node_id>/run",
|
|
)
|
|
api.add_resource(
|
|
WorkflowDraftRunIterationNodeApi, "/apps/<uuid:app_id>/workflows/draft/iteration/nodes/<string:node_id>/run"
|
|
)
|
|
api.add_resource(PublishedWorkflowApi, "/apps/<uuid:app_id>/workflows/publish")
|
|
api.add_resource(DefaultBlockConfigsApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs")
|
|
api.add_resource(
|
|
DefaultBlockConfigApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs/<string:block_type>"
|
|
)
|
|
api.add_resource(ConvertToWorkflowApi, "/apps/<uuid:app_id>/convert-to-workflow")
|