top of page

Migrating Data From Couchbase to Postgres using Apache Spark

Writer's picture: ergempergemp


Introduction


Apache Spark is a famous distributed data processing platform and framework. Since Apache Spark is a prior data processing platform for multiple data sources for both source and target reasons, most of the data platforms has their own connectors for Apache Spark. For instance Couchbase NoSQL database has its own Spark API for Spark RDDs and Spark SQL DataFrames which makes it quite easy to read bulk data from Couchbase and create Spark RDDs and/or Spark SQL DataFrames. This multi source capability can also be used as reading from one kind of source and write the data to another type of target. If there is no data processing, enhancement or any kind of data alteration between the reading and writing process then, Apache Spark can also be used as a distributed, fast and efficient data migration tool. Most important part of this process would probably be the data structure changes between different kind of data sources. NoSQL databases and Relational databases can be an example for this topic. While NoSQL databases stores data in a semi-structured, Relational databases stores data in a more strictly defined structures. With the flexibility of Spark framework, both metadata and the data transformations can be done with the correct data modeling. In this article, I am going to give a simple example of copying JSON structured data from Couchbase NoSQL database to a Postgresql relational database.


Installing Travel-Sample sample bucket on Couchbase


First we need to create a DataFrame from Couchbase bucket. For a sample dataset, I used travel-sample bucket of the Couchbase. Since this article is not a tutorial for Couchbase Installation or Sample Buckets you need to install travel-sample bucket after installing Couchbase. For this, you need to go Settings > Sample Buckets tab.



Including Maven Dependencies


We are going to migrate data from Couchbase to Postgresql, therefore we both need Couchbase Spark Connector Libraries as well as Postgresql JDBC libraries. I am using Maven project so I need to modify and add following dependencies to my pom.xml file.


        <!-- couchbase libraries -->
        <dependency>
            <groupId>com.couchbase.client</groupId>
            <artifactId>spark-connector_2.11</artifactId>
            <version>2.3.0</version>
            <exclusions>
                <exclusion>
                    <groupId>com.couchbase.client</groupId>
                    <artifactId>java-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.couchbase.client</groupId>
            <artifactId>java-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <!-- couchbase libraries -->

        <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.8</version>
        </dependency>

Spark version of this article is 2.3.3 so following Spark dependencies should also be added to pom.xml file.


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.3</version>
        </dependency

Creating Spark DataFrame from Couchbase Bucket


Suppress Logging


To enable logging of Apache Spark to me more readable, I usually practice to set the level of the logging to ERROR only.


        Logger log = Logger.getRootLogger();
        log.setLevel(Level.ERROR);

        Logger.getLogger("org").setLevel(Level.ERROR);
        Logger.getLogger("akka").setLevel(Level.OFF);

Creating Spark Session


Spark session is the entry point of a Spark program. To enable Couchbase connection from the Spark Session some special configurations should be added to the Session parameters. The detailed explanations of these configurations can be found on the official documentation


        SparkSession spark = SparkSession
                .builder()
                .appName("MigrateCouchbase2Postgres")
                .master("local[*]")
                .config("spark.couchbase.nodes", "localhost")
                .config("spark.couchbase.username", "Administrator")
                .config("spark.couchbase.password", "Administrator")
                .config("spark.couchbase.bucket." + "travel-sample", "")
                .config("com.couchbase.socketConnect", 300000)
                .config("com.couchbase.connectTimeout", 300000)
                .config("com.couchbase.queryTimeout", 300000)
                .config("com.couchbase.maxRequestLifetime", 300000)
                .config("spark.driver.allowMultipleContext", "true")
                .getOrCreate();

Creating Spark SQL DataFrame


This is a part where load the data within the Couchbase Bucket into a DataFrame. With only the following part running the project will display how many entries in the travel-sample bucket which has the type of airline.


Dataset<Row> travelDS = couchbaseReader(spark.read()).couchbase(new EqualTo("type", "airline"));
System.out.println(travelDS.count());

The important thing to remember is to filter the bucket data for the same structured information. Since Couchbase is a NoSQL database where different type of structured data can be stored in the same bucket, remember to load the relevant data which also have the same JSON structure.


Creating table in Postgres Database


As we know the airlines JSON data structure lies in the Couchbase Bucket, the corresponding table should exist in the Postgresql database in order to fill in the data which is loaded from Couchbase.


create table travel_sample
(
    meta_id   varchar(50),
    call_sign varchar(100),
    country   varchar(50),
    iata      varchar(50),
    icao      varchar(50),
    id        integer,
    name      varchar(50),
    type      varchar(50)
);

alter table travel_sample
    owner to postgres;


Repartition and Load Data to Postgres Database


travelDS.repartiton(4).foreachPartition(each -> {
    try {
        Class.forName("org.postgresql.Driver");

        //jdbc:postgresql://host:port/database
        String url = "jdbc:postgresql://localhost:5432/postgres";
        Properties props = new Properties();
        props.setProperty("user","postgres");
        props.setProperty("password","postgres");
        Connection conn = DriverManager.getConnection(url, props);

        PreparedStatement pStmt = conn.prepareStatement(  "insert into travel_sample values (?,?,?,?,?,?,?,?)");
 
        while (each.hasNext()){
            Row row = each.next();

            pStmt.setString(1, row.getString(0));
            pStmt.setString(2, row.getString(1));
            pStmt.setString(3, row.getString(2));
            pStmt.setString(4, row.getString(3));
            pStmt.setString(5, row.getString(4));
            pStmt.setLong(6, row.getLong(5));
            pStmt.setString(7, row.getString(6));
            pStmt.setString(8, row.getString(7));

            pStmt.executeUpdate();
        }
    }
    catch (ClassNotFoundException cnfe){
        cnfe.printStackTrace();
    } catch (SQLException e) {
        e.printStackTrace();
    } finally{

    }
});

Explaining the Code: After reading and distributing data within the Spark, now it is time to repartition and load data to the previously created Postgresql table. First; I am repartitioning the data to control the parallel threads of the data ingestion to the Postgres Database. I find parallel threads of 4 is more than enough for this example so the DataFrame will be split into four adjacent partitions. For every partition there will be only one JDBC connection is created which means there should be only four connections created on the Postgres Database at the same time and all the data will be ingested to the database with these connections. Also the insert statement is prepared once for each partition, so Postgresql is not going to parse and execute the statement for every inserted row, instead the cached statement will be parsed once and executed many times for each row. ForEachPartition will enable an iterator of the row type of the Spark SQL. So each partition should iterate the rows and insert the rows by using the already opened JDBC connection and prepared statement. After the execution data should be inserted to the Postgresql. When you select from the Postgresql table you should find the data as follows.






Comments


github logo

Istanbul, Turkey

  • kisspng-computer-icons-logo-portable-network-graphics-clip-icons-for-free-iconza-circle-so
  • Blogger
  • LinkedIn
Contact

Thanks for submitting!

bottom of page