lakefs-dag
### Example lakeFS DAG
Data Management & Governance
Providers:
Run this DAG
1. Install the Astronomer CLI:Skip if you already have the CLI
2. Initate the project in a local directory:
3. Copy and paste the code below into a file in the
dagsdirectory.
4. Add the following to your
requirements.txtfile:
5. Run the DAG from the local directory where the project was initiated:
from typing import Sequencefrom collections import namedtuplefrom itertools import zip_longestimport timefrom airflow.decorators import dagfrom airflow.utils.dates import days_agofrom airflow.exceptions import AirflowFailExceptionfrom lakefs_sdk.exceptions import NotFoundExceptionfrom lakefs_provider.hooks.lakefs_hook import LakeFSHookfrom lakefs_provider.operators.create_branch_operator import LakeFSCreateBranchOperatorfrom lakefs_provider.operators.create_symlink_operator import LakeFSCreateSymlinkOperatorfrom lakefs_provider.operators.delete_branch_operator import LakeFSDeleteBranchOperatorfrom lakefs_provider.operators.merge_operator import LakeFSMergeOperatorfrom lakefs_provider.operators.upload_operator import LakeFSUploadOperatorfrom lakefs_provider.operators.commit_operator import LakeFSCommitOperatorfrom lakefs_provider.operators.get_commit_operator import LakeFSGetCommitOperatorfrom lakefs_provider.operators.get_object_operator import LakeFSGetObjectOperatorfrom lakefs_provider.sensors.file_sensor import LakeFSFileSensorfrom lakefs_provider.sensors.commit_sensor import LakeFSCommitSensorfrom airflow.operators.python import PythonOperator# These args will get passed on to each operator# You can override them on a per-task basis during operator initializationdefault_args = {"owner": "lakeFS","branch": "example-branch","repo": "example-repo","path": "path/to/_SUCCESS","default-branch": "main","lakefs_conn_id": "conn_lakefs"}CONTENT_PREFIX = 'It is not enough to succeed. Others must fail.'COMMIT_MESSAGE_1 = 'committing to lakeFS using airflow!'MERGE_MESSAGE_1 = 'merging to the default branch'IdAndMessage = namedtuple('IdAndMessage', ['id', 'message'])def check_expected_prefix(task_instance, actual: str, expected: str) -> None:if not actual.startswith(expected):raise AirflowFailException(f'Got:\n"{actual}"\nwhich does not start with\n{expected}')def check_logs(task_instance, repo: str, ref: str, commits: Sequence[str], messages: Sequence[str],amount: int = 100) -> None:hook = LakeFSHook(default_args['lakefs_conn_id'])expected = [IdAndMessage(commit, message) for commit, message in zip(commits, messages)]actuals = (IdAndMessage(message=commit['message'], id=commit['id'])for commit in hook.log_commits(repo, ref, amount))for (expected, actual) in zip_longest(expected, actuals):if expected is None:# Matched all msgs!returnif expected != actual:raise AirflowFailException(f'Got {actual} instead of {expected}')def check_branch_object(task_instance, repo: str, branch: str, path: str):hook = LakeFSHook(default_args['lakefs_conn_id'])print(f"Trying to check if the following path exists: lakefs://{repo}/{branch}/{path}")try:hook.stat_object(repo=repo, ref=branch, path=path)raise AirflowFailException("Path found, this is not to be expected.")except NotFoundException as e:print(f"Path not found, as expected: {e}")@dag(default_args=default_args,render_template_as_native_obj=True,max_active_runs=1,start_date=days_ago(2),schedule_interval=None,tags=['testing'])def lakeFS_workflow():"""### Example lakeFS DAGShowcases the lakeFS provider package's operators and sensors.To run this example, create a connector with:- id: conn_lakefs- type: http- host: http://localhost:8000- extra: {"access_key_id":"AKIAIOSFODNN7EXAMPLE","secret_access_key":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"}"""# Create the branch to run ontask_create_branch = LakeFSCreateBranchOperator(task_id='create_branch',source_branch=default_args.get('default-branch'))# Create a path.task_upload_file = LakeFSUploadOperator(task_id='upload_file',content=f"{CONTENT_PREFIX} @{time.asctime()}".encode('utf-8'))task_get_branch_commit = LakeFSGetCommitOperator(do_xcom_push=True,task_id='get_branch_commit',ref=default_args['branch'])# Checks periodically for the path.# DAG continues only when the file exists.task_sense_file = LakeFSFileSensor(task_id='sense_file',mode='reschedule',poke_interval=1,timeout=10,)# Commit the changes to the branch.# (Also a good place to validate the new changes before committing them)task_commit = LakeFSCommitOperator(task_id='commit',msg=COMMIT_MESSAGE_1,metadata={"committed_from": "airflow-operator"})# Create symlink file for example-branchtask_create_symlink = LakeFSCreateSymlinkOperator(task_id="create_symlink")# Wait until the commit is completed.# Not really necessary in this DAG, since the LakeFSCommitOperator won't return before that.# Nonetheless we added it to show the full capabilities.task_sense_commit = LakeFSCommitSensor(task_id='sense_commit',prev_commit_id='''{{ task_instance.xcom_pull(task_ids='get_branch_commit', key='return_value').id }}''',mode='reschedule',poke_interval=1,timeout=10,)# Get the file.task_get_object = LakeFSGetObjectOperator(task_id='get_object',do_xcom_push=True,ref=default_args['branch'])# Check its contentstask_check_expected_prefix = PythonOperator(task_id='check_expected_prefix',python_callable=check_expected_prefix,op_kwargs={'actual': '''{{ task_instance.xcom_pull(task_ids='get_object', key='return_value') }}''','expected': CONTENT_PREFIX,})# Merge the changes back to the main branch.task_merge_branches = LakeFSMergeOperator(task_id='merge_branches',do_xcom_push=True,source_ref=default_args.get('branch'),destination_branch=default_args.get('default-branch'),msg=MERGE_MESSAGE_1,metadata={"committer": "airflow-operator"})expected_commits = ['''{{ ti.xcom_pull('merge_branches') }}''','''{{ ti.xcom_pull('commit') }}''']expected_messages = [MERGE_MESSAGE_1, COMMIT_MESSAGE_1]# Fetch and verify log messages in bulk.task_check_logs_bulk = PythonOperator(task_id='check_logs_bulk',python_callable=check_logs,op_kwargs={'repo': default_args.get('repo'),'ref': '''{{ task_instance.xcom_pull(task_ids='merge_branches', key='return_value') }}''','commits': expected_commits,'messages': expected_messages,})# Fetch and verify log messages one at a time.task_check_logs_individually = PythonOperator(task_id='check_logs_individually',python_callable=check_logs,op_kwargs={'repo': default_args.get('repo'),'ref': '''{{ task_instance.xcom_pull(task_ids='merge_branches', key='return_value') }}''','amount': 1,'commits': expected_commits,'messages': expected_messages,})task_delete_branch = LakeFSDeleteBranchOperator(task_id='delete_branch',repo=default_args.get('repo'),branch=default_args.get('branch'),)task_check_branch_object = PythonOperator(task_id='check_branch_object',python_callable=check_branch_object,op_kwargs={'repo': default_args.get('repo'),'branch': default_args.get('branch'),'path': default_args.get('path'),})task_create_branch >> task_get_branch_commit >> [task_upload_file, task_sense_commit, task_sense_file]task_upload_file >> task_commit >> task_create_symlinktask_sense_file >> task_get_object >> task_check_expected_prefixtask_sense_commit >> task_merge_branches >> [task_check_logs_bulk, task_check_logs_individually][task_check_expected_prefix, task_check_logs_bulk, task_check_logs_individually] >> task_delete_branchtask_delete_branch >> task_check_branch_objectsample_workflow_dag = lakeFS_workflow()