The Data Pipeline delivers data to data consumers. Different Data Pipelines tackle different tasks — for example data transport, data enrichment and transformation. The network of Data Pipelines can quickly become very complex and ultimately unmanageable. This is why every well-organized Information Factory invests heavily in Data Pipeline design and maintenance.
One design principle which pays into maintainability and efficient operation is «Consistency», the imperative to use one and the same pattern everywhere. Another design principle which pays into scalability is «Everything automated», the imperative to leverage the metadata-driven approach to allow a maximum degree of automation. In other words, the creation of all data transformations except business rules are generated from metadata.
But how to translate these principles into action?
This tutorial explains how meta-information about data pipelines is collected, and how this metadata is used to quickly spot problems and bottlenecks. We use dbt (data build tool) to show how logging is added to the data pipeline. This tutorial relies on an existing dbt project running on BigQuery. If you are new to dbt and BigQuery, this blog post will help you set up your first project.
In order to log all jobs run by dbt on BigQuery, we take advantage of three features:
- The information schema provided by BigQuery
- The invocation_id generated by dbt for every run
- On-run-end feature of dbt
First, let’s inspect the information schema. The information schema is automatically generated by BigQuery and provides access to metadata about datasets, tables and jobs. Here we use the INFORMATION_SCHEMA.JOBS_BY_USER view to analyze the metadata about BigQuery jobs, such as total_bytes_processed or total_slot_ms. This information comes in handy when profiling your data pipelines and spotting problems. For example total_bytes_processed can be used to find queries, which process a lot of data and can possibly be optimized for efficiency. On the other hand, profiling total_slot_ms over time for the same query can help to find queries that become slower with time.
The INFORMATION_SCHEMA.JOBS_BY_USER view specifically returns only jobs that were submitted by the current user. Make sure that you have the bigquery.jobs.list permission for your project to access the INFORMATION_SCHEMA.JOBS_BY_USER view. This permission is for example part of the predefined IAM role “BigQuery User”, which can be granted on dataset level.
Here is an example query to retrieve information of interest about a data pipeline.
Using this query, we can for example analyze who is using how much slot time for their jobs and how much bytes they are processing with their queries. We can also retrieve error messages for failed queries.
Note that dml_statistics and error_result are RECORD data types, which are nested structures. Since they don’t contain repeated data we don’t need to unnest the data using joins.
The second required ingredient to logging is the invocation_id. It is generated for every dbt command that is run and is included as a label in all BigQuery jobs that dbt initiates. The invocation_id identifies the actual load in question. We will explain how to access this information later in the blog.
With these two preparations, the actual logging can be performed after each load. dbt provides the on-run-end hook to execute code at the end of each load. On-run-end hooks allow dbt users to define an SQL statement to be run at the end of every dbt command, such as dbt run. Instead of writing the SQL code manually, macros can be used in the on-run-end hook. Macros are pieces of code written in Jinja, which allow you to expand the standard SQL language to for example use for or if loops. In our case, we use the two previous features (INFORMATION_SCHEMA and invocation_id) to write a macro which extracts the desired logging information and writes it to a specified table. This macro is then handed to the on-run-end hook and makes sure that every run in dbt gets documented properly.
First, let us define the macro that needs to be executed by dbt after every run. This is done in the dbt_project.yml file using the following code snippet:
Of course, this macro has to exist. It is not part of the dbt release. Here is the definition:
This file must be stored in the macro folder. You can choose the file name freely (e.g. macros_for_logging.sql), but make sure that the name of the macro (get_query_run_details) is the same as the one you have defined in your dbt_project.yml. The macro uses the INFORMATION_SCHEMA.JOBS_BY_USER view, which always has to be specified by the region to gain access to the job metadata.
We then filter this selection based on the invocation_id to find the metadata on the last job we ran using dbt. More specifically, the invocation_id is stored in the value field of the labels record. Since there can be more than one label assigned to one BigQuery job, the label record might contain repeated values and we need to unnest it.
Last but not least we take advantage of the fact that the INFORMATION_SCHEMA is partitioned by the creation_time and only query jobs run on the current day, as we are only interested in the last job.
All jobs labeled with the current invocation_id are then inserted into a table that we specify in the get_query_run_details macro (dwh_log.dbt_query_log (line 3) in the example above). Here is a preview of how you logging table schema could look like:
The adjustment to your dbt_project.yml file and the addition of the new macro is all you need to ensure that all your jobs running in BigQuery that are originated by dbt are logged properly. This then allows you to keep track of the changes to your data warehouse you build using dbt.
Collecting performance data over time about all jobs run in your data warehouse allows you to identify different kinds of issues with your data pipelines. You can for example use the execution time of a given query and analyze the execution duration over time. This allows you to identify queries, which become slower over time and rethink those, before they become a problem. Another use case of the logging information is to find long running queries. These are queries that need a long time to complete and are therefore a good target for optimizing performance.
Logging information can also be used for debugging. The detailed information BigQuery collects, e.g. error messages and reason are helpful tools to find queries that cause your scheduled load to fail. With detailed information about the problem a certain query has caused, you can more easily address the issue and make sure that your loading pipeline runs smoothly the next time.
Overall, BigQuery together with dbt is a powerful tool to smoothly operate your information factory. By logging all your jobs run in your data pipelines you can make sure to stay on top of any problems. It helps to build efficient and consistent data pipelines and allows you to find and address problems quickly. Smoothly running operations allow you to deliver data consistently to your data consumers and make data driven business decisions.
If you want to know more about other opportunities that dbt and BigQuery offer to improve your information factory, make sure to check out our other blog posts.