Processing real-time data in motion has historically been challenging and expensive. The perceived difficulty put it out of the question for many, despite numerous use cases and benefits.
In this article, we will dive deep into a new form of SQL that makes processing data in motion much easier, as well as a SaaS solution that makes streaming data processing much cheaper.
We will provide 3 tutorials that show you how to process data in motion with SQL and Estuary Flow, which is a low-code ETL platform that streamlines the creation and maintenance of data pipelines, ingesting and transforming data in real time from sources to destinations.
But first, let's talk about why the current status quo of SQL is limited.
A Brief History of SQL
SQL was developed over half a century ago by IBM. It has since then become the industry-standard language for database creation and manipulation.
What has changed since then?
SQL was developed at a time when data was scarce, and expensive to store, analyze, and use. Real-time data was barely available if at all. Historically, SQL has been primarily used to query static data - data at rest.
Although fast access to insights has always been a north star for analytics and SQL, in the past this was almost impossible.
But, this has changed.
Over the years, the Big Data era has changed how businesses interact with data. In 2013, the global market for Big Data reached $10 billion. In 2016, the world has experienced unprecedented data growth. 90% of the world’s data at that time was created in the previous 2 years alone. With this amount of data becoming available to the world, it’s no longer sufficient to query static data only.
The need is clearly there. Now, we just need a solution.
In 2009, LinkedIn invented Apache Kafka, which made stream processing possible. It was a big step towards bringing stream processing to the mainstream. However, to manage the infrastructure and tooling, significant development resources were needed.
In the following years, several SaaS platforms such as Estuary began to offer managed solutions, dramatically simplifying implementation complexity and ease of use, making real-time streaming data more accessible, and making the handling of it more approachable.1
In the last few years, some of these SaaS platforms have added the capability to apply plain SQL to data in motion, not just data at rest. This is what we call Streaming SQL.
Streaming SQL has upped the game of what data-driven organizations are able to do with the vast amount of data available in order to gain a competitive edge.
For example, instead of analyzing month-to-date app usage as of yesterday, the rise of Streaming SQL means analyzing month-to-date usage as of seconds ago.
Allied Market Research estimated that the data analytics market will grow to $420.98 billion by 2027.2 Those who will make the best out of this big opportunity will be the ones leveraging powerful tools such as Streaming SQL.
What is Streaming SQL?
As already mentioned, in recent years the enormous growth in data volumes and the increasing need for real-time data analysis have made SQL a crucial component of data management and business analytics.
However, traditional SQL solutions that operate on stored data in databases cannot effectively support real-time stream processing requirements.
As a result, there is a need for a new type of SQL that can process continuous data streams. This is where Streaming SQL comes in.
Before we dive deeper, let us examine what exactly we mean by “streaming” in this context.
"Streaming" refers to the handling of data as a continuous flow of events or messages through message brokers such as Kafka, Gazette, Amazon Kinesis, or Pulsar.
These event streams can include various types of data, from user actions on websites or mobile apps, IoT sensor data, and server metrics to traditional database activities captured using change data capture (CDC).
Traditional SQL runs on databases while Streaming SQL runs on streams of live data. Running SQL on databases returns a static set of results from a single point in time.
On the other hand, with Streaming SQL you could run the exact same SQL query on a stream of real-time data and get a point-in-time answer.
In short, Streaming SQL is designed to process subsets of data quickly and deliver results in real time.
Streaming SQL can transform, filter, aggregate, and enrich data in flight, making it a powerful tool for organizations to extract maximum value from constantly streaming data.
Also, Streaming SQL can work with a wide range of data sources and environments, from public cloud platforms to Kafka, Hadoop, NoSQL, and relational databases.
Streaming SQL has become an essential part of effective real-time stream processing solutions. It enables data-driven organizations to analyze and act on streaming data and to quickly identify the value in new data. With Streaming SQL, organizations can quickly adopt a modern data architecture and create streaming data pipelines to support their business needs.
Streaming SQL vs. Traditional SQL
Streaming SQL can be understood as a variant of traditional SQL where Streaming SQL is specifically designed to process data streams in real time.
There are two major differences between traditional SQL and Streaming SQL.
- Static vs Continuous Stream: As mentioned, the primary difference is that traditional SQL operates on static data stored in databases, whereas Streaming SQL works with continuously flowing data streams as they are generated, potentially by multiple sources. This continuous nature is what makes them valuable compared to traditional SQL solutions.
Also, Streaming SQL solutions use windows and event tables to trigger actions when data changes. You can write SQL queries for streaming data without writing code.
In a nutshell, Streaming SQL is best for processing data streams that are constantly changing, while traditional SQL is best for querying and analyzing data that is already stored in a database.
- Sliding Window Approach: Streaming SQL uses a sliding window approach, which involves breaking the data stream into small, discrete segments or windows and processing each window separately. This approach enables Streaming SQL to handle data streams that are too large to be processed as a single entity.
The Need for Streaming SQL
While several CDC tools allow you to ingest data from a source to a target, few offer SQL transformation capabilities. Often, replicating data as-is from one place to another may not be sufficient for your needs.
For example, you may want to do some filtering, apply certain calculations to your source data, or aggregate data from multiple documents before the data arrive at the destination.
Other common use cases include merging across several collections using a common key and applying business logic to the source data.
Using derivations in Flow, you can perform a variety of transformations, from a simple remapping use case to a highly complex stateful transaction processing.
A derivation is a data collection that is derived from applying transformations to one or more source collections. Derivations work continuously, ensuring they stay in sync with any updates made to the source collections in real time.
Flow enables you to write derivations using either SQLite or TypeScript in three simple steps.
The Flow Derivations doc here walks through a tutorial that illustrates a complex stateful transaction processing use case. However, often you may just need to apply a simple transformation to your data, which is what this tutorial aims to show.
Four Common Use Cases for Streaming SQL
SQL transformation can be extremely useful during data replication as it allows data to be transformed and modified as it is being replicated from one source to another.
The following are a few ways in which SQL transformation can be used during data replication.
- Data cleansing: Data replication often involves moving data from one system to another, which can result in data quality issues. You can use SQL transformation to clean and standardize data during replication, ensuring that the data is consistent across systems.
- Data mapping: When replicating data between two systems, it is often necessary to map fields from one system to another. You can use SQL transformation to map fields and transform data types during replication, ensuring that the data is properly mapped and formatted.
- Data filtering: During replication, it may be necessary to filter out certain records or data elements based on specific criteria. You can use SQL transformation to filter out data during replication, ensuring that only the relevant data is replicated to the target system.
- Data aggregation: During replication, it may be necessary to aggregate data from multiple sources or to create summary data for reporting purposes. You can use SQL transformation to aggregate and summarize data during replication, making it easier to analyze and report on.
Overall, SQL transformation is useful during data replication because it allows you to transform, clean, and format data as it is being moved from one system to another. This helps ensure that the data is consistent and accurate across systems, and that it is properly mapped and formatted for its intended use.
Tutorial Example 1: Stateless Transform
In this tutorial, we will walk through a few SQL transformation use cases, showing you how and where to put your SQL queries to transform your data collection using Estuary Flow.
Note that Flow integrates with GitPod to let you leverage the full capabilities of SQLite. GitPod is free to use. It is an online Integrated Development Environment (IDE) that provides a complete development environment that can be accessed through a web browser, with all the necessary tools and dependencies pre-installed.
Scenario
Suppose you have a table of employees, with their names, address, and region they are in:
And you want to filter by region, and derive a target collection where only employees in the US are included.
Suppose this is the end result you want in your destination:
Prerequisites
Before following the steps below, you should already have a collection you want to apply the transformation to.
- Create an employees table in any database of your choice.
- Create a Capture in Estuary Flow. If you’re new to Flow, you can get started with a free account.
- Name the Capture: employees
Tutorial Steps
Navigate to the Collections page in Flow and click “+ NEW TRANSFORMATION”.
This brings up a pop-up window to derive a new collection.
Step 1: Select source collections
In the “Available Collections” dropdown, find the source collection you want to apply streaming SQL to. In our case: JennyPoc/sqlserversource/employees
. Select it.
Step 2: Transformation Language
For Language, there are two options: SQL or Typescript. For this tutorial, we will select: SQL.
Step 3: Write transformations
Finally, give your derived collection a name. We will name ours sqldemo
.
Click the blue button “PROCEED TO GITPOD”.
You will then receive this message in Flow, informing you that a GitPod window has opened in another tab.
Proceed to GitPod and sign in using your GitLab, GitHub, or BitBucket account.
After logging into GitPod, an environment like the following is already set up for you:
In your flow.yaml file in your workspace folder, make the following updates:
- Update the schema specs to include the fields you want to see in your derived collection.
Note: All Flow collections have a JSON schema, which defines their structure, representation, and constraints. You can learn more about collection schemas here.
You can either manually update the JSON schema or use --infer-schema to get your schema specs automatically populated. See below for further details on how to run the command.
- Designate a key (For our example, we will key our collection on EmployeeID.)
- Remove migrations .sql filename and replace with
{}
(Note that migration is not needed for a stateless transformation. Review this article to understand more about stateful vs stateless processing.) - Replace the lambda .sql filename with SQL select statement that filters on Region:
plaintextSelect $EmployeeID, $LastName, $FirstName, $Address, $Region where $Region = ‘US’;
To see a preview, run the following flowctl command on the Terminal tab in GitPod:
plaintext$ flowctl preview --source flow.yaml --interval 200ms | jq -c 'del(._meta)'
To use schema inference mentioned above, run the following flowctl command with the --infer-schema
flag:
plaintext$ flowctl preview --source flow.yaml --infer-schema --interval 200ms | jq -c 'del(._meta)'
To stop the preview, type Ctrl Z or Ctrl C:
If we publish a materialization on this data, the resulting table looks like this:
This tutorial video walks through the above steps live.
In our next example, we will show how to use migrations and lambda, as well as how to publish the materialization.
Tutorial Example 2: Stateful Transform
Scenario:
For our next two examples, we will create a transformation off of our Wikipedia live demo, which captures change events on Wikipedia edits from the Wikipedia API and then ingests the collection to a Google Sheet in real time.
The original materialized collection includes the following fields:
- The date of the Wikipedia edit
- Whether the edit was made by a bot or a human
- Total edits
- Total new lines
For Example 2, we will apply a simple SQL transformation to retain the first and last time we saw a user, and their lifetime edits.
For Example 3, we will apply another SQL transformation on the same dataset to show aggregations.
Prerequisites:
Again, as a prerequisite before you add a new transformation, you should already have a collection you want to apply the transformation to.
For the next two examples in this tutorial, we will apply the transformation to this collection in the demo prefix: demo/wikipedia/recentchange-sampled
This collection is a 3% sample of the enormous demo/wikipedia/recentchange
collection which contains millions of documents. Since the purpose of this tutorial is to demonstrate a proof of concept, we avoid publishing a derivation that processes hundreds of gigabytes of data.
Tutorial Steps
To begin, navigate to the Collections page in Flow and click “+ NEW TRANSFORMATION”.
The “Derive A New Collection” screen opens.
Step 1: Select source collections
In the “Available Collections” dropdown, find the collection demo/wikipedia/recentchange-sampled
and select it.
Step 2: Transformation Language
For Language, there are two options: SQL or Typescript. For this tutorial, we will select: SQL.
Step 3: Write transformations
Finally, give your derived collection a name. We will name ours sqltransformdemo
.
Click the blue button “PROCEED TO GITPOD”.
After logging into GitPod, an environment like the following is already set up for you:
In the flow.yaml file, a specification is already created to get you started in implementing your derivation.
There are a few components in the flow.yaml. Since a derivation is a collection, it has a schema. The top part of flow.yaml is a JSON schema of the collection, including its properties and key.
You need to update your schema specs and key accordingly. This user guide is also a good reference.
Because this is a SQLite derivation, it can have one or more migrations that it can use, as well as one or more transforms. The migrations and transforms are listed under the derive section of the flow.yaml file.
What a transform does is: Every time a document is available from the source collection, it reads from the collection of data, and evaluates the lambda function, which is a block of SQL statements in the case of a SQLite transform.
Depending on the complexity of your SQL statements, you can either put them directly in flow.yaml under the derive section, or use the migrations and lambda .sql files to hold your SQL, in which case the flow.yaml holds the filenames of the migrations and lambda .sql files.
For this tutorial, we will fill out the flow.yaml as follows:
This time, we are keying our collection on user
.
Next, the migrations .sql file contains the new table that your derivation will use.
You can use migrations to create or alter tables. Each migration is run only once, and new migrations will be applied as needed.
Note that not all derivations require creating a new table, such as Example 1 and 3.
By default, the <derivation name>.migration.0.sql
is populated with the following example:
For our tutorial, we will create a new table users
as follows:
plaintextcreate table users (
user text primary key not null,
first_seen text not null,
last_seen text not null,
total_edits integer not null,
total_lines integer not null
);
Next, the lambda file is where you put your SQL transformation statements. By default, it is populated with the following example, showing you examples of fields you can leverage:
For this tutorial, we will use the following SQL statements to track usernames, first seen and last seen timestamps, total edits, and total new lines.
Put the following in your SQL lambda file:
plaintextinsert into users (user, first_seen, last_seen, total_edits, total_lines)
select $user, $meta$dt, $meta$dt, 1, coalesce($length$new - $length$old, 0)
where $type = 'edit' and $meta$dt
on conflict do update set
last_seen = $meta$dt,
total_edits = total_edits + 1,
total_lines = total_lines + coalesce($length$new - $length$old, 0);
select
user,
first_seen,
last_seen,
total_edits,
total_lines
from users
where $type = 'edit' and user = $user;
Let’s dissect these SQL statements a bit.
You may be wondering why we are setting both first_seen
and last_seen
to $meta$dt
- where are we minimizing or maximizing those values?
We are maximizing the last_seen value through this upsert statement in the lambda:
plaintext on conflict do update set
last_seen = $meta$dt,
total_edits = total_edits + 1,
total_lines = total_lines + coalesce($length$new - $length$old, 0)
The upsert makes last_seen
to be the latest $meta$dt
when the user makes an edit.
The upsert is also aggregating total_edits
and total_lines
. It doesn’t require a schema annotation as it is accumulated in the database.
Lastly, the reason why we are inserting data into tables and then selecting them out is because we want to track states for this use case. This is only necessary for a stateful transform. In order to keep track of when we first and last saw a user across events, we need to store this info somewhere.
To run a preview, run the following flowctl command on the Terminal tab in Gitpod:
plaintextflowctl preview --source flow.yaml --interval 200ms | jq -c 'del(._meta)'
You should already be authenticated, but if you leave the window open for too long, you may be asked to authenticate again.
To do that in the Terminal, run flowctl auth login
:
You can get your access token for flowctl from the UI. Navigate to the Admin page, CLI-API tab:
When the flowctl preview command runs, you should see the live updates from the derivation as follows:
To stop the preview, type Ctrl Z or Ctrl C:
To publish the derived collection, run the flowctl catalog publish
command as follows. See this doc for a complete flowctl usage guide.
You should now see your published collection in the Flow UI:
Note that SQL statements are applied on a go-forward basis only, so you will see updates reflected in your destination whenever new source documents arrive at your source.
Tutorial Example 3: Aggregations
In this example, we will use the same dataset from our Wikipedia demo collection to show aggregations.
We will look at the total number of lines per edit and include the date of the edit and whether the edit was made by a bot or a human.
To do that, first update the flow.yaml file with the following schema specs and remove <derivation name>.migrations.0.sql
from flow.yaml, as we will not be creating a new table for this derivation.
The properties in our schema include:
- date
- bot (boolean field indicating whether the edit was made by a bot or a human)
- total_lines
- total_edits
We are keying our collection on the date field this time.
Notice we are adding a merge annotation to our schema specs:
When multiple documents get added to collections that share a common key, Flow opportunistically merges all such documents into a single representative document for that key through a strategy called reduction.
The above reduce annotation provides Flow with the specific reduction strategy to use at your document locations. You can learn more about reductions here.
Next, populate the lambda file with the following SQL statement:
plaintextselect
date($meta$dt) as date,
$bot,
coalesce($length$new - $length$old, 0) as total_lines,
1 as total_edits
where $type = 'edit';
The coalesce
extracts the delta change of the particular source document.
Now run the preview to take a look at the output:
plaintextflowctl preview --source flow.yaml --interval 200ms | jq -c 'del(._meta)'
As before, to publish the derived collection, run the flowctl catalog publish
command.
See this doc for a complete flowctl usage guide.
See this tutorial video for a deeper dive and another example on how to apply SQL transformations to Wikipedia live data.
Conclusion
In this tutorial, we have walked through how to use simple SQL statements to create derivations in Flow.
Using Streaming SQL, you can easily apply a wide range of transformations to your data before they arrive at the destination in your data pipeline.
We have walked through three examples. In the first example, we apply a simple SQL query to filter streaming data on a specific column. In the second example, using our Wikipedia live demo, we applied a simple SQL transformation to retain the first and last time we saw a user and their lifetime edits. In the third example, we applied another SQL transformation to show aggregations of the raw data.
SQL transformation is a powerful capability during data replication because it allows data to be transformed, cleaned, and formatted as it is being moved from one system to another. This can help ensure the data is consistent and accurate across systems, and that it is properly mapped and formatted for its intended use.
Try the SQL transformation capability in Flow today! If you’re new to Flow, you can register here for free.
References
About the author
With over 15 years in data engineering, a seasoned expert in driving growth for early-stage data companies, focusing on strategies that attract customers and users. Extensive writing provides insights to help companies scale efficiently and effectively in an evolving data landscape.