API Reference

Base Class

Configuration Data Classes

gcp_airflow_foundations.base_class.source_config.SourceConfig(...)

Source configuration data class.

gcp_airflow_foundations.base_class.source_table_config.SourceTableConfig(...)

Table configuration data class.

gcp_airflow_foundations.base_class.salesforce_ingestion_config.SalesforceIngestionConfig(...)

Salesforce configuration class.

gcp_airflow_foundations.base_class.dataflow_job_config.DataflowJobConfig(...)

Dataflow configuration class.

class gcp_airflow_foundations.base_class.source_config.SourceConfig(name: str, source_type: str, ingest_schedule: str, external_dag_id: ~typing.Optional[str], gcp_project: str, dataset_data_name: str, dataset_hds_override: ~typing.Optional[str], extra_options: ~typing.Optional[dict], landing_zone_options: ~gcp_airflow_foundations.base_class.landing_zone_config.LandingZoneConfig, acceptable_delay_minutes: int, notification_emails: ~typing.List[str], owner: str, partition_expiration: ~typing.Optional[int], dag_args: ~typing.Optional[dict], location: str, start_date: str, schema_options: ~gcp_airflow_foundations.base_class.schema_options_config.SchemaOptionsConfig = SchemaOptionsConfig(schema_source_type=<SchemaSourceType.AUTO: 'AUTO'>, schema_object_template=None), facebook_options: ~typing.Optional[~gcp_airflow_foundations.base_class.facebook_config.FacebookConfig] = None, full_ingestion_options: ~gcp_airflow_foundations.base_class.source_ingestion_config.FullIngestionConfig = FullIngestionConfig(ingest_all_tables=False, ingestion_name='', dag_creation_mode='TABLE', regex_table_pattern='ANY'), catchup: bool = True, start_date_tz: str = 'EST', ods_suffix: str = '', hds_suffix: str = '', dagrun_timeout_mins: int = 1440, version: int = 1, sla_mins: int = 900, dlp_config: ~typing.Optional[~gcp_airflow_foundations.base_class.dlp_source_config.DlpSourceConfig] = None, num_retries: int = 3, email_on_retry: bool = False, email_on_failure: bool = True, connection: str = 'google_cloud_default')

Source configuration data class.

name

Name of source

Type

str

source_type

Source type selection. See SourceType class

Type

str

ingest_schedule

Ingestion schedule. Currently only supporting @hourly, @daily, @weekly, and @monthly

Type

str

gcp_project

Google Cloud Platform project ID

Type

str

dataset_data_name

Target dataset name

Type

str

extra_options

Google Cloud Storage bucket and objects for source data if loading from GCS

Type

Optional[dict]

landing_zone_options

Staging dataset name

Type

gcp_airflow_foundations.base_class.landing_zone_config.LandingZoneConfig

acceptable_delay_minutes

Delay minutes limit

Type

int

notification_emails

Email address for notification emails

Type

List[str]

owner

Airflow user owning the DAG

Type

str

partition_expiration

Expiration time for HDS Snapshot partitions in days.

Type

Optional[int]

facebook_options

Extra options for ingesting data from Facebook Marketing API.

Type

Optional[gcp_airflow_foundations.base_class.facebook_config.FacebookConfig]

catchup

Run all dag runs since start_date. https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#catchup

Type

bool

dag_args

Optional dictionary of parameters to be passed as keyword arguments to the ingestion DAG. Refer to airflow.models.dag.DAG for the available parameters.

Type

Optional[dict]

location

BigQuery job location.

Type

str

start_date

Start date for DAG

Type

str

start_date_tz

Timezone

Type

str

ods_suffix

Suffix for ODS tables. Defaults to empty string.

Type

str

hds_suffix

Suffix for HDS tables. Defaults to empty string.

Type

str

version

The Dag version. Can be incremented if logic changes

Type

int

sla_mins

Service Level Agreement (SLA) timeout minutes. This is is an expectation for the maximum time a Task should take.

Type

int

num_retries

Number of retries for the DAG before failing - https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

Type

int

email_on_retry

Whether the DAG should email on retries

Type

bool

email_on_failure

Whether the DAG should email on failure

Type

bool

connection

Aiflow Google Cloud Platform connection

Type

str

