Extracting Data
1. Overview
The building blocks of the data ingestion DAG are pre-defined in GCP Airflow Foundations, such that the user only needs to configure the parameters of the data ingestion. The data ingestion follows a set of default steps:
The first step in the ingestion pipeline is to extract the data from a source and load them to a landing table in BigQuery.
The schema of the landing table is parsed and compared with the schema of the destination tables.
If any schema changes are detected, these are migrated to the destination table.
Finally, the data are upserted to the destination tables.
This can be visualized in the tree diagram bellow:
2. Config Based DAG Generation
GCP Airflow Foundations support the dynamic generation of ETL/ELT DAGs from simple, user-provided configuation files written in YAML. At minimum, the user declares in the configuration file the derised ingestion mode and the type of the data source, along with the required source tables to be ingested. Optionally, additional parameters can be provided, such us metadata naming and column mapping between the source and destination tables, among others. GCP Airflow Foundations will parse the information declared in the YAML file to generate the building blocks necessary for generating the DAGs for the desired data ingestion.
For a detailed description and data type of each configuration field, please refer to gcp_airflow_foundations.base_class.source_config.SourceConfig
and gcp_airflow_foundations.base_class.source_table_config.SourceTableConfig
for the ingestion source and tables respectively.
An example of a simple configuration file to extract marketing data from Facebook:
source:
name: facebook_campaigns_ingestion
source_type: FACEBOOK
ingest_schedule: "@daily"
start_date: "2021-01-01"
dataset_data_name: facebook
landing_zone_options:
landing_zone_dataset: landing_zone
facebook_options:
account_lookup_scope: full
fields: [
"account_id",
"campaign_id",
"impressions",
"spend",
"reach",
"clicks"]
level: campaign
time_increment: "1"
tables:
- table_name: campaign_insights
surrogate_keys: ["account_id", "campaign_id", "date_start"]
ingestion_type: INCREMENTAL
facebook_table_config:
breakdowns: null
action_breakdowns: ["action_type"]
- table_name: campaign_insights_platform_placement
surrogate_keys: ["account_id", "campaign_id", "date_start", "publisher_platform", "platform_position"]
ingestion_type: INCREMENTAL
facebook_table_config:
breakdowns: ["publisher_platform", "platform_position"]
action_breakdowns: ["action_type"]
3. Replication Scheduling
In the ingest_schedule
field you can select the ingestion schedule for Airflow.
GCP Airflow Foundations currently support hourly, daily, weekly, and monthly intervals.
4. Source Selection
To declare the data source for an ingestion, you only need to provide an alias for your souce in the name
field, as well as define the source_type
.
The latter is an enumaration-type field.
For every data source you will be extracting data from, you need to configurate the corresponding Airflow Connection in Airflow’s GUI by providing the required credentials.
5. Ingestion Type
GCP Airflow Foundations support two methods for extracting data out of the source system:
Full Table Ingestion: All rows of a table (including new, updated, and existing) are extracteed during every ingestion
Incremental Ingesteion: Only rows that have been added or updated since the last ingestion job are extracted. As an example an updated_at column can be useed to identify records that have been updated since a specified time, and then only replicate those records
The ingestion type must be declared in the ingestion_type
field for each table.
Note that you can select a different ingestion type for each table, and some sources support only full table ingestion.
6. Table Selection
The next step after having selected a data source, is to select the individual tables you need to extract data from. The tables
field is a list-type field, whose entries
are single tables. Start by giving an alias in the table_name
field. Next, in the surrogate_keys
field you need to set the columns that will be used as a key to select unique records.
These are usually record identifier fields, as well as breakdown dimension fields (e.g. date, geography fields, etc.).
6.1 Configuring an Operational Data Store (ODS)
By default, the destination table will be an Operational Data Store (ODS). An Operational Data Store (ODS) is a table that provides a snapshot of the latest data for operational reporting. As newer records become available, the ODS continuously overwrites older data with either full or incremental data ingestions. With full ingestions, the entire ODS is replaced with the updated data, whereas with incremental ingestions the new data is upserted into the target table
The ODS table will include four metadata columns for each table row:
Key |
Default Name |
Description |
---|---|---|
hash_column_name |
metadata_row_hash |
The entire row hash |
primary_key_hash_column_name |
metadata_primary_key_hash |
The hash of the primary keys |
ingestion_time_column_name |
metadata_inserted_at |
The ingestion time |
update_time_column_name |
metadata_updated_at |
The update time |
Optionally, the user can override the default metadata column names for each table by providing the ods_config.ods_metadata
field. For example:
tables:
- table_name: campaign_insights
surrogate_keys: ["account_id", "campaign_id", "date_start"]
ingestion_type: INCREMENTAL
facebook_table_config:
breakdowns: null
action_breakdowns: ["action_type"]
column_mapping:
date_start: date
ods_config:
ods_metadata:
hash_column_name: metadata_row_hash
primary_key_hash_column_name: metadata_primary_key_hash
ingestion_time_column_name: metadata_inserted_at
update_time_column_name: metadata_updated_at
6.2 Configuring a Historical Data Store (HDS)
In addition to an ODS destination table, the data can also be ingested in a Historical Data Store (HDS) table. To implement an HDS table, the user can select between a Slowly Changing Diemension Type 2 (SCD2) and a snapshot.
6.2.1 Slowly Changing Diemension Type 2 (SCD2)
In SCD2, a new row is inserted for each change to an existing record in the corresponding target table, as well as for entirely new records. Each record row has metadata timestamp columns that indicate the time of insertion, update, and expiration.
6.2.2 Snapshot
With snapshots, a new partition is appended to the target table at each ETL schedule. Therefore, the target table comprises a collection of snapshots where each partition contains the full dimension at a point in time.
6.2.3 Comparison of SCD2 and Snapshotting
Even though the SCD2 approach is more computationally efficient, it is also more difficult to maintain and reproduce. Snapshot tables, on the other hand, do not require complex transformations.
Snapshot tables result in significantly larger tables (since all data is replicated every day) which can result in higher storage costs. However using properly partitioned BigQuery tables mitigates this - partitioned older than 90 days (if they have not been edited) are automatically moved to Big Query long term storage.
Querying data from a specific day or time ranges is cheaper when using properly partitioned snapshot tables since BigQuery will scan the data only in the appropriate partitions. While doing the same query on SCD2 tables will result in a full table scan.
Snapshot tables are more intuitive to work with - querying data from a specific date can use the exact same SQL queries used for ODS with the simple addition of filter cluse for that day. While SCD2 requires more complex logic using the created_at and expired_at columns.
Snapshot tables follow the “functional data engineering ” principle. Most importantly operations are idempotent such that re-running ingestion for the same day will not result in data duplicates or corrupt data.
Expiring old data is easier with snapshot tables.
6.2.4 Ingesting Data to an HDS Table
To configure an HDS ingestion, the user has to declare the HDS type in under each table. For example:
tables:
- table_name: campaign_insights
surrogate_keys: ["account_id", "campaign_id", "date_start"]
ingestion_type: INCREMENTAL
facebook_table_config:
breakdowns: null
action_breakdowns: ["action_type"]
column_mapping:
date_start: date
hds_config:
hds_table_type: SNAPSHOT
hds_table_time_partitioning: DAY
hds_metadata:
eff_start_time_column_name: metadata_created_at
eff_end_time_column_name: metadata_expired_at
hash_column_name: metadata_row_hash
Note that the hds_metadata
field is optional. If not provided the default column names will be used.
Also note that the hds_table_time_partitioning
is only needed for snapshot-type HDS tables,
in which case it must match the ingestion schedule.
The HDS table will include four metadata columns for each table row:
Key |
Default Name |
Description |
---|---|---|
hash_column_name |
metadata_row_hash |
The entire row hash |
eff_end_time_column_name |
metadata_expired_at |
The expiration time (if any) |
eff_start_time_column_name |
metadata_created_at |
The ingestion time |
7. Landing and Destination Datasets
The ingested data will first be stored in a temporary, landing table in BigQuery.
The dataset name of the landing tables must be provided in the landing_zone_options.landing_zone_dataset
field.
From the landing dataset, the data are upserted in the destination tables. The destination dataset can be selected in the
dataset_data_name
.
Note
The landing tables are deleted after ingestion.
8. Column Mapping
Both ODS and HDS ingestions support column mapping and schema migration.
When a data field in the data source is desired to have a different name in the destination table,
then, the column_mapping
field can be declared. This is a map-type field, whose keys are the names of columns as they
appear in the data source, and the keys are the corresponding names that these columns should have in the destination table.
For example:
tables:
- table_name: campaign_insights
surrogate_keys: ["account_id", "campaign_id", "date_start"]
ingestion_type: INCREMENTAL
facebook_table_config:
breakdowns: null
action_breakdowns: ["action_type"]
column_mapping:
date_start: date
In this example, the date_start
field extracted from Facebook’s API will be mapped to the date
field in the destination tables.