transform
Astro SDKGiven a python function that returns a SQL statement and (optional) tables, execute the SQL statement and output the result into a SQL table.
Access Instructions
Install the Astro SDK provider package into your Airflow environment.
Import the module into your DAG file and instantiate it with your desired params.
Parameters
python_callableOptional[Callable]This parameter is filled in automatically when you use the transform function as a decorator. This is where the python function gets passed to the wrapping function
conn_idstrConnection ID for the database you want to connect to. If you do not pass in a value for this object we can infer the connection ID from the first table passed into the python_callable function. (required if there are no table arguments)
parametersOptional[dict or iterable]The parameters to render the SQL query with.
databaseOptional[str]Database within the SQL instance you want to access. If left blank we will default to the table.metatadata.database in the first Table passed to the function (required if there are no table arguments)
schemaOptional[str]Schema within the SQL instance you want to access. If left blank we will default to the table.metatadata.schema in the first Table passed to the function (required if there are no table arguments)
kwargsDictAny keyword arguments supported by the BaseOperator is supported (e.g queue, owner)
Documentation
Use this function as a decorator like so:
@transformdef my_sql_statement(table1: Table, table2: Table) -> Table:return ""SELECT * FROM {{table1}} JOIN {{table2}}""
In this example, by identifying the parameters as Table objects, astro knows to automatically convert those objects into tables (if they are, for example, a dataframe). Any type besides table will lead astro to assume you do not want the parameter converted. You can also pass parameters into the query like so
@transformdef my_sql_statement(table1: Table, table2: Table, execution_date) -> Table:return ""SELECT * FROM {{table1}} JOIN {{table2}} WHERE date > {{exec_date}}"", {""exec_date"": execution_date}