BatchWaitersHook

Amazon

A utility to manage waiters for AWS Batch services.

View on GitHub

Last Updated: Mar. 21, 2023

Access Instructions

Install the Amazon provider package into your Airflow environment.

Import the module into your DAG file and instantiate it with your desired params.

Parameters

waiter_configa custom waiter configuration for AWS Batch services
aws_conn_idconnection id of AWS credentials / region name. If None, credential boto3 strategy will be used (https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html).
region_nameregion name to use in AWS client. Override the AWS region in connection (if provided)

Documentation

A utility to manage waiters for AWS Batch services.

import random
from airflow.providers.amazon.aws.operators.batch_waiters import BatchWaiters
# to inspect default waiters
waiters = BatchWaiters()
config = waiters.default_config # type: Dict
waiter_names = waiters.list_waiters() # -> ["JobComplete", "JobExists", "JobRunning"]
# The default_config is a useful stepping stone to creating custom waiters, e.g.
custom_config = waiters.default_config # this is a deepcopy
# modify custom_config['waiters'] as necessary and get a new instance:
waiters = BatchWaiters(waiter_config=custom_config)
waiters.waiter_config # check the custom configuration (this is a deepcopy)
waiters.list_waiters() # names of custom waiters
# During the init for BatchWaiters, the waiter_config is used to build a waiter_model;
# and note that this only occurs during the class init, to avoid any accidental mutations
# of waiter_config leaking into the waiter_model.
waiters.waiter_model # -> botocore.waiter.WaiterModel object
# The waiter_model is combined with the waiters.client to get a specific waiter
# and the details of the config on that waiter can be further modified without any
# accidental impact on the generation of new waiters from the defined waiter_model, e.g.
waiters.get_waiter("JobExists").config.delay # -> 5
waiter = waiters.get_waiter("JobExists") # -> botocore.waiter.Batch.Waiter.JobExists object
waiter.config.delay = 10
waiters.get_waiter("JobExists").config.delay # -> 5 as defined by waiter_model
# To use a specific waiter, update the config and call the `wait()` method for jobId, e.g.
waiter = waiters.get_waiter("JobExists") # -> botocore.waiter.Batch.Waiter.JobExists object
waiter.config.delay = random.uniform(1, 10) # seconds
waiter.config.max_attempts = 10
waiter.wait(jobs=[jobId])

Was this page helpful?