Spark JdbcRDD is a special RDD structure to handle the database tables. It is quite confusing to realize the code. In this post we are going to demonstrate collecting the data from a database table and distribute is on the Spark Workers step by step. We are going to us PostgreSQL database for this case and need a demo table and a demo record. We are not going to cover how to install and create a PostgreSQL Database but the following script can be used to create a table and insert a record.
create table users
(user_id numeric,
user_name varchar(100),
user_email varchar(100),
constraint pk_user primary key (user_id));
insert into users
values
(
1024,
'john_doe',
'john_doe@mail.com'
);
After creating the database table, switch to Spark Code. In order to read the table data to Spark RDD we need two classes first. the DBConnection class which is going to extend AbstractFunction0 and a Result Mapper MapResult which extends AbstractFunction1 to map the rows to RDD. Both classes will also implement Serializable to be able to distributed over the workers.
package spark_jdbc;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.spark.rdd.JdbcRDD;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
public class DBConnection extends AbstractFunction0<Connection>
implements Serializable {
private String driverClassName;
private String connectionUrl;
private String userName;
private String password;
public DBConnection(String driverClassName,
String connectionUrl,
String userName,
String password) {
this.driverClassName = driverClassName;
this.connectionUrl = connectionUrl;
this.userName = userName;
this.password = password;
}
public DBConnection() {
this.driverClassName = "org.postgresql.Driver";
this.connectionUrl = "jdbc:postgresql://localhost/xennio";
this.userName = "postgres";
this.password = "postgres";
}
@Override
public Connection apply() {
try {
Class.forName(driverClassName);
} catch (ClassNotFoundException e) {
//LOGGER.error("Failed to load driver class", e);
e.printStackTrace();
}
Properties properties = new Properties();
properties.setProperty("user", userName);
properties.setProperty("password", password);
Connection connection = null;
try {
connection = DriverManager.getConnection(connectionUrl, properties);
} catch (SQLException e) {
//LOGGER.error("Connection failed", e);
e.printStackTrace();
}
return connection;
}
}
package spark_jdbc;
import java.io.Serializable;
import java.sql.ResultSet;
import org.apache.spark.rdd.JdbcRDD;
import scala.runtime.AbstractFunction1;
public class MapResult extends AbstractFunction1<ResultSet, Object[]>
implements Serializable {
public Object[] apply(ResultSet row) {
return JdbcRDD.resultSetToObjectArray(row);
}
}
package spark_jdbc;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;
import scala.reflect.ClassManifestFactory$;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
public class spark_jdbc {
public static void main(String[] args)
{
SparkConf conf = new SparkConf()
.setAppName("spark JdbcRDD example")
.setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
DBConnection dbConnection = new DBConnection();
JdbcRDD<Object[]> jdbcRDD = new JdbcRDD(
sc.sc(),
dbConnection,
"select * from useres where ?=?",
0L,
0L ,
1,
new MapResult(),
ClassManifestFactory$.MODULE$.fromClass(Object[].class));
JavaRDD<Object[]> javaRDD = jdbcRDD.toJavaRDD();
javaRDD.map(record -> record[0].toString());
List<String> lines = javaRDD.map(record -> record[0].toString()).collect();
/*
List<String> lines = javaRDD.map(new Function<Object[], String>() {
@Override
public String call(final Object[] record) throws Exception {
return record[0].toString();
}
}).collect();
*/
lines.forEach(line -> System.out.println(line));
}
}
Comments