firebolt
Example use of Firebolt related operators.
Providers:
Run this DAG
1. Install the Astronomer CLI:Skip if you already have the CLI
2. Initate the project in a local directory:
3. Copy and paste the code below into a file in thedags
directory.
4. Add the following to your requirements.txt
file:
5. Run the DAG from the local directory where the project was initiated:
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example use of Firebolt related operators."""from datetime import datetimefrom airflow import DAGfrom firebolt_provider.operators.firebolt import (FireboltOperator,FireboltStartEngineOperator,FireboltStopEngineOperator,)FIREBOLT_CONN_ID = "firebolt_conn_id"FIREBOLT_SAMPLE_TABLE = "sample_table"FIREBOLT_DATABASE = "sample_database"# SQL commandsSQL_CREATE_TABLE_STATEMENT = (f"CREATE FACT TABLE IF NOT EXISTS {FIREBOLT_SAMPLE_TABLE} "f"(id INT, name String)"f" PRIMARY INDEX id;")SQL_INSERT_STATEMENT = f"INSERT INTO {FIREBOLT_SAMPLE_TABLE} values (%(id)s, 'name');"SQL_LIST = [SQL_INSERT_STATEMENT % {"id": n} for n in range(0, 10)]SELECT_STATEMENT_SQL_STRING = f"SELECT * FROM {FIREBOLT_SAMPLE_TABLE} LIMIT 10;"SQL_DROP_TABLE_STATEMENT = f"DROP TABLE IF EXISTS {FIREBOLT_SAMPLE_TABLE};"SQL_CREATE_DATABASE_STATEMENT = f"CREATE DATABASE IF NOT EXISTS {FIREBOLT_DATABASE};"SQL_DROP_DATABASE_STATEMENT = f"DROP DATABASE IF EXISTS {FIREBOLT_DATABASE};"with DAG("example_firebolt",start_date=datetime(2021, 1, 1),default_args={"firebolt_conn_id": FIREBOLT_CONN_ID},tags=["example"],catchup=False,) as dag:firebolt_start_engine = FireboltStartEngineOperator(task_id="firebolt_start_engine")firebolt_stop_engine = FireboltStopEngineOperator(task_id="firebolt_stop_engine")firebolt_op_sql_create_table = FireboltOperator(task_id="firebolt_op_sql_create_table",sql=SQL_CREATE_TABLE_STATEMENT,)firebolt_op_sql_list = FireboltOperator(task_id="firebolt_op_sql_list",sql=SQL_LIST,)firebolt_op_with_params = FireboltOperator(task_id="firebolt_op_with_params",sql=SQL_INSERT_STATEMENT,parameters=[56],)firebolt_op_sql_str = FireboltOperator(task_id="firebolt_op_sql_str",sql=SELECT_STATEMENT_SQL_STRING,)firebolt_op_sql_drop_table = FireboltOperator(task_id="firebolt_op_sql_drop_table",sql=SQL_DROP_TABLE_STATEMENT,)firebolt_op_sql_create_db = FireboltOperator(task_id="firebolt_op_sql_create_db",sql=SQL_CREATE_DATABASE_STATEMENT,)firebolt_op_sql_drop_db = FireboltOperator(task_id="firebolt_op_sql_drop_db",sql=SQL_DROP_DATABASE_STATEMENT,)(firebolt_start_engine>> firebolt_op_sql_create_table>> firebolt_op_sql_list>> firebolt_op_with_params>> firebolt_op_sql_str>> firebolt_op_sql_drop_table>> firebolt_op_sql_create_db>> firebolt_op_sql_drop_db>> firebolt_stop_engine)