Streaming pipeline in Snowflake

In the recent years we can clearly observe the trends in data space. The existing technologies are constantly changing, new products come to the market, where one of the main motivation is the increased demand from the business. Firstly nowadays we are in a better position to choose a performant warehouse (specially if it is a cloud technology), as a result of that the so-called ETL jobs have transformed into ELT jobs. We do not have to worry about separated staging environment, just load the data into the target environment, and make the transformations on the target database, because it is performant enough. Secondly our so-called batch pipelines slightly moving to the streaming technologies, even sometimes we do not have yet the business case for it, it is worth to think about it to implement the pipelines directly as a streaming solution (In this context I am going to use the terminology of ‘stream’ as a micro-batch solution) rather then a batch one. In this blogpost I am going to show you what Snowflake can offer in this market.

Use-case: In this case we are going to use a S3 bucket as an “interface” for Snowflake. It is really convenient to use, and it is really easy integrate with any kind of AWS existing architecture, since this is the basic storage layer for AWS. We are going to place all the incoming messages in the bucket, and use it as a “staging” area. This can be done easily with AWS Firehose which is meant to be the service to transfer a stream into S3 bucket or either into Redshift. For the ingest part we are going to use the following objects in Snowflake: Stage, Pipe, Stream, Task. At first glance these terminologies could be confusing: why do we need all of them, and what is the difference between of them?

Stage: So let us start at the first component of this pipeline. Stage in Snowflake terminology basically means AWS S3 hook.

Snowflake documentation By creating a stage, we create a secure connection to our existing S3 bucket, and we are going to use this hook as a “table”, so we can immediately execute our SQL-like command to copy from this S3 bucket. This connection is going to use an IAM role defined by us in our account, which has a privileges to read from the specified bucket/folder. Once we created a stage, it generates a so called AWS_EXTERNAL_ID, and we are going to use this external id in the IAM Role’s Trust Policy. This Trust policy allows for this specific stage in the Snowflake account to assume the IAM Role. The details can be found in the Snowflake documentation from here https://docs.snowflake.net/manuals/user-guide/data-load-s3-config.html. An example for the stage creation and for the copy command:

CREATE STAGE some_stage
url='s3://some_path'
credentials = (aws_role = 'arn:aws:iam::some_role');</p>
COPY INTO target FROM @some_stage;

Pipe: After we defined the stage, we need an automation to process for all the incoming messages on the platform. This is done by the Pipe object (aka. Snowpipe). Pipe is going to execute our SQL COPY command in an automatised manner. In our case the trigger event for this pipeline a SQS notification system what we have to enable in our bucket first. The purpose of the SQS notification is to listen whether we placed a new object in the S3 bucket or not, and as a result of that the object will ingested to our target. What is the target? Well, it can vary from use case to use case what the target is, now we are going to use a persistent table with one column. No, it is not insane, its a pure one column table. So far we have not mentioned anything about schema. We have not started to think about the structure of the target table.

The cool thing is that we do not have to

because we put all our data into a variant (JSON) column which handles everything. JSON column format actually is not a new thing in the database technologies, but still there is huge differences in terms of performance or query freedom between the databases, and I can simply confirm that Snowflake handles amazingly this format. The PIPE DDL is also straightforward:

CREATE PIPE some_pipe as COPY INTO target FROM @mystage;

Stream:This object is really new, it is just came out in June 2019. We can think about stream as a view on the top of a table, which is a time window abstraction.

The syntax is really convenient, it is easy to create, and really easy to read from it, like this:

CREATE STREAM some_stream ON TABLE some_table;
SELECT * FROM some_stream;

Once we take something out the stream we have to insert it to somewhere (into a persistent table) in order to keep the records. If we just keep selecting from it e.g. using the worksheet, we’ll never get the same results, because we can only see the records which received from the last read time. With every read we basically shift towards our “transparent time window”, so we have to take care about this step.

Task:This object is really new as well in Snowflake (2019 June), we can describe it simply as a stored procedure. Defining a cron expression, this is going to schedule a code snippet defined at creation, like the following:

CREATE OR REPLACE TASK some_task
warehouse = mywh
schedule = '5 minute'
when
system$stream_has_data('some_stream')
as
INSERT INTO some_temp_table
SELECT id, name FROM some_stream
WHERE metadata$action = 'INSERT';

The cron expression above is going to limit our stream’s (micro-batch) velocity, which should be set to a time interval which makes sense from business prospective.

Looking at the example above, we have already ingested the data stream into a persistent table, why we ingest into an other one? Imagine a case where we receive a lot of different events in this stream, and we would like to route them into a different target tables. The code above would be then only just a first step in this case, and as the next step would be to SELECT the records from this temp table WHERE event= MY_EVENT_1, and INSERT INTO FINAL_TABLE.

Steaming Pipeline Concept in Snowflake

What is great in the above mentioned example? We could implement a streaming app, written only in SQL!

GREAT! Wait! Is it really GREAT?

Subjective opinion: I am not propagate here the folks to implement everything in SQL. Actually I am against it. I have similar feeling with the TASK as well. This article just shows the brand new features of Snowflake Warehouse which can be really useful for use cases. I believe every business logic related code should be visible and accessible, moreover it should be version controlled and not hided in warehouse deeply. So actually the logic which was defined in the TASK object could be taken out and orchestrated outside of Snowflake (e.g. Airflow), and also depending the complexity of the logic, it could be implemented in other language than SQL, like python, scala etc., so this could be then more flexible in terms of requirements, and it has better operation “cost” as well.

Sales spiel: Infinite Lambda has a team of data engineers ready to set you up with Snowflake immediately. Work with us to build out a cutting edge data warehouse, improve an existing one, or find ways to optimise operational costs.

Share on facebook
Share on twitter
Share on linkedin