feat: Iteration node support parallel mode

This commit is contained in:
Nov1c444 2024-10-18 16:57:24 +08:00
parent f447ee7b9d
commit 7d761013d1
29 changed files with 1023 additions and 143 deletions

View File

@ -19,6 +19,7 @@ from core.app.entities.queue_entities import (
QueueIterationStartEvent,
QueueMessageReplaceEvent,
QueueNodeFailedEvent,
QueueNodeInIterationFailedEvent,
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueueParallelBranchRunFailedEvent,
@ -306,7 +307,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
if response:
yield response
elif isinstance(event, QueueNodeFailedEvent):
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent):
workflow_node_execution = self._handle_workflow_node_execution_failed(event)
response = self._workflow_node_finish_to_stream_response(

View File

@ -17,6 +17,7 @@ from core.app.entities.queue_entities import (
QueueIterationNextEvent,
QueueIterationStartEvent,
QueueNodeFailedEvent,
QueueNodeInIterationFailedEvent,
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueueParallelBranchRunFailedEvent,
@ -276,7 +277,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
if response:
yield response
elif isinstance(event, QueueNodeFailedEvent):
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent):
workflow_node_execution = self._handle_workflow_node_execution_failed(event)
response = self._workflow_node_finish_to_stream_response(

View File

@ -9,6 +9,7 @@ from core.app.entities.queue_entities import (
QueueIterationNextEvent,
QueueIterationStartEvent,
QueueNodeFailedEvent,
QueueNodeInIterationFailedEvent,
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueueParallelBranchRunFailedEvent,
@ -31,6 +32,7 @@ from core.workflow.graph_engine.entities.event import (
IterationRunNextEvent,
IterationRunStartedEvent,
IterationRunSucceededEvent,
NodeInIterationFailedEvent,
NodeRunFailedEvent,
NodeRunRetrieverResourceEvent,
NodeRunStartedEvent,
@ -248,9 +250,40 @@ class WorkflowBasedAppRunner(AppRunner):
error=event.route_node_state.node_run_result.error
if event.route_node_state.node_run_result and event.route_node_state.node_run_result.error
else "Unknown error",
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
in_iteration_id=event.in_iteration_id,
)
)
elif isinstance(event, NodeInIterationFailedEvent):
self._publish_event(
QueueNodeInIterationFailedEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_data=event.node_data,
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.route_node_state.start_at,
inputs=event.route_node_state.node_run_result.inputs
if event.route_node_state.node_run_result
else {},
process_data=event.route_node_state.node_run_result.process_data
if event.route_node_state.node_run_result
else {},
outputs=event.route_node_state.node_run_result.outputs
if event.route_node_state.node_run_result
else {},
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
in_iteration_id=event.in_iteration_id,
error=event.error,
)
)
elif isinstance(event, NodeRunStreamChunkEvent):
self._publish_event(
QueueTextChunkEvent(

View File

@ -305,6 +305,37 @@ class QueueNodeSucceededEvent(AppQueueEvent):
error: Optional[str] = None
class QueueNodeInIterationFailedEvent(AppQueueEvent):
"""
QueueNodeInIterationFailedEvent entity
"""
event: QueueEvent = QueueEvent.NODE_FAILED
node_execution_id: str
node_id: str
node_type: NodeType
node_data: BaseNodeData
parallel_id: Optional[str] = None
"""parallel id if node is in parallel"""
parallel_start_node_id: Optional[str] = None
"""parallel start node id if node is in parallel"""
parent_parallel_id: Optional[str] = None
"""parent parallel id if node is in parallel"""
parent_parallel_start_node_id: Optional[str] = None
"""parent parallel start node id if node is in parallel"""
in_iteration_id: Optional[str] = None
"""iteration id if node is in iteration"""
start_at: datetime
inputs: Optional[dict[str, Any]] = None
process_data: Optional[dict[str, Any]] = None
outputs: Optional[dict[str, Any]] = None
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None
error: str
class QueueNodeFailedEvent(AppQueueEvent):
"""
QueueNodeFailedEvent entity
@ -331,6 +362,7 @@ class QueueNodeFailedEvent(AppQueueEvent):
inputs: Optional[dict[str, Any]] = None
process_data: Optional[dict[str, Any]] = None
outputs: Optional[dict[str, Any]] = None
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None
error: str

View File

@ -9,6 +9,7 @@ from core.app.entities.queue_entities import (
QueueIterationNextEvent,
QueueIterationStartEvent,
QueueNodeFailedEvent,
QueueNodeInIterationFailedEvent,
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueueParallelBranchRunFailedEvent,
@ -299,7 +300,9 @@ class WorkflowCycleManage:
return workflow_node_execution
def _handle_workflow_node_execution_failed(self, event: QueueNodeFailedEvent) -> WorkflowNodeExecution:
def _handle_workflow_node_execution_failed(
self, event: QueueNodeFailedEvent | QueueNodeInIterationFailedEvent
) -> WorkflowNodeExecution:
"""
Workflow node execution failed
:param event: queue node failed event
@ -311,17 +314,21 @@ class WorkflowCycleManage:
outputs = WorkflowEntry.handle_special_values(event.outputs)
finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
elapsed_time = (finished_at - event.start_at).total_seconds()
execution_metadata = (
json.dumps(jsonable_encoder(event.execution_metadata)) if event.execution_metadata else None
)
update_data = {
WorkflowNodeExecution.status: WorkflowNodeExecutionStatus.FAILED.value,
WorkflowNodeExecution.error: event.error,
WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None,
WorkflowNodeExecution.process_data: json.dumps(event.process_data) if event.process_data else None,
WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None,
WorkflowNodeExecution.finished_at: finished_at,
WorkflowNodeExecution.elapsed_time: elapsed_time,
}
update_data[WorkflowNodeExecution.execution_metadata] = execution_metadata
db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution.id).update(
{
WorkflowNodeExecution.status: WorkflowNodeExecutionStatus.FAILED.value,
WorkflowNodeExecution.error: event.error,
WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None,
WorkflowNodeExecution.process_data: json.dumps(event.process_data) if event.process_data else None,
WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None,
WorkflowNodeExecution.finished_at: finished_at,
WorkflowNodeExecution.elapsed_time: elapsed_time,
}
update_data
)
db.session.commit()

View File

@ -61,6 +61,7 @@ class NodeRunMetadataKey(Enum):
PARALLEL_START_NODE_ID = "parallel_start_node_id"
PARENT_PARALLEL_ID = "parent_parallel_id"
PARENT_PARALLEL_START_NODE_ID = "parent_parallel_start_node_id"
PARALLEL_MODE_RUN_ID = "parallel_mode_run_id"
class NodeRunResult(BaseModel):

View File

@ -81,6 +81,10 @@ class NodeRunFailedEvent(BaseNodeEvent):
error: str = Field(..., description="error")
class NodeInIterationFailedEvent(BaseNodeEvent):
error: str = Field(..., description="error")
###########################################
# Parallel Branch Events
###########################################
@ -129,6 +133,7 @@ class BaseIterationEvent(GraphEngineEvent):
"""parent parallel id if node is in parallel"""
parent_parallel_start_node_id: Optional[str] = None
"""parent parallel start node id if node is in parallel"""
parallel_mode_run_id: Optional[str] = None
class IterationRunStartedEvent(BaseIterationEvent):

View File

@ -4,6 +4,7 @@ import time
import uuid
from collections.abc import Generator, Mapping
from concurrent.futures import ThreadPoolExecutor, wait
from copy import copy, deepcopy
from typing import Any, Optional
from flask import Flask, current_app
@ -729,6 +730,16 @@ class GraphEngine:
"""
return time.perf_counter() - start_at > max_execution_time
def create_copy(self):
"""
create a graph engine copy
:return: with a new variable pool instance of graph engine
"""
new_instance = copy(self)
new_instance.graph_runtime_state = copy(self.graph_runtime_state)
new_instance.graph_runtime_state.variable_pool = deepcopy(self.graph_runtime_state.variable_pool)
return new_instance
class GraphRunFailedError(Exception):
def __init__(self, error: str):

View File

@ -1,8 +1,22 @@
from enum import Enum
from typing import Any, Optional
from core.workflow.entities.base_node_data_entities import BaseIterationNodeData, BaseIterationState, BaseNodeData
class ErrorHandleMode(Enum):
TERMINATED = "Terminated"
CONTINUE_ON_ERROR = "Continue on error"
REMOVE_ABNORMAL_OUTPUT = "Remove abnormal output"
def to_json(self):
return self.value
@classmethod
def from_json(cls, value):
return cls(value)
class IterationNodeData(BaseIterationNodeData):
"""
Iteration Node Data.
@ -11,6 +25,9 @@ class IterationNodeData(BaseIterationNodeData):
parent_loop_id: Optional[str] = None # redundant field, not used currently
iterator_selector: list[str] # variable selector
output_selector: list[str] # output selector
is_parallel: bool = False # open the parallel mode or not
parallel_nums: int = 10 # the numbers of parallel
error_handle_mode: ErrorHandleMode = ErrorHandleMode.TERMINATED # how to handle the error
class IterationStartNodeData(BaseNodeData):

View File

@ -1,11 +1,21 @@
import logging
import uuid
from collections.abc import Generator, Mapping, Sequence
from concurrent.futures import wait
from datetime import datetime, timezone
from typing import Any, cast
from queue import Empty, Queue
from typing import Any, Optional, cast
from flask import Flask, current_app
from configs import dify_config
from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType
from core.workflow.entities.node_entities import (
NodeRunMetadataKey,
NodeRunResult,
NodeType,
)
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.graph_engine.entities.event import (
BaseGraphEvent,
BaseNodeEvent,
@ -16,13 +26,15 @@ from core.workflow.graph_engine.entities.event import (
IterationRunNextEvent,
IterationRunStartedEvent,
IterationRunSucceededEvent,
NodeInIterationFailedEvent,
NodeRunFailedEvent,
NodeRunStreamChunkEvent,
NodeRunSucceededEvent,
)
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.nodes.base_node import BaseNode
from core.workflow.nodes.event import RunCompletedEvent, RunEvent
from core.workflow.nodes.iteration.entities import IterationNodeData
from core.workflow.nodes.iteration.entities import ErrorHandleMode, IterationNodeData
from models.workflow import WorkflowNodeExecutionStatus
logger = logging.getLogger(__name__)
@ -36,6 +48,17 @@ class IterationNode(BaseNode):
_node_data_cls = IterationNodeData
_node_type = NodeType.ITERATION
@classmethod
def get_default_config(cls, filters: Optional[dict] = None) -> dict:
return {
"type": "iteration",
"config": {
"is_parallel": False,
"parallel_nums": 10,
"error_handle_mode": ErrorHandleMode.TERMINATED.value,
},
}
def _run(self) -> Generator[RunEvent | InNodeEvent, None, None]:
"""
Run the node.
@ -73,7 +96,7 @@ class IterationNode(BaseNode):
variable_pool.add([self.node_id, "item"], iterator_list_value[0])
# init graph engine
from core.workflow.graph_engine.graph_engine import GraphEngine
from core.workflow.graph_engine.graph_engine import GraphEngine, GraphEngineThreadPool
graph_engine = GraphEngine(
tenant_id=self.tenant_id,
@ -113,93 +136,56 @@ class IterationNode(BaseNode):
index=0,
pre_iteration_output=None,
)
outputs: list[Any] = []
try:
for _ in range(len(iterator_list_value)):
# run workflow
rst = graph_engine.run()
for event in rst:
if isinstance(event, (BaseNodeEvent | BaseParallelBranchEvent)) and not event.in_iteration_id:
event.in_iteration_id = self.node_id
if (
isinstance(event, BaseNodeEvent)
and event.node_type == NodeType.ITERATION_START
and not isinstance(event, NodeRunStreamChunkEvent)
):
if self.node_data.is_parallel:
futures = []
q = Queue()
thread_pool = GraphEngineThreadPool(max_workers=self.node_data.parallel_nums, max_submit_count=100)
for index, item in enumerate(iterator_list_value):
future = thread_pool.submit(
self._run_single_iter_parallel,
current_app._get_current_object(),
q,
iterator_list_value,
inputs,
outputs,
start_at,
graph_engine,
iteration_graph,
index,
item,
)
future.add_done_callback(thread_pool.task_done_callback)
futures.append(future)
succeeded_count = 0
while True:
try:
event = q.get(timeout=1)
if event is None:
break
if isinstance(event, IterationRunNextEvent):
succeeded_count += 1
if succeeded_count == len(futures):
q.put(None)
yield event
except Empty:
logger.warning(msg="Parallel Iteration event queue empty")
continue
if isinstance(event, NodeRunSucceededEvent):
if event.route_node_state.node_run_result:
metadata = event.route_node_state.node_run_result.metadata
if not metadata:
metadata = {}
if NodeRunMetadataKey.ITERATION_ID not in metadata:
metadata[NodeRunMetadataKey.ITERATION_ID] = self.node_id
metadata[NodeRunMetadataKey.ITERATION_INDEX] = variable_pool.get_any(
[self.node_id, "index"]
)
event.route_node_state.node_run_result.metadata = metadata
yield event
elif isinstance(event, BaseGraphEvent):
if isinstance(event, GraphRunFailedEvent):
# iteration run failed
yield IterationRunFailedEvent(
iteration_id=self.id,
iteration_node_id=self.node_id,
iteration_node_type=self.node_type,
iteration_node_data=self.node_data,
start_at=start_at,
inputs=inputs,
outputs={"output": jsonable_encoder(outputs)},
steps=len(iterator_list_value),
metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens},
error=event.error,
)
yield RunCompletedEvent(
run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=event.error,
)
)
return
else:
event = cast(InNodeEvent, event)
yield event
# append to iteration output variable list
current_iteration_output = variable_pool.get_any(self.node_data.output_selector)
outputs.append(current_iteration_output)
# remove all nodes outputs from variable pool
for node_id in iteration_graph.node_ids:
variable_pool.remove_node(node_id)
# move to next iteration
current_index = variable_pool.get([self.node_id, "index"])
if current_index is None:
raise ValueError(f"iteration {self.node_id} current index not found")
next_index = int(current_index.to_object()) + 1
variable_pool.add([self.node_id, "index"], next_index)
if next_index < len(iterator_list_value):
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
yield IterationRunNextEvent(
iteration_id=self.id,
iteration_node_id=self.node_id,
iteration_node_type=self.node_type,
iteration_node_data=self.node_data,
index=next_index,
pre_iteration_output=jsonable_encoder(current_iteration_output)
if current_iteration_output
else None,
)
# wait all threads
wait(futures)
else:
for _ in range(len(iterator_list_value)):
yield from self._run_single_iter(
iterator_list_value,
variable_pool,
inputs,
outputs,
start_at,
graph_engine,
iteration_graph,
)
yield IterationRunSucceededEvent(
iteration_id=self.id,
@ -304,3 +290,180 @@ class IterationNode(BaseNode):
}
return variable_mapping
def _run_single_iter(
self,
iterator_list_value: list[str],
variable_pool: VariablePool,
inputs: dict[str, list],
outputs: list,
start_at: datetime,
graph_engine,
iteration_graph,
parallel_mode_run_id: Optional[str] = None,
):
rst = graph_engine.run()
for event in rst:
if isinstance(event, (BaseNodeEvent | BaseParallelBranchEvent)) and not event.in_iteration_id:
event.in_iteration_id = self.node_id
if (
isinstance(event, BaseNodeEvent)
and event.node_type == NodeType.ITERATION_START
and not isinstance(event, NodeRunStreamChunkEvent)
):
continue
if isinstance(event, NodeRunSucceededEvent):
self._handle_event_metadata(event, variable_pool, parallel_mode_run_id)
yield event
elif isinstance(event, BaseGraphEvent):
if isinstance(event, GraphRunFailedEvent):
# iteration run failed
if self.node_data.is_parallel:
yield IterationRunFailedEvent(
iteration_id=self.id,
iteration_node_id=self.node_id,
iteration_node_type=self.node_type,
iteration_node_data=self.node_data,
parallel_mode_run_id=parallel_mode_run_id,
start_at=start_at,
inputs=inputs,
outputs={"output": jsonable_encoder(outputs)},
steps=len(iterator_list_value),
metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens},
error=event.error,
)
else:
yield IterationRunFailedEvent(
iteration_id=self.id,
iteration_node_id=self.node_id,
iteration_node_type=self.node_type,
iteration_node_data=self.node_data,
start_at=start_at,
inputs=inputs,
outputs={"output": jsonable_encoder(outputs)},
steps=len(iterator_list_value),
metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens},
error=event.error,
)
yield RunCompletedEvent(
run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=event.error,
)
)
return
else:
event = cast(InNodeEvent, event)
if isinstance(event, NodeRunFailedEvent):
if self.node_data.error_handle_mode == ErrorHandleMode.CONTINUE_ON_ERROR:
metadata_event = self._handle_event_metadata(event, variable_pool, parallel_mode_run_id)
yield NodeInIterationFailedEvent(
**metadata_event.model_dump(),
)
break
elif self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT:
current_index = variable_pool.get([self.node_id, "index"])
if current_index is None:
raise ValueError(f"iteration {self.node_id} current index not found")
next_index = int(current_index.to_object()) + 1
metadata_event = self._handle_event_metadata(event, variable_pool, parallel_mode_run_id)
yield NodeInIterationFailedEvent(
**metadata_event.model_dump(),
)
variable_pool.add([self.node_id, "index"], next_index)
if next_index < len(iterator_list_value):
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
yield IterationRunNextEvent(
iteration_id=self.id,
iteration_node_id=self.node_id,
iteration_node_type=self.node_type,
iteration_node_data=self.node_data,
index=next_index,
parallel_mode_run_id=parallel_mode_run_id,
pre_iteration_output=None,
)
return
yield self._handle_event_metadata(event, variable_pool, parallel_mode_run_id)
current_index = variable_pool.get([self.node_id, "index"])
# append to iteration output variable list
current_iteration_output = variable_pool.get_any(self.node_data.output_selector)
outputs.insert(current_index.value, current_iteration_output)
# remove all nodes outputs from variable pool
for node_id in iteration_graph.node_ids:
variable_pool.remove_node(node_id)
# move to next iteration
if current_index is None:
raise ValueError(f"iteration {self.node_id} current index not found")
next_index = int(current_index.to_object()) + 1
variable_pool.add([self.node_id, "index"], next_index)
if next_index < len(iterator_list_value):
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
yield IterationRunNextEvent(
iteration_id=self.id,
iteration_node_id=self.node_id,
iteration_node_type=self.node_type,
iteration_node_data=self.node_data,
index=next_index,
parallel_mode_run_id=parallel_mode_run_id,
pre_iteration_output=jsonable_encoder(current_iteration_output) if current_iteration_output else None,
)
def _run_single_iter_parallel(
self,
flask_app: Flask,
q: Queue,
iterator_list_value: list[str],
inputs: dict[str, list],
outputs: list,
start_at: datetime,
graph_engine,
iteration_graph,
index,
item,
):
with flask_app.app_context():
parallel_mode_run_id = uuid.uuid4().hex
graph_engine_copy = graph_engine.create_copy()
variable_pool_copy = graph_engine_copy.graph_runtime_state.variable_pool
variable_pool_copy.add([self.node_id, "index"], index)
variable_pool_copy.add([self.node_id, "item"], item)
for event in self._run_single_iter(
iterator_list_value=iterator_list_value,
variable_pool=variable_pool_copy,
inputs=inputs,
outputs=outputs,
start_at=start_at,
graph_engine=graph_engine_copy,
iteration_graph=iteration_graph,
parallel_mode_run_id=parallel_mode_run_id,
):
q.put(event)
def _handle_event_metadata(self, event: BaseNodeEvent, variable_pool: VariablePool, parallel_mode_run_id: str):
"""
Handle success event.
"""
if not isinstance(event, BaseNodeEvent):
return event
if event.route_node_state.node_run_result:
metadata = event.route_node_state.node_run_result.metadata
if not metadata:
metadata = {}
if NodeRunMetadataKey.ITERATION_ID not in metadata:
metadata[NodeRunMetadataKey.ITERATION_ID] = self.node_id
if self.node_data.is_parallel:
metadata[NodeRunMetadataKey.PARALLEL_MODE_RUN_ID] = parallel_mode_run_id
else:
metadata[NodeRunMetadataKey.ITERATION_INDEX] = variable_pool.get_any([self.node_id, "index"])
event.route_node_state.node_run_result.metadata = metadata
return event

View File

@ -50,10 +50,10 @@ def test_dify_config(example_env_file):
assert config.SENTRY_TRACES_SAMPLE_RATE == 1.0
# annotated field with default value
assert config.HTTP_REQUEST_MAX_READ_TIMEOUT == 60
assert config.HTTP_REQUEST_MAX_READ_TIMEOUT == 600
# annotated field with configured value
assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 30
assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 600
# NOTE: If there is a `.env` file in your Workspace, this test might not succeed as expected.

View File

@ -10,6 +10,7 @@ from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.iteration.entities import ErrorHandleMode
from core.workflow.nodes.iteration.iteration_node import IterationNode
from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode
from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
@ -418,3 +419,395 @@ def test_run_parallel():
assert item.run_result.outputs == {"output": ["dify 123", "dify 123"]}
assert count == 32
def test_iteration_run_in_parallel_mode():
graph_config = {
"edges": [
{
"id": "start-source-pe-target",
"source": "start",
"target": "pe",
},
{
"id": "iteration-1-source-answer-3-target",
"source": "iteration-1",
"target": "answer-3",
},
{
"id": "iteration-start-source-tt-target",
"source": "iteration-start",
"target": "tt",
},
{
"id": "iteration-start-source-tt-2-target",
"source": "iteration-start",
"target": "tt-2",
},
{
"id": "tt-source-if-else-target",
"source": "tt",
"target": "if-else",
},
{
"id": "tt-2-source-if-else-target",
"source": "tt-2",
"target": "if-else",
},
{
"id": "if-else-true-answer-2-target",
"source": "if-else",
"sourceHandle": "true",
"target": "answer-2",
},
{
"id": "if-else-false-answer-4-target",
"source": "if-else",
"sourceHandle": "false",
"target": "answer-4",
},
{
"id": "pe-source-iteration-1-target",
"source": "pe",
"target": "iteration-1",
},
],
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "iteration-start",
"title": "iteration",
"type": "iteration",
},
"id": "iteration-1",
},
{
"data": {
"answer": "{{#tt.output#}}",
"iteration_id": "iteration-1",
"title": "answer 2",
"type": "answer",
},
"id": "answer-2",
},
{
"data": {
"iteration_id": "iteration-1",
"title": "iteration-start",
"type": "iteration-start",
},
"id": "iteration-start",
},
{
"data": {
"iteration_id": "iteration-1",
"template": "{{ arg1 }} 123",
"title": "template transform",
"type": "template-transform",
"variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
},
"id": "tt",
},
{
"data": {
"iteration_id": "iteration-1",
"template": "{{ arg1 }} 321",
"title": "template transform",
"type": "template-transform",
"variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
},
"id": "tt-2",
},
{
"data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"},
"id": "answer-3",
},
{
"data": {
"conditions": [
{
"comparison_operator": "is",
"id": "1721916275284",
"value": "hi",
"variable_selector": ["sys", "query"],
}
],
"iteration_id": "iteration-1",
"logical_operator": "and",
"title": "if",
"type": "if-else",
},
"id": "if-else",
},
{
"data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"},
"id": "answer-4",
},
{
"data": {
"instruction": "test1",
"model": {
"completion_params": {"temperature": 0.7},
"mode": "chat",
"name": "gpt-4o",
"provider": "openai",
},
"parameters": [
{"description": "test", "name": "list_output", "required": False, "type": "array[string]"}
],
"query": ["sys", "query"],
"reasoning_mode": "prompt",
"title": "pe",
"type": "parameter-extractor",
},
"id": "pe",
},
],
}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.CHAT,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool
pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "dify",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "1",
},
user_inputs={},
environment_variables=[],
)
pool.add(["pe", "list_output"], ["dify-1", "dify-2"])
parallel_iteration_node = IterationNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
config={
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "iteration-start",
"title": "迭代",
"type": "iteration",
"is_parallel": True,
},
"id": "iteration-1",
},
)
sequential_iteration_node = IterationNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
config={
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "iteration-start",
"title": "迭代",
"type": "iteration",
"is_parallel": True,
},
"id": "iteration-1",
},
)
def tt_generator(self):
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={"iterator_selector": "dify"},
outputs={"output": "dify 123"},
)
with patch.object(TemplateTransformNode, "_run", new=tt_generator):
# execute node
parallel_result = parallel_iteration_node._run()
sequential_result = sequential_iteration_node._run()
assert parallel_iteration_node.node_data.parallel_nums == 10
assert parallel_iteration_node.node_data.error_handle_mode == ErrorHandleMode.TERMINATED
count = 0
for item in parallel_result:
count += 1
if isinstance(item, RunCompletedEvent):
assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.run_result.outputs == {"output": ["dify 123", "dify 123"]}
assert count == 32
for item in sequential_result:
count += 1
if isinstance(item, RunCompletedEvent):
assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.run_result.outputs == {"output": ["dify 123", "dify 123"]}
assert count == 64
def test_iteration_run_error_handle():
graph_config = {
"nodes": [
{
"data": {"desc": "", "selected": False, "title": "开始", "type": "start", "variables": []},
"id": "1727165736057",
},
{
"data": {
"answer": "{{#1728357129226.output#}}\n",
"desc": "",
"selected": False,
"title": "直接回复",
"type": "answer",
"variables": [],
},
"id": "1727166508076",
},
{
"data": {
"code": '\ndef main(text:str) -> dict:\n return {\n "result": text.split(","),\n }\n',
"code_language": "python3",
"desc": "",
"outputs": {"result": {"children": None, "type": "array[string]"}},
"selected": False,
"title": "Code",
"type": "code",
"variables": [{"value_selector": ["sys", "query"], "variable": "text"}],
},
"id": "1727168677820",
},
{
"data": {
"desc": "",
"error_handle_mode": "Continue on error",
"height": 226,
"is_parallel": True,
"iterator_selector": ["1727168677820", "result"],
"output_selector": ["1728702070951", "result"],
"output_type": "array[string]",
"parallel_nums": 6,
"selected": True,
"start_node_id": "1728357129226start",
"title": "迭代 2",
"type": "iteration",
"width": 589,
},
"id": "1728357129226",
},
{
"data": {"desc": "", "isInIteration": True, "selected": False, "title": "", "type": "iteration-start"},
"id": "1728357129226start",
},
{
"data": {
"type": "code",
"title": "Code 3",
"desc": "",
"variables": [{"variable": "arg1", "value_selector": ["1728357129226", "item"]}],
"code_language": "python3",
"code": '\ndef main(arg1: str) -> dict:\n return {\n "result": arg1.split(":")[1]\n }\n', # noqa: E501
"outputs": {"result": {"type": "string", "children": None}},
"selected": False,
"isInIteration": True,
"iteration_id": "1728357129226",
},
"id": "1728702070951",
},
],
"edges": [
{"id": "1727165736057-source-1727168677820-target", "target": "1727168677820", "source": "1727165736057"},
{"id": "1727168677820-source-1728357129226-target", "target": "1728357129226", "source": "1727168677820"},
{"id": "1728357129226-source-1727166508076-target", "target": "1727166508076", "source": "1728357129226"},
{
"id": "1728357129226start-source-1728702070951-target",
"target": "1728702070951",
"source": "1728357129226start",
},
],
}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.CHAT,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool
pool = VariablePool(
system_variables={},
user_inputs={},
environment_variables=[],
)
pool.add(["1727168677820", "result"], ["test", "test:hello"])
iteration_node = IterationNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
config={
"data": {
"error_handle_mode": ErrorHandleMode.CONTINUE_ON_ERROR,
"is_parallel": True,
"iterator_selector": ["1727168677820", "result"],
"output_selector": ["1728702070951", "result"],
"output_type": "array[string]",
"parallel_nums": 6,
"start_node_id": "1728357129226start",
"title": "迭代 2",
"type": "iteration",
},
"id": "1728357129226",
},
)
# print("")
# execute continue on error node
result = iteration_node._run()
count = 0
for item in result:
count += 1
if isinstance(item, RunCompletedEvent):
assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.run_result.outputs == {"output": [None, "hello"]}
assert count == 10
# execute remove abnormal output
iteration_node.node_data.error_handle_mode = ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT
result = iteration_node._run()
count = 0
for item in result:
count += 1
if isinstance(item, RunCompletedEvent):
assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.run_result.outputs == {"output": ["hello"]}
assert count == 10

View File

@ -644,6 +644,11 @@ export const useNodesInteractions = () => {
newNode.data.isInIteration = true
newNode.data.iteration_id = prevNode.parentId
newNode.zIndex = ITERATION_CHILDREN_Z_INDEX
if (newNode.data.type === BlockEnum.Answer) {
const parentIterNodeIndex = nodes.findIndex(node => node.id === prevNode.parentId)
if (nodes[parentIterNodeIndex].data._isFirstTime)
nodes[parentIterNodeIndex].data._isShowTips = true
}
}
const newEdge: Edge = {

View File

@ -427,6 +427,8 @@ export const useWorkflowRun = () => {
const {
workflowRunningData,
setWorkflowRunningData,
runTimes,
setRunTimes,
} = workflowStore.getState()
const { data } = params
@ -446,8 +448,8 @@ export const useWorkflowRun = () => {
const nodes = getNodes()
const newNodes = produce(nodes, (draft) => {
const currentNode = draft.find(node => node.id === data.node_id)!
currentNode.data._iterationIndex = data.index > 0 ? data.index : 1
currentNode.data._iterationIndex = runTimes
setRunTimes(runTimes + 1)
})
setNodes(newNodes)
@ -460,6 +462,7 @@ export const useWorkflowRun = () => {
const {
workflowRunningData,
setWorkflowRunningData,
setRunTimes,
} = workflowStore.getState()
const {
getNodes,
@ -476,7 +479,7 @@ export const useWorkflowRun = () => {
})
}
}))
setRunTimes(1)
const newNodes = produce(nodes, (draft) => {
const currentNode = draft.find(node => node.id === data.node_id)!

View File

@ -12,14 +12,14 @@ import Tooltip from '@/app/components/base/tooltip'
type Props = {
className?: string
title: JSX.Element | string | DefaultTFuncReturn
tooltip?: string
tooltip?: React.ReactNode
supportFold?: boolean
children?: JSX.Element | string | null
operations?: JSX.Element
inline?: boolean
}
const Filed: FC<Props> = ({
const Field: FC<Props> = ({
className,
title,
tooltip,
@ -58,4 +58,4 @@ const Filed: FC<Props> = ({
</div>
)
}
export default React.memo(Filed)
export default React.memo(Field)

View File

@ -25,6 +25,7 @@ import {
useToolIcon,
} from '../../hooks'
import { useNodeIterationInteractions } from '../iteration/use-interactions'
import type { IterationNodeType } from '../iteration/types'
import {
NodeSourceHandle,
NodeTargetHandle,
@ -34,6 +35,7 @@ import NodeControl from './components/node-control'
import AddVariablePopupWithPosition from './components/add-variable-popup-with-position'
import cn from '@/utils/classnames'
import BlockIcon from '@/app/components/workflow/block-icon'
import Tooltip from '@/app/components/base/tooltip'
type BaseNodeProps = {
children: ReactElement
@ -169,6 +171,22 @@ const BaseNode: FC<BaseNodeProps> = ({
className='grow mr-1 system-sm-semibold-uppercase text-text-primary truncate'
>
{data.title}
{
data.type === BlockEnum.Iteration && (data as IterationNodeType).is_parallel && (
<Tooltip popupContent={
<div className='w-[180px]'>
<div className='font-extrabold'>
{t('workflow.nodes.iteration.parallelModeEnableTitle')}
</div>
{t('workflow.nodes.iteration.parallelModeEnableDesc')}
</div>}
>
<div className='text-[#da7b2f] border-2 border-[#da7b2f] rounded-lg pl-1 pr-1 inline text-sm ml-1'>
{t('workflow.nodes.iteration.parallelModeUpper')}
</div>
</Tooltip>
)
}
</div>
{
data._iterationLength && data._iterationIndex && data._runningStatus === NodeRunningStatus.Running && (

View File

@ -1,4 +1,4 @@
import { BlockEnum } from '../../types'
import { BlockEnum, ErrorHandleMode } from '../../types'
import type { NodeDefault } from '../../types'
import type { IterationNodeType } from './types'
import { ALL_CHAT_AVAILABLE_BLOCKS, ALL_COMPLETION_AVAILABLE_BLOCKS } from '@/app/components/workflow/constants'
@ -10,6 +10,11 @@ const nodeDefault: NodeDefault<IterationNodeType> = {
iterator_selector: [],
output_selector: [],
_children: [],
_isFirstTime: true,
_isShowTips: false,
is_parallel: false,
parallel_nums: 10,
error_handle_mode: ErrorHandleMode.Terminated,
},
getAvailablePrevNodes(isChatMode: boolean) {
const nodes = isChatMode

View File

@ -8,12 +8,16 @@ import {
useNodesInitialized,
useViewport,
} from 'reactflow'
import { useTranslation } from 'react-i18next'
import { IterationStartNodeDumb } from '../iteration-start'
import { useNodeIterationInteractions } from './use-interactions'
import type { IterationNodeType } from './types'
import AddBlock from './add-block'
import cn from '@/utils/classnames'
import type { NodeProps } from '@/app/components/workflow/types'
import Toast from '@/app/components/base/toast'
const i18nPrefix = 'workflow.nodes.iteration'
const Node: FC<NodeProps<IterationNodeType>> = ({
id,
@ -22,11 +26,20 @@ const Node: FC<NodeProps<IterationNodeType>> = ({
const { zoom } = useViewport()
const nodesInitialized = useNodesInitialized()
const { handleNodeIterationRerender } = useNodeIterationInteractions()
const { t } = useTranslation()
useEffect(() => {
if (nodesInitialized)
handleNodeIterationRerender(id)
}, [nodesInitialized, id, handleNodeIterationRerender])
if (data.is_parallel && data._isShowTips && data._isFirstTime) {
Toast.notify({
type: 'warning',
message: t(`${i18nPrefix}.answerNodeWarningDesc`),
duration: 5000,
})
data._isFirstTime = false
}
}, [nodesInitialized, id, handleNodeIterationRerender, data, t])
return (
<div className={cn(

View File

@ -10,9 +10,14 @@ import ResultPanel from '../../run/result-panel'
import IterationResultPanel from '../../run/iteration-result-panel'
import type { IterationNodeType } from './types'
import useConfig from './use-config'
import { InputVarType, type NodePanelProps } from '@/app/components/workflow/types'
import { ErrorHandleMode, InputVarType, type NodePanelProps } from '@/app/components/workflow/types'
import Field from '@/app/components/workflow/nodes/_base/components/field'
import BeforeRunForm from '@/app/components/workflow/nodes/_base/components/before-run-form'
import Switch from '@/app/components/base/switch'
import Select from '@/app/components/base/select'
import Slider from '@/app/components/base/slider'
import Input from '@/app/components/base/input'
import Divider from '@/app/components/base/divider'
const i18nPrefix = 'workflow.nodes.iteration'
@ -21,7 +26,20 @@ const Panel: FC<NodePanelProps<IterationNodeType>> = ({
data,
}) => {
const { t } = useTranslation()
const responseMethod = [
{
value: ErrorHandleMode.Terminated,
name: t(`${i18nPrefix}.ErrorMethod.operationTerminated`),
},
{
value: ErrorHandleMode.ContinueOnError,
name: t(`${i18nPrefix}.ErrorMethod.continueOnError`),
},
{
value: ErrorHandleMode.RemoveAbnormalOutput,
name: t(`${i18nPrefix}.ErrorMethod.removeAbnormalOutput`),
},
]
const {
readOnly,
inputs,
@ -47,6 +65,9 @@ const Panel: FC<NodePanelProps<IterationNodeType>> = ({
setIterator,
iteratorInputKey,
iterationRunResult,
changeParallel,
changeErrorResponseMode,
changeParallelNums,
} = useConfig(id, data)
return (
@ -87,6 +108,34 @@ const Panel: FC<NodePanelProps<IterationNodeType>> = ({
/>
</Field>
</div>
<div className='px-4 pb-4 space-y-4'>
<Field title={t(`${i18nPrefix}.parallelMode`)} tooltip={<div className='w-[230px]'>{t(`${i18nPrefix}.parallelPanelDesc`)}</div>} inline>
<Switch defaultValue={inputs.is_parallel} onChange={changeParallel} />
</Field>
</div>
<div className='px-4 pb-4 space-y-4'>
<Field title={t(`${i18nPrefix}.MaxParallelismTitle`)} tooltip={<div className='w-[230px]'>{t(`${i18nPrefix}.MaxParallelismDesc`)}</div>}>
<div className='flex row'>
<Input type='number' wrapperClassName='w-18 mr-4 ' max={10} min={1} value={inputs.parallel_nums} onChange={(e) => { changeParallelNums(Number(e.target.value)) }} />
<Slider
value={inputs.parallel_nums}
onChange={changeParallelNums}
max={10}
min={1}
className=' flex-shrink-0 flex-1 mt-4'
/>
</div>
</Field>
</div>
<Divider className='ml-4 mr-4' />
<div className='px-4 pb-4 space-y-4'>
<Field title={t(`${i18nPrefix}.errorResponseMethod`)} >
<Select items={responseMethod} defaultValue={inputs.error_handle_mode} onSelect={changeErrorResponseMode}>
</Select>
</Field>
</div>
{isShowSingleRun && (
<BeforeRunForm
nodeName={inputs.title}

View File

@ -1,6 +1,7 @@
import type {
BlockEnum,
CommonNodeType,
ErrorHandleMode,
ValueSelector,
VarType,
} from '@/app/components/workflow/types'
@ -12,4 +13,9 @@ export type IterationNodeType = CommonNodeType & {
iterator_selector: ValueSelector
output_selector: ValueSelector
output_type: VarType // output type.
is_parallel: boolean // open the parallel mode or not
parallel_nums: number // the numbers of parallel
error_handle_mode: ErrorHandleMode // how to handle error in the iteration
_isShowTips?: boolean // when answer node in parallel mode iteration show tips
_isFirstTime?: boolean // is the first time to add parallel iteration node
}

View File

@ -8,12 +8,13 @@ import {
useWorkflow,
} from '../../hooks'
import { VarType } from '../../types'
import type { ValueSelector, Var } from '../../types'
import type { ErrorHandleMode, ValueSelector, Var } from '../../types'
import useNodeCrud from '../_base/hooks/use-node-crud'
import { getNodeInfoById, getNodeUsedVarPassToServerKey, getNodeUsedVars, isSystemVar, toNodeOutputVars } from '../_base/components/variable/utils'
import useOneStepRun from '../_base/hooks/use-one-step-run'
import type { IterationNodeType } from './types'
import type { VarType as VarKindType } from '@/app/components/workflow/nodes/tool/types'
import type { Item } from '@/app/components/base/select'
const DELIMITER = '@@@@@'
const useConfig = (id: string, payload: IterationNodeType) => {
@ -183,6 +184,25 @@ const useConfig = (id: string, payload: IterationNodeType) => {
})
}, [iteratorInputKey, runInputData, setRunInputData])
const changeParallel = useCallback((value: boolean) => {
const newInputs = produce(inputs, (draft) => {
draft.is_parallel = value
})
setInputs(newInputs)
}, [inputs, setInputs])
const changeErrorResponseMode = useCallback((item: Item) => {
const newInputs = produce(inputs, (draft) => {
draft.error_handle_mode = item.value as ErrorHandleMode
})
setInputs(newInputs)
}, [inputs, setInputs])
const changeParallelNums = useCallback((num: number) => {
const newInputs = produce(inputs, (draft) => {
draft.parallel_nums = num
})
setInputs(newInputs)
}, [inputs, setInputs])
return {
readOnly,
inputs,
@ -209,6 +229,9 @@ const useConfig = (id: string, payload: IterationNodeType) => {
setIterator,
iteratorInputKey,
iterationRunResult,
changeParallel,
changeErrorResponseMode,
changeParallelNums,
}
}

View File

@ -61,36 +61,67 @@ const RunPanel: FC<RunProps> = ({ hideResult, activeTab = 'RESULT', runID, getRe
}, [notify, getResultCallback])
const formatNodeList = useCallback((list: NodeTracing[]) => {
const allItems = list.reverse()
const allItems = [...list].reverse()
const result: NodeTracing[] = []
allItems.forEach((item) => {
const { node_type, execution_metadata } = item
if (node_type !== BlockEnum.Iteration) {
const isInIteration = !!execution_metadata?.iteration_id
const groupMap = new Map<string, NodeTracing[]>()
if (isInIteration) {
const iterationNode = result.find(node => node.node_id === execution_metadata?.iteration_id)
const iterationDetails = iterationNode?.details
const currentIterationIndex = execution_metadata?.iteration_index ?? 0
if (Array.isArray(iterationDetails)) {
if (iterationDetails.length === 0 || !iterationDetails[currentIterationIndex])
iterationDetails[currentIterationIndex] = [item]
else
iterationDetails[currentIterationIndex].push(item)
}
return
}
// not in iteration
result.push(item)
return
}
const processIterationNode = (item: NodeTracing) => {
result.push({
...item,
details: [],
})
}
const updateParallelModeGroup = (runId: string, item: NodeTracing, iterationNode: NodeTracing) => {
if (!groupMap.has(runId))
groupMap.set(runId, [item])
else
groupMap.get(runId)!.push(item)
if (item.status === 'failed') {
iterationNode.status = 'failed'
iterationNode.error = item.error
}
iterationNode.details = Array.from(groupMap.values())
}
const updateSequentialModeGroup = (index: number, item: NodeTracing, iterationNode: NodeTracing) => {
const { details } = iterationNode
if (details) {
if (!details[index])
details[index] = [item]
else
details[index].push(item)
}
if (item.status === 'failed') {
iterationNode.status = 'failed'
iterationNode.error = item.error
}
}
const processNonIterationNode = (item: NodeTracing) => {
const { execution_metadata } = item
if (!execution_metadata?.iteration_id) {
result.push(item)
return
}
const iterationNode = result.find(node => node.node_id === execution_metadata.iteration_id)
if (!iterationNode || !Array.isArray(iterationNode.details))
return
const { parallel_mode_run_id, iteration_index = 0 } = execution_metadata
if (parallel_mode_run_id)
updateParallelModeGroup(parallel_mode_run_id, item, iterationNode)
else
updateSequentialModeGroup(iteration_index, item, iterationNode)
}
allItems.forEach((item) => {
item.node_type === BlockEnum.Iteration
? processIterationNode(item)
: processNonIterationNode(item)
})
return result
}, [])

View File

@ -5,6 +5,7 @@ import { useTranslation } from 'react-i18next'
import {
RiArrowRightSLine,
RiCloseLine,
RiErrorWarningLine,
} from '@remixicon/react'
import { ArrowNarrowLeft } from '../../base/icons/src/vender/line/arrows'
import TracingPanel from './tracing-panel'
@ -27,7 +28,7 @@ const IterationResultPanel: FC<Props> = ({
noWrap,
}) => {
const { t } = useTranslation()
const [expandedIterations, setExpandedIterations] = useState<Record<number, boolean>>([])
const [expandedIterations, setExpandedIterations] = useState<Record<number, boolean>>({})
const toggleIteration = useCallback((index: number) => {
setExpandedIterations(prev => ({
@ -71,10 +72,19 @@ const IterationResultPanel: FC<Props> = ({
<span className='system-sm-semibold-uppercase text-text-primary flex-grow'>
{t(`${i18nPrefix}.iteration`)} {index + 1}
</span>
<RiArrowRightSLine className={cn(
'w-4 h-4 text-text-tertiary transition-transform duration-200 flex-shrink-0',
expandedIterations[index] && 'transform rotate-90',
)} />
{
iteration.some(item => item.status === 'failed')
? (
<RiErrorWarningLine className='w-4 h-4 text-[#F04438]' />
)
: (< RiArrowRightSLine className={
cn(
'w-4 h-4 text-text-tertiary transition-transform duration-200 flex-shrink-0',
expandedIterations[index] && 'transform rotate-90',
)} />
)
}
</div>
</div>
{expandedIterations[index] && <div

View File

@ -69,7 +69,12 @@ const NodePanel: FC<Props> = ({
return iteration_length
}
const getErrorCount = (details: NodeTracing[][] | undefined) => {
if (!details || details.length === 0)
return 0
return details.flat().filter(item => item.status === 'failed').length
}
useEffect(() => {
setCollapseState(!nodeInfo.expand)
}, [nodeInfo.expand, setCollapseState])
@ -134,7 +139,12 @@ const NodePanel: FC<Props> = ({
onClick={handleOnShowIterationDetail}
>
<Iteration className='w-4 h-4 text-components-button-tertiary-text flex-shrink-0' />
<div className='flex-1 text-left system-sm-medium text-components-button-tertiary-text'>{t('workflow.nodes.iteration.iteration', { count: getCount(nodeInfo.details?.length, nodeInfo.metadata?.iterator_length) })}</div>
<div className='flex-1 text-left system-sm-medium text-components-button-tertiary-text'>{t('workflow.nodes.iteration.iteration', { count: getCount(nodeInfo.details?.length, nodeInfo.metadata?.iterator_length) })}{getErrorCount(nodeInfo.details) > 0 && (
<>
{t('workflow.nodes.iteration.comma')}
{t('workflow.nodes.iteration.error', { count: getErrorCount(nodeInfo.details) })}
</>
)}</div>
{justShowIterationNavArrow
? (
<RiArrowRightSLine className='w-4 h-4 text-components-button-tertiary-text flex-shrink-0' />

View File

@ -164,6 +164,8 @@ type Shape = {
setShowImportDSLModal: (showImportDSLModal: boolean) => void
showTips: string
setShowTips: (showTips: string) => void
runTimes: number
setRunTimes: (runTimes: number) => void
}
export const createWorkflowStore = () => {
@ -266,6 +268,8 @@ export const createWorkflowStore = () => {
setShowImportDSLModal: showImportDSLModal => set(() => ({ showImportDSLModal })),
showTips: '',
setShowTips: showTips => set(() => ({ showTips })),
runTimes: 1,
setRunTimes: runTimes => set(() => ({ runTimes })),
}))
}

View File

@ -34,7 +34,11 @@ export enum ControlMode {
Pointer = 'pointer',
Hand = 'hand',
}
export enum ErrorHandleMode {
Terminated = 'Terminated',
ContinueOnError = 'Continue on error',
RemoveAbnormalOutput = 'Remove abnormal output',
}
export type Branch = {
id: string
name: string

View File

@ -530,6 +530,23 @@ const translation = {
iteration_one: '{{count}} Iteration',
iteration_other: '{{count}} Iterations',
currentIteration: 'Current Iteration',
comma: ', ',
error_one: '{{count}} Error',
error_other: '{{count}} Errors',
parallelMode: 'Parallel Mode',
parallelModeUpper: 'PARALLEL MODE',
parallelModeEnableTitle: 'Parallel Mode Enabled',
parallelModeEnableDesc: 'In parallel mode, tasks within iterations support parallel execution. You can configure this in the properties panel on the right.',
parallelPanelDesc: 'In parallel mode, tasks in the iteration support parallel execution.',
MaxParallelismTitle: 'Maximum parallelism',
MaxParallelismDesc: 'The maximum parallelism is used to control the number of tasks executed simultaneously in a single iteration.',
errorResponseMethod: 'Error response method',
ErrorMethod: {
operationTerminated: 'Terminated',
continueOnError: 'Continue on error',
removeAbnormalOutput: 'Remove abnormal output',
},
answerNodeWarningDesc: 'Parallel mode warning: Answer nodes, conversation variable assignments, and persistent read/write operations within iterations may cause exceptions.',
},
note: {
addNote: 'Add Note',

View File

@ -530,6 +530,23 @@ const translation = {
iteration_one: '{{count}}个迭代',
iteration_other: '{{count}}个迭代',
currentIteration: '当前迭代',
comma: '',
error_one: '{{count}}个失败',
error_other: '{{count}}个失败',
parallelMode: '并行模式',
parallelModeUpper: '并行模式',
parallelModeEnableTitle: '并行模式启用',
parallelModeEnableDesc: '启用并行模式时迭代内的任务支持并行执行。你可以在右侧的属性面板中进行配置。',
parallelPanelDesc: '在并行模式下,迭代中的任务支持并行执行。',
MaxParallelismTitle: '最大并行度',
MaxParallelismDesc: '最大并行度用于控制单次迭代中同时执行的任务数量。',
errorResponseMethod: '错误响应方法',
ErrorMethod: {
operationTerminated: '错误时终止',
continueOnError: '忽略错误并继续',
removeAbnormalOutput: '移除错误输出',
},
answerNodeWarningDesc: '并行模式警告:在迭代中,回答节点、会话变量赋值和工具持久读/写操作可能会导致异常。',
},
note: {
addNote: '添加注释',

View File

@ -30,6 +30,7 @@ export type NodeTracing = {
parallel_start_node_id?: string
parent_parallel_id?: string
parent_parallel_start_node_id?: string
parallel_mode_run_id?: string
}
metadata: {
iterator_length: number