BatchWaitersHook
AmazonA utility to manage waiters for AWS Batch services.
Access Instructions
Install the Amazon provider package into your Airflow environment.
Import the module into your DAG file and instantiate it with your desired params.
Parameters
waiter_configa custom waiter configuration for AWS Batch services
aws_conn_idconnection id of AWS credentials / region name. If None, credential boto3 strategy will be used (https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html).
region_nameregion name to use in AWS client. Override the AWS region in connection (if provided)
Documentation
A utility to manage waiters for AWS Batch services.
import randomfrom airflow.providers.amazon.aws.operators.batch_waiters import BatchWaiters# to inspect default waiterswaiters = BatchWaiters()config = waiters.default_config # type: Dictwaiter_names = waiters.list_waiters() # -> ["JobComplete", "JobExists", "JobRunning"]# The default_config is a useful stepping stone to creating custom waiters, e.g.custom_config = waiters.default_config # this is a deepcopy# modify custom_config['waiters'] as necessary and get a new instance:waiters = BatchWaiters(waiter_config=custom_config)waiters.waiter_config # check the custom configuration (this is a deepcopy)waiters.list_waiters() # names of custom waiters# During the init for BatchWaiters, the waiter_config is used to build a waiter_model;# and note that this only occurs during the class init, to avoid any accidental mutations# of waiter_config leaking into the waiter_model.waiters.waiter_model # -> botocore.waiter.WaiterModel object# The waiter_model is combined with the waiters.client to get a specific waiter# and the details of the config on that waiter can be further modified without any# accidental impact on the generation of new waiters from the defined waiter_model, e.g.waiters.get_waiter("JobExists").config.delay # -> 5waiter = waiters.get_waiter("JobExists") # -> botocore.waiter.Batch.Waiter.JobExists objectwaiter.config.delay = 10waiters.get_waiter("JobExists").config.delay # -> 5 as defined by waiter_model# To use a specific waiter, update the config and call the `wait()` method for jobId, e.g.waiter = waiters.get_waiter("JobExists") # -> botocore.waiter.Batch.Waiter.JobExists objectwaiter.config.delay = random.uniform(1, 10) # secondswaiter.config.max_attempts = 10waiter.wait(jobs=[jobId])