CrateDB Table Export
This DAG performs a daily export from a CrateDB time-series table to an S3 bucket as JSON files.
Data Management & GovernanceStorageDatabasesWork Management
Providers:
Modules:
Run this DAG
1. Install the Astronomer CLI:Skip if you already have the CLI
2. Initate the project in a local directory:
3. Copy and paste the code below into a file in the
dagsdirectory.
4. Run the DAG from the local directory where the project was initiated:
"""Regularly exports a table's rows to an S3 bucket as JSON filesA detailed tutorial is available at https://community.crate.io/t/cratedb-and-apache-airflow-automating-data-export-to-s3/901"""import datetimeimport osfrom airflow import DAGfrom airflow.providers.postgres.operators.postgres import PostgresOperatorfrom airflow.utils.task_group import TaskGroupfrom config.table_exports import TABLESwith DAG(dag_id="cratedb_table_export",start_date=datetime.datetime(2021, 11, 11),schedule_interval="@daily",catchup=False,) as dag:with TaskGroup(group_id='table_exports') as tg1:for export_table in TABLES:PostgresOperator(task_id="copy_{table}".format(table=export_table['table']),postgres_conn_id="cratedb_connection",sql="""COPY {table} WHERE DATE_TRUNC('day', {timestamp_column}) = '{day}'TO DIRECTORY 's3://{access}:{secret}@{target_bucket}-{day}';""".format(table=export_table['table'],timestamp_column=export_table['timestamp_column'],target_bucket=export_table['target_bucket'],day='{{ macros.ds_add(ds, -1) }}',access=os.environ.get("ACCESS_KEY_ID"),secret=os.environ.get("SECRET_ACCESS_KEY")))