Thursday, October 20, 2016

Covariant and Invariant in Java

1. About covariant and invariant

Arrays are reified, while generics are implemented by erasure. This means that arrays know and enforce their element types at runtime.
Below is the most clear explaination I ever found, which is cited from stackoverflow

1.1. Covariance, Invariance and Contravariance explained in plain English?

At heart, these terms describe how the subtype relation is affected by type transformations. That is, if A and B are types, f is a type transformation, and ≤ the subtype relation (i.e. A ≤ B means that A is a subtype of B), we have
  • f is covariant if A ≤ B implies that f(A) ≤ f(B)
  • f is contravariant if A ≤ B implies that f(B) ≤ f(A)
  • f is invariant if neither of the above holds Let’s consider an example. Let f(A) = List<A> where List is declared by
class List<T> { ... }
Is f covariant, contravariant, or invariant? Covariant would mean that a List<String> is a subtype of List<Object>, contravariant that a List<Object> is a subtype of List<String> and invariant that neither is a subtype of the other, i.e. List<String> and List<Object> are inconvertible types. In Java, the latter is true, we say (somewhat informally) that generics are invariant.
Another example. Let f(A) = A[]. Is f covariant, contravariant, or invariant? That is, is String[] a subtype of Object[], Object[] a subtype of String[], or is neither a subtype of the other? (Answer: In Java, arrays are covariant)
This was still rather abstract. To make it more concrete, let’s look at which operations in Java are defined in terms of the subtype relation. The simplest example is assignment. The statement
x = y;
will compile only if typeof(y) ≤ typeof(x). That is, we have just learned that the statements
ArrayList<String> strings = new ArrayList<Object>();
ArrayList<Object> objects = new ArrayList<String>();
will not compile in Java, but
Object[] objects = new String[1];
will.
Another example where the subtype relation matters is a method invocation expression:
result = method(a);
Informally speaking, this statement is evaluated by assigning the value of a to the method’s first parameter, then executing the body of the method, and then assigning the methods return value to result. Like the plain assignment in the last example, the "right hand side" must be a subtype of the "left hand side", i.e. this statement can only be valid if typeof(a) ≤ typeof(parameter(method)) and returntype(method) ≤ typeof(result). That is, if method is declared by:
Number[] method(ArrayList<Number> list) { ... }
none of the following expressions will compile:
Integer[] result = method(new ArrayList());
Number[] result = method(new ArrayList());
Object[] result = method(new ArrayList<Object>());
but
Number[] result = method(new ArrayList<Number>());
Object[] result = method(new ArrayList<Number>());
will.
Another example where subtyping matters is overriding. Consider:
Super sup = new Sub();
Number n = sup.method(1);
where
class Super {
    Number method(Number n) { ... }
}

class Sub extends Super {
    @Override
    Number method(Number n);
}
Informally, the runtime will rewrite this to:
class Super {
    Number method(Number n) {
        if (this instanceof Sub) {
            return ((Sub) this).method(n);  // *
        } else {
            ...
        }
    }
}
For the marked line to compile, the method parameter of the overriding method must be a supertype of the method parameter of the overridden method, and the return type a subtype of the overridden method’s one. Formally speaking, f(A) = parametertype(method asdeclaredin(A)) must at least be contravariant, and if f(A) = returntype(method asdeclaredin(A)) must at least be covariant.
Note the "at least" above. Those are minimum requirements any reasonable statically type safe object oriented programming language will enforce, but a programming language may elect to be more strict. In the case of Java 1.4, parameter types and method return types must be identical (except for type erasure) when overriding methods, i.e. parametertype(method asdeclaredin(A)) = parametertype(method asdeclaredin(B)) when overriding. Since Java 1.5, covariant return types are permitted when overriding, i.e. the following will compile in Java 1.5, but not in Java 1.4:
class Collection {
    Iterator iterator() { ... }
}

class List extends Collection {
    @Override
    ListIterator iterator() { ... }
}
I hope I covered everything - or rather, scratched the surface. Still I hope it will help to understand the abstract, but important concept of type variance.

Friday, October 14, 2016

Spark and MySQL

1. Background

We want to perform some data analysis on several (about 15) MySQL database instances that are spread across mutiple datacenters.
Most of the analysis need to perform unioning and joining on tables from different database instances.
There are sereval ways to conduct the task.
For example, at first we decide to chose the 'easy' way: prepare a MySQL instance dedicated for data analysis. Installed it on a high performance and high capacity machine. Then copy related tabes into it. After that we can perform unioning and joining as needed.
We 'copy' data manully at first, but soon found it too error prone as there are about 20+ tables on each db instances and 300+ tables in total that need to be copied. So we made it a little more automatic by writing scripts to perform the 'copy' tasks.
Another trouble we met was that the total size of these table is too big, that is more than 1.5T. What’s worse was that they are spreaded out in multiple datacenters which will lead to slow transfer speed. One solution was to preprocess some of these tables first to make them smaller.
Anyway, it seems there was no option that’s both effeient and easy. Fortunately, we have spark.
I took about 3 days to learn and make helloworld programm work:
  • read books and documents
  • run on local environment
  • setup cluster environment
