For this letter, I’ll be retelling a collection of firsthand and secondhand stories from the perspective of a fictional junior-mid Data Engineer’s situation.
Disclaimer
This letter focuses on AWS and Spark tools for a single method. Methods change over time. The underpinning of Business Analytics problems change less frequently.
I have joined a company with some Data Engineering processes in place. Jobs are scheduled. There are a few optimizations in place; a couple of spark jobs are submitted to generate a daily dashboard. The Data Warehouse is a bit of a data lake, where the main purpose of the Warehouse is to prevent analysts from having to write cross-database joins. Warehouse app usage data is basically a daily copy of the OLTP database that our app runs on.
Analytics are generally good and improving at this small org!
But beneath this general peace and well-being, there lies a panic. A panic that management will ask a question like, “How many customers did we have 6, 12, and 18 months ago?”.
Panic
Analyst: “What do you mean we can’t just filter on date and figure out the customer count?”
Fictional DE: “The data warehouse is basically a copy of the current state of our application, we don’t have a history of when a row might have been updated, deleted, or inserted other than the latest update. And even then, we don’t know what type of update it was! It’s a bunch of state complexity that we do not capture.”
Analyst: “Ok I understand we can’t get exact numbers. What’s the next best option?”
Fictional DE: “Backfills are the standard method for computing metrics in the past.”
Handling State History with Backfills is Never Pretty
Maxime Beauchemin, CEO & Founder at Preset, writes in Functional Data Engineering
A common pattern that leads to increased “state complexity” is when a partition depends on a previous partition on the same table. This leads to growing complexity linearly over time. If for instance, computing the latest partition of your user dimension uses the previous partition of that same table, and the table has 3 years of history with daily snapshot, the depth of the resulting graph grows beyond a thousand and the complexity of the graph grows linearly over time. Strictly speaking, if we had to reprocess data from a few months back, we may need to reprocess hundreds of partition using a DAG that cannot be parallelized.
Since so much of data is work cumulative, it can be really tricky to subtract away changes that were not tracked over time. On top of that, backfillls are inefficient because a Backfill job is depends on sequential Dynamic Data Partitioning. Robert Chang does a great job of explaining Backfilling here.
In order to get a long-term win at this organization, I’m going to propose what I’ve been reading about modern Data Lakes. I don’t know how invested my manager is going to be about this proposal, so I’ll lay it out in two tiers of engineering time investment, with the pros for each one.
Tier 0: Solving Historical Analytics with CSVs
Our Relational Database that our app runs on is meant to reflect the reality of our app. It’s not meant to run analytics or look in the past. So, we need to start storing CSV copies of our most important tables in S3 cloud storage. This is called Daily Snapshotting and can be solved with scheduled, parametric, SQL like so:
tblnames = ['table1', 'table2']
for i in tblnames:
sqlcursor.execute('''
SELECT *
FROM {table}
INTO S3 # pseudo-syntax
'''.format(table=i)
)
It’s not the most elegant code but it works and it solves a big business problem; A lack of Historical Analytics. All this requires is a single parameterized SQL command in a for-loop of our table names, scheduled everyday. There won’t be much maintenance on that front.
And AWS provides another service, Athena, which will let us query these CSV files as if they were a database! So we can still stay in SQL world, which is preferable for future maintenance as well.
The sooner we start, the better our data history is going to be! Storage is so cheap nowadays, that the cost for basic historical analytics should be worth it for us if we want to be a data-driven organization.
Immutability is important
An important note is that these CSVs should never be modified. In other words, we want to take advantage of immutable data storage, because the past is written. And it is written in rows and columns.
Tier 1: Solving Data Lineage
As our business grows, we will need to be able to adapt to changes in business needs. Let’s say our business adds a new service. That adds a new table and a couple of foreign key columns. How do we adapt to performing analytics on a changing schema? How are we supposed to remember which columns exist where and what datatype they are or were.
This confusion can be solved with implementing Data Lineage (keeping track of all metadata changes over time) is something we could pay another company tens of thousands of dollars to do, or I could write a few more lines of code with a focus on maintainability!
In my opinion, Parquet files offer the easiest solution for solving Data Lineage problems. The solution is a single python script, which could be orchestrated in Airflow if we wanted to be fancy. This python script will import pyspark modules and change our CSV captures into Parquet files.
from pyspark import SparkContext
from pyspark.sql.types import *
if __name__ == "__main__":
sc = SparkContext(appName="CSV2Parquet")
rdd = sc.textFile('inputfile.csv')
df = sc.read.csv(rdd)
df.write.parquet('outpoutfile.parquet')
As we know, database objects like columns and tables change over time.
Parquet files capture these changes by storing metadata at the header of the file. Meaning, our organization’s analytics can adapt to a living and breathing business by keeping visibility into how these parquet files are structured.
What about Slowly Changing Dimensions (SCD)
There is another solution available to us, which is slowly changing dimensions. This is more of an old school approach, as evidenced by the Oracle website link.
This approach is optimized for keeping storage low. Meaning, we don’t duplicate data by brute, SELECT star, force! With SCDs, we would add nuanced modifications and historical capture at the row level of our Data Warehouse.
What about Temporal Tables
Temporal Tables are a solution provided by Microsoft SQL Server. It’s basically a version controlled table where we “archive” a changed row in the same table by keeping track of the change, time of change, in appended columns. Temporal tables are a little hard to wrap your head around unless you have worked with them.
Since my focus is on AWS and open source tools, I’ll just leave this link if you are interested to pursue this method.
Parquet Highlights
Parquet files are columnar storage
Columnar storage means that our data is physically organized so that data from the same column hangout together in storage. This is great for analytics because we typically want to know aggregations of cumulative numbers. We don’t want to single out observations, most of the time.
The jargon phrase for these benefits is “read-optimized”.Columnar storage is a more secure storage option than raw CSVs. Especially for GDPR requests, it would be amazingly difficult and counter to our principles to have to go back and delete rows in our Data Lake. Remember immutability!
Keeping this observation level data obscured with write proper write permissions and columnar storage is a great solution for juggling GDPR requests and historical analytics.Columnar storage is great for writing once and reading many times.
Since the datasets are immutable, we don’t have to care about the slow write time because it is a single up-front cost.
Parquet vs ORC files
ORC is a self-describing, type-aware columnar file format designed for Hadoop workloads. It is optimized for large streaming reads, but with integrated support for finding required rows quickly.
The reason ORC files have this row level optimization is that it is optimized for Hive, a Data Warehouse solution available in the larger Hadoop ecosystem.
I am a big proponent of going with the industry standard first, and optimizing after. Spark is much more of a standard than Hive, so I recommend Parquet as it is further optimized for our presumed PySpark scripts that form our architecture.
This brings me to the next point,
Parquet files are extensible
These files can be changed back to CSV files if we need them uncompressed for any reason.
Keep in mind, because of security reasons, this functionality should almost never be used and permissions to unpack parquet files should be very limited.Parquet files will work in any cloud environment and both Python and Spark are industry standards that work with any tool, app, service you would want in Analytics!
Conclusion
Building a Data Lake in cloud storage with daily Parquet files is a simple solution for two common business analytics problems; Accurate Historical Data and Data Lineage. Not only that, the focus on SQL and simple Python/PySpark scripts makes this Data Lake easy to use and maintain.
Finally, having a robust Data Lake with complete history gives us a more solid foundation for the next step of Data Processing, our Data Warehouse.
Curated Content
Optimizing Spark SQL JOINs
Cloud Storage Types explained
About the Author and Newsletter
I automate data processes. I work mostly in Python, SQL, and bash. To schedule a consultancy appointment, contact me through ocelotdata.com.
At Modern Data Infrastructure, I democratize the knowledge it takes to understand an open-source, transparent, and reproducible Data Infrastructure. In my spare time, I collect Lenca pottery, walk my dog, and listen to music.
More at; What is Modern Data Infrastructure.