ClassNotFoundException in Spark application using KryoSerializer

We frequently encoutered a ClassNotFoundException in our Java based Spark applications for classes that we verifiably included in our application’s JAR. Furthermore, we used the kryoSerializer (org.apache.spark.serializer.KryoSerializer) for performance reasons.

After some very annoying debugging sessions we found out that we can get rid of the exception by registering the apparently missing classes by adding them to the spark configration item org.apache.spark.serializer.KryoSerializer. This property is a simple comma separated list of full qualified class names. After adding each class the ClassNotFoundException disappeared.

Calculate prime numbers using spark

Hi, for a test I wrote a short java application that calculates prime numbers on a distributed spark cluster between 0 and 1000000. Since spark 2.x examples are rare on the internet I just leave this here. Prime number code is by Oscar Sanchez.

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
 
public class Main {
 
  private static boolean isPrime(long n) {
    for (long i = 2; 2 * i < n; i++) {
      if (n % i == 0) {
        return false;
      }
    }
    return true;
  }
 
  public static void main(String[] args) {
    SparkSession spark = SparkSession.builder().appName("PrimeApp").getOrCreate();
    Dataset<Tuple2<Long, Boolean>> rnd = spark.range(0L, 1000000L).map(
      (MapFunction<Long, Tuple2<Long, Boolean>>) x -> new Tuple2<Long, Boolean>(x, isPrime(x)), Encoders.tuple(Encoders.LONG(), Encoders.BOOLEAN()));
    rnd.show(false);
    spark.stop();
  }
 
}

Java / Spark – Printing RDDs from Spark Streaming

A common challenge when writing functional code in Spark is to simply output logs as we usually do it in ordinary applications where line for line is processed sequentially and a debug message could be printed at an arbitrary place using System.out.println(). Working with Spark’s RDDs is new to most of us and might be confusing at first since it is difficult to track what object type we are currently working with (is it a RDDStream, an RDD or a simple type like String or int) – but infact this tracking is important. If you make use of the print function that is offered by RDDs or RDDStreams you’ll just see more or less useless array like information in the console instead of the objects’ content.

What I commonly do is to resolve the RDD object down to a simple JavaRDD or JavaDStream<object> using e.g. flatmap(x -> x) to flatten streams or lists in a dataset or map() to break down complex objects to a simple data type (e.g. via map(person -> person.getAddress().getStreet())). On RDDs you can then do a foreach(System.out::println). On JavaDStream you can do the following:

public static void printRDD(final JavaRDD<String> s) {
  s.foreach(System.out::println);
}
 
public static void main(final String[] args) {
  SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("testApp");
  try (JavaStreamingContext sc = new JavaStreamingContext(conf, new Duration(5000))) {
    JavaDStream<String> stream = sc.textFileStream("C:/testnumbers/");
    stream.foreachRDD(Main::printRDD);
    sc.start();
    sc.awaitTermination();
  }
}

Here, I simply break down a JavaDStream to an RDD using foreachRDD() and use System.out::println as method reference to print each RDDs content.