Then I started programming. At first I program with python, but found the documents was not very friendly. So I switched to java8. Even though I have programmed using scala, but I don’t want to use it since the rest of the team have little experience on it. I don’t want myself to be the only person who can maintain the code.

2. Spark Basics

Spark cluster:
  • Each Spark cluster has a master and several worker.
  • Each worker can spawn multiple executors
  • Each executor can have several core, with each core run one task at a time.
Spark program:
  • Users write spark programms call driver program to create jobs
  • Each job is consists of serverl stages
  • Eatch of stage can be split into severl tasksthat call Spark APIs to manipulate data. Spark use RDD to represent collection of distributed data. The program will c
The driver program starts to run by been submitted to a cluster.
cluster overview.png
There are two flavors of spark API we can use to write the driver program: RDD and Spark SQL. Note that Spark SQL doesn’t need to be related to tranditional database. In fact it can be used on any data that has schema.
We have chosen Spark SQL since it is more effective than RDD both in aspects of performance and programming convienence.
Spark SQL use Dataset<Row> to represent distributed data collection.
With one or more DataFrames, we can transform them by: - select subfields - filter out a subset - group/aggregate - union/intersect/exept - join, etc
And finally can trigger a job by perform a action on the result DataFrame by: - store it - retrive result

3. Common code samples

We use a class DbInstance to wrap common spark SQL operations,
public class DbInstance implements Serializable  {
 String user;
 String password;
 String url;
 String db;
 String host;
 ...
}

3.1. Prepare SparkSession

The SparkSession class is the entry to SparkSQL, and we need only one instance for the whole application:
SparkSession sparkSession = SparkSession
      .builder()
      .appName("loginLog")
   .config("spark.sql.warehouse.dir", "file:///")
      .getOrCreate();

3.2. Load data (DataFrame) from a MySQL table

To load data from MySQL:
public Dataset<Row> loadTable(String tableName) {
 Dataset<Row> rc = SparkConfig.sparkSession.read().format("jdbc")
 .option("user", user)
 .option("password", password)
 .option("url", url)
 .option("driver", "com.mysql.jdbc.Driver")
 .option("dbtable", tableName).load();
 return rc;
}
The parameter passed to url option is something like:
jdbc:mysql://db_host:3306/db_name
The parameter passed to dbtable can be any thing valid as for the FROM cluase of a SQL statement, such as:
  • table name
  • subquery

3.3. Save data to a MySQL table

To save data to MySQL
public void saveToTable(SaveMode saveMode, Dataset<Row> dataset, String tableName){
 Properties p = new Properties();
 p.setProperty("user", user);
 p.setProperty("password", password);
 p.setProperty("driver", "com.mysql.jdbc.Driver");
 dataset.write().mode(saveMode).jdbc(url, tableName, p);
}

3.4. Get table list from MySQL

By providing a tableNameMatch String such as 'sampe_table%', getTableList can return a list of talbe names
public List<String> getTableList(String tableNameMatch) {
  String tableNamesSQL = "(select table_name from information_schema.tables" +
    " where table_schema='" + this.db + "'" +
    " and table_name like '"+ tableNameMatch + "') as tables";

  Dataset<Row> dsTableList = this.loadTable(tableNamesSQL);

  List<Row> listRow = dsTableList.collectAsList();

  List<String> listTableName = listRow.stream().map(r -> r.getString(r.fieldIndex("table_name"))).collect(Collectors.toList());
  return listTableName;
 }

3.5. Union similiar tables

public Optional<Dataset<Row>> getDs_union(String tableNameMatch) {
 List<String> list = getTableList(tableNameMatch);

 Stream<String> stream =  list.stream();

 Optional< Dataset<Row> > loginLogDsOp = stream.map(name -> {
  Dataset<Row> rc = loadTable("`" + name + "`");
  return rc;
 }).reduce((x, y) -> x.union(y));
 return loginLogDsOp;
}

3.6. Creating empty DataFrame

Each DataFrame need a schema, Spark SQL use StructType to represent schema:
StructType schema = new StructType(new StructField[]{
     new StructField("uid", DataTypes.LongType, false, Metadata.empty()),
     new StructField("sum_intervalTime", DataTypes.LongType, false, Metadata.empty())
     });
Dataset<Row> emptyDs = SparkConfig.sparkSession.createDataFrame(new ArrayList(), schema);
Alternatively, you can use a java bean class to represent schema.
Please note that the order of the StructField is important. If you union two DataFrame with schemas of same set of StructField but different order, spark won’t complaint, but the result may be wrong.
Dataset<Row> emptyDs = SparkConfig.sparkSession.createDataFrame(new ArrayList(), MyBean.class);

