Table of Contents
1. Background
We want to perform some data analysis on several (about 15) MySQL database instances that are spread across mutiple datacenters.
Most of the analysis need to perform unioning and joining on tables from different database instances.
There are sereval ways to conduct the task.
For example, at first we decide to chose the 'easy' way: prepare a MySQL instance dedicated for data analysis. Installed it on a high performance and high capacity machine. Then copy related tabes into it. After that we can perform unioning and joining as needed.
We 'copy' data manully at first, but soon found it too error prone as there are about 20+ tables on each db instances and 300+ tables in total that need to be copied. So we made it a little more automatic by writing scripts to perform the 'copy' tasks.
Another trouble we met was that the total size of these table is too big, that is more than 1.5T. What’s worse was that they are spreaded out in multiple datacenters which will lead to slow transfer speed. One solution was to preprocess some of these tables first to make them smaller.
Anyway, it seems there was no option that’s both effeient and easy. Fortunately, we have spark.
I took about 3 days to learn and make helloworld programm work:
- read books and documents
- run on local environment
- setup cluster environment
Then I started programming. At first I program with python, but found the documents was not very friendly. So I switched to java8. Even though I have programmed using scala, but I don’t want to use it since the rest of the team have little experience on it. I don’t want myself to be the only person who can maintain the code.
2. Spark Basics
Spark cluster:
- Each Spark cluster has a master and several worker.
- Each worker can spawn multiple executors
- Each executor can have several core, with each core run one task at a time.
Spark program:
- Users write spark programms call driver program to create jobs
- Each job is consists of serverl stages
- Eatch of stage can be split into severl tasksthat call Spark APIs to manipulate data. Spark use RDD to represent collection of distributed data. The program will c
The driver program starts to run by been submitted to a cluster.
There are two flavors of spark API we can use to write the driver program: RDD and Spark SQL. Note that Spark SQL doesn’t need to be related to tranditional database. In fact it can be used on any data that has schema.
We have chosen Spark SQL since it is more effective than RDD both in aspects of performance and programming convienence.
Spark SQL use Dataset<Row> to represent distributed data collection.
With one or more DataFrames, we can transform them by: - select subfields - filter out a subset - group/aggregate - union/intersect/exept - join, etc
And finally can trigger a job by perform a action on the result DataFrame by: - store it - retrive result
3. Common code samples
We use a class DbInstance to wrap common spark SQL operations,
public class DbInstance implements Serializable {
String user;
String password;
String url;
String db;
String host;
...
}
3.1. Prepare SparkSession
The SparkSession class is the entry to SparkSQL, and we need only one instance for the whole application:
SparkSession sparkSession = SparkSession
.builder()
.appName("loginLog")
.config("spark.sql.warehouse.dir", "file:///")
.getOrCreate();
3.2. Load data (DataFrame) from a MySQL table
To load data from MySQL:
public Dataset<Row> loadTable(String tableName) {
Dataset<Row> rc = SparkConfig.sparkSession.read().format("jdbc")
.option("user", user)
.option("password", password)
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", tableName).load();
return rc;
}
The parameter passed to url option is something like:
jdbc:mysql://db_host:3306/db_name
The parameter passed to dbtable can be any thing valid as for the FROM cluase of a SQL statement, such as:
- table name
- subquery
3.3. Save data to a MySQL table
To save data to MySQL
public void saveToTable(SaveMode saveMode, Dataset<Row> dataset, String tableName){
Properties p = new Properties();
p.setProperty("user", user);
p.setProperty("password", password);
p.setProperty("driver", "com.mysql.jdbc.Driver");
dataset.write().mode(saveMode).jdbc(url, tableName, p);
}
3.4. Get table list from MySQL
By providing a tableNameMatch String such as 'sampe_table%', getTableList can return a list of talbe names
public List<String> getTableList(String tableNameMatch) {
String tableNamesSQL = "(select table_name from information_schema.tables" +
" where table_schema='" + this.db + "'" +
" and table_name like '"+ tableNameMatch + "') as tables";
Dataset<Row> dsTableList = this.loadTable(tableNamesSQL);
List<Row> listRow = dsTableList.collectAsList();
List<String> listTableName = listRow.stream().map(r -> r.getString(r.fieldIndex("table_name"))).collect(Collectors.toList());
return listTableName;
}
3.5. Union similiar tables
public Optional<Dataset<Row>> getDs_union(String tableNameMatch) {
List<String> list = getTableList(tableNameMatch);
Stream<String> stream = list.stream();
Optional< Dataset<Row> > loginLogDsOp = stream.map(name -> {
Dataset<Row> rc = loadTable("`" + name + "`");
return rc;
}).reduce((x, y) -> x.union(y));
return loginLogDsOp;
}
3.6. Creating empty DataFrame
Each DataFrame need a schema, Spark SQL use StructType to represent schema:
StructType schema = new StructType(new StructField[]{
new StructField("uid", DataTypes.LongType, false, Metadata.empty()),
new StructField("sum_intervalTime", DataTypes.LongType, false, Metadata.empty())
});
Dataset<Row> emptyDs = SparkConfig.sparkSession.createDataFrame(new ArrayList(), schema);
Alternatively, you can use a java bean class to represent schema.
Please note that the order of the StructField is important. If you union two DataFrame with schemas of same set of StructField but different order, spark won’t complaint, but the result may be wrong.
|
Dataset<Row> emptyDs = SparkConfig.sparkSession.createDataFrame(new ArrayList(), MyBean.class);
4. Some issues I met
Can’t submit to a cluster
There are mainly two reasons why submit fail: - driver and work can’t connect to each other. - insufficent resources such as memory and CPU cores.
On my PC, I have several virtual NIC, and only the physiscal one is connectable from the works. But the one NIC chosen by spark by default is a vitual one. And I have to set the SPARK_LOCAL_IP environment variable before submitting.
Insert to MySQL is slow
Adding the following paramters will greatly boost insert performanceuseServerPrepStmts=false&rewriteBatchedStatements=true
Application throw java.net.URISyntaxException when start
java.net.URISyntaxException: Relative path in absolute URI: file:F:/Workspace/spark/proj/audit_spark/spark-warehouse
We need to set "spark.sql.warehouse.dir" to eliminate this exception.
Executor Timeout
When executing some tasks, I met the following error:16/10/19 16:08:49 WARN TaskSetManager: Lost task 21.0 in stage 14.0 (TID 828, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 132088 ms
The solution is add the following config:
--conf "spark.network.timeout=10000000"--conf "spark.executor.heartbeatInterval=10000000"
MySQL wait_timeout
Sometimes I run into:The last packet successfully received from the server was 245,554 milliseconds ago. The last packet sent successfully to the server was 245,671 milliseconds ago.
I guess there is some bug in the jdbc module of spark: It uses one connection for several steps other that getting a new one from connectionPool. And if one statement is take too much time, the state of the connection is error, which causes the steps behind it fail.
There is no clean solution, I have to change /etc/my.cnf. Add the following lines to [mysqld] section of my.cnf
wait_timeout=31536000
interactive_timeout =31536000
Also when loading data from single DB, sometimes reducing concurrency can help successfully loading.
out of memory
When running local spark, reducing concurrency will reduce memory requirement of certain jobs.
0 评论:
Post a Comment