diff --git a/aws-step-functions/POWER.md b/aws-step-functions/POWER.md new file mode 100644 index 0000000..1e8699c --- /dev/null +++ b/aws-step-functions/POWER.md @@ -0,0 +1,103 @@ +--- +name: "aws-step-functions" +displayName: "AWS Step Functions" +description: "Build workflows with AWS Step Functions state machines using the JSONata query language. Covers Amazon States Language (ASL) structure, state types, variables, data transformation, error handling, AWS service integration, and migrating from the JSONPath to the JSONata query language." +keywords: ["step functions", "state machine", "serverless", "jsonata", "asl", "amazon states language", "workflow", "orchestration"] +author: "AWS" +--- + +# AWS Step Functions + +## Overview + +AWS Step Functions uses Amazon States Language (ASL) to define state machines as JSON. With AWS Step Functions, you can create workflows, also called State machines, to build distributed applications, automate processes, orchestrate microservices, and create data and machine learning pipelines. + +This power provides comprehensive guidance for writing state machines in ASL, covering: +- ASL structure and JSONata expression syntax +- Details on the eight available workflow states +- The `$states` reserved variable +- Workflow variables with `Assign` +- Error handling +- AWS Service integration patterns +- Example code for data transformation and architecture +- Validation and testing of state machines +- How to migrate from JSONPath to JSONata + +## When to Load Steering Files + +Load the appropriate steering file based on what the user is working on: + +- **ASL structure**, **state types**, **Task**, **Pass**, **Choice**, **Wait**, **Succeed**, **Fail**, **Parallel**, **Map** → see `asl-state-types.md` +- **Variables**, **Assign**, **data passing**, **scope**, **$states**, **input**, **output**, **Arguments**, **Output**, **data transformation**, **QueryEvaluationError** → see `variables-and-data.md` +- **Error handling**, **troubleshooting**, **Retry**, **Catch**, **fallback**, **error codes**, **States.Timeout**, **States.ALL** → see `error-handling.md` +- **Service integrations**, **Lambda invoke**, **DynamoDB**, **SNS**, **SQS**, **SDK integrations**, **Resource ARN**, **sync**, **async** → see `service-integrations.md` +- **Migrating from JSONPath to JSONata**, **migration**, **JSONPath to JSONata**, **InputPath**, **Parameters**, **ResultSelector**, **ResultPath**, **OutputPath**, **intrinsic functions**, **Iterator**, **payload template** → see `migrating-from-jsonpath-to-jsonata.md` +- **Validation**, **linting**, **testing**, **TestState**, **test state**, **mock**, **mocking**, **unit test**, **inspection level**, **DEBUG**, **TRACE**, **validate state**, **test in isolation** → see `validation-and-testing.md` +- **Architecture patterns**, **examples**, **polling**, **saga**, **compensation**, **scatter-gather**, **semaphore**, **lock**, **human-in-the-loop**, **escalation**, **Express to Standard** → see `architecture-patterns.md` +- **Data transformation**, **JSONata expressions**, **filtering**, **aggregation**, **string operations**, **$reduce**, **$lookup**, **$toMillis**, **$partition**, **$parse**, **$hash**, **$uuid** → see `transforming-data.md` +- **State input/output**, **$states**, **Assign**, **Output**, **Arguments**, **variable scope**, **variable limits**, **evaluation order**, **passing data between states** → see `processing-state-inputs-and-outputs.md` + +## Quick Reference + +### Standard vs Express Workflows + +| | Standard | Express | +| --------------------------------- | ------------------------------------ | ------------------------------------------- | +| **Max duration** | 1 year | 5 minutes | +| **Execution semantics** | Exactly-once | At-least-once (async) / At-most-once (sync) | +| **Execution history** | Retained 90 days, queryable via API | CloudWatch Logs only | +| **Max throughput** | 2,000 exec/sec | 100,000 exec/sec | +| **Pricing model** | Per state transition | Per execution count + duration | +| **`.sync` / `.waitForTaskToken`** | Supported | Not supported | +| **Best for** | Auditable, non-idempotent operations | High-volume, idempotent event processing | + +**Choose Standard** for: payment processing, order fulfillment, compliance workflows, anything that must never execute twice. + +**Choose Express** for: IoT data ingestion, streaming transformations, mobile backends, high-throughput short-lived processing. + +### Setting the State Machine Query Language + +JSONata is the modern, preferred way to reference and transform data in ASL. It replaces the five JSONPath I/O fields (`InputPath`, `Parameters`, `ResultSelector`, `ResultPath`, `OutputPath`) with just two: `Arguments` (inputs) and `Output`. + +**Enable at the top level** to apply to all states: + +```json +{ "QueryLanguage": "JSONata", "StartAt": "...", "States": {...} } +``` + +**Or per-state** to migrate from JSONPath incrementally: + +```json +{ "Type": "Task", "QueryLanguage": "JSONata", ... } +``` + +**JSONPath is still supported** and is the default if `QueryLanguage` is omitted — existing state machines do not need to be migrated. + +## Best Practices + +- Set `"QueryLanguage": "JSONata"` at the top level for new state machines unless the user wants to use JSONPath +- Keep `Output` minimal — only include what the state immediately after the current state needs +- Use `Assign` to store variables needed in later states instead of threading it through Output +- Use `$states.input` to reference original state input +- Remember: `Assign` and `Output` are evaluated in parallel — variable assignments in `Assign` are NOT available in `Output` of the same state +- All JSONata expressions must produce a defined value — `$data.nonExistentField` throws `States.QueryEvaluationError` +- Use `$states.context.Execution.Input` to access the original workflow input from any state +- Save state machine definitions with `.asl.json` extension when working outside the console +- Prefer the optimized Lambda integration (`arn:aws:states:::lambda:invoke`) over the SDK integration + +## Troubleshooting + +### Common Errors + +- `States.QueryEvaluationError` — JSONata expression failed. Check for type errors, undefined fields, or out-of-range values. +- Mixing JSONPath fields with JSONata fields in the same state. +- Using `$` or `$$` at the top level of a JSONata expression — use `$states.input` instead. +- Forgetting `{% %}` delimiters around JSONata expressions — the string will be treated as a literal. +- Assigning variables in `Assign` and expecting them in `Output` of the same state — new values only take effect in the next state. +- Reference validation-and-testing.md and error-handling.md for detailed troubleshooting information. + +## Resources + +- [ASL Specification](https://states-language.net/spec.html) +- [JSONata documentation](https://docs.jsonata.org/overview.html) +- [Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html) diff --git a/aws-step-functions/examples/compensation-saga-pattern.asl.json b/aws-step-functions/examples/compensation-saga-pattern.asl.json new file mode 100644 index 0000000..1f16893 --- /dev/null +++ b/aws-step-functions/examples/compensation-saga-pattern.asl.json @@ -0,0 +1,130 @@ +{ + "QueryLanguage": "JSONata", + "StartAt": "ReserveInventory", + "States": { + "ReserveInventory": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Arguments": { + "TableName": "InventoryTable", + "Key": { + "productId": { + "S": "{% $states.input.productId %}" + } + }, + "UpdateExpression": "SET reserved = reserved + :qty", + "ExpressionAttributeValues": { + ":qty": { + "N": "{% $string($states.input.quantity) %}" + } + } + }, + "Assign": { + "reservedQty": "{% $states.input.quantity %}" + }, + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Assign": { + "failedStep": "ReserveInventory", + "errorInfo": "{% $states.errorOutput %}" + }, + "Next": "OrderFailed" + } + ], + "Next": "ChargePayment" + }, + "ChargePayment": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:ChargeCard:$LATEST", + "Payload": { + "orderId": "{% $orderId %}", + "amount": "{% $states.input.total %}" + } + }, + "Assign": { + "chargeId": "{% $states.result.Payload.chargeId %}" + }, + "Output": "{% $states.result.Payload %}", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Assign": { + "failedStep": "ChargePayment", + "errorInfo": "{% $states.errorOutput %}" + }, + "Next": "ReleaseInventory" + } + ], + "Next": "ShipOrder" + }, + "ShipOrder": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:ShipOrder:$LATEST", + "Payload": { + "orderId": "{% $orderId %}" + } + }, + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Assign": { + "failedStep": "ShipOrder", + "errorInfo": "{% $states.errorOutput %}" + }, + "Next": "RefundPayment" + } + ], + "Next": "OrderComplete" + }, + "RefundPayment": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:RefundCharge:$LATEST", + "Payload": { + "chargeId": "{% $chargeId %}", + "reason": "{% $errorInfo.Cause %}" + } + }, + "Next": "ReleaseInventory" + }, + "ReleaseInventory": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Arguments": { + "TableName": "InventoryTable", + "Key": { + "productId": { + "S": "{% $states.input.productId %}" + } + }, + "UpdateExpression": "SET reserved = reserved - :qty", + "ExpressionAttributeValues": { + ":qty": { + "N": "{% $string($reservedQty) %}" + } + } + }, + "Next": "OrderFailed" + }, + "OrderFailed": { + "Type": "Fail", + "Error": "{% $failedStep & 'Error' %}", + "Cause": "{% 'Order ' & $orderId & ' failed at ' & $failedStep & ': ' & ($exists($errorInfo.Cause) ? $errorInfo.Cause : 'Unknown') %}" + }, + "OrderComplete": { + "Type": "Succeed" + } + } +} diff --git a/aws-step-functions/examples/express-standard-handoff.asl.json b/aws-step-functions/examples/express-standard-handoff.asl.json new file mode 100644 index 0000000..ac0786a --- /dev/null +++ b/aws-step-functions/examples/express-standard-handoff.asl.json @@ -0,0 +1,75 @@ +{ + "Comment": "Express workflow - fast ingest and validation", + "QueryLanguage": "JSONata", + "StartAt": "ValidateInput", + "States": { + "ValidateInput": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:ValidateOrder:$LATEST", + "Payload": "{% $states.input %}" + }, + "Output": "{% $states.result.Payload %}", + "Next": "EnrichData" + }, + "EnrichData": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "LookupCustomer", + "States": { + "LookupCustomer": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Arguments": { + "TableName": "CustomersTable", + "Key": { + "customerId": { + "S": "{% $states.input.customerId %}" + } + } + }, + "Output": "{% $states.result.Item %}", + "End": true + } + } + }, + { + "StartAt": "LookupPricing", + "States": { + "LookupPricing": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:GetPricing:$LATEST", + "Payload": "{% $states.input %}" + }, + "Output": "{% $states.result.Payload %}", + "End": true + } + } + } + ], + "Output": { + "order": "{% $states.input %}", + "customer": "{% $states.result[0] %}", + "pricing": "{% $states.result[1] %}" + }, + "Next": "HandOffToStandard" + }, + "HandOffToStandard": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution", + "Arguments": { + "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:OrderFulfillment-Standard", + "Input": "{% $string($states.input) %}" + }, + "Output": { + "status": "handed_off", + "childExecutionArn": "{% $states.result.ExecutionArn %}" + }, + "End": true + } + } +} diff --git a/aws-step-functions/examples/human-in-the-loop-with-timeout-escalation.asl.json b/aws-step-functions/examples/human-in-the-loop-with-timeout-escalation.asl.json new file mode 100644 index 0000000..73884e8 --- /dev/null +++ b/aws-step-functions/examples/human-in-the-loop-with-timeout-escalation.asl.json @@ -0,0 +1,108 @@ +{ + "QueryLanguage": "JSONata", + "StartAt": "RequestApproval", + "States": { + "RequestApproval": { + "Type": "Task", + "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", + "Arguments": { + "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/ApprovalQueue", + "MessageBody": "{% $string({'taskToken': $states.context.Task.Token, 'orderId': $orderId, 'approver': $states.input.primaryApprover, 'amount': $states.input.amount}) %}" + }, + "TimeoutSeconds": 86400, + "Assign": { + "approvalResult": "{% $states.result %}" + }, + "Catch": [ + { + "ErrorEquals": [ + "States.Timeout" + ], + "Assign": { + "escalationReason": "Primary approver did not respond within 24 hours" + }, + "Next": "EscalateToManager" + }, + { + "ErrorEquals": [ + "States.ALL" + ], + "Assign": { + "approvalError": "{% $states.errorOutput %}" + }, + "Next": "ApprovalFailed" + } + ], + "Next": "EvaluateApproval" + }, + "EscalateToManager": { + "Type": "Task", + "Resource": "arn:aws:states:::sns:publish", + "Arguments": { + "TopicArn": "arn:aws:sns:us-east-1:123456789012:EscalationNotifications", + "Subject": "Approval Escalation", + "Message": "{% 'Order ' & $orderId & ' requires manager approval. ' & $escalationReason %}" + }, + "Next": "WaitForManagerApproval" + }, + "WaitForManagerApproval": { + "Type": "Task", + "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", + "Arguments": { + "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/ApprovalQueue", + "MessageBody": "{% $string({'taskToken': $states.context.Task.Token, 'orderId': $orderId, 'approver': $states.input.managerApprover, 'amount': $states.input.amount, 'escalated': true}) %}" + }, + "TimeoutSeconds": 43200, + "Assign": { + "approvalResult": "{% $states.result %}" + }, + "Catch": [ + { + "ErrorEquals": [ + "States.Timeout" + ], + "Assign": { + "approvalResult": { + "decision": "rejected", + "reason": "No response from manager within 12 hours; auto-rejected" + } + }, + "Next": "EvaluateApproval" + }, + { + "ErrorEquals": [ + "States.ALL" + ], + "Assign": { + "approvalError": "{% $states.errorOutput %}" + }, + "Next": "ApprovalFailed" + } + ], + "Next": "EvaluateApproval" + }, + "EvaluateApproval": { + "Type": "Choice", + "Choices": [ + { + "Condition": "{% $approvalResult.decision = 'approved' %}", + "Next": "ProcessApprovedOrder" + } + ], + "Default": "OrderRejected" + }, + "ProcessApprovedOrder": { + "Type": "Succeed" + }, + "OrderRejected": { + "Type": "Fail", + "Error": "OrderRejected", + "Cause": "{% $exists($approvalResult.reason) ? $approvalResult.reason : 'Approval denied' %}" + }, + "ApprovalFailed": { + "Type": "Fail", + "Error": "ApprovalFailed", + "Cause": "{% $exists($approvalError.Cause) ? $approvalError.Cause : 'Approval process encountered an error' %}" + } + } +} diff --git a/aws-step-functions/examples/nested-map-parallel-structures.asl.json b/aws-step-functions/examples/nested-map-parallel-structures.asl.json new file mode 100644 index 0000000..7139f69 --- /dev/null +++ b/aws-step-functions/examples/nested-map-parallel-structures.asl.json @@ -0,0 +1,70 @@ +{ + "QueryLanguage": "JSONata", + "StartAt": "ProcessAllOrders", + "States": { + "ProcessAllOrders": { + "Type": "Map", + "Items": "{% $states.input.orders %}", + "MaxConcurrency": 5, + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "ProcessSingleOrder", + "States": { + "ProcessSingleOrder": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "ValidatePayment", + "States": { + "ValidatePayment": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:ValidatePayment:$LATEST", + "Payload": "{% $states.input %}" + }, + "Output": "{% $states.result.Payload %}", + "End": true + } + } + }, + { + "StartAt": "CheckInventory", + "States": { + "CheckInventory": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Arguments": { + "TableName": "InventoryTable", + "Key": { + "productId": { + "S": "{% $states.input.productId %}" + } + } + }, + "Output": "{% $states.result.Item %}", + "End": true + } + } + } + ], + "Output": { + "payment": "{% $states.result[0] %}", + "inventory": "{% $states.result[1] %}" + }, + "End": true + } + } + }, + "Assign": { + "orderResults": "{% $states.result %}" + }, + "Next": "Summarize" + }, + "Summarize": { + "Type": "Succeed" + } + } +} diff --git a/aws-step-functions/examples/polling-loop-wait-check-choice.asl.json b/aws-step-functions/examples/polling-loop-wait-check-choice.asl.json new file mode 100644 index 0000000..e512a34 --- /dev/null +++ b/aws-step-functions/examples/polling-loop-wait-check-choice.asl.json @@ -0,0 +1,77 @@ +{ + "QueryLanguage": "JSONata", + "StartAt": "SubmitOrder", + "States": { + "SubmitOrder": { + "Type": "Task", + "Resource": "arn:aws:states:::sqs:sendMessage", + "Arguments": { + "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/FulfillmentQueue", + "MessageBody": "{% $string({'orderId': $states.input.orderId, 'items': $states.input.items}) %}" + }, + "Assign": { + "fulfillmentOrderId": "{% $states.input.orderId %}" + }, + "Next": "InitialWaitForFulfillment" + }, + "InitialWaitForFulfillment": { + "Type": "Wait", + "Seconds": 300, + "Next": "CheckFulfillmentStatus" + }, + "CheckFulfillmentStatus": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Arguments": { + "TableName": "OrdersTable", + "Key": { + "orderId": { + "S": "{% $fulfillmentOrderId %}" + } + } + }, + "Assign": { + "orderStatus": "{% $states.result.Item.status.S %}" + }, + "Next": "EvaluateFulfillment", + "Retry": [ + { + "ErrorEquals": [ + "States.TaskFailed", + "ThrottlingException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ] + }, + "EvaluateFulfillment": { + "Type": "Choice", + "Choices": [ + { + "Condition": "{% $orderStatus = 'fulfilled' %}", + "Next": "FulfillmentComplete" + }, + { + "Condition": "{% $orderStatus in ['failed', 'cancelled'] %}", + "Next": "FulfillmentFailed" + } + ], + "Default": "WaitBeforeNextPoll" + }, + "WaitBeforeNextPoll": { + "Type": "Wait", + "Seconds": 60, + "Next": "CheckFulfillmentStatus" + }, + "FulfillmentComplete": { + "Type": "Succeed" + }, + "FulfillmentFailed": { + "Type": "Fail", + "Error": "FulfillmentFailed", + "Cause": "{% 'Order fulfillment status: ' & $orderStatus %}" + } + } +} diff --git a/aws-step-functions/examples/scatter-gather-with-partial-results.asl.json b/aws-step-functions/examples/scatter-gather-with-partial-results.asl.json new file mode 100644 index 0000000..91064e2 --- /dev/null +++ b/aws-step-functions/examples/scatter-gather-with-partial-results.asl.json @@ -0,0 +1,73 @@ +{ + "QueryLanguage": "JSONata", + "StartAt": "CallExternalAPIs", + "States": { + "CallExternalAPIs": { + "Type": "Map", + "Items": "{% $states.input.records %}", + "MaxConcurrency": 10, + "ToleratedFailurePercentage": 100, + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "CallAPI", + "States": { + "CallAPI": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:CallExternalAPI:$LATEST", + "Payload": "{% $states.input %}" + }, + "Output": "{% $states.result.Payload %}", + "Retry": [ + { + "ErrorEquals": [ + "States.TaskFailed" + ], + "IntervalSeconds": 2, + "MaxAttempts": 2, + "BackoffRate": 2.0, + "JitterStrategy": "FULL" + } + ], + "End": true + } + } + }, + "Next": "SplitResults" + }, + "SplitResults": { + "Type": "Pass", + "Assign": { + "successes": "{% ( $s := $states.input[$not($exists(Error))]; $type($s) = 'array' ? $s : $exists($s) ? [$s] : [] ) %}", + "failures": "{% ( $f := $states.input[$exists(Error)]; $type($f) = 'array' ? $f : $exists($f) ? [$f] : [] ) %}" + }, + "Output": { + "successes": "{% ( $s := $states.input[$not($exists(Error))]; $type($s) = 'array' ? $s : $exists($s) ? [$s] : [] ) %}", + "failures": "{% ( $f := $states.input[$exists(Error)]; $type($f) = 'array' ? $f : $exists($f) ? [$f] : [] ) %}", + "totalProcessed": "{% $count($states.input) %}" + }, + "Next": "EvaluateResults" + }, + "EvaluateResults": { + "Type": "Choice", + "Choices": [ + { + "Condition": "{% $count($successes) = 0 %}", + "Next": "AllFailed" + } + ], + "Default": "ProcessSuccesses" + }, + "AllFailed": { + "Type": "Fail", + "Error": "AllItemsFailed", + "Cause": "Every item in the batch failed processing" + }, + "ProcessSuccesses": { + "Type": "Succeed" + } + } +} diff --git a/aws-step-functions/examples/semaphore-concurrency-lock.asl.json b/aws-step-functions/examples/semaphore-concurrency-lock.asl.json new file mode 100644 index 0000000..cab5b5f --- /dev/null +++ b/aws-step-functions/examples/semaphore-concurrency-lock.asl.json @@ -0,0 +1,126 @@ +{ + "QueryLanguage": "JSONata", + "StartAt": "AcquireLock", + "States": { + "AcquireLock": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:putItem", + "Arguments": { + "TableName": "LocksTable", + "Item": { + "lockId": { + "S": "{% $states.input.customerId %}" + }, + "executionId": { + "S": "{% $states.context.Execution.Id %}" + }, + "expiresAt": { + "N": "{% $string($toMillis($now()) + 900000) %}" + } + }, + "ConditionExpression": "attribute_not_exists(lockId) OR expiresAt < :now", + "ExpressionAttributeValues": { + ":now": { + "N": "{% $string($toMillis($now())) %}" + } + } + }, + "Retry": [ + { + "ErrorEquals": [ + "DynamoDB.ConditionalCheckFailedException" + ], + "IntervalSeconds": 5, + "MaxAttempts": 12, + "BackoffRate": 1.5, + "JitterStrategy": "FULL" + } + ], + "Catch": [ + { + "ErrorEquals": [ + "DynamoDB.ConditionalCheckFailedException" + ], + "Assign": { + "lockError": "{% $states.errorOutput %}" + }, + "Next": "LockUnavailable" + } + ], + "Next": "DoProtectedWork" + }, + "DoProtectedWork": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:ProcessCustomer:$LATEST", + "Payload": "{% $states.input %}" + }, + "Output": "{% $states.result.Payload %}", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Assign": { + "workError": "{% $states.errorOutput %}" + }, + "Next": "ReleaseLock" + } + ], + "Next": "ReleaseLock" + }, + "ReleaseLock": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:deleteItem", + "Arguments": { + "TableName": "LocksTable", + "Key": { + "lockId": { + "S": "{% $states.input.customerId %}" + } + }, + "ConditionExpression": "executionId = :execId", + "ExpressionAttributeValues": { + ":execId": { + "S": "{% $states.context.Execution.Id %}" + } + } + }, + "Retry": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2.0 + } + ], + "Next": "CheckWorkResult" + }, + "CheckWorkResult": { + "Type": "Choice", + "Choices": [ + { + "Condition": "{% $exists($workError) %}", + "Next": "WorkFailed" + } + ], + "Default": "Done" + }, + "WorkFailed": { + "Type": "Fail", + "Error": "{% $exists($workError.Error) ? $workError.Error : 'WorkFailed' %}", + "Cause": "{% $exists($workError.Cause) ? $workError.Cause : 'Protected work failed' %}" + }, + "Done": { + "Type": "Succeed" + }, + "LockUnavailable": { + "Type": "Fail", + "Error": "LockContention", + "Cause": "{% 'Could not acquire lock for ' & $states.input.customerId & ' after retries' %}" + } + } +} diff --git a/aws-step-functions/steering/architecture-patterns.md b/aws-step-functions/steering/architecture-patterns.md new file mode 100644 index 0000000..425da87 --- /dev/null +++ b/aws-step-functions/steering/architecture-patterns.md @@ -0,0 +1,70 @@ +# Architecture Patterns (JSONata Mode) + +## Polling Loop (Wait → Check → Choice) + +Some AWS operations and user-defined tasks are asynchronous. The states pattern is: Start Task → initial wait (what is the expected time it takes to complete the task?) → call describe/status API → check result → short wait → loop back. + +See `examples/polling-loop-wait-check-choice.asl.json` + +--- + +## Compensation / Saga Pattern + +Step Functions has no built-in rollback. The saga pattern chains compensating actions in reverse order. Each forward step has a Catch that records which step failed, then routes to the appropriate compensation entry point. + +See `examples/compensation-saga-pattern.asl.json` + +Compensation chain: `ReserveInventory` fails → `OrderFailed`. `ChargePayment` fails → `ReleaseInventory` → `OrderFailed`. `ShipOrder` fails → `RefundPayment` → `ReleaseInventory` → `OrderFailed`. Each Catch records `$failedStep` and `$errorInfo`. Compensation states use variables from forward steps (`$chargeId`, `$reservedQty`) to know what to undo. + +--- + +## Nested Map / Parallel Structures + +Map and Parallel states can be nested in any order to create multiple layers. The key constraint is understanding variable scope and data flow at each nesting boundary. + +See `examples/nested-map-parallel-structures.asl.json` +See `variables-and-data.md` for details about variable scopes + +--- + +## Scatter-Gather with Partial Results + +When calling unreliable external APIs per-item, use `ToleratedFailurePercentage` on a Map to continue with whatever succeeded, then post-process the results to separate successes from failures. Failed iterations return objects with `Error` and `Cause` fields. + +See `examples/scatter-gather-with-partial-results.asl.json` + +Key elements: +- `ToleratedFailurePercentage: 100` lets the Map complete even if every item fails. Lower the threshold to bail out early. +- Filter on `$exists(Error)` to separate failed from successful iterations. +- Guard filtered results with the `$type`/`$exists`/`[]` pattern — JSONata returns a single object (not a 1-element array) when exactly one item matches, and undefined when nothing matches. + +--- + +## Semaphore / Concurrency Lock + +Step Functions has no native mutual exclusion. Use DynamoDB conditional writes as a distributed lock when only one execution should process a given resource at a time. Pattern: acquire lock → do work → release lock, with Catch ensuring release on failure. + +See `examples/semaphore-concurrency-lock.asl.json` + +Key elements: +- `ConditionExpression` with `attribute_not_exists` ensures only one writer wins. The `expiresAt` check provides stale-lock recovery if an execution crashes without releasing. +- `executionId` on the lock item lets `ReleaseLock` conditionally delete only its own lock. +- Retry on `ConditionalCheckFailedException` acts as a spin-wait. Tune `MaxAttempts` and `IntervalSeconds` based on expected hold time. +- Catch on `DoProtectedWork` routes to `ReleaseLock` so the lock is always released. After releasing, `CheckWorkResult` re-raises the error path. +- Set `expiresAt` to a reasonable TTL (here 15 min). Use a DynamoDB TTL attribute to auto-clean expired locks. + +--- + +## Human-in-the-Loop with Timeout Escalation + +Chain multiple `.waitForTaskToken` states with `States.Timeout` catches to build escalation: primary approver → manager → auto-reject. + +See `examples/human-in-the-loop-with-timeout-escalation.asl.json` + +--- + +## Express → Standard Handoff + +Express workflows are more cost-effective for high volume State Machine Invocations, but don't support callbacks or long waits. Standard workflows handle those but cost per state transition. Use Express for fast, high-volume ingest and kick off a Standard execution for the long-running tail. + +See `examples/express-standard-handoff.asl.json` diff --git a/aws-step-functions/steering/asl-state-types.md b/aws-step-functions/steering/asl-state-types.md new file mode 100644 index 0000000..649ac9e --- /dev/null +++ b/aws-step-functions/steering/asl-state-types.md @@ -0,0 +1,269 @@ +# ASL Structure and State Types (JSONata Mode) + +Quick reference for the eight state types in AWS Step Functions. Reference variables-and-data.md for details about the fields available inside each state. +--- + +## Pass State + +Passes input to output, optionally transforming it with JSONata. Useful for injecting or transforming data. Without `Output`, the Pass state copies input to output unchanged. + +```json +"SetupAndGreet": { + "Type": "Pass", + "Assign": { + "retryCount": 0, + "maxRetries": 3, + "config": "{% $states.input.configuration %}" + }, + "Output": { + "greeting": "{% 'Hello, ' & $states.input.name %}", + "timestamp": "{% $now() %}" + }, + "Next": "ProcessItem" +} +``` + +--- + +## Task State + +Executes work via AWS service integrations, activities, or HTTP APIs. Reference service-integrations.md for full details. + +### Required Fields +- `Resource`: ARN identifying the task to execute + +### Optional Fields +- `Arguments`: Input to the task (replaces JSONPath `Parameters`) +- `Output`: Transform the result +- `Assign`: Store variables from input or result +- `TimeoutSeconds`: Max task duration (default 99999999, accepts JSONata expression) +- `HeartbeatSeconds`: Heartbeat interval (must be < TimeoutSeconds) +- `Retry`: Retry policy array +- `Catch`: Error handler array +- `Credentials`: Cross-account role assumption + +--- + +## Choice State + +Uses `Choices` and `Condition` fields with JSONata boolean expressions to implement branching logic. + +Key points: +- `Condition` must evaluate to a boolean. +- Each Choice Rule can have its own `Assign` and `Output`. +- If a rule matches, its `Assign`/`Output` are used (not the state-level ones). +- If no rule matches, the state-level `Assign` is evaluated and `Default` is followed. +- `Default` is optional but recommended — without it, `States.NoChoiceMatched` is thrown. +- Choice states cannot be terminal (no `End` field). + +### Structure + +```json +"RouteOrder": { + "Type": "Choice", + "Choices": [ + { + "Condition": "{% $states.input.orderType = 'express' %}", + "Next": "ExpressShipping" + }, + { + "Condition": "{% $states.input.total > 100 %}", + "Assign": { + "discount": "{% $states.input.total * 0.1 %}" + }, + "Output": { + "total": "{% $states.input.total * 0.9 %}" + }, + "Next": "ApplyDiscount" + }, + { + "Condition": "{% $states.input.priority >= 5 and $states.input.category = 'urgent' %}", + "Next": "PriorityQueue" + } + ], + "Default": "StandardProcessing", + "Assign": { + "routedDefault": true + } +} +``` + +JSONata supports rich boolean logic: + +```json +"Condition": "{% $states.input.age >= 18 and $states.input.age <= 65 %}" +"Condition": "{% $states.input.status = 'active' or $states.input.override = true %}" +"Condition": "{% $not($exists($states.input.error)) %}" +"Condition": "{% $contains($states.input.email, '@') %}" +"Condition": "{% $count($states.input.items) > 0 %}" +"Condition": "{% $states.input.score >= $threshold %}" +``` + +--- + +## Wait State + +Delays execution for a specified duration or until a timestamp. + +### Wait with Dynamic Seconds + +```json +"DynamicWait": { + "Type": "Wait", + "Seconds": "{% $states.input.delaySeconds %}", + "Next": "Continue" +} +``` + +### Wait Until Timestamp + +```json +"WaitUntilDate": { + "Type": "Wait", + "Timestamp": "{% $states.input.scheduledTime %}", + "Next": "Execute" +} +``` + +Timestamps must conform to RFC3339 (e.g., `"2026-03-14T01:59:00Z"`). + +A Wait state must contain exactly one of `Seconds` or `Timestamp`. + +--- + +## Succeed State + +Terminates the state machine (or a Parallel branch / Map iteration) successfully. + +```json +"Done": { + "Type": "Succeed", + "Output": { + "status": "completed", + "processedAt": "{% $now() %}" + } +} +``` + +--- + +## Fail State + +Terminates the state machine with an error. + +```json +"DynamicFail": { + "Type": "Fail", + "Error": "{% $states.input.errorCode %}", + "Cause": "{% $states.input.errorMessage %}" +} +``` +Reference error-handling.md for more information. + +--- + +## Parallel State + +Executes multiple branches concurrently. All branches receive the same input. + +Key points: +- `Arguments` provides input to each branch's StartAt state (optional, defaults to state input). +- Result is an array with one element per branch, in the same order as `Branches`. +- If any branch fails, the entire Parallel state fails (unless caught). +- States inside branches can only transition to other states within the same branch. +- Branch variables are scoped — branches cannot access each other's variables. +- Use `Output` on terminal states within branches to pass data back to the outer scope. + +--- + +## Map State + +Iterates over a JSON array or a JSON object, processing each element (potentially in parallel). + +### Key Map Fields + +| Field | Description | +|-------|-------------| +| `Items` | JSON array, JSON object, or JSONata expression evaluating to an array or object | +| `ItemProcessor` | State machine to run for each item (has `StartAt` and `States`) | +| `ItemSelector` | Reshape each item before processing | +| `MaxConcurrency` | Max parallel iterations (0 = unlimited, 1 = sequential) | +| `ToleratedFailurePercentage` | 0-100, percentage of items allowed to fail | +| `ToleratedFailureCount` | Number of items allowed to fail | +| `ItemReader` | Read items from an external resource | +| `ItemBatcher` | Batch items into sub-arrays | +| `ResultWriter` | Write results to an external resource | + +### Map ProcessorConfig + +The `ItemProcessor` can include a `ProcessorConfig` to control execution mode. +- `INLINE` (default) — iterations run within the parent execution. Use for most cases. +- `DISTRIBUTED` — iterations run as child executions. Use for large-scale processing (thousands+ items), items read from S3, or when you need per-iteration execution history. + +```json +"ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "ProcessOrder", + "States": { ... } +} +``` + +### Map Failure Tolerance + +Use `ToleratedFailurePercentage` (0–100) and/or `ToleratedFailureCount` to allow partial failures. The Map state fails if either threshold is breached. + +### ItemReader (Distributed Map only, optional) + +Specifies a dataset and its location for a Distributed Map state. Omit when iterating over JSON data from a previous state. + +Sub-fields: +- `Resource` — The S3 API action. Use `arn:aws:states:::s3:getObject` for single files or `arn:aws:states:::s3:listObjectsV2` for listing objects. +- `Arguments` — JSON object specifying `Bucket`, `Key` (for getObject), or `Prefix` (for listObjectsV2). Values accept JSONata expressions. +- `ReaderConfig` — Configuration object with the following sub-fields: + - `InputType` — Required for most sources. Valid values: `CSV`, `JSON`, `JSONL`, `PARQUET`, `MANIFEST`. + - `Transformation` — Optional. `NONE` (default) iterates over metadata from ListObjectsV2. `LOAD_AND_FLATTEN` reads and processes the actual data objects, eliminating the need for nested Maps. + - `ManifestType` — Optional. `ATHENA_DATA` for Athena UNLOAD manifests, `S3_INVENTORY` for S3 inventory reports. When `S3_INVENTORY`, do not specify `InputType`. + - `CSVDelimiter` — Optional, for CSV/MANIFEST. Valid values: `COMMA` (default), `PIPE`, `SEMICOLON`, `SPACE`, `TAB`. + - `CSVHeaderLocation` — Optional, for CSV/MANIFEST. `FIRST_ROW` uses the file's first line. `GIVEN` requires a `CSVHeaders` array in the config. + - `CSVHeaders` — Array of column name strings. Required when `CSVHeaderLocation` is `GIVEN`. + - `ItemsPointer` — Optional, for JSON files. Uses JSONPointer syntax (e.g., `/data/items`) to select a nested array or object within the file. + - `MaxItems` — Optional. Limits the number of items processed. Accepts an integer or a JSONata expression evaluating to a positive integer. Maximum: 100,000,000. + +S3 buckets must be in the same AWS account and Region as the state machine. + +### ItemSelector (optional) + +Overrides the values of input items before they are passed to each iteration. Accepts a JSON object with key-value pairs. Values can be static or JSONata expressions. Can access the map state's context object. Available in both Inline and Distributed Map states. + +### ItemBatcher (Distributed Map only, optional) + +Groups items into batches for processing. Each child workflow execution receives an `Items` array and an optional `BatchInput` object. You must specify at least one of: +- `MaxItemsPerBatch` — Maximum number of items per batch. Accepts an integer or a JSONata expression evaluating to a positive integer. +- `MaxInputBytesPerBatch` — Maximum batch size in bytes (up to 256 KiB). Accepts an integer or a JSONata expression evaluating to a positive integer. +- `BatchInput` — Optional. Fixed JSON merged into each batch. Values accept JSONata expressions. + +If both `MaxItemsPerBatch` and `MaxInputBytesPerBatch` are specified, Step Functions reduces the item count to stay within the byte limit. + +### ResultWriter (Distributed Map only, optional) + +Controls output formatting and optional export of child workflow execution results to S3. The `ResultWriter` field cannot be empty — specify at least one of the following combinations: +- `WriterConfig` only — formats output without exporting to S3. +- `Resource` + `Arguments` only — exports to S3 without additional formatting. +- All three — formats and exports. + +Sub-fields: +- `Resource` — `arn:aws:states:::s3:putObject` +- `Arguments` — JSON object with `Bucket` and `Prefix`. Values accept JSONata expressions. +- `WriterConfig` — Configuration object: + - `Transformation` — `NONE` (includes execution metadata), `COMPACT` (output only, preserves array structure), or `FLATTEN` (output only, flattens nested arrays into one). + - `OutputType` — `JSON` (array) or `JSONL` (JSON Lines). + +When exporting, Step Functions writes `SUCCEEDED_n.json`, `FAILED_n.json`, and `PENDING_n.json` files plus a `manifest.json` to the specified S3 location. Individual result files are capped at 5 GB. The S3 bucket must be in the same account and Region as the state machine. + +Without `ResultWriter`, the Map state returns an array of child execution results directly. If the output exceeds 256 KiB, the execution fails with `States.DataLimitExceeded` — use `ResultWriter` to export to S3 instead. + +--- + +See `examples/nested-map-parallel-structures.asl.json` for a combined Map + Parallel example. \ No newline at end of file diff --git a/aws-step-functions/steering/error-handling.md b/aws-step-functions/steering/error-handling.md new file mode 100644 index 0000000..cf7686d --- /dev/null +++ b/aws-step-functions/steering/error-handling.md @@ -0,0 +1,150 @@ +# Error Handling in JSONata Mode + +## Overview + +When a state encounters an error, Step Functions defaults to failing the entire execution. You can override this with `Retry` (retry the failed state) and `Catch` (transition to a fallback state). `Retry` and `Catch` are available on: Task, Parallel, and Map states. + +## Error Names + +Errors are identified by case-sensitive strings. Step Functions defines these built-in error codes: + +| Error Code | Description | +|-----------|-------------| +| `States.ALL` | Wildcard — matches any error | +| `States.Timeout` | Task exceeded `TimeoutSeconds` or missed heartbeat | +| `States.HeartbeatTimeout` | Task missed heartbeat interval | +| `States.TaskFailed` | Task failed during execution | +| `States.Permissions` | Insufficient privileges | +| `States.QueryEvaluationError` | JSONata expression evaluation failed | +| `States.BranchFailed` | A Parallel state branch failed | +| `States.NoChoiceMatched` | No Choice rule matched and no Default | +| `States.ExceedToleratedFailureThreshold` | Map state exceeded failure tolerance | +| `States.ItemReaderFailed` | Map state ItemReader failed | +| `States.ResultWriterFailed` | Map state ResultWriter failed | + +Custom error names are allowed but must NOT start with `States.`. + +--- + +## Retry + +The `Retry` field is an array of Retrier objects. The interpreter scans retriers in order and uses the first one whose `ErrorEquals` matches. + +### Retrier Fields + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `ErrorEquals` | string[] | Required | Error names to match | +| `IntervalSeconds` | integer | 1 | Seconds before first retry | +| `MaxAttempts` | integer | 3 | Maximum retry attempts (0 = never retry) | +| `BackoffRate` | number | 2.0 | Multiplier for retry interval (must be ≥ 1.0) | +| `MaxDelaySeconds` | integer | — | Cap on retry interval | +| `JitterStrategy` | string | — | Jitter strategy (e.g., `"FULL"`) | + +Rules: +- `States.ALL` must appear alone in its `ErrorEquals` array. +- `States.ALL` must be in the last retrier. +- `MaxAttempts: 0` means "never retry this error." +- Retrier attempt counts reset when the interpreter transitions to another state. +- Retriers are evaluated in order. Each retrier tracks its own attempt count independently. + +--- + +## Catch + +The `Catch` field is an array of Catcher objects. After retries are exhausted (or if no retrier matches), the interpreter scans catchers in order. + +### Catcher Fields (JSONata) + +| Field | Type | Description | +|-------|------|-------------| +| `ErrorEquals` | string[] | Required. Error names to match | +| `Next` | string | Required. State to transition to | +| `Output` | any | Optional. Transform the error output | +| `Assign` | object | Optional. Assign variables from error context | + +### Error Output Structure + +When a state fails and matches a Catcher, `$states.errorOutput` is a JSON object with: +- `Error` (string) — the error name +- `Cause` (string) — human-readable error description + +In a Catch block, `Assign` and `Output` can reference: +- `$states.input` — the original state input +- `$states.errorOutput` — the error details +- `$states.context` — execution context + +If a Catcher matches, the state's top-level `Assign` is NOT evaluated — only the Catcher's `Assign` runs. If no `Output` is provided in the Catcher, the state output is the raw Error Output object. + +When both Retry and Catch are present, retries are attempted first. Only if retries are exhausted does the Catch apply. + +--- + +## Handling States.QueryEvaluationError + +JSONata expressions can fail at runtime. Common causes: + +1. Type error — `{% $x + $y %}` where `$x` or `$y` is not a number +2. Type incompatibility — `"TimeoutSeconds": "{% $name %}"` where `$name` is a string +3. Value out of range — negative number for `TimeoutSeconds` +4. Undefined result — `{% $data.nonExistentField %}` — JSON cannot represent undefined + +Prevent these errors with defensive expressions: use `$exists()` before accessing fields evaluated at runtime, `$type()` before arithmetic, and guard filtered results that may return a single object instead of an array. Always guard with `$exists()` — if a variable was never assigned (e.g., the Catch didn't fire for that path), referencing it directly throws `States.QueryEvaluationError`. See `transforming-data.md` for defensive JSONata examples. + +--- + +## Error Handling in Parallel States + +If any branch fails, the entire Parallel state fails. Use `States.BranchFailed` in Retry/Catch at the Parallel state level. + +--- + +## Error Handling in Map States + +Individual iteration failures can be tolerated with `ToleratedFailurePercentage` or `ToleratedFailureCount`. If the threshold is exceeded, the Map state throws `States.ExceedToleratedFailureThreshold`. + +--- + +## Retry and Catch with User-Friendly Error + +Retries transient errors with backoff, then catches all errors into a variable and transitions to a Fail state with a descriptive Cause. Guard variable references with `$exists()` in case the Catch path wasn't taken. + +```json +"ChargePayment": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:ChargeCard:$LATEST", + "Payload": "{% $states.input %}" + }, + "Retry": [ + { + "ErrorEquals": ["ThrottlingException", "ServiceUnavailable"], + "IntervalSeconds": 2, + "MaxAttempts": 3, + "BackoffRate": 2.0, + "JitterStrategy": "FULL" + }, + { + "ErrorEquals": ["States.QueryEvaluationError"], + "MaxAttempts": 0 + } + ], + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Assign": { + "error": "{% $states.errorOutput %}" + }, + "Next": "PaymentFailed" + } + ], + "Output": "{% $states.result.Payload %}", + "Next": "ConfirmOrder" +}, +"PaymentFailed": { + "Type": "Fail", + "Error": "PaymentError", + "Cause": "{% 'Payment failed for order ' & ($exists($orderId) ? $orderId : 'unknown') & ': ' & ($exists($error.Error) ? $error.Error : 'Unknown') & ' - ' & ($exists($error.Cause) ? $error.Cause : 'No details') & '. Timestamp: ' & $now() %}" +} +``` diff --git a/aws-step-functions/steering/migrating-from-jsonpath-to-jsonata.md b/aws-step-functions/steering/migrating-from-jsonpath-to-jsonata.md new file mode 100644 index 0000000..9550cde --- /dev/null +++ b/aws-step-functions/steering/migrating-from-jsonpath-to-jsonata.md @@ -0,0 +1,301 @@ +# Migrating from JSONPath to JSONata + +Complete conversion guide for migrating existing JSONPath state machines to JSONata. Covers fields, states, intrinsic functions, common pitfalls, and the end-to-end conversion workflow. + +## JSONPath → JSONata Quick Reference + +| JSONPath | JSONata | +|---|---| +| `InputPath` | Not needed — use `$states.input` directly in `Arguments` | +| `Parameters` | `Arguments` | +| `ResultSelector` | `Output` (reference `$states.result`) | +| `ResultPath` | `Assign` (preferred) or `Output` | +| `OutputPath` | `Output` (return only what you need) | +| `TimeoutSecondsPath` | `TimeoutSeconds` with `{% %}` | +| `HeartbeatSecondsPath` | `HeartbeatSeconds` with `{% %}` | +| `ItemsPath` | `Items` with `{% %}` | +| `"key.$": "$.field"` | `"key": "{% $states.input.field %}"` | +| `$` or `$.field` (state input) | `$states.input` or `$states.input.field` | +| `$$` (context object) | `$states.context` | +| `$$.Execution.Input` | `$states.context.Execution.Input` | +| `$$.Task.Token` | `$states.context.Task.Token` | +| `$$.Map.Item.Value` | `$states.context.Map.Item.Value` | +| `$variable` (workflow var) | `$variable` (unchanged) | + + +--- + +## Converting Each State Type + +### Task State + +**Before (JSONPath):** +```json +"ProcessOrder": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "InputPath": "$.order", + "Parameters": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:Process:$LATEST", + "Payload": { "id.$": "$.orderId", "customer.$": "$.customerName" } + }, + "ResultSelector": { "processedId.$": "$.Payload.id", "status.$": "$.Payload.status" }, + "ResultPath": "$.processingResult", + "OutputPath": "$.processingResult", + "Next": "Ship" +} +``` + +**After (JSONata):** +```json +"ProcessOrder": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:Process:$LATEST", + "Payload": { "id": "{% $states.input.order.orderId %}", "customer": "{% $states.input.order.customerName %}" } + }, + "Output": { "processedId": "{% $states.result.Payload.id %}", "status": "{% $states.result.Payload.status %}" }, + "Next": "Ship" +} +``` + +### Pass State + +**Before (JSONPath):** +```json +"InjectDefaults": { + "Type": "Pass", + "Result": { "region": "us-east-1" }, + "ResultPath": "$.config", + "Next": "Go" +} +``` + +**After (JSONata):** +```json +"InjectDefaults": { + "Type": "Pass", + "Assign": { "region": "us-east-1" }, + "Next": "Go" +} +``` + +### Choice State + +JSONPath uses `Variable` + typed operators. JSONata uses a single `Condition` expression. + +**Before (JSONPath):** +```json +"Choices": [ + { "Variable": "$.status", "StringEquals": "approved", "Next": "Approved" }, + { "And": [ + { "Variable": "$.priority", "StringEquals": "high" }, + { "Variable": "$.age", "NumericLessThanEquals": 30 } + ], "Next": "FastTrack" }, + { "Not": { "Variable": "$.email", "IsPresent": true }, "Next": "RequestEmail" } +] +``` + +**After (JSONata):** +```json +"Choices": [ + { "Condition": "{% $states.input.status = 'approved' %}", "Next": "Approved" }, + { "Condition": "{% $states.input.priority = 'high' and $states.input.age <= 30 %}", "Next": "FastTrack" }, + { "Condition": "{% $not($exists($states.input.email)) %}", "Next": "RequestEmail" } +] +``` + +#### Choice Operator Mapping + +| JSONPath Operator | JSONata | +|---|---| +| `StringEquals` / `StringEqualsPath` | `= 'value'` / `= $states.input.other` | +| `NumericGreaterThan` / `NumericLessThanEquals` | `> value` / `<= value` | +| `BooleanEquals` | `= true` / `= false` | +| `TimestampGreaterThan` | `$toMillis(field) > $toMillis('ISO-timestamp')` | +| `IsPresent: true` / `false` | `$exists(field)` / `$not($exists(field))` | +| `IsNull: true` | `field = null` | +| `IsNumeric` / `IsString` / `IsBoolean` | `$type(field) = 'number'` / `'string'` / `'boolean'` | +| `StringMatches` (wildcards) | `$contains(field, /regex/)` | +| `And` / `Or` / `Not` | `and` / `or` / `$not()` | + +### Wait State + +**Before (JSONPath):** +```json +{ "Type": "Wait", "TimestampPath": "$.deliveryDate", "Next": "Check" } +``` + +**After (JSONata):** +```json +{ "Type": "Wait", "Timestamp": "{% $states.input.deliveryDate %}", "Next": "Check" } +``` + +### Map State + +| JSONPath | JSONata | +|---|---| +| `ItemsPath` | `Items` (fold `InputPath` into expression) | +| `Parameters` (with `$$.Map.*`) | `ItemSelector` (with `$states.context.Map.*`) | +| `Iterator` | `ItemProcessor` (add `ProcessorConfig`) | +| `ResultSelector` inside iterator | `Output` inside processor states | +| `ResultPath` on Map | `Assign` | + +**After (JSONata):** +```json +"ProcessItems": { + "Type": "Map", + "Items": "{% $states.input.orderData.items %}", + "ItemSelector": { + "item": "{% $states.context.Map.Item.Value %}", + "index": "{% $states.context.Map.Item.Index %}" + }, + "MaxConcurrency": 5, + "ItemProcessor": { + "ProcessorConfig": { "Mode": "INLINE" }, + "StartAt": "Process", + "States": { + "Process": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:Process:$LATEST", "Payload": "{% $states.input %}" }, + "Output": "{% $states.result.Payload %}", + "End": true + } + } + }, + "Assign": { "processedItems": "{% $states.result %}" }, + "Next": "Done" +} +``` + +--- + +## Converting Intrinsic Functions + +| JSONPath Intrinsic | JSONata Equivalent | +|---|---| +| `States.Format('Order {}', $.id)` | `'Order ' & $states.input.id` | +| `States.StringToJson($.str)` | `$parse($states.input.str)` | +| `States.JsonToString($.obj)` | `$string($states.input.obj)` | +| `States.StringSplit($.str, ',')` | `$split($states.input.str, ',')` | +| `States.Array($.a, $.b)` | `[$states.input.a, $states.input.b]` | +| `States.ArrayPartition($.arr, 2)` | `$partition($states.input.arr, 2)` | +| `States.ArrayContains($.arr, $.v)` | `$states.input.v in $states.input.arr` | +| `States.ArrayRange(0, 10, 2)` | `$range(0, 10, 2)` | +| `States.ArrayGetItem($.arr, 0)` | `$states.input.arr[0]` | +| `States.ArrayLength($.arr)` | `$count($states.input.arr)` | +| `States.ArrayUnique($.arr)` | `$distinct($states.input.arr)` | +| `States.Base64Encode($.str)` | `$base64encode($states.input.str)` | +| `States.Base64Decode($.str)` | `$base64decode($states.input.str)` | +| `States.Hash($.data, 'SHA-256')` | `$hash($states.input.data, 'SHA-256')` | +| `States.JsonMerge($.a, $.b)` | `$merge([$states.input.a, $states.input.b])` | +| `States.MathRandom()` | `$random()` | +| `States.MathAdd($.a, $.b)` | `$states.input.a + $states.input.b` | +| `States.UUID()` | `$uuid()` | + +--- + +## Converting Catch Blocks + +JSONPath Catch uses `ResultPath`. JSONata Catch uses `Assign` and `Output` with `$states.errorOutput`. + +**Before (JSONPath):** +```json +"Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.error", "Next": "HandleError" }] +``` + +**After (JSONata):** +```json +"Catch": [{ + "ErrorEquals": ["States.ALL"], + "Assign": { "errorInfo": "{% $states.errorOutput %}" }, + "Output": "{% $states.input %}", + "Next": "HandleError" +}] +``` + +Retry syntax is identical between JSONPath and JSONata — no conversion needed. + +--- + +## Conversion Pitfalls and How to Avoid Them + +### 1. Do not mix JSONPath and JSONata fields in the same state +Invalid combinations: `Arguments` + `InputPath`, `Output` + `ResultSelector`, `Condition` + `Variable`. Remove all JSONPath fields from converted states. + +### 2. You must remove `.$` suffixes +```json +❌ "orderId.$": "{% $states.input.orderId %}" +✓ "orderId": "{% $states.input.orderId %}" +``` + +### 3. Use `$states` instead of `$` or `$$`. +```json +❌ "{% $.orderId %}" ❌ "{% $$.Task.Token %}" +✓ "{% $states.input.orderId %}" ✓ "{% $states.context.Task.Token %}" +``` +Note: `$` is valid inside nested filter expressions (e.g., `$states.input.items[$.price > 10]`). + +### 4. Do not use double quotes inside JSONata expressions +```json +❌ "{% $states.input.status = "active" %}" +✓ "{% $states.input.status = 'active' %}" +``` + +### 5. Do not attempt to access the output of `Assign` or `Output` in the same state where they are assigned. +`Assign` and `Output` evaluate in parallel — new variable values are not available until the next state. +```json +❌ "Assign": { "total": "{% $states.result.Payload.total %}" }, + "Output": { "total": "{% $total %}" } +✓ "Assign": { "total": "{% $states.result.Payload.total %}" }, + "Output": { "total": "{% $states.result.Payload.total %}" } +``` + +### 6. Use defensive coding to prevent undefined errors in JSONata +JSONPath silently returns null. JSONata throws `States.QueryEvaluationError`: +```json +❌ "{% $states.input.customer.middleName %}" +✓ "{% $exists($states.input.customer.middleName) ? $states.input.customer.middleName : '' %}" +``` + +### 7. Use defensive coding to prevent invalid filter results +JSONata returns a single object (not a 1-element array) when exactly one item matches a filter, and undefined when nothing matches. Both break Map state `Items` and functions like `$count`: +```json +❌ "Items": "{% $states.input.orders[status = 'pending'] %}" +✓ "Items": "{% ( $f := $states.input.orders[status = 'pending']; $type($f) = 'array' ? $f : $exists($f) ? [$f] : [] ) %}" +``` + +### 8. Iterator → ItemProcessor rename +`Iterator` was renamed to `ItemProcessor` and requires `ProcessorConfig`: +```json +❌ "Iterator": { "StartAt": "...", "States": {...} } +✓ "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "...", "States": {...} } +``` + +--- + +## Conversion Workflow + +For each state being converted, apply these steps in order: + +1. Add `"QueryLanguage": "JSONata"` to the state +2. `Parameters` → `Arguments`: remove `.$` suffixes from all keys, wrap values in `{% %}`, replace `$` with `$states.input` and `$$` with `$states.context` +3. Convert `ResultPath` based on its value: + - Absent or `"$"` → no action needed (default behavior is replaced by `Output`) + - `null` → add `"Output": "{% $states.input %}"` + - `"$.field"` → add `"Assign": { "field": "{% $states.result %}" }` and `"Output": "{% $states.input %}"` +4. `ResultSelector` → fold selection logic into `Output` (reference `$states.result`) +5. `OutputPath` → fold into `Output` (return only what you need) +6. Reminder: If the state has `ResultSelector` + `ResultPath` + `OutputPath`, collapse all three into a single `Output` field +7. Remove all five JSONPath I/O fields: `InputPath`, `Parameters`, `ResultSelector`, `ResultPath`, `OutputPath` +8. Convert `*Path` fields to base field + `{% %}` expression (`TimeoutSecondsPath` → `TimeoutSeconds`, `HeartbeatSecondsPath` → `HeartbeatSeconds`, `ItemsPath` → `Items`) +9. Replace `States.*` intrinsic functions with JSONata equivalents (see Converting Intrinsic Functions table) +10. Choice states: replace `Variable` + comparison operators with a single `Condition` expression +11. Map states: `Iterator` → `ItemProcessor` with `ProcessorConfig`, `ItemsPath` → `Items`, `Parameters` with `$$.Map.*` → `ItemSelector` with `$states.context.Map.*` +12. Catch blocks: replace `ResultPath` with `Assign` + `Output` using `$states.errorOutput` +13. Pass states: replace `Result` with `Output` or `Assign` +14. Where multiple consecutive states used `ResultPath` to thread data through the payload, refactor to use `Assign` variables instead — downstream states reference `$variableName` directly +15. Validate the converted state using the TestState API +16. Repeat for all states, then promote `"QueryLanguage": "JSONata"` to the top level and remove per-state declarations \ No newline at end of file diff --git a/aws-step-functions/steering/processing-state-inputs-and-outputs.md b/aws-step-functions/steering/processing-state-inputs-and-outputs.md new file mode 100644 index 0000000..510f134 --- /dev/null +++ b/aws-step-functions/steering/processing-state-inputs-and-outputs.md @@ -0,0 +1,260 @@ +# Processing State Inputs and Outputs + +## State Fields Quick Reference + +| Field | Purpose | Available In | +|-------|---------|-------------| +| `Type` | State type identifier | Task, Parallel, Map, Pass, Wait, Choice, Succeed, Fail | +| `Comment` | Human-readable description | Task, Parallel, Map, Pass, Wait, Choice, Succeed, Fail | +| `Output` | Transform state output | Task, Parallel, Map, Pass, Wait, Choice, Succeed | +| `Assign` | Store workflow variables | Task, Parallel, Map, Pass, Wait, Choice | +| `Next` / `End` | Transition control | Task, Parallel, Map, Pass, Wait | +| `Arguments` | Input to task/branches | Task, Parallel | +| `Retry` & `Catch` | Error handling | Task, Parallel, Map | +| `Items` | a JSON array, a JSON object, or a JSONata expression | Map | +| `ItemSelector` | Reshape each item before processing | Map | +| `Condition` | Boolean branching | Choice (inside rules) | +| `Error` & `Cause` | Error name and description (accept JSONata) | Fail | + +## The `$states` Reserved Variable + +Step Functions provides a reserved `$states` variable in every JSONata state: + +``` +$states = { + "input": // Original input to the state + "result": // Task/Parallel/Map result (if successful) + "errorOutput": // Error Output (only available in Catch) + "context": // Context object (execution metadata) +} +``` + +### Context Object Fields + +| Path | Type | Description | +|------|------|-------------| +| `$states.context.Execution.Id` | String | Execution ARN | +| `$states.context.Execution.Input` | Object | Original workflow input | +| `$states.context.Execution.Name` | String | Execution name | +| `$states.context.Execution.RoleArn` | String | IAM execution role | +| `$states.context.Execution.StartTime` | String (ISO 8601) | When execution started | +| `$states.context.Execution.RedriveCount` | Number | Number of times execution was redriven | +| `$states.context.Execution.RedriveTime` | String (ISO 8601) | When execution was last redriven | +| `$states.context.State.EnteredTime` | String (ISO 8601) | When current state was entered | +| `$states.context.State.Name` | String | Current state name | +| `$states.context.State.RetryCount` | Number | Number of retries attempted | +| `$states.context.StateMachine.Id` | String | State machine ARN | +| `$states.context.StateMachine.Name` | String | State machine name | +| `$states.context.Task.Token` | String | Task token (only in `.waitForTaskToken` states) | +| `$states.context.Map.Item.Index` | Number | Index number for the array item that is being currently processed | +| `$states.context.Map.Item.Value` | any | Current item being processed | +| `$states.context.Map.Item.Key` | String | Property name when iterating over a JSON object (not valid for arrays) | +| `$states.context.Map.Item.Source` | String | Item source: `STATE_DATA` for state input, `S3://bucket-name` for S3 LIST_OBJECTS_V2 with NONE transformation, or `S3://bucket-name/object-key` for all other S3 input types | + +--- + +## Workflow Variables with `Assign` + +Variables let you store data in one state and reference it in any subsequent state without threading data through Output/Input chains. + +### Declaring Variables + +```json +"StoreData": { + "Type": "Pass", + "Assign": { + "productName": "product1", + "count": 42, + "available": true, + "config": "{% $states.input.configuration %}" + }, + "Next": "UseData" +} +``` + +### Referencing Variables + +Prepend the variable name with `$`: + +```json +"Arguments": { + "product": "{% $productName %}", + "quantity": "{% $count %}" +} +``` + +### Assigning from Task Results + +```json +"FetchPrice": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:GetPrice:$LATEST", + "Payload": { + "product": "{% $states.input.product %}" + } + }, + "Assign": { + "currentPrice": "{% $states.result.Payload.price %}" + }, + "Output": "{% $states.result.Payload %}", + "Next": "CheckPrice" +} +``` + +### Assign in Choice Rules and Catch + +Choice Rules and Catch blocks can each have their own `Assign`: + +```json +"CheckValue": { + "Type": "Choice", + "Choices": [ + { + "Condition": "{% $states.input.value > 100 %}", + "Assign": { + "tier": "premium" + }, + "Next": "PremiumPath" + } + ], + "Default": "StandardPath", + "Assign": { + "tier": "standard" + } +} +``` + +If a Choice Rule matches, its `Assign` is used. If no rule matches, the state-level `Assign` is used. + +--- + +## Variable Evaluation Order + +All expressions in `Assign` and `Output` are evaluated in parallel using variable values as they were on state entry. New values only take effect in the next state. + +```json +"SwapExample": { + "Type": "Pass", + "Assign": { + "x": "{% $y %}", + "y": "{% $x %}" + }, + "Next": "AfterSwap" +} +``` + +If `$x = 3` and `$y = 6` on entry, after this state: `$x = 6`, `$y = 3`. This works because all expressions are evaluated first, then assignments are made. + +You cannot assign to a sub-path of a variable: +- Valid: `"Assign": {"x": 42}` +- Invalid: `"Assign": {"x.y": 42}` or `"Assign": {"x[2]": 42}` + +--- + +## Variable Scope + +Variables exist in a state-machine-local scope: + +- **Outer scope**: All states in the top-level `States` field. +- **Inner scope**: States inside a Parallel branch or Map iteration. + +### Scope Rules + +1. Inner scopes can READ variables from outer scopes. +2. Inner scopes CANNOT ASSIGN to variables that exist in an outer scope. +3. Variable names must be unique across outer and inner scopes (no shadowing). +4. Variables in different Parallel branches or Map iterations are isolated from each other. +5. When a Parallel branch or Map iteration completes, its variables go out of scope. +6. Exception: Distributed Map states cannot reference variables in outer scopes. + +### Passing Data Out of Inner Scopes + +Use `Output` on terminal states within branches/iterations to return data to the outer scope: + +```json +"ParallelWork": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "BranchA", + "States": { + "BranchA": { + "Type": "Task", + "Resource": "...", + "Output": "{% $states.result.Payload %}", + "End": true + } + } + } + ], + "Assign": { + "branchAResult": "{% $states.result[0] %}" + }, + "Next": "Continue" +} +``` + +### Catch Assign and Outer Scope + +A Catch block in a Parallel or Map state can assign values to variables in the outer scope (the scope where the Parallel/Map state exists): + +```json +"Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Assign": { + "errorOccurred": true, + "errorDetails": "{% $states.errorOutput %}" + }, + "Next": "HandleError" + } +] +``` + +--- + +## Arguments and Output Fields + +### Arguments + +Provides input to Task and Parallel states: + +```json +"Arguments": { + "staticField": "hello", + "dynamicField": "{% $states.input.name %}", + "computed": "{% $count($states.input.items) %}" +} +``` + +### Output + +Transforms the state output: + +```json +"Output": { + "customerId": "{% $states.input.id %}", + "result": "{% $states.result.Payload %}", + "processedAt": "{% $now() %}" +} +``` + +If `Output` is not provided: +- Task, Parallel, Map: state output = the result +- All other states: state output = the state input + +--- + +## Variable Limits + +| Limit | Value | +|-------|-------| +| Max size of a single variable | 256 KiB | +| Max combined size in a single Assign | 256 KiB | +| Max total stored variables per execution | 10 MiB | +| Max variable name length | 80 Unicode characters | + +--- + diff --git a/aws-step-functions/steering/service-integrations.md b/aws-step-functions/steering/service-integrations.md new file mode 100644 index 0000000..3b7b4a8 --- /dev/null +++ b/aws-step-functions/steering/service-integrations.md @@ -0,0 +1,207 @@ +# Service Integrations in JSONata Mode + +## Integration Types + +Step Functions can integrate with AWS services in three patterns: + +1. **Optimized integrations** — Purpose-built, recommended where available +2. **AWS SDK integrations** — Call any AWS SDK API action directly +3. **HTTP Task** — Call HTTPS APIs (e.g., Stripe, Salesforce) + +## Integration Patterns + +| Pattern | Resource ARN | Behavior | When to Use | +|---------|-------------|----------|-------------| +| Optimized | `arn:aws:states:::servicename:apiAction` | Call API and continue immediately | Fire-and-forget operations (start a process, send a message) | +| Optimized (sync) | `arn:aws:states:::servicename:apiAction.sync` | Wait for the job to complete | When you need the result before continuing (run ECS task, execute child workflow, run Glue job) | +| Optimized (callback) | `arn:aws:states:::servicename:apiAction.waitForTaskToken` | Pause until a task token is returned | Human approval, external system processing, long-running async operations | +| AWS SDK | `arn:aws:states:::aws-sdk:serviceName:apiAction` | Call any AWS SDK API action directly | When no optimized integration exists for the service | +| HTTP Task | `arn:aws:states:::http:invoke` | Call an HTTPS API endpoint | External APIs (e.g., Stripe, Salesforce) | + +--- + +## Examples + +### Lambda Function + +#### Optimized Integration (Recommended) +Always review the AWS Documentation to check availability and proper usage of an optimized integration before using it: https://docs.aws.amazon.com/step-functions/latest/dg/integrate-optimized.html + +```json +"InvokeFunction": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:MyFunction:$LATEST", + "Payload": { + "orderId": "{% $states.input.orderId %}", + "customer": "{% $states.input.customer %}" + } + }, + "Output": "{% $states.result.Payload %}", + "Next": "NextState" +} +``` + +Always include a version qualifier (`:$LATEST`, `:1`, or an alias like `:prod`) on the function ARN. + +The result is wrapped in a `Payload` field, so use `$states.result.Payload` to access the Lambda return value. + +#### SDK Integration + +```json +"InvokeViaSDK": { + "Type": "Task", + "Resource": "arn:aws:states:::aws-sdk:lambda:invoke", + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:MyFunction", + "Payload": "{% $string($states.input) %}" + }, + "Next": "NextState" +} +``` + +--- + +### DynamoDB + +#### GetItem + +```json +"GetUser": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Arguments": { + "TableName": "UsersTable", + "Key": { + "userId": { + "S": "{% $states.input.userId %}" + } + } + }, + "Assign": { + "user": "{% $states.result.Item %}" + }, + "Output": "{% $states.result.Item %}", + "Next": "ProcessUser" +} +``` + +--- + +### SQS (Simple Queue Service) + +#### Send Message + +```json +"QueueMessage": { + "Type": "Task", + "Resource": "arn:aws:states:::sqs:sendMessage", + "Arguments": { + "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/ProcessingQueue", + "MessageBody": "{% $string($states.input) %}" + }, + "Next": "Done" +} +``` + +#### Send Message with Wait for Task Token + +```json +"WaitForApproval": { + "Type": "Task", + "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", + "Arguments": { + "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/ApprovalQueue", + "MessageBody": "{% $string({'taskToken': $states.context.Task.Token, 'orderId': $orderId, 'amount': $states.input.amount}) %}" + }, + "TimeoutSeconds": 86400, + "Next": "ProcessApproval" +} +``` +The execution pauses until an external system calls `SendTaskSuccess` or `SendTaskFailure` with the task token. + +--- + +### Step Functions (Nested Execution) + +#### Start Execution (Synchronous) + +```json +"RunSubWorkflow": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Arguments": { + "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:ChildWorkflow", + "Input": "{% $states.input %}" + }, + "Output": "{% $parse($states.result.Output) %}", + "Next": "ProcessSubResult" +} +``` + +Note: The `.sync:2` suffix waits for completion. The child output is a JSON string in `$states.result.Output`, so use `$parse()` to deserialize it. + +#### Start Execution (Async — Fire and Forget) + +```json +"StartAsync": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution", + "Arguments": { + "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:AsyncWorkflow", + "Input": "{% $string($states.input) %}" + }, + "Next": "Continue" +} +``` + +--- + +### Cross-Account Access + +Use the `Credentials` field to assume a role in another account: + +```json +"CrossAccountCall": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Credentials": { + "RoleArn": "arn:aws:iam::111122223333:role/CrossAccountRole" + }, + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:111122223333:function:RemoteFunction:$LATEST", + "Payload": "{% $states.input %}" + }, + "Output": "{% $states.result.Payload %}", + "Next": "Done" +} +``` + +--- + +### Callback Pattern + +```json +"WaitForHumanApproval": { + "Type": "Task", + "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", + "Arguments": { + "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/ApprovalQueue", + "MessageBody": "{% $string({'taskToken': $states.context.Task.Token, 'request': $states.input}) %}" + }, + "TimeoutSeconds": 604800, + "Catch": [ + { + "ErrorEquals": ["States.Timeout"], + "Output": { + "status": "approval_timeout" + }, + "Next": "HandleTimeout" + } + ], + "Next": "ApprovalReceived" +} +``` + +The external system must call `SendTaskSuccess` or `SendTaskFailure` with the task token to resume execution. diff --git a/aws-step-functions/steering/transforming-data.md b/aws-step-functions/steering/transforming-data.md new file mode 100644 index 0000000..743cc27 --- /dev/null +++ b/aws-step-functions/steering/transforming-data.md @@ -0,0 +1,208 @@ +# Data Transformation Patterns + +## JSONata Expression Syntax + +JSONata expressions are written inside `{% %}` delimiters in string values: + +```json +"Output": "{% $states.input.customer.name %}" +"TimeoutSeconds": "{% $timeout %}" +"Condition": "{% $states.input.age >= 18 %}" +``` + +Rules: +- The string must start with `{%` (no leading spaces) and end with `%}` (no trailing spaces). +- Not all fields accept JSONata — `Type` and `Resource` must be constant strings. +- JSONata expressions can appear in string values within objects and arrays at any nesting depth. +- A string without `{% %}` is treated as a literal value. +- All string literals inside JSONata expressions must use single quotes (`'text'`), not double quotes. The expression is already inside a JSON double-quoted string, so double quotes would break the JSON. +- Complex logic is wrapped in `( expr1; expr2; ...; finalExpr )` where semicolons separate sequential expressions and the last expression is the return value. +- You cannot use `$` or `$$` at the top level. +- You cannot use unqualified field names at top level. +- Use `$parse()` instead `$eval` for deserializing JSON strings. +- Expressions must produce a defined value because JSON cannot represent undefined. + + +### String Quoting + +```json +"Output": "{% 'Hello ' & $states.input.name %}" +"Condition": "{% $states.input.status = 'active' %}" +``` + +Never use double quotes inside the expression: +``` +❌ "Output": "{% "Hello" %}" +✓ "Output": "{% 'Hello' %}" +``` + +### Local Variable Binding with `:=` + +Use `:=` inside `( ... )` blocks to bind intermediate values within a single JSONata expression. Semicolons separate each binding, and the last expression is the return value: + +```json +"Output": "{% ( $subtotal := $sum($states.input.items.price); $tax := $subtotal * 0.1; $discount := $exists($couponValue) ? $couponValue : 0; {'subtotal': $subtotal, 'tax': $tax, 'discount': $discount, 'total': $subtotal + $tax - $discount} ) %}" +``` + +You can also define local helper functions: + +```json +"Assign": { + "summary": "{% ( $formatPrice := function($amt) { '$' & $formatNumber($amt, '#,##0.00') }; $subtotal := $sum($states.input.items.price); {'itemCount': $count($states.input.items), 'subtotal': $formatPrice($subtotal), 'total': $formatPrice($subtotal * 1.1)} ) %}" +} +``` + +Local variables bound with `:=` exist only within the `( ... )` block. They do not affect state machine variables. To persist values across states, use the `Assign` field. + +### Filtering Arrays + +```json +"Output": { + "expensiveItems": "{% $states.input.items[price > 100] %}" +} +``` + +### Aggregation + +```json +"Output": { + "total": "{% $sum($states.input.items.price) %}", + "average": "{% $average($states.input.items.price) %}", + "count": "{% $count($states.input.items) %}" +} +``` + +### String Operations + +```json +"Output": { + "fullName": "{% $states.input.firstName & ' ' & $states.input.lastName %}", + "upper": "{% $uppercase($states.input.name) %}", + "trimmed": "{% $trim($states.input.rawInput) %}" +} +``` + +### Object Merging + +```json +"Output": "{% $merge([$states.input, {'processedAt': $now(), 'status': 'complete'}]) %}" +``` + +### Building Lookup Maps with `$reduce` + +Use `$reduce` to transform an array into a key-value object: + +```json +"Assign": { + "priceByProduct": "{% $reduce($states.input.items, function($acc, $item) { $merge([$acc, {$item.productId: $item.price}]) }, {}) %}" +} +``` + +Given `[{"productId": "A1", "price": 10}, {"productId": "B2", "price": 25}]`, this produces `{"A1": 10, "B2": 25}`. + +### Dynamic Key Access with `$lookup` + +Use `$lookup` to access an object property by a variable key: + +```json +"Output": { + "price": "{% $lookup($priceByProduct, $states.input.productId) %}" +} +``` + +This is essential when you've built a mapping object with `$reduce` and need to retrieve values dynamically. Standard dot notation (`$priceByProduct.someKey`) only works with literal key names. + +### Conditional Values + +```json +"Output": { + "tier": "{% $states.input.total > 1000 ? 'gold' : 'standard' %}", + "discount": "{% $exists($states.input.coupon) ? 0.1 : 0 %}" +} +``` + +### Array Membership with `in` and Concatenation with `$append` + +Test if a value exists in an array with `in`: + +```json +"Condition": "{% $states.input.status in ['pending', 'processing', 'shipped'] %}" +``` + +Concatenate arrays with `$append`: + +```json +"Assign": { + "allIds": "{% $append($states.input.orderIds, $states.input.returnIds) %}" +} +``` + +### Array Mapping + +```json +"Output": { + "names": "{% $states.input.users.(firstName & ' ' & lastName) %}" +} +``` + +### Generating UUIDs and Random Values + +```json +"Assign": { + "requestId": "{% $uuid() %}", + "randomValue": "{% $random() %}" +} +``` + +### Partitioning Arrays + +```json +"Assign": { + "batches": "{% $partition($states.input.items, 10) %}" +} +``` + +### Parsing JSON Strings + +```json +"Assign": { + "parsed": "{% $parse($states.input.jsonString) %}" +} +``` + +### Hashing + +```json +"Assign": { + "hash": "{% $hash($states.input.content, 'SHA-256') %}" +} +``` + +### Timestamp Comparison with `$toMillis` + +JSONata timestamps are strings, so you can't compare them directly with `<` or `>`. Use `$toMillis` to convert to numeric milliseconds: + +```json +"Condition": "{% $toMillis($states.input.orderDate) > $toMillis($states.input.cutoffDate) %}" +``` + +Useful for sorting timestamps, calculating durations, or finding the most recent entry: + +```json +"Assign": { + "ageMinutes": "{% $round(($toMillis($now()) - $toMillis($states.input.createdAt)) / 60000, 2) %}", + "mostRecent": "{% $sort($states.input.timestamps, function($a, $b) { $toMillis($a) < $toMillis($b) })[0] %}" +} +``` + +## Step Functions Built-in functions + +| Function | Purpose | +|----------|---------| +| `$partition(array, size)` | Partition array into chunks | +| `$range(start, end, step)` | Generate array of values | +| `$hash(data, algorithm)` | Calculate hash (MD5, SHA-1, SHA-256, SHA-384, SHA-512) | +| `$random([seed])` | Random number 0 ≤ n < 1, optional seed | +| `$uuid()` | Generate v4 UUID | +| `$parse(jsonString)` | Deserialize JSON string | + diff --git a/aws-step-functions/steering/validation-and-testing.md b/aws-step-functions/steering/validation-and-testing.md new file mode 100644 index 0000000..be4a054 --- /dev/null +++ b/aws-step-functions/steering/validation-and-testing.md @@ -0,0 +1,286 @@ +# Validation and Testing + +## Local ASL Validation + +Files saved with the `.asl.json` extension get automatic validation from the AWS Toolkit Extension. If the extension is not installed, suggest the user install it (https://open-vsx.org/extension/amazonwebservices/aws-toolkit-vscode). Use your diagnostics tool on any `.asl.json` file to catch structural errors instantly. The State Machine definition must be saved as `.asl.json` to work with local validation. + +--- + +## Testing with TestState API + +The TestState API enables unit and integration testing of Step Functions without deployment. Key capabilities: + +- **Mock service integrations** — Test without invoking real services +- **Advanced states** — Map, Parallel, Activity, `.sync`, `.waitForTaskToken` (require mocks) +- **Control execution** — Simulate retries, Map iterations, error scenarios +- **Chain tests** — Use output→input to test execution paths +- **Optional IAM** — When mocking, `roleArn` optional + +### Before Accessing AWS + +Before calling the TestState API, follow this sequence: + +1. Ask the user to grant you permission to use the TestState API in their AWS account. +2. Check for AWS credentials: run `aws sts get-caller-identity` and verify the response. +3. If credentials are available, confirm the IAM role ARN to use for execution (or omit if using mocks). +4. If credentials are unavailable, help the user construct the CLI/SDK call to run manually. +5. Never assume AWS access — always ask before making any AWS API call. + +### Required IAM Permissions + +The calling identity needs `states:TestState`. If not using mocks, it also needs `iam:PassRole` for the execution role. For HTTP Task with `revealSecrets`, add `states:RevealSecrets`. + +```bash +aws stepfunctions test-state \ + --definition '{"Type":"Task","Resource":"arn:aws:states:::lambda:invoke","Arguments":{...},"End":true}' \ + --input '{"data":"value"}' \ + --mock '{"result":"{\"StatusCode\":200,\"Payload\":{\"body\":\"success\"}}"}' \ + --inspection-level DEBUG +``` + +## Inspection Levels + +| Level | Returns | Use Case | +| --------- | ------------------------------------------------------------------------------- | ------------------- | +| **INFO** | `output`, `status`, `nextState` | Quick validation | +| **DEBUG** | + `afterArguments`, `result`, `variables` | Data flow debugging | +| **TRACE** | + HTTP `request`/`response` (use `--reveal-secrets` for auth) | HTTP Task debugging | + +## Critical: Service-Specific Mock Structure + +**⚠️ Mocks MUST match AWS service API response schema exactly** — field names (case-sensitive), types, required fields. + +### Finding Mock Structure + +1. Identify service from `Resource` ARN: `arn:aws:states:::lambda:invoke` → Lambda `Invoke` API +2. Consult AWS SDK docs for that API's Response Syntax +3. Structure mock to match + +### Common Service Mocks + +| Service | API | Mock Structure | Example | +| --------------- | ---------------- | --------------------------------------- | ----------------------------------------------------------------------------- | +| Lambda | `Invoke` | `{StatusCode, Payload, FunctionError?}` | `'{"result":"{\"StatusCode\":200,\"Payload\":{\"body\":\"ok\"}}\"}'` | +| DynamoDB | `PutItem` | `{Attributes?}` | `'{"result":"{\"Attributes\":{\"id\":{\"S\":\"123\"}}}"}'` | +| DynamoDB | `GetItem` | `{Item?}` | `'{"result":"{\"Item\":{\"id\":{\"S\":\"123\"}}}"}'` | +| SNS | `Publish` | `{MessageId}` | `'{"result":"{\"MessageId\":\"abc-123\"}"}'` | +| SQS | `SendMessage` | `{MessageId, MD5OfMessageBody}` | `'{"result":"{\"MessageId\":\"xyz\",\"MD5OfMessageBody\":\"...\"}"}'` | +| EventBridge | `PutEvents` | `{FailedEntryCount, Entries[]}` | `'{"result":"{\"FailedEntryCount\":0,\"Entries\":[{\"EventId\":\"123\"}]}"}'` | +| S3 | `PutObject` | `{ETag, VersionId?}` | `'{"result":"{\"ETag\":\"\\\"abc123\\\"\"}"}'` | +| Step Functions | `StartExecution` | `{ExecutionArn, StartDate}` | `'{"result":"{\"ExecutionArn\":\"arn:...\",\"StartDate\":\"...\"}"}'` | +| Secrets Manager | `GetSecretValue` | `{ARN, Name, SecretString?}` | `'{"result":"{\"Name\":\"MySecret\",\"SecretString\":\"...\"}"}'` | + +**For `.sync` patterns:** Mock the **polling API** (e.g., `startExecution.sync:2` → mock `DescribeExecution`, NOT `StartExecution`) + +### Mock Syntax + +**Success:** `--mock '{"result":""}'`\ +**Error:** `--mock '{"errorOutput":{"error":"ErrorCode","cause":"description"}}'`\ +**Validation:** `--mock '{"fieldValidationMode":"STRICT|PRESENT|NONE","result":"..."}'` + +**Validation modes:** + +- `STRICT` (default): All required fields, correct types — use in CI/CD +- `PRESENT`: Only validate fields present — flexible testing +- `NONE`: No validation — quick prototyping only + +## Testing Map States + +Tests Map's **input/output processing**, not iterations inside. Mock = entire Map output. + +```bash +aws stepfunctions test-state \ + --definition '{ + "Type":"Map", + "Items":"{% $states.input.items %}", + "ItemSelector":{"value":"{% $states.context.Map.Item.Value %}"}, + "ItemProcessor":{"ProcessorConfig":{"Mode":"INLINE"},...}, + "End":true + }' \ + --input '{"items":[1,2,3]}' \ + --mock '{"result":"[10,20,30]"}' \ + --inspection-level DEBUG +``` + +**DEBUG returns:** `afterItemSelector`, `afterItemBatcher`, `toleratedFailureCount`, `maxConcurrency` + +**Distributed Map:** Provide data in input (as if read from S3)\ +**Failure threshold testing:** Use `--state-configuration '{"mapIterationFailureCount":N}'`\ +**Testing state within Map:** `--state-name` auto-populates `$states.context.Map.Item.Index`, `$states.context.Map.Item.Value` + +## Testing Parallel States + +Mock = JSON array, one element per branch (in definition order): + +```bash +--mock '{"result":"[{\"branch1\":\"result1\"},{\"branch2\":\"result2\"}]"}' +``` + +## Testing Error Handling + +### Retry Logic + +```bash +--state-configuration '{"retrierRetryCount":1}' \ +--mock '{"errorOutput":{"error":"Lambda.ServiceException","cause":"..."}}' \ +--inspection-level DEBUG +``` + +Response includes: `status:"RETRIABLE"`, `retryBackoffIntervalSeconds`, `retryIndex` + +### Catch Handlers + +```bash +--mock '{"errorOutput":{"error":"Lambda.TooManyRequestsException","cause":"..."}}' \ +--inspection-level DEBUG +``` + +Response includes: `status:"CAUGHT_ERROR"`, `nextState`, `catchIndex`, error in `output` + +### Error Propagation in Map/Parallel + +```bash +--state-name "ChildState" \ +--state-configuration '{"errorCausedByState":"ChildState"}' \ +--mock '{"errorOutput":{"error":"States.TaskFailed","cause":"..."}}' +``` + +## Testing .sync and .waitForTaskToken + +**Required:** Must provide mock (validation exception otherwise) + +### .sync Patterns + +Mock the **polling API**, not initial call: + +```bash +# startExecution.sync:2 → mock DescribeExecution +--mock '{"result":"{\"Status\":\"SUCCEEDED\",\"Output\":\"{...}\"}"}' +``` + +Common patterns: `startExecution.sync:2`→`DescribeExecution`, `batch:submitJob.sync`→`DescribeJobs`, `glue:startJobRun.sync`→`GetJobRun` + +### .waitForTaskToken + +```bash +--context '{"Task":{"Token":"test-token-123"}}' \ +--mock '{"result":"{\"StatusCode\":200,\"Payload\":{\"status\":\"approved\"}}"}' +``` + +## Activity States + +Require mock: + +```bash +--definition '{"Type":"Task","Resource":"arn:aws:states:...:activity:MyActivity",...}' \ +--mock '{"result":"{\"result\":\"completed\"}"}' +``` + +## Chaining Tests (Integration Testing) + +```bash +RESULT_1=$(aws stepfunctions test-state --state-name "State1" ... | jq -r '.output') +NEXT_1=$(... | jq -r '.nextState') +RESULT_2=$(aws stepfunctions test-state --state-name "$NEXT_1" --input "$RESULT_1" ...) +``` + +Validates: data transformations, state transitions, end-to-end paths + +## Context Fields + +Test states referencing execution context: + +```bash +--context '{ + "Execution":{"Id":"arn:...","Name":"test-123","StartTime":"2024-01-01T10:00:00.000Z"}, + "State":{"Name":"ProcessData","EnteredTime":"2024-01-01T10:00:05.000Z"}, + "Task":{"Token":"test-token-abc123"} +}' +``` + +## HTTP Tasks (TRACE) + +```bash +--resource "arn:aws:states:::http:invoke" \ +--inspection-level TRACE \ +--reveal-secrets # Requires states:RevealSecrets permission +``` + +Returns: `inspectionData.request` (method, URL, headers, body), `inspectionData.response` (status, headers, body) + +## Troubleshooting + +| Error | Fix | +| ----------------------- | ---------------------------------------------- | +| Invalid field type | Check AWS SDK docs for correct types | +| Required field missing | Add field OR use `fieldValidationMode:PRESENT` | +| .sync validation failed | Mock polling API, not initial call | + +**Debug workflow:** + +1. Start `fieldValidationMode:NONE` for logic testing +2. Switch to `PRESENT` for partial validation +3. Use `STRICT` in CI/CD + +## Test Automation Pattern + +```bash +#!/bin/bash +test_state() { + local state_name=$1 + local input=$2 + local mock=$3 + + aws stepfunctions test-state \ + --definition "$(cat statemachine.asl.json)" \ + --state-name "$state_name" \ + --input "$input" \ + --mock "$mock" \ + --inspection-level DEBUG +} + +# Test chain +RESULT=$(test_state "State1" '{"id":"123"}' '{"result":"..."}' | jq -r '.output') +test_state "State2" "$RESULT" '{"result":"..."}' +``` + +## Best Practices + +1. **Always verify mock structure** against AWS SDK docs for the specific service +2. **For .sync, mock polling API** (DescribeX/GetX), not initial call +3. **Use STRICT validation in CI/CD** to catch mismatches early +4. **Test all error paths** with appropriate error codes +5. **Chain tests** to validate multi-state execution paths +6. **Start with NONE→PRESENT→STRICT** when developing mocks +7. **Use DEBUG for data flow**, TRACE for HTTP debugging +8. **Mock external dependencies** to isolate state machine logic +9. **Test Map failure thresholds** with `mapIterationFailureCount` +10. **Never commit `--reveal-secrets` output** to version control + +## Quick Reference + +```bash +# Basic test +aws stepfunctions test-state --definition '{...}' --input '{...}' --mock '{...}' + +# Test specific state in state machine +aws stepfunctions test-state --definition "$(cat sm.json)" --state-name "MyState" --input '{...}' --mock '{...}' + +# Test retry (2nd attempt) +--state-configuration '{"retrierRetryCount":1}' --mock '{"errorOutput":{...}}' + +# Test Map failure threshold +--state-configuration '{"mapIterationFailureCount":5}' --mock '{"errorOutput":{...}}' + +# Test with context +--context '{"Execution":{"Id":"..."}, "Task":{"Token":"..."}}' + +# HTTP Task with secrets +--inspection-level TRACE --reveal-secrets + +# Mock validation modes +--mock '{"fieldValidationMode":"STRICT|PRESENT|NONE","result":"..."}' +``` + +--- +