4. Some issues I met

Can’t submit to a cluster
There are mainly two reasons why submit fail: - driver and work can’t connect to each other. - insufficent resources such as memory and CPU cores.
On my PC, I have several virtual NIC, and only the physiscal one is connectable from the works. But the one NIC chosen by spark by default is a vitual one. And I have to set the SPARK_LOCAL_IP environment variable before submitting.
Insert to MySQL is slow
Adding the following paramters will greatly boost insert performance
useServerPrepStmts=false&rewriteBatchedStatements=true
Application throw java.net.URISyntaxException when start
java.net.URISyntaxException: Relative path in absolute URI: file:F:/Workspace/spark/proj/audit_spark/spark-warehouse
We need to set "spark.sql.warehouse.dir" to eliminate this exception.
Executor Timeout
When executing some tasks, I met the following error:
16/10/19 16:08:49 WARN TaskSetManager: Lost task 21.0 in stage 14.0 (TID 828, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 132088 ms
The solution is add the following config:
--conf "spark.network.timeout=10000000"--conf "spark.executor.heartbeatInterval=10000000"
MySQL wait_timeout
Sometimes I run into:
The last packet successfully received from the server was 245,554 milliseconds ago.  The last packet sent successfully to the server was 245,671 milliseconds ago.
I guess there is some bug in the jdbc module of spark: It uses one connection for several steps other that getting a new one from connectionPool. And if one statement is take too much time, the state of the connection is error, which causes the steps behind it fail.
There is no clean solution, I have to change /etc/my.cnf. Add the following lines to [mysqld] section of my.cnf
wait_timeout=31536000
interactive_timeout =31536000
Also when loading data from single DB, sometimes reducing concurrency can help successfully loading.
out of memory
When running local spark, reducing concurrency will reduce memory requirement of certain jobs.

Thursday, September 22, 2016

Buggy Crossbridge

1. The Problem

Our game client is made up of two swf files (e.g. preloader.swf and game.swf, and preloader.swf will load game.swf), each of which has to do some md5 caculations.
We implemented the md5 algorithm in C and compile it to md5.swc via crossbridge, because we found that md5 algorithm implemention in AS3 is far more slower than implemention in C (and compiled by Crossbridge).
These two swf files are maintained by 2 different teams and the building environment is not very smart, so we distributed the binary md5.swc file to them manually.
It work pretty well at first, until one day one of the team compiled a new md5.swc with the exact same code. Theoretically these was no problem there, but strangly the preloader.swf would now unable to load game.swf any more. In fact, we found we always have to use the same md5.swf. Even with the same code and same building envrionment, two md5.swf generated at two different times will always cause error.

2. The Root cause

The md5.swc compiled via Crossbridge contains two main namespaces:
  • a public namespace with name provide by developer
  • a private namespace with some random name
These two namespaces reference each other. In fact, when loading, code in private namespace may perform some initializion by calling code in public namespace.
Two build of the same code with have:
  • same public namespace
  • two different private namespace
So when preloader.swf loads game.swf with a different md5.swc, the public namespace may be initialized twice.

3. The solution

There are two solutions: - Make sure these two swf always use the same build of md5.swc - Change crossbridge to not generate random name for private namespace.
The path for the second solution is:
 llvm-2.9/lib/Target/AVM2/AVM2AsmPrinter.cpp | 16 +---------------
 1 file changed, 1 insertion(+), 15 deletions(-)

diff --git a/llvm-2.9/lib/Target/AVM2/AVM2AsmPrinter.cpp b/llvm-2.9/lib/Target/AVM2/AVM2AsmPrinter.cpp
index af61a9d..8b433fb 100644
--- a/llvm-2.9/lib/Target/AVM2/AVM2AsmPrinter.cpp
+++ b/llvm-2.9/lib/Target/AVM2/AVM2AsmPrinter.cpp
@@ -668,21 +668,7 @@ public:
     }

     std::string getModulePackageName(const Module &M) const {
- // need to add a UUID to the module ident so it's really unique
- static std::map<const Module *, UUID> modUUIDs;
-
-        std::string uuid = modUUIDs[&M].str();
-        std::string EID = M.getModuleIdentifier();
-        std::replace(EID.begin(), EID.end(), '@', '_'); // Mangler allows '@' but we don't.. should be only char Mangler thinks is ok that we don't like
-
-        EID += ":";
-        EID += uuid;
-
-        Twine TMN = EID;
-        SmallString<256> MN;
-        Mang->getNameWithPrefix(MN, TMN);
-        std::string R = MN.str();
-        return getPackageName(M) + R;
+  return "C_Run_internal";
     }

     // Following the behavior of llvm-2.9/tools/lto/LTOModule.cpp
The result is as following(The top one is changed, the bottom one is original):
crossbridge namespace.png