Flow Starter
1. Flow Concept
Flow is a more generalized concept for business processes.
Relationship: Flow > BP = Workflow > BPMN
Expression Engine is mainly used to solve data computation problems, implementing a calculation engine such as QLExpress. BPMN scenarios are better suited to workflow or rule engines like Flowable. For example, Flowable integrates JUEL to support simple expression parsing. DMN rule engines are better for true/false decision scenarios, for example, the sequenceFlow routing conditions in BP. Drools focuses more on rule configuration and management.
Example scenarios:
- Import: better suited to BPMN. Each action is a data update node; actions have an order but can be processed asynchronously.
- Approval: BPMN + DMN. BPMN handles the approval process and path. DMN handles complex approval routing. DQL solves complex approver queries.
2. Flow Design
Flow Starter provides event-driven process definition and execution.
Supported flow types:
- automated flow, cron flow, form flow, validation flow, onchange event flow, AI Agent flow
Supported node types:
- create data, update data, delete data, query data, compute data, decision gateway, generate report, query AI, send message
- validate data, WebHook, async task, subflow
Supported event types:
- create, update, delete, changed (create/update/delete), onchange, API, cron, subflow, message
- button events can be implemented via API events
Supports synchronous and asynchronous flows, and flow versioning.
3. Supported Scenarios
- Batch import validation flows and post-import business processing flows
- Data validation is synchronous; other business processing is asynchronous
- Messages and emails are produced by message nodes in the flow (for example, onboarding email)
- Business process configuration and cron tasks
Overview
Flow Starter provides a configurable flow engine for Softa. Flows are defined by FlowConfig, FlowTrigger, FlowNode, and FlowEdge records. A flow can be triggered by ChangeLog events (create/update/delete), API events, cron events, or subflow calls. Flow execution supports synchronous validation (before transaction commit) and asynchronous processing (via MQ).
Dependency
<dependency>
<groupId>io.softa</groupId>
<artifactId>flow-starter</artifactId>
<version>${softa.version}</version>
</dependency>Requirements
- Database contains flow metadata tables: FlowConfig, FlowTrigger, FlowNode, FlowEdge, FlowStage, FlowEvent, FlowInstance, FlowDebugHistory.
- Pulsar is required for async flow events and async task execution.
- ChangeLog and Cron integration are optional. Enable if you use those trigger sources.
Configuration
Enable flow
enable:
flow: trueMQ topics
mq:
topics:
change-log:
topic: dev_demo_change_log
flow-sub: dev_demo_change_log_flow_sub
cron-task:
topic: dev_demo_cron_task
flow-sub: dev_demo_cron_task_flow_sub
flow-async-task:
topic: dev_demo_flow_async_task
sub: dev_demo_flow_async_task_sub
flow-event:
topic: dev_demo_flow_event
sub: dev_demo_flow_event_subKey Concepts
- FlowConfig: the flow definition. Key fields include name, flowType, sync, rollbackOnFail, debugMode, active.
- FlowTrigger: defines the event that triggers a flow, including eventType, sourceModel, sourceFields, triggerCondition, and cronId.
- FlowNode: the execution unit. Each node has nodeType, nodeParams, nodeCondition, and optional exceptionPolicy.
- FlowEdge: edge between nodes (mainly for layout/visualization).
- FlowStage: optional grouping for nodes.
Execution Model
Trigger sources:
- ChangeLog events: generated by ORM change log. Synchronous flows run BEFORE_COMMIT for validation; async flows are sent to MQ.
- API events: POST /automation/apiEvent triggers by TriggerEventDTO.
- Cron events: consumed from cron-task MQ and routed by cronId.
- Subflow events: TriggerSubflow node calls another flow by triggerId.
Trigger conditions:
- FlowTrigger.triggerCondition is evaluated against the triggerParams map. For ChangeLog events, triggerParams is dataAfterChange for CREATE/UPDATE and dataBeforeChange for DELETE. For API events, triggerParams is TriggerEventDTO.eventParams.
- For UPDATE events, sourceFields is used to filter triggers. If sourceFields is empty, any update matches.
Sync vs async:
- FlowConfig.sync = true: executes in-process. If rollbackOnFail = true, the flow is wrapped in a transaction.
- FlowConfig.sync = false: FlowEventMessage is sent to MQ and executed by FlowEventConsumer.
Important behavior notes:
- FlowManager loads all FlowTrigger and FlowConfig records once at startup and caches them in memory. Updating flow metadata requires a restart or a manual reload.
- Validation flows run BEFORE_COMMIT. Avoid mutating data inside these flows.
- FlowConstant.EXCLUDE_TRIGGER_MODELS excludes FlowInstance and FlowEvent from triggering flows to avoid loops.
Context and Expressions
The flow engine passes a NodeContext across nodes. It includes:
- FlowEnv variables: NOW, TODAY, YESTERDAY.
- TriggerParams: the row data that triggered the flow.
- SourceRowId: the id of the triggering record.
Template expression syntax (all templates use {{ expr }}):
- Variables and expressions:
{{ TriggerParams.id }},{{ price * qty }},{{ NOW }}. - Reserved field references in filters:
{{ @fieldName }},{{ @parent.fieldName }}.
Node results:
- Many nodes store their output in NodeContext using the node id as the key.
Use
{{ <nodeId> }}or{{ <nodeId>.field }}in templates and filters.
Node exception policy:
- FlowNode.exceptionPolicy supports NodeExceptionPolicy to handle node result exceptions. It can detect empty/false results and emit signals such as EndFlow or ThrowException.
Node Types (Implemented)
- ValidateData: validate with expression, throws BusinessException on failure.
- GetData: query data; supports single/multi row, field value(s), exists, count.
- ComputeData: compute expression result.
- CreateData, UpdateData, DeleteData: CRUD operations based on templates and filters.
- ExtractTransform: extract a field from a collection into a Set.
- Condition: evaluate a condition and emit an exception signal.
- TriggerSubflow: trigger another flow by triggerId.
- AsyncTask: send an async task message to MQ.
- QueryAi: query AI and store the reply content.
- ReturnData: set the flow return payload.
Node Types (Stubs / No-Op)
These nodes have empty processors and should be implemented to be useful:
- BranchGateway
- LoopByDataset
- LoopByPage
- TransferStage
- GenerateReport
- SendMessage
- WebHook
- ApprovalNode
Note: FlowNodeService contains loop iteration logic for LoopByDataset and LoopByPage, but the starter does not provide NodeProcessor implementations for them. Add custom processors to make them executable.
4. Flow Key Elements
The following parameters are aligned with the current code. Field values support:
- constants
- expressions
{{ expr }}(variables, map access, computed values), e.g.{{ TriggerParams.status }},{{ price * qty }},{{ NOW }} - reserved field references in Filters:
{{ @fieldName }},{{ @parent.fieldName }}
NodeGetDataType options:
- MultiRows, SingleRow, OneFieldValue, OneFieldValues, Exist, Count
ValueType options:
- String, Integer, Long, Double, BigDecimal, Boolean, Date, DateTime, Time
| Node Type | Params Object | Param | Required | Description |
|---|---|---|---|---|
| Compute Data | ComputeDataParams | expression | Required | Computation expression, multi-line text |
valueType | Required | Result value type, ValueType | ||
| Create Data | CreateDataParams | modelName | Required | Model name to create |
rowTemplate | Required | Data template; values support constants and {{ expr }} | ||
| Delete Data | DeleteDataParams | modelName | Required | Model name to delete |
pkVariable | Either | PK variable name, single/multi {{ var }} | ||
filters | Composite filters; values support constants, {{ expr }}, {{ @fieldName }} | |||
| Get Data | GetDataParams | modelName | Required | Model name to query |
getDataType | Required | Return type, NodeGetDataType | ||
fields | Field list; empty defaults to id only | |||
filters | Composite filters; values support constants, {{ expr }}, {{ @fieldName }} | |||
orders | Order conditions, Orders array | |||
acrossTimeline | Whether to query across timeline | |||
limitSize | Max rows, up to 10000 | |||
| Update Data | UpdateDataParams | modelName | Required | Model name to update |
pkVariable | Either | PK variable name, single/multi {{ var }} | ||
filters | Composite filters; values support constants, {{ expr }}, {{ @fieldName }} | |||
rowTemplate | Required | Data template; values support constants and {{ expr }} | ||
| Validate Data | ValidateDataParams | expression | Required | Validation expression, multi-line text |
exceptionMsg | Required | Exception message on failure, supports {{ expr }} | ||
| Async Task | AsyncTaskParams | asyncTaskHandlerCode | Required | Async task handler |
dataTemplate | Task params template; values support constants and {{ expr }} | |||
| Extract Transform | ExtractTransformParams | collectionVariable | Required | Collection variable, {{ var }} |
itemKey | Required | Field name to extract | ||
| Return Data | ReturnDataParams | dataTemplate | Required | Return data template; values support constants and {{ expr }} |
| Condition | ConditionParams | passCondition | Required | Condition expression |
exceptionSignal | Required | NodeExceptionSignal | ||
exceptionMessage | Exception message, supports {{ expr }} | |||
| Branch Gateway | BranchGatewayParams | serialGateway | Serial gateway flag, default parallel | |
| Query AI | QueryAiParams | robotId | Required | |
conversationId | Required | |||
queryContent | Required | Supports {{ expr }} interpolation | ||
| Trigger Subflow | TriggerSubflowParams | subflowTriggerId | Required | Subflow trigger ID |
dataTemplate | Subflow params template | |||
| WebHook | WebHookParams | No params yet | ||
| Generate Report | GenerateReportParams | No params yet | ||
| Send Message | SendMessageParams | message | Required | Message content |
recipient | Required | Recipient | ||
sender | Required | Sender | ||
| Transfer Stage | TransferStageParams | No params yet |
REST APIs
- POST /automation/apiEvent
- POST /automation/onchange (currently returns empty Map)
- POST /automation/simulateEvent (non-prod only)
- GET /FlowConfig/getByModel
- GET /FlowConfig/getFlowById
Entity CRUD endpoints are also available for FlowConfig, FlowTrigger, FlowNode, FlowEdge, FlowStage, FlowInstance, FlowEvent, and FlowDebugHistory via EntityController.
Examples
API event request body:
{
"sourceModel": "Order",
"sourceRowId": 1001,
"triggerId": 2001,
"eventParams": {
"id": 1001,
"status": "PAID",
"totalAmount": 199.99,
"updatedBy": "system"
}
}API event:
curl -X POST http://localhost:8080/automation/apiEvent \
-H 'Content-Type: application/json' \
-d @- <<'JSON'
{
"sourceModel": "Order",
"sourceRowId": 1001,
"triggerId": 2001,
"eventParams": {
"id": 1001,
"status": "PAID",
"totalAmount": 199.99,
"updatedBy": "system"
}
}
JSONSimulate flow event request body (non-prod only):
{
"flowId": 3001,
"flowNodeId": null,
"rollbackOnFail": true,
"triggerId": 2001,
"sourceModel": "Order",
"sourceRowId": 1001,
"triggerParams": {
"id": 1001,
"status": "PAID",
"totalAmount": 199.99
}
}Simulate flow event (non-prod only):
curl -X POST http://localhost:8080/automation/simulateEvent \
-H 'Content-Type: application/json' \
-d @- <<'JSON'
{
"flowId": 3001,
"flowNodeId": null,
"rollbackOnFail": true,
"triggerId": 2001,
"sourceModel": "Order",
"sourceRowId": 1001,
"triggerParams": {
"id": 1001,
"status": "PAID",
"totalAmount": 199.99
}
}
JSONNode parameters reference (common nodeParams templates):
{
"validateData": {
"expression": "TriggerParams.totalAmount > 0",
"exceptionMsg": "totalAmount must be greater than 0 for order {{ TriggerParams.id }}"
},
"getData": {
"modelName": "Order",
"getDataType": "MultiRows",
"fields": ["id", "status", "totalAmount"],
"filters": ["status", "=", "PENDING"],
"orders": ["createdTime", "DESC"],
"limitSize": 100
},
"extractTransform": {
"collectionVariable": "{{ 101 }}",
"itemKey": "id"
},
"computeData": {
"expression": "1 + 2",
"valueType": "Integer"
},
"updateData": {
"modelName": "Order",
"pkVariable": "{{ 102 }}",
"rowTemplate": {
"status": "PROCESSING",
"updatedAt": "{{ NOW }}"
}
},
"deleteData": {
"modelName": "Order",
"filters": ["status", "=", "CANCELLED"]
},
"condition": {
"passCondition": "TriggerParams.status == 'PAID'",
"exceptionSignal": "EndFlow",
"exceptionMessage": "status is not PAID, flow ended"
},
"returnData": {
"dataTemplate": {
"orderId": "{{ TriggerParams.id }}",
"status": "{{ TriggerParams.status }}"
}
},
"asyncTask": {
"asyncTaskHandlerCode": "OrderNotify",
"dataTemplate": {
"orderId": "{{ TriggerParams.id }}",
"status": "{{ TriggerParams.status }}"
}
},
"triggerSubflow": {
"subflowTriggerId": 4001,
"dataTemplate": {
"orderId": "{{ TriggerParams.id }}",
"totalAmount": "{{ TriggerParams.totalAmount }}"
}
},
"loopByDataset": {
"dataSetParam": "{{ 101 }}",
"loopItemNaming": "orderItem"
},
"loopByPage": {
"model": "Order",
"fields": ["id", "status"],
"filters": ["status", "=", "PENDING"],
"pageSize": 50,
"pageParamNaming": "pageRows"
},
"queryAi": {
"robotId": 1,
"conversationId": 1,
"queryContent": "Summarize order {{ TriggerParams.id }}"
}
}