top of page

Streaming Analytics in Snowflake

Writer's picture: ergempergemp

Updated: Feb 15, 2024



What is Click-stream


Click-stream is the data produced by the customer journey on a web-site to obtain each customers behaviour and derive the individual intension. This data will result in recommendation engines, personalized contents and so more.


Managing click-stream data is one of the challenging problems of data engineering. The rate of production and the large volume in a short time brings to find a solution for both storing and processing the data at the very same time.


Usually a streaming data platform is preferred with the real-time analytics capabilities. This brings analyzing real-time data before even storing it into a data-fabric. Archival of click-stream data is usually done with any kind of scalable, distributed data platform for further batch analysis especially for historical reasons and also seasonal data analysis.



Storing Click-Stream Data in Snowflake


In normal circumstances Click-stream data is stored (queued) by a streaming platform for low latency. But in this post we assume the click-stream data is going to be inserted directly to a snowflake table.


Snowflake is a specific solution for data warehousing. So it is not recommended to expect OLTP performance from Snowflake.

Plase check the following links for hybrid tables.



Test Case


Sample Payload

{
    "sid": "70495ad7-60f5-460a-9a77-61bb4a221223",
    "pid": "fab11176-087f-4e6c-92c6-405710f72f64",
    "userId": null,
    "ts": "1592374901",
    "event": "impresssionView",
    "productId": "000004",
    "ip_address": "243.131.145.237",
    "productList": [
        {
            "productIds": "000014"
        },
        {
            "productIds": "000017"
        },
        {
            "productIds": "000042"
        }
    ]
}

For this test case I am going to produce some mock click-stream data and simulate the continuous insert into a table.


Mock Data

Mock data produced earlier is inserted into the sample table. The definition of this mock data is:


SID: Session ID


PID: Persistant ID


USERID: if the user logged in to the website, user's id within the application will be logged in the click data.


TS: Timestamp of the click data.

EVENT: Event type of this click is logged here. In this test case event can be one of the impresssionView, orderSummary, productView, newSession.


PRODUCTID: If the click event supports only one product, relevant product id will be logged in this column.


IP_ADDRESS: The origin of this click event


PRODUCTLIST: If the click event supports multiple products, each product is logged to this column in a JSON array.


create or replace TABLE CLICK_STREAM_SAMPLE (
	SID VARCHAR(16777216),
	PID VARCHAR(16777216),
	USERID NUMBER(38,0),
	TS TIMESTAMP,
	EVENT VARCHAR(16777216),
	PRODUCTID NUMBER(38,0),
	IP_ADDRESS VARCHAR(16777216),
	PRODUCTLIST VARCHAR(16777216)
);

Dripping Click Stream Data to Table


For simulating click-stream data to be loaded on our event table we need to create a procedure which is going to run timely fashion and insert 5 records for every one second.


create or replace procedure insert_clickstream_slowly()
returns varchar
language sql 
as
declare
  delay integer := 0;
  c1 cursor for select * from click_stream_sample order by ts desc;

  p_sid varchar;
  p_pid varchar;
  p_userid number;
  p_event varchar;
  p_productid number;
  p_ip_address varchar;
  p_productlist text;
begin
  open c1;
  for r1 in c1 loop
    p_sid := r1.sid;
    p_pid := r1.pid;
    p_userid := r1.userid;
    p_event := r1.event;
    p_productid := r1.productid;    
    p_ip_address := r1.ip_address;
    p_productlist := r1.productlist;

    insert into click_stream
    (
    SID,
    PID,
    USERID,
    TS,
    EVENT,
    PRODUCTID,
    IP_ADDRESS,
    PRODUCTLIST
    )
    values
    (
    :p_sid,
    :p_pid,
    :p_userid,
    current_timestamp,
    :p_event,
    :p_productid,
    :p_ip_address,
    :p_productlist
    );
    
    delay := delay+1;
    
    if (mod(delay,5) = 0) then 
     call system$wait(1);
     delay := 0;
    end if; 
  end loop;
  close c1;
end;

