Platform Specific Tools and Advanced Techniques
Photo by Christopher Burns on Unsplash
The modern data ecosystem keeps evolving and new data tools emerge now and then. In this article, I want to talk about crucial things that affect data engineers. We will discuss how to use this knowledge to power advanced analytics pipelines and operational excellence.
I’d like to discuss some popular Data engineering questions:
Modern data engineering (DE). What is it?Does your DE work well enough to fuel advanced data pipelines and Business intelligence (BI)?Are your data pipelines efficient?What is required from the technological point of view to enable operational excellence?
Back in October, I wrote about the rise of the Data Engineer, the role, its challenges, responsibilities, daily routine and how to become successful in this field. The data engineering landscape is constantly changing but major trends seem to remain the same.
As a data engineer, I am tasked to design efficient data processes almost every day. So here are a few things to consider that can help us answer these questions.
Modern data engineering trends
ETL vs ELTSimplified data connectors and API integrationsETL frameworks explosionData infrastructure as codeData Mesh and decentralized data managementDemocratization of Business intelligence pipelines using AIFocus on data literacy
ELT vs ETL
Popular SQL data transformation tools like Dataform and DBT made a significant contribution to the popularisation of the ELT approach [1]. It simply makes sense to perform required data transformations, such as cleansing, enrichment and extraction in the place where data is being stored. Often it is a data warehouse solution (DWH) in the central part of our infrastructure. Cloud platform leaders made DWH (Snowflake, BigQuery, Redshift, Firebolt) infrastructure management really simple and in many scenarios they will outperform and dedicated in-house infrastructure management team in terms of cost-effectiveness and speed.
Data warehouse exmaple. Image by author
It also might be a datalake in the center and it depends on the type of our data platform and tools we use. In this case, SQL stops being an option in many cases making it difficult to query the data for those users who are not familiar with programming. Tools like Databricks, Tabular and Galaxy try to solve this problem and it really feels like the future. Indeed, datalakes can store all types of data including unstructured ones and we still need to be able to analyse these datasets.
Datalake example. Image by author.Just imagine transactionally consistent datalake tables with point-in-time snapshot isolation.
I previously wrote about it in one of my stories on Apache Iceberg table format [2].
Introduction to Apache Iceberg Tables
Simplified data integrations
Managed solutions like Fivetran and Stitch were built to manage third-party API integrations with ease. These days many companies choose this approach to simplify data interactions with their external data sources. This would be the right way to go for data analyst teams that are not familiar with coding.
Indeed, why would we build a data connector from scratch if it already exists and is being managed in the cloud?The downside of this approach is it’s pricing model though.
Very often it is row-based and might become quite expensive on an enterprise level of data ingestion, i.e. big data pipelines. This is where open-source alternatives come into play. Frameworks like Airbyte and Meltano might be an easy and quick solution to deploy a data source integration microservice.
If you don’t have time to learn a new ETL framework you can create a simple data connector yourself. If you know a bit of Python it would be a trivial task. In one of my previous articles I wrote how easy it is to create a microservice that pulls data from NASA API [3]:
Consider this code snippet for app.py
import requests
session = requests.Session()
url=”https://api.nasa.gov/neo/rest/v1/feed”
apiKey=”your_api_key”
requestParams = {
‘api_key’: apiKey,
‘start_date’: ‘2023-04-20’,
‘end_date’: ‘2023-04-21’
}
response = session.get(url, params = requestParams, stream=True)
print(response.status_code)
It can be deployed in any cloud vendor platform and scheduled to run with the required frequency. It’s always a good practice to use something like Terraform to deploy our data pipeline applications.
ETL frameworks explosion
We can witness a “Cambrian explosion” of various ETL frameworks for data extraction and transformation. It’s not a surprise that many of them are open-source and are Python-based.
Luigi [8] is one of them and it helps to create ETL pipelines. It was created by Spotify to manage massive data processing workloads. It has a command line interface and great visualization features. However, even basic ETL pipelines would require a certain level of Python programming skills. From my experience, I can tell that it’s great for strict and straightforward pipelines. I find it particularly difficult to implement complex branching logic using Luigi but it works great in many scenarios.
Python ETL (PETL) [9] is one of the most widely used open-source ETL frameworks for straightforward data transformations. It is invaluable working with tables, extracting data from external data sources and performing basic ETL on data. In many ways, it is similar to Pandas but the latter has more analytics capabilities under the hood. PETL is great for aggregation and row-level ETL.
Bonobo [10] is another open-source lightweight data processing tool which is great for rapid development, automation and parallel execution of batch-processing data pipelines. What I like about it is that it makes it really easy to work with various data file formats, i.e. SQL, XML, XLS, CSV and JSON. It will be a great tool for those with minimal Python knowledge. Among other benefits, I like that it works well with semi-complex data schemas. It is ideal for simple ETL and can run in Docker containers (it has a Docker extension).
Pandas is an absolute beast in the world of data and there is no need to cover it’s capabilities in this story. It’s worth mentioning that its data frame transformations have been included in one of the basic methods of data loading for many modern data warehouses. Consider this data loading sample into the BigQuery data warehouse solution:
from google.cloud import bigquery
from google.oauth2 import service_account
…
# Authenticate BigQuery client:
service_acount_str = config.get(‘BigQuery’) # Use config
credentials = service_account.Credentials.from_service_account_info(service_acount_str)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
…
def load_table_from_dataframe(table_schema, table_name, dataset_id):
#! source data file format must be outer array JSON:
“””
[
{“id”:”1″},
{“id”:”2″}
]
“””
blob = “””
[
{“id”:”1″,”first_name”:”John”,”last_name”:”Doe”,”dob”:”1968-01-22″,”addresses”:[{“status”:”current”,”address”:”123 First Avenue”,”city”:”Seattle”,”state”:”WA”,”zip”:”11111″,”numberOfYears”:”1″},{“status”:”previous”,”address”:”456 Main Street”,”city”:”Portland”,”state”:”OR”,”zip”:”22222″,”numberOfYears”:”5″}]},
{“id”:”2″,”first_name”:”John”,”last_name”:”Doe”,”dob”:”1968-01-22″,”addresses”:[{“status”:”current”,”address”:”123 First Avenue”,”city”:”Seattle”,”state”:”WA”,”zip”:”11111″,”numberOfYears”:”1″},{“status”:”previous”,”address”:”456 Main Street”,”city”:”Portland”,”state”:”OR”,”zip”:”22222″,”numberOfYears”:”5″}]}
]
“””
body = json.loads(blob)
print(pandas.__version__)
table_id = client.dataset(dataset_id).table(table_name)
job_config = bigquery.LoadJobConfig()
schema = create_schema_from_yaml(table_schema)
job_config.schema = schema
df = pandas.DataFrame(
body,
# In the loaded table, the column order reflects the order of the
# columns in the DataFrame.
columns=[“id”, “first_name”,”last_name”,”dob”,”addresses”],
)
df[‘addresses’] = df.addresses.astype(str)
df = df[[‘id’,’first_name’,’last_name’,’dob’,’addresses’]]
print(df)
load_job = client.load_table_from_dataframe(
df,
table_id,
job_config=job_config,
)
load_job.result()
print(“Job finished.”)
Apache Airflow, for example, is not an ETL tool per se but it helps to organize our ETL pipelines into a nice visualization of dependency graphs (DAGs) to describe the relationships between tasks. Typical Airflow architecture includes a schduler based on metadata, executors, workers and tasks.
For example, we can run ml_engine_training_op after we export data into the cloud storage (bq_export_op) and make this workflow run daily or weekly.
ML model training using Airflow. Image by author.
Consider this example below.
It creates a simple data pipeline graph to export data into a cloud storage bucket and then trains the ML model using MLEngineTrainingOperator.”””DAG definition for recommendation_bespoke model training.”””
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.hooks.base_hook import BaseHook
from airflow.operators.app_engine_admin_plugin import AppEngineVersionOperator
from airflow.operators.ml_engine_plugin import MLEngineTrainingOperator
import datetime
def _get_project_id():
“””Get project ID from default GCP connection.”””
extras = BaseHook.get_connection(‘google_cloud_default’).extra_dejson
key = ‘extra__google_cloud_platform__project’
if key in extras:
project_id = extras[key]
else:
raise (‘Must configure project_id in google_cloud_default ‘
‘connection from Airflow Console’)
return project_id
PROJECT_ID = _get_project_id()
# Data set constants, used in BigQuery tasks. You can change these
# to conform to your data.
DATASET = ‘staging’ #’analytics’
TABLE_NAME = ‘recommendation_bespoke’
# GCS bucket names and region, can also be changed.
BUCKET = ‘gs://rec_wals_eu’
REGION = ‘us-central1′ #’europe-west2′ #’us-east1’
JOB_DIR = BUCKET + ‘/jobs’
default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘start_date’: airflow.utils.dates.days_ago(2),
’email’: [‘mi**************@gm***.com’],
’email_on_failure’: True,
’email_on_retry’: False,
‘retries’: 5,
‘retry_delay’: datetime.timedelta(minutes=5)
}
# Default schedule interval using cronjob syntax – can be customized here
# or in the Airflow console.
schedule_interval = ’00 21 * * *’
dag = DAG(‘recommendations_training_v6’, default_args=default_args,
schedule_interval=schedule_interval)
dag.doc_md = __doc__
#
#
# Task Definition
#
#
# BigQuery training data export to GCS
training_file = BUCKET + ‘/data/recommendations_small.csv’ # just a few records for staging
t1 = BigQueryToCloudStorageOperator(
task_id=’bq_export_op’,
source_project_dataset_table=’%s.recommendation_bespoke’ % DATASET,
destination_cloud_storage_uris=[training_file],
export_format=’CSV’,
dag=dag
)
# ML Engine training job
training_file = BUCKET + ‘/data/recommendations_small.csv’
job_id = ‘recserve_{0}’.format(datetime.datetime.now().strftime(‘%Y%m%d%H%M’))
job_dir = BUCKET + ‘/jobs/’ + job_id
output_dir = BUCKET
delimiter=’,’
data_type=’user_groups’
master_image_uri=’gcr.io/my-project/recommendation_bespoke_container:tf_rec_latest’
training_args = [‘–job-dir’, job_dir,
‘–train-file’, training_file,
‘–output-dir’, output_dir,
‘–data-type’, data_type]
master_config = {“imageUri”: master_image_uri,}
t3 = MLEngineTrainingOperator(
task_id=’ml_engine_training_op’,
project_id=PROJECT_ID,
job_id=job_id,
training_args=training_args,
region=REGION,
scale_tier=’CUSTOM’,
master_type=’complex_model_m_gpu’,
master_config=master_config,
dag=dag
)
t3.set_upstream(t1)
Bubbles [11] is another open-source tool for ETL in the Python world. It’s great for rapid development and I like how it works with metadata to describe data pipelines. The creators of Bubbles call it an “abstract framework” and say that it can be used from many other programming languages, not exclusively from Python.
There are many other tools with more specific applications, i.e. extracting data from web pages (PyQuery, BeautifulSoup, etc.) and parallel data processing. It can be a topic for another story but I wrote about some of them before, i.e. joblib library [12]
Data infrastructure as code
Infrastructure as code (IaC) is a popular and very functional approach for managing data platform resources. Even for data, it is pretty much a standard right now, and it definitely looks great on your CV telling your potential employers that you are familiar with DevOps standards. Using tools like Terraform (platform agnostic) and CloudFormation we can integrate our development work and deployments (operations) with ease.
In general, we would want to have staging and production data environments for our data pipelines. It helps to test our pipelines and facilitate collaboration between teams.
Consider this diagram below. It explains how data environments work.
Data environments. Image by author.
Often we might need an extra sandbox for testing purposes or to run data transformation unit tests when our ETL services trigger CI/CD workflows.
I previously wrote about it here:
Infrastructure as Code for Beginners
Using AWS CloudFormation template files we can describe required resources and their dependencies so we can launch and configure them together as a single stack.
If you are a data professional this approach will definitely help working with different data environments and replicate data platform resources faster and more consistently without errors.
The problem is that many data practitioners are not familiar with IaC and it creates a lot of errors during the development process.
Data Mesh and decentralized data management
Data space has significantly evolved during the last decade and now we have lots of data tools and frameworks. Data Mesh defines the state when we have different data domains (company departments) with their own teams and shared data resources. Each team has their own goals, KPIs, data roles and responsibilities.
For a long period of time, data bureaucracy has been a real pain for many companies.
This data platform type [4] might seem a bit chaotic but it was meant to become a successful and efficient choice for companies where decentralization enables different teams to access cross-domain datasets and run analytics or ETL tasks on their own.
Indeed, Snowflake might be your favourite data warehouse solution if you are a data analyst and not familiar with Spark. However, often it’s a trivial problem when you might want to read datalake data without data engineering help. In this scenario, a bunch of metadata records on datasets could be extremely useful and that’s why Data Mesh is so successful.
It enables users with knowledge about data, its origins and how other teams can make the best of those datasets they weren’t previously aware of.
Sometimes datasets and data source connections become very intricate and it is always a good practice to have a single-source-of-truth data silo or repository with metadata and dataset descriptions.
In one of my previous stories [5] I wrote about the role of SQL as a unified querying language for teams and data. Indeed, it analytical, self-descriptive and come be even dynamic which makes it a perfect tool for all data users.
Often it all turns into a big mes(s/h)
This fact makes SQL-based templating engines like DBT, Jinja and Dataform very popular. Just imagine you have an SQL-like platform where all datasets and their transformations are described and defined thoroughly [6].
Dataform’s dependency graph and metadata. Image by author.
It might be a big challenge to understand how data teams relate to data sources and schemas. Very often it is all tangled in spaghetti of dataset dependencies and ETL transformations.
Data engineering plays a critical role in mentoring, improving data literacy and empowering the rest of the company with state-of-the-art data processing techniques and best practices.
Democratization of Business Intelligence pipelines using AI
Improving data accessibility has always been a popular topic in the data space but it is interesting to see how the whole data pipeline design process is becoming increasingly accessible to teams that weren’t familiar with data before. Now almost every department can utilize built-in AI capabilities to create complex BI transformations on data.
All they need is to describe what they want BI-wise in their own words
For example, BI tools like Thoughspot use AI with an intuitive “Google-like search interface” [7] to gain insights from data stored in any modern DWH solution such as Google Big Query, Redshift, Snowflake or Databricks.
Modern Data Stack includes BI tools that help with data modelling and visualization. Many of them already have these built-in AI capabilities to gain data insights faster based on user behaviour.
I believe it’s a fairly easy task to integrate GPT and BI. In the next couple of years, we will see many new products using this tech.
GPT can pre-process text data to generate a SQL query that understands your intent and answers your question.
Conclusion
In this article, I tried to give a very high-level overview of major data trends that affect data engineering role these days. Data Mesh and templated SQL with dependency graphs to facilitate data literacy democratized the whole analytics process. Advanced data pipelines with intricate ETL techniques and transformations can be transparent for everyone in the organisation now. Data pipelines are becoming increasingly accessible for other teams and they don’t need to know programming to learn and understand the complexity of ETL. Data Mesh and metadata help to solve this problem. From my experience, I can tell that I keep seeing more and more people learning SQL to contribute to the transformation layer. Companies born during the “advanced data analytics” age have the luxury of easy access to cloud vendor products and their managed services. It definitely helps to acquire the required data skills and improve them to gain a competitive advantage.
Recommended read
[1] https://medium.com/towards-data-science/data-pipeline-design-patterns-100afa4b93e3
[2] https://towardsdatascience.com/introduction-to-apache-iceberg-tables-a791f1758009
[3] https://towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd
[4] https://medium.com/towards-data-science/data-platform-architecture-types-f255ac6e0b7
[5] https://medium.com/towards-data-science/advanced-sql-techniques-for-beginners-211851a28488
[7] https://docs.thoughtspot.com/cloud/latest/search-sage
[8] https://github.com/spotify/luigi
[9] https://petl.readthedocs.io/en/stable/
[10] https://www.bonobo-project.org
[11] http://bubbles.databrewery.org/
[12] https://medium.com/towards-data-science/how-to-become-a-data-engineer-c0319cb226c2
Modern Data Engineering was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.