This drove me crazy but I finally found a solution.
If you want to Split a pair RDD of type (A, Iterable(B)) by key, so the result is several RDDs of type B, then here how you go:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// A random pair RDD JavaPairRDD<A, Iterable<B>> rdd = ... // Get the list of keys List<A> keys = rdd.keys().distinct().collect(); // Iterate through the keys for (String key : keys) { // Get an RDD by filtering the original RDD by key JavaRDD<B> rddByKey = getRddByKey(rdd, key); // ... } private JavaRDD getRddByKey(JavaPairRDD<A, Iterable<B>> pairRDD, A key) { return pairRDD.filter(v -> v._1().equals(key)).values().flatMap(tuples -> tuples); } |
The trick is twofold (1) get the list of all the keys, (2) iterate through the list of keys, and for each key, create a new RDD by filtering the original pair RDD to get only values attached to that key. Note the use of Java 8’s Lambda expression in line 17.
Remark: bare in mind that the action collect() gets you the keys of the entire distributed RDD into the driver machine. Generally, keys are integers or small strings, so collecting them into one machine wouldn’t be problematic. Otherwise, this wouldn’t be the best way to go. If you have a suggestion, you are more than welcome to leave a comment below… spread the word, help the world!