Apache Airflow is a popular workflow management platform. A wide range of AWS services are integrated with the platform by Amazon AWS Operators. AWS Lambda is one of the integrated services and it can be used to develop workflows efficiently. The current Lambda Operator, however, just invokes a Lambda function and it can fail to report the invocation result of a function correctly and to record the exact error message from failure. In this post, we’ll discuss a custom Lambda operator that handles those limitations.
Architecture
We’ll discuss a custom Lambda operator and it extends the Lambda operator provided by AWS. When a DAG creates a task that invokes a Lambda function, it updates the Lambda payload with a correlation ID that uniquely identifies the task. The correlation ID is added to every log message that the Lambda function generates. Finally the custom operator filters the associating CloudWatch log events, prints the log messages and raises a runtime error when an error message is found. In this setup, we are able to correctly identify the function invocation result and to point to the exact error message if it fails. The source of this post can be found in a GitHub repository.
Lambda Setup
Lambda Function
The Logger utility of the Lambda Powertools Python package is used to record log messages. The correlation ID is added to the event payload and it is set to be injected with log messages by the logger.inject_lambda_context decorator. Note the Lambda Context would be a better place to add a correlation ID as we can add a custom client context object. However it is not recognised when an invocation is made asynchronously and we have to add it to the event payload. We use another decorator (middleware_before_after) and it logs messages before and after the function invocation. The latter message that indicates the end of a function is important as we can rely on it in order to identify whether a function is completed without an error. If a function finishes with an error, the last log message won’t be recorded. Also we can check if a function fails by checking a log message where its level is ERROR and it is created by the logger.exception method. The Lambda event payload has two extra attributes – n for setting-up the number of iteration and to_fail for determining whether to raise an error.
# lambda/src/lambda_function.py |
SAM Template
The Serverless Application Model (SAM) framework is used to deploy the Lambda function. The Lambda Powertools Python package is added as a Lambda layer. The Lambda log group is configured so that messages are kept only for 1 day and it can help reduce time to filter log events. By default, a Lambda function is invoked twice more on error when it is invoked asynchronously – the default retry attempts equals to 2.. It is set to 0 as retry behaviour can be controlled by Airflow if necessary and it can make it easier to track function invocation status.
# lambda/template.yml Resources: |
Lambda Operator
Lambda Invoke Function Operator
Below shows the source of the Lambda invoke function operator. After invoking a Lambda function, the execute method checks if the response status code indicates success and whether FunctionError is found in the response payload. When an invocation is made synchronously (RequestResponse invocation type), it can identify whether the invocation is successful or not because the response is returned after it finishes. However it reports a generic error message when it fails and we have to visit CloudWatch Logs if we want to check the exact error. It gets worse when it is invoked asynchronously (Event invocation type) because the response is made before the invocation finishes. In this case it is not even possible to check whether the invocation is successful.
… |
Custom Lambda Operator
The custom Lambda operator extends the Lambda invoke function operator. It updates the Lambda payload by adding a correlation ID. The execute method is extended by the log_processor decorator function. As the name suggests, the decorator function filters all log messages that include the correlation ID and print them. This process loops over the lifetime of the invocation. While processing log events, it raises an error if an error message is found. And log event processing gets stopped when a message that indicates the end of the invocation is encountered. Finally, in order to handle the case where an invocation doesn’t finish within the timeout seconds, it raises an error at the end of the loop.
The main benefits of this approach are
we don’t have to rewrite Lambda invocation logic as we extends the Lambda invoke function operator,
we can track a lambda invocation status regardless of its invocation type, and
we are able to record all relevant log messages of an invocation
# airflow/dags/lambda_operator.py …
|
Unit Testing
Unit testing is performed for the main log processing function (process_log_events). Log events fixture is created by a closure function. Depending on the case argument, it returns a log events list that covers success, error or timeout error. It is used as the mock response of the get_response_iterator method. The 3 testing cases cover each of the possible scenarios.
# airflow/tests/test_lambda_operator.py |
Below shows the results of the testing.
$ pytest airflow/tests/test_lambda_operator.py -v |
Compare Operators
Docker Compose
In order to compare the two operators, the Airflow Docker quick start guide is simplified into using the Local Executor. In this setup, both scheduling and task execution are handled by the airflow scheduler service. Instead of creating an AWS connection for invoking Lambda functions, the host AWS configuration is shared by volume-mapping (${HOME}/.aws to /home/airflow/.aws). Also, as I don’t use the default AWS profile but a profile named cevo, it is added to the scheduler service as an environment variable (AWS_PROFILE: “cevo”).
# airflow/docker-compose.yaml |
The quick start guide requires a number of steps to initialise an environment before starting the services and they are added to a single shell script shown below.
# airflow/init.sh #!/usr/bin/env bash |
After finishing the initialisation steps, the docker compose services can be started by docker-compose up -d.
Lambda Invoke Function Operator
Two tasks are created with the Lambda invoke function operator. The first is invoked synchronously (RequestResponse) while the latter is asynchronously (Event). Both are configured to raise an error after 10 seconds.
# airflow/dags/example_without_logging.py |
As shown below the task by asynchronous invocation is incorrectly marked as success. It is because practically only the response status code is checked as it doesn’t wait until the invocation finishes. On the other hand, the task by synchronous invocation is indicated as failed. However it doesn’t show the exact error that fails the invocation – see below for further details.
The error message is Lambda function execution resulted in error and it is the generic message constructed by the Lambda invoke function operator.
Custom Lambda Operator
Five tasks are created with the custom Lambda operator The first four tasks cover success and failure by synchronous and asynchronous invocations. The last task is to check failure due to timeout.
# airflow/dags/example_with_logging.py |
As expected we see two success tasks and three failure tasks. The custom Lambda operator tracks Lambda function invocation status correctly.
Below shows log messages of the success task by asynchronous invocation. Each message includes the same correlation ID and the last message from the Lambda function is Function ended.
The failed task by asynchronous invocation also shows all log messages and it is possible to check what caused the invocation to fail.
The case of failure due to timeout doesn’t show an error message from the Lambda invocation. However we can treat it as failure because we don’t see the message of the function invocation ended within the function timeout.
Still the failure by synchronous invocation doesn’t show the exact error message and it is because an error is raised before the process log events function is executed. Because of this, I advise to invoke a Lambda function asynchronously.
Summary
In this post, we discussed limitations of the Lambda invoke function operator and created a custom Lambda operator. The custom operator reports the invocation result of a function correctly and records the exact error message from failure. A number of tasks are created to compare the results between the two operators and it is shown that the custom operator handles those limitations successfully.