Categories
Mastering Development

Airflow:How to select bigquery table data in to dataframe

I am new in airflow. I created my first dag below, selecting data from google big query table & saving it to a pd dataframe. Need suggestion in below Where should i provide connection id of my big query As pd.read_gbq requires authintication, how to handle same in airflow dags import os import pandas as […]

Categories
Mastering Development

AirflowTaskTimeout after setting execution_timeout

My Airflow DAG keeps failing on the only task that I have. I declared the execution_timeout as 300 seconds, but it keeps crashing after around 37 seconds. The task consists in scraping a website, without Chromedriver. I’m on Linux, Raspberry PI. Here is the code: from datetime import timedelta import importlib import sys from airflow.operators.bash_operator […]

Categories
Mastering Development

Apache airflow unable to locate AWS credentials when using boto3 inside a DAG

Running an instance of Airflow on ECS Fargate. The problem is I cannot run the code to call an existing Glue Job within the DAG. Below is the DAG script. import boto3 import os import logging import time import sys import botocore from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python_operator import PythonOperator […]

Categories
Mastering Development

Airflow – Bach command failed

I am trying to execute following airflow dag file. but getting following error. import json from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from datetime import datetime, timedelta, time args = { ‘owner’: ‘test’, ‘start_date’: days_ago(2), ‘depends_on_past’: False, } dag = DAG( dag_id=’test’, default_args=args, schedule_interval=’0 5 * […]

Categories
Mastering Development

Airflow : Custom dates : Works fine with datetime but not with pendulum

Requirement: Create a custom date function to be used in operators, DAG, etc Below is the DAG file DAG from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta from alerts.custom_date import strt_of_wk_strt_mon_dt, NEXT_DS_NODASH default_args = { ‘owner’: ‘airflow’, ‘depends_on_past’: False, ‘start_date’: datetime(2020, 7, 8), ’email_on_failure’: False, ’email_on_retry’: False, ‘retries’: 1, ‘retry_delay’: […]

Categories
Mastering Development

Airflow BranchPython Operator – Continue After Branch

I have the following operators as shown below. I am able to visually see the the graph representation looks correct. However, I am not able to get the functionality to work properly as in the paths will not keep going past either branch. Regardless of the date, neither path will keep going to task_05. The […]

Categories
Mastering Development

How to trigger a Airflow task only when new partition/data in avialable in the AWS athena table using DAG in python?

I have a scenerio like a below : Trigger a Task 1 and Task 2 only when new data is avialable for them in source table ( Athena). Trigger for Task1 and Task2 should happen when a new data parition in a day. Trigger Task 3 only on the completion of Task 1 and Task […]

Categories
Mastering Development

Airflow subdag codeview shows code of main-dag

I am new to Airflow and I followed the tutorial on the official page (https://airflow.readthedocs.io/en/stable/tutorial.html) and added a subdag to the tutorial dag. When I zoom into the subdag on the web-UI and click on code, the code of the main-dag is shown. Also when I click on details of the subdag the filename of […]

Categories
Development

ERROR – SSH operator error: timed out in Airflow while connecting with ec2 instance

I am getting a timeout error while trying to connect with ec2 instance. I can connect to the machine through ssh command: ssh -i keypair.pem myuser@ec2IPaddress My connection file has the following things: conn Id: ssh_custom Conn Type: SSH Host: ec2IPaddress Username: myuser Port: 8888 Extra: {“key_file”:”/home/ubuntu/keypair.pem”, “no_host_key_check”: “false”, “allow_host_key_change”: “true”, “timeout”: “45”} My dag […]

Categories
Development

Access Xcom in EmailOperator of Airflow

I’m very new to Airflow and I’m facing some problems with Xcom and Jinja. I have to do some Python elaborations and then pass the result to an EmailOperator in order to send it as the email body. Seems that does not exist documentation about it, the only hint that I’ve found is this link […]