class gcp_airflow_foundations.base_class.source_table_config.SourceTableConfig(table_name: str, ingestion_type: ~gcp_airflow_foundations.enums.ingestion_type.IngestionType, landing_zone_table_name_override: ~typing.Optional[str], dest_table_override: ~typing.Optional[str], surrogate_keys: ~typing.List[str], column_mapping: ~typing.Optional[dict], cluster_fields: ~typing.Optional[~typing.List[str]], column_casting: ~typing.Optional[dict], new_column_udfs: ~typing.Optional[dict], hds_config: ~typing.Optional[~gcp_airflow_foundations.base_class.hds_table_config.HdsTableConfig], start_date: ~typing.Optional[str], extra_options: dict = <factory>, facebook_table_config: ~typing.Optional[~gcp_airflow_foundations.base_class.facebook_table_config.FacebookTableConfig] = FacebookTableConfig(api_object=<ApiObject.INSIGHTS: 'INSIGHTS'>, breakdowns=[], action_breakdowns=[]), start_date_tz: ~typing.Optional[str] = 'EST', ods_config: ~typing.Optional[~gcp_airflow_foundations.base_class.ods_table_config.OdsTableConfig] = OdsTableConfig(ods_table_time_partitioning=None, partition_column_name=None, ods_metadata=OdsTableMetadataConfig(hash_column_name='af_metadata_row_hash', primary_key_hash_column_name='af_metadata_primary_key_hash', ingestion_time_column_name='af_metadata_inserted_at', update_time_column_name='af_metadata_updated_at'), merge_type='SG_KEY_WITH_HASH'), version: int = 1, catchup: bool = True)

Table configuration data class.

table_name

Table name. Used for Dag Id.

Type

str

ingestion_type

FULL or INCREMENTAL.

Type

gcp_airflow_foundations.enums.ingestion_type.IngestionType

landing_zone_table_name_override

Optional staging zone table name.

Type

Optional[str]

dest_table_override

Optional target table name. If None, use table_name instead.

Type

Optional[str]

surrogate_keys

Keys used to identify unique records when merging into ODS.

Type

List[str]

column_mapping

Mapping used to rename columns.

Type

Optional[dict]

cluster_fields

The fields used for clustering. BigQuery supports clustering for both partitioned and non-partitioned tables.

Type

Optional[List[str]]

column_casting

Mapping used to cast columns into a specific data type. Note column name uses that of the landing zone table.

Type

Optional[dict]

ods_config

ODS table configuration. See gcp_airflow_foundations.base_class.ods_table_config.OdsTableConfig.

Type

Optional[gcp_airflow_foundations.base_class.ods_table_config.OdsTableConfig]

hds_config

HDS table configuration. See gcp_airflow_foundations.base_class.hds_table_config.HdsTableConfig.

Type

Optional[gcp_airflow_foundations.base_class.hds_table_config.HdsTableConfig]

facebook_table_config

Extra options for ingesting data from the Facebook API.

Type

Optional[gcp_airflow_foundations.base_class.facebook_table_config.FacebookTableConfig]

extra_options

Field for storing additional configuration options.

Type

dict

start_date

Start date override for DAG

Type

Optional[str]

start_date_tz

Timezone

Type

Optional[str]

version

The Dag version for the table. Can be incremented if logic changes.

Type

int

catchup

Passed to a dag [see doc](https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#catchup). Defaults to True. May want to change it to False if Dag version is changed, and we don’t want to rerun past dags.

Type

bool

class gcp_airflow_foundations.base_class.salesforce_ingestion_config.SalesforceIngestionConfig(api_table_name: str, ingest_all_columns: bool, fields_to_omit: Optional[List[str]], field_names: Optional[List[str]])

Salesforce configuration class.

ingest_all_columns

SELECT * the Salesforce object if true

Type

bool

fields_to_omit

a list of object fields to omit from ingestion

Type

Optional[List[str]]

field_names

an explicit list of fields to ingest

Type

Optional[List[str]]

class gcp_airflow_foundations.base_class.dataflow_job_config.DataflowJobConfig(system_name: str, project: str, region: str, subnetwork: str, bq_load_temp_directory: str, template_path: str, jdbc_driver_class: str, jdbc_jar_path: str, jdbc_url: str, jdbc_user: str, jdbc_pass_secret_name: str, kms_key_path: str, sql_casts: Optional[dict], bq_schema_table: str, database_owner: str, connection_pool: str, max_retry_delay: int = 60)

Dataflow configuration class.

project

the GCP project in which the Dataflow job runs

Type

str

region

the region in which the Dataflow job should run

Type

str

subnetwork

the specific subnetwork in which the Dataflow job should run

Type

str

bq_load_temp_directory

GCS directory for loading temporary Dataflow files

Type

str

template_path

GCS path to Dataflow template

Type

str

jdbc_driver_class

the name of the JDBC driver class to use (e.g. oracle.jdbc.driver.OracleDriver)

Type

str

jdbc_jar_path

the GCS path to the driver .jar file

Type

str

jdbc_url

a valid JDBC url for connecting to the database

Type

str

jdbc_user

the database username

Type

str

jdbc_pass_secret_name

the secret name of the database password

Type

str

kms_key_path

the KMS key path for encrypting/decrypting JDBC credentials

Type

str

sql_casts

a dictionary of sql casts to use when querying the source DB

Type

Optional[dict]

database_owner

owner of the tables to query (query scope)

Type

str