top of page
Writer's pictureergemp

Spark Streaming in Action for Click-Stream

In this post, I am going to describe an end to end architecture to process and analyze the click-stream events for a website with Apache Kafka, Spark Streaming, Hadoop HDFS and Hive.

For a use case, the question we are trying to answer is; What is the sales rate of the end-users who signed up to our e-commerce last week?


What is Click-Stream


Click-stream events are the end-user behaviors on any website, for this particular case this is an e-commerce web site events. All the actions of the end-users are stored as they occur during the session lifetime of the user. Some of these events are signUp events for the newcomers, orderSummary events for the buyers, search event for a product search on the website and productView events for viewing the product detail. There may be much more than the events mentioned here which is highly dependent of the e-commerce business and needs.


Apache Kafka is a good option to start storing the click-stream events. With its high performance and scalable infrastructure, Kafka is a very good candidate for storing streaming data and act as a data hub for the streaming events. In this post we are interested in signUp and orderSummary events (remember the problem we are trying to solve)



Apache Kafka


Kafka is a very fast and durable distributed streaming platform and acts a streaming data hub between different application and/or micro services within the companies. Kafka has a lot of capabilities such as, mirroring, partitioning, complex event processing with Kafka Streams (or, if you are using Confluent Distribution, then with Kafka SQL). The downside of the Apache Kafka is its retention, which is 7 days by default. Of course you can increase this retention period according to your business needs. But if you increase the retention period too much, this may lead you the misusage of a distributed streaming platform because Kafka is not for data archiving.


Apache Hadoop


Since you definitely need a solution for archiving data to solve the sales rate problem of the newly signUp customers, the best archiving solution would be Apache Hadoop. Hadoop HDFS is a scalable, distributed and fault-tolerant filesystem which can manage big-data streams such as click-stream or IOT events as well.

Since Hadoop HDFS is highly scalable, increasing the storage capacity is not a burden. Simply add a new node to Hadoop Cluster or just increase the DataNode filesystem on the existing data nodes of the cluster. The storage will be available as soon as the new available storage added to the system.


Data Pipeline


The only missing thing is a kind of integration solution between Apache Kafka and Hadoop HDFS. There are a bunch of solutions such as Apache Flume , Apache Nifi or Kafka Streams. But in this case I preferred to use Apache Spark which has Spark Streaming feature. Spark Streaming is a streaming data processing framework which has a good set of capabilities not only for Extract and Load but also in means of Transformation.



Spark Streaming


Apache Spark is one of my favorite data processing framework which works in-memory and distributed as well. Thanks to RDD architecture and the framework itself, Spark handles the distribution of the data across nodes and the Spark Documentation is quite well to understand the concepts and the framework usage.

To solve the mentioned case defined at the beginning of this post we need the new signUp and orderSummary events. Using Spark Streaming for just filtering some of the events stored in Kafka Topics may seem as over engineered but using Spark Streaming would definitely give flexibility to add new features and transformations on top of filtering, such as data aggregation, data enhancement and data linage options.


/* apply filter */
json_parser parser = new json_parser();
JavaDStream<String> filtered = 
    values
 .filter(value -> (!filter.excludeValues
 .replace(" ", "")
 .contains(parser.parse(value,filter.jsonFilterKey))))
 .filter(value -> (nvl.nvl(filter.includeValues
 .replace(" ", ""),parser.parse(value,filter.jsonFilterKey))
 .contains(parser.parse(value,filter.jsonFilterKey))));

Data is written to Hadoop HDFS as partitioned in to the YEAR, MONTH, DAY, HOUR basis to increase the performance of Apache Hive tables defined on top of this HDFS Folder which will be discussed in the following section of this post.


/* convert string rdd to javaRecord rdd */ 
filtered.foreachRDD(rdd -> { 
    SparkSession spark = io.xenn.spark.util
 .JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); 
    io.xenn.spark.util.CustomDate customDate = new io.xenn.spark.util.CustomDate();

    JavaRDD<JavaRecord> rowRDD = rdd.map(line -> {
        JavaRecord record = new JavaRecord();
        record.setWord(line);
        record.setYear(customDate.year);
        record.setMonth(customDate.month);
        record.setDay(customDate.day);
        record.setHour(customDate.hour);
 return record;
 });

 /* convert rdd to dataframe and write to hdfs */
    Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); 
    wordsDataFrame
 .coalesce(1)
 .write()
 .format("text")
 .partitionBy("year","month","day","hour")
 .mode(SaveMode.Append)
 .save(tmpl.hdfsPathTmpl + job.jobName); 
}); 

After submitting the Spark Job to the Spark cluster you can monitor the Streaming job and investigate the metrics. Monitoring end-point of the Spark Jobs are quite handy and also makes Spark Streaming Jobs better than some other solutions in means of operability and manageability.



Apache Hive


Since the data pipeline is completed and the Historical Click-Stream events are now flowing into our Hadoop Filesystem we can start working on our data. To work with the data on the HDFS environment there are different options such as Apache Hadoop MapReduce, Apache Hive, Spark RDD, Spark SQL and Apache Drill. Since I want to give customer to connect with a SQL client with their own choice and query the data with standard ANSI SQL commands, I preferred Hive to define a table on top of the HDFS Filesystem. The sample row in the HDFS Filesystem for a singUp event is as follows. To Define a table on this JSON based data I have used a JSON serde which can be downloaded from here.


{
 "h": {
 "ts": 1544874900,
 "a": 302,
 "c": 852,
 "p": "39324-62681-54240",
 "s": "66492-89739-41140",
 "n": "signup"
 },
 "b": {
 "success": true,
 "userId": "8278362"
 }
}

After studying the JSON format of the event data, a Hive table can be created for a signUp event with following create table command. The same applies to the orderSummary as well.


create
	external table
		signup (
		h struct<ts:string,
		n:string>,
		b struct<success:string,
		userid:string>) 
		PARTITIONED BY (
 year string,
 month string,
 day string,
 hour string) 
 ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
 WITH SERDEPROPERTIES (
 "case.insensitive" = "true",
 "ignore.malformed.json" = "true") 
		LOCATION '/xennio/events/signup';

Since I have the signUp and orderSummary tables as Hive tables, I can simply run a SQL Query to find out the how many orders are purchased by the customers who are signed-up last week. The same could be achieved with the streaming data processing.


with dates as 
 (select date_add(date_add(current_date(),-14), a.pos) as d from 
 (select posexplode(split(repeat("o", datediff(current_date(), date_add(current_date(),-14) )), "o"))) a )
select b.userid, count(productid) as total_order from ordersummary lateral view explode(split(b.productids,',')) productids as productid
where b.userid in (
select t.b.userid from signup t
left join 
 (
 select
 day(d) as day2,
 month(d) as month2,
 year(d) as year2
 from dates
 ) dd
on (t.year = dd.year2 and t.month = dd.month2 and t.day=dd.day2)
)
group by b.userid
order by total_order desc; 

USERID  TOTAL_ORDERS
8269875	40
8268701	32
8270329	16
8266972	10
8268083	9
8267692	8
8267873	7
8270185	6
8267939	6
8266974	6
8269059	5
8267511	5
8269038	5
8267456	4
8267437	4
8270468	4
8268454	4
8267374	4
8270537	4
8267035	4

The same result can be achieved with the streaming event processing features of Kafka but then the historical data would be lost because of the Kafka retentions. With this infrastructure hit-rates of the newly signed-up users can be calculated over time and the performance of the e-commerce can be calculated over time.


14 views0 comments

Recent Posts

See All

Comentários


bottom of page