Now we have slowly filling table by itself and we can work on streams on this event table.


Using Snowflake to manage Streaming Data


It is possible to stream data within snowflake without using any other streaming platform. It is called snowflake streams.


Snowflake streams is a built-in streaming functionality which enables to catch the change data of a table (in some cases, even a view). Creating a stream on a Snowflake table is as easy as running a DML as follows


create stream click_stream_stream01 on table click_stream;

Stream object will simply capture CDC data on the event table. Every select on stream will give the changes. To consume the changes and clear out the stream, the select statement should be in a dml transaction. Detailed documentation on Snowflake stream can bu found on the references section.


Streaming Analytics on Stream Data


Since we have the streaming data, we can do the analytics on certain intervals. Instead of doing analytics on the event table, we can calculate the product view counts on the fly and save the results into a summary Snowflake table.


For this demonstration, I am going to calculate the productViews and save them in the following format.


create table click_stream_analytics
(
product_id varchar, 
total_view integer,
first_event_ts timestamp,
last_event_ts timestamp,
begin_window timestamp,
end_window timestamp
);

product_id: ID of the product


total_view: Count of the views of the specific product


first_event_ts: This is the timestamp which the first event of productView seen


last_event_ts: Last timestamp of the productView have been seen


begin_window: The window of the analytics jobs start time.


end_window: The window of the analytics jobs end time.


There are two approaches for the window timing. The timing can be derived from the analytics task or from the event timings. Both of them has their own advantages. In this case I will include both method.

The procedure


create or replace procedure clickstream_analytics()
returns varchar
language sql 
as 
declare
    begin_window timestamp;
    end_window timestamp;
begin
    select current_timestamp into end_window;
    select dateadd(minute, -15, :end_window) into begin_window;

    insert into click_stream_analytics
    select   
      j1.value:productIds::string as product_id,
      count(*) as total_view,
      min(ts) first_event_ts,
      max(ts) last_event_ts,
      :begin_window,
      :end_window
    from 
      click_stream_stream01 
      , lateral flatten (parse_json(productlist), OUTER => TRUE) j1
      where 
        metadata$action='INSERT' and 
        METADATA$ISUPDATE = FALSE and 
        event = 'productView'
    group by j1.value:productIds::string  
    ;
end;

Explanation of the code


This procedure calculates the product view counts in a timely fashion, the result table will include the time interval and the counts of the individual products between the time interval.


First the window interval should be calculated and in this case, we are assuming the recurring job is triggered every 15 minutes. Since the data we are working with aged 15 minutes the begin_window parameter should indicate the 15 minutes before the execution time and the end_window is the time of the execution.


For the first_event_ts and last_event_ts is the first and last timestamps of the product's view event.


According to our json payload product_view event has the product list in json, so we need to flatten the array into rows before counting them correctly. That is the reason of the lateral flatten usage.


In the end the result of this procedure is written into the result table.


Final


Instead of querying raw table now we have the summary table which loads data itself in a time fashion. It is much easier and reasonable to query the summary table for the sake of performanse and logical understandability reasons.


select * from click_stream_analytics where product_id='000026';
/*
PRODUCT_ID	TOTAL_VIEW	FIRST_EVENT_TS	LAST_EVENT_TS
BEGIN_WINDOW	END_WINDOW
000026	13	2024-02-09 07:52:07.608	2024-02-09 08:07:48.443	2024
02-09 08:00:54.209	2024-02-09 08:15:54.209
000026	6	2024-02-09 07:44:27.515	2024-02-09 07:50:18.311	2024
02-09 07:36:32.699	2024-02-09 07:51:32.699
000026	19	2024-02-09 00:25:28.705	2024-02-09 00:47:24.101	2024
02-09 07:24:04.813	2024-02-09 07:39:04.813
*/

References



Comments


github logo

Istanbul, Turkey

  • kisspng-computer-icons-logo-portable-network-graphics-clip-icons-for-free-iconza-circle-so
  • Blogger
  • LinkedIn
Contact

Thanks for submitting!

bottom of page