top of page

Enhancing your Click-Stream with CRM database

Writer's picture: ergempergemp

Click-Stream in Summary


In summary, Click-Stream is the every event of end users on the e-commerce websites. Since every click, search, login, logout, order and listing... etc are the actions of the end-users, the more active users an e-commerce has, more performance effect is expected on the web page processing. In order to not to effect the performance of the website, this architecture needs to be very fast and lightweight. To process pages more efficiently with the extra load of the click-stream processing, architectural team has the bargain of designing the simplest and asynchronous data structure with the minimal effect on bandwidth and page load times. Simple and lightweight data architecture brings the informational clearance within the click-stream. Instead of storing the user email and/or user name, the id of the users should be stored. Or, instead of storing and moving the name and all the features of the product, only the id of the product should be stored within the click-stream.


The Problem


The problem is, since click-stream is a streaming data flow, in order to take actions on the streaming data more detailed information is needed for the user identification. For instance, if the e-commerce company needs to send an email to newly signed-in users, or to execute a campaign according to the product visits or to recommend alternative products to the users, it is obvious that more data is needed than only the user id or the product id.


This problem brings the question, how can we achieve the detailed information of a user by using only the id of the user while the data is flowing?


Spark Streaming


The answer given to this question in this headline is Spark Streaming. Spark is a distributed in-memory data processing platform and framework which also has the streaming data processing capabilities with its powerful feature Spark Streaming.

Spark Streaming can attach to Apache Kafka topics and starts consuming the streaming data which is flowing within the Kafka topics. Spark Streaming uses micro-batching to collect data from Kafka. In this headline we are not going to use time windows but we are going to use 10 second intervals for data retrieval. Detailed code and information on consuming data from Apache Kafka topics with Spark Streaming can be found on Spark Streaming from Kafka. As we working on a JSON representation of a click-stream event, we need to filter the productView events and map the userId and the productId with the variantId of the product itself. Timestamp value is also very nice to have if we want to investigate the timing in a more granular manner.


//set microbatches to 10secs        
JavaStreamingContext streamingContext = new JavaStreamingContext
 (sc, new Duration(10000));
 
//create DStream
//get the streaming clickstream events        
JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
 ); 

//since the events are in json format;
//create the json_parser object to parse the value of a json line
util.json_parser parser = new util.json_parser();
JavaDStream<String> filteredStream = stream
 .map(ConsumerRecord::value) 
 .filter(event -> parser.parse(event,"n").equalsIgnoreCase("productView"))
 .filter(event -> !parser.parse(event, "userId").equalsIgnoreCase(""))
 .map(event -> "{\"userid\":\"" + parser.parse(event, "userId") + "\"," + 
 "\"productid\":\"" + parser.parse(event, "userId") + "\"," + 
 "\"variantid\":\"" + parser.parse(event, "variantid") + "\"," +
 "\"ts\":\"" + parser.parse(event, "ts") + "\"}");

Above code block is a small part of the Spark Streaming project. Since this code block is working on a JSON file we need to work on JSON key and values. Here I used a custom developed json_parser class. To understand he json_parser class you can check out the Getting the value of a JSON key in a JSON File Post.

The complete code for Spark Streaming on Kafka can be found on Spark Streaming from Kafka Post.

Spark JdbcRDD


Since we have the Streaming Events of click-stream data with the IDs of the Users and the Products visited we need to JOIN the data with the CRM database which has the detailed information on users. Apache Spark can also connects to databases and reads database table data to its distributed memory structure which is called RDD. Streaming data can be joined with the static database data on the fly.

//create a JdbcRDD to get the CRM data to RDD
DBConnection dbConnection = new DBConnection(); 
JdbcRDD<Object[]> jdbcRDD = new JdbcRDD(sc.sc(), 
        dbConnection, 
 "select * from xenn_users where ?=?", 
 0L, 
 0L , 
 1, 
 new MapResult(), 
        ClassManifestFactory$.MODULE$.fromClass(Object[].class)); 

//convert JdbcRDD to JavaRDD
JavaRDD<Object[]> javaRDDObj = jdbcRDD.toJavaRDD(); 

//convert JavaRDD<Object> to JavaRDD<Srting>
//record[0] : userid
//record[1] : username
//record[2] : email
JavaRDD<String> javaRDDStr = javaRDDObj
 .map(record -> 
                                        record[0].toString() + "," + 
                                        record[1].toString() + "," + 
                                        record[2].toString());

//we need pairRDD in order to join two different RDDs
JavaPairRDD<String, String> javaPairRDDStr = javaRDDStr
 .mapToPair(rdd -> 
 new Tuple2(rdd.split(",")[0], 
 "\"email\":\"" + 
                                   rdd.split(",")[2] + "\""));

//also cache the RDD because DStream is going to use the same set of data 
//for every microbatch 
//(in our case the batch is 10secs)
javaPairRDDStr.persist(StorageLevel.MEMORY_ONLY());

The complete code on JdbcRDD can be found on Querying you Database Data with JDBCRDDs post.

Join DStream and JdbcRDD


As we have both the Click-Stream and the CRM data we need to JOIN the data and present it.


//both click-stream and CRM data is now available as DStream and JavaRDD 
//now for each RDD in the DStream we can join the data with the CRM data. 
pairedFilteredStream.foreachRDD(rdd -> { 
    JavaPairRDD<String, Tuple2<String,String>> joinedRDD = rdd.join(javaPairRDDStr);
    List<Tuple2<String, Tuple2<String, String>>> lCollect = joinedRDD.collect();
 
 for(Tuple2<String, Tuple2<String, String>> eCollect : lCollect) { 
 //System.out.println(eCollect._1);
        System.out.println(eCollect._2); 
 } 
});

It is better not to collect the RDD to the Spark Driver from the worker nodes. Usually you need to persist the resulting data or purge the data after taking action, or event persist the data after the action is taken. Collect on the other hand is quite an expensive operation which load all the data on the driver. It this PoC we us collect just to demonstrate the resulting data on the standard output.


Comments


github logo

Istanbul, Turkey

  • kisspng-computer-icons-logo-portable-network-graphics-clip-icons-for-free-iconza-circle-so
  • Blogger
  • LinkedIn
Contact

Thanks for submitting!

bottom of page