Scheduling SQL Menggunakan Apache Airflow

Tri Juhari
3 min readJul 21, 2021

--

Salah satu penggunaan yang sering dilakukan untuk melakukan penjadwalan skrip SQL yaitu dengan menggunakan apache Airflow. Para developer atau seorang data engineer kadang sering bertanya :

“Bagaimana cara menggunakan teknologi Airflow dalam melakukan penjadwalan SQL ?”

“Bagaimana cara melakukan pemfilteran tanggal (waktu) berdasarkan interval jadwal di Airflow ?”

Tulisan ini bertujuan untuk menjawab kedua pertanyaan di atas. Disini saya mengasumsikan para pembaca sudah memiliki pemahaman dasar tentang Apache Airflow dan SQL. Misalnya kita asumsikan kita ingin menjalankan skrip SQL setiap hari pada tengah malam yang mana skrip SQL ini akan melakukan agregasi data berdasarkan data hari sebelumnya dari tabel event dan hasil dari agregasi tersebut kita simpan kedalam tabel event_stats.

Kita dapat menggunakan airflow untuk menjalankan skrip SQL setiap hari. Contohnya dalam kasus ini kita menggunakan MySQL, namun airflow juga didukung oleh beberapa teknologi database lainnya.

DAG : Directed Acyclic Graph , penggunaan DAG pada airflow bertujuan untuk melakukan monitoring data pipeline yang berjalan berdasarkan interval schedule. DAG juga tidak hanya dapat menjalankan satu tugas saja namun dapat menjalankan beberapa tugas.

from airflow import DAG 
from airflow.operators.mysql_operator import mysqlOperator
default_arg = { 'owner' : 'airflow', 'start_date': '2020-02-28'}
dag = DAG('simple-mysql-dag',
default_arg = default_arg,
schedule_interval = '0 0 * * *')
mysql_task = mysqlOperator( dag = dag,
mysql_conn_id = 'mysql_default'
task_id = '<path>/sample_sql.sql'
params = {'test_user_id': -99 })
mysql_task

Berikut penjelasan terkait Code diatas :

  1. pada parameter schedule_interval memiliki nilai “0 0 * * *” kode ini merupakan format schedule pada cron artinya menunjukkan bahwa DAG harus dijalankan setiap hari pada tengah malam, yang dilambangkan dengan jam ke-0 setiap hari (sebagai catatan bahwa airflow secara default berjalan pada waktu UTC ).mysql_conn_id merupakan id koneksi untuk database SQL yang akan kita gunakan untuk proses penjadwalan, connection id dapat kita set melalui halaman airflow versi GUI nya dengan cara pilih admin kemudian pilih connection.
  2. Skrip SQL yang digunakan untuk melakukan operasi ini disimpan dalam file sample_sql.sql secara terpisah. File ini dibaca saat DAG dijalankan.
USE your_database;

DROP TABLE IF EXISTS event_stats_staging;
CREATE TABLE event_stats_staging
AS SELECT date
, user_id
, sum(spend_amt) total_spend_amt
FROM event
WHERE date = {{ macros.ds }}
AND user_id <> {{ params.test_user_id }}
GROUP BY date, user_id;

INSERT INTO event_stats (
date
, user_id
, total_spend_amt
)
SELECT date
, user_id
, total_spend_amt
FROM event_stats_staging;

DROP TABLE event_stats_staging;

nilai di dalam simbol {{ }} disebut template parameter. Airflow menggantikannya dengan variabel yang diteruskan melalui skrip DAG saat run-time atau tersedia melalui makro metadata di airflow. Penggunaan template parameter menjadi sangat membantu ketika kita memiliki logika yang lebih kompleks dan ingin menghasilkan bagian skrip secara dinamis, seperti klausa where, pada saat run time.

Terdapat 2 konsep kunci dalam skrip SQL templat yang ditunjukkan oleh code di atas.

  1. Airflow macros: fitur ini mememberikan kemudahan ketika kita ingin mengakses meta data yang tersedia untuk setiap proses DAG. Pada studi kasus ini kita menggunakan tanggal eksekusi (execution date) karena dapat memberikan tanggal waktu sebelumnya dimana kita ingin melakukan agregasi data (Referensi : https://airflow.apache.org/docs/stable/macros.html)
  2. Templated parameters:Jika skrip SQL ingin menggunakan beberapa parameter yang dapat diisikan saat dijalankan dari DAG, kita dapat menambahkan parameter sesuai yang dibutuhkan. Contohnya dalam code diatas kita ingin melakukan filtering secara spesifik berdasarkan user_id (-99) sebelum dilakukan agregasi
  3. If we want our SQL script to have some parameters that can be filled at run time from the DAG, we can pass them as parameters to the task. In our example we filter out a specific user_id (-99) before aggregation.

Berdasarkan postingan diatas bisa dilakukan summary terkait :

  1. Bagaimana melakukan penjadwalan skrip SQL menggunakan apache airflow
  2. Bagaimana mengkoneksikan database menggunakan id
  3. Bagaimana menambahkan parameter saat melakukan run-time menggunakan parameter input dan makro

--

--

No responses yet