Unit testing spark 2.x applications in Java

When you specific spark questions you might get the impression that java is the black sheep of the languages supported. Nearly all answers refer to scala or python. So it is with unit testing hence I am writing this post. I will show how to create a local context (what is pretty well documented) and how to read parquet files (or other formats like csv or json – the process is the same) from a source directory within your project. This way you can unit test your classes containing spark functions without connection to another file system or resource negotiator.

In the following example it is important to register the folder src/test/resources as class path / source code folder.

The annotations beforeClass and afterClass define methods that are called once the class is loaded the first respectively the last time.

package de.jofre.spark.tests;
 
import static org.assertj.core.api.Assertions.assertThat;
 
import org.apache.spark.SparkConf;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import de.jofre.test.categories.UnitTest;
 
@Category(UnitTest.class)
public class SparkSessionAndFileReadTest {
	private static Dataset<Row> df;
	private static SparkSession spark;
	private static final Logger logger = LoggerFactory.getLogger(SparkSessionAndFileReadTest.class);
 
	@BeforeClass
	public static void beforeClass() {
		spark = SparkSession.builder().master("local[*]").config(new SparkConf().set("fs.defaultFS", "file:///"))
				.appName(SparkSessionAndFileReadTest.class.getName()).getOrCreate();
		df = spark.read().parquet("src/test/resources/tests/part-r-00001-myfile.snappy.parquet");
		logger.info("Created spark context and dataset with {} rows.", df.count());
	}
 
	@Test
	public void buildClippingTransformerTest() {
		logger.info("Testing the spark sorting function.");
		Dataset<Row> sorted = df.sort("id");
		assertThat(sorted.count()).isEqualTo(df.count());
	}
 
	@AfterClass
	public static void afterClass() {
		if (spark != null) {
			spark.stop();
		}
	}
}

Permanently add a proxy to MiKTex

MiKTeX is a Tex distribution that is required when translating latex documents to target formats like e.g. PDF. One task of such a distribution is the package management of plugins that are used in your document. MiKTeX downloads such packages when they are first used or updated and hence requires an internet connection as long as you do not have those packages on a portable medium.

If you sit behind a proxy – like me – you have to configure MiKTeX to use this proxy. What I did for a long (annoying) time was to enter the proxy URL in the MiKTeX Update tool and check Authentication required. This enforces the tool to ask you every single time to enter your proxy credentials. A better way is to uncheck Authentication required and specify user and password directly in the URL. If you do so, you should never be asked again to enter you user password combination.

Calculate prime numbers using spark

Hi, for a test I wrote a short java application that calculates prime numbers on a distributed spark cluster between 0 and 1000000. Since spark 2.x examples are rare on the internet I just leave this here. Prime number code is by Oscar Sanchez.

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
 
public class Main {
 
  private static boolean isPrime(long n) {
    for (long i = 2; 2 * i < n; i++) {
      if (n % i == 0) {
        return false;
      }
    }
    return true;
  }
 
  public static void main(String[] args) {
    SparkSession spark = SparkSession.builder().appName("PrimeApp").getOrCreate();
    Dataset<Tuple2<Long, Boolean>> rnd = spark.range(0L, 1000000L).map(
      (MapFunction<Long, Tuple2<Long, Boolean>>) x -> new Tuple2<Long, Boolean>(x, isPrime(x)), Encoders.tuple(Encoders.LONG(), Encoders.BOOLEAN()));
    rnd.show(false);
    spark.stop();
  }
 
}

Reading Parquet Files from a Java Application

Recently I came accross the requirement to read a parquet file into a java application and I figured out it is neither well documented nor easy to do so. As a consequence I wrote a short tutorial. The first task is to add your maven dependencies.

<dependencies>
 <dependency>
 <groupId>org.apache.parquet</groupId>
 <artifactId>parquet-hadoop</artifactId>
 <version>1.9.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-common</artifactId>
 <version>2.7.0</version>
 </dependency>
</dependencies>

To write the java application is easy once you know how to do it. Instead of using the AvroParquetReader or the ParquetReader class that you find frequently when searching for a solution to read parquet files use the class ParquetFileReader instead. The basic setup is to read all row groups and then read all groups recursively.

Continue reading “Reading Parquet Files from a Java Application”

Installing VirtualBox Guest Additions and fix the KERN_DIR issue

When installing the VirtualBox Guest Additions on CentOS 7 you might get an error saying the sources of the linux kernel could not be found.

Error: unable to find the sources of your current Linux kernel. Specify KERN_DIR= and run Make again. Stop.

This error can be solved by installing the kernel developer package and setting the KERN_DIR environment variable.

$ sudo yum install kernel-devel
$ sudo uname -r   # store the variable
$ echo export KERN_DIR=/usr/src/kernels/[uname output] >> ~/.bashrc

So what we do here is to intall the kernel-developer package and, get the current kernel version and set the environment variable to KERN_DIR. Now restart the virtual machine and install the Guest Additions again. The error should have gone.

Furthermore, developer tools need to be installed via yum groupinstall ‘Developer Tools’. The group name may differ if you are using a different language but english.

TensorFlow For Poets – Retrain Inception behind a Proxy

I tried to retrain Google’s Inception as described here. I failed since I use a proxy what has not been considered when implementing the retrain.py script.

So what I did to solve it is to find the following line in the script:

filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)

… and add the following code above the line:

proxy = urllib.request.ProxyHandler({'http': r'http://user:password@proxy.domain.de:8080'})
auth = urllib.request.HTTPBasicAuthHandler()
opener = urllib.request.build_opener(proxy, auth, urllib.request.HTTPHandler)
urllib.request.install_opener(opener)

Now tensorflow can download the required files.

Install Python, Scikit and Tensorflow on Windows

Lately, I worked through Google‘s fantastic machine learning tutorial by Josh Gordon that you can find here. On a windows machine installing the required components is not easy and documented somewhat cryptically. As a consequence, I decided to write the necessary steps down so that you can step through the easily.

  1. Download and install Python 3.5.3 from the official website. Tensorflow requires us to use version 3.5.x on windows so make sure that you don’t use 2.x or 3.6.x.
  2. Install scikit-learn, tensorflow and pydotplus from the command line: pip install scikit-learn pydotplus tensorflow If you are using a proxy call pip with the proxy parameter (e.g. pip –proxy http://user:password@proxy.mydomain.com:8080 install scikit-learn pydotplus tensorflow)
  3. Install GraphViz from the official website and add its binary folder to your PATH variable (e.g. C:\Program Files (x86)\Graphviz2.38\bin).
  4. Download and install the compiled binaries for numpy 1.13 and scipy 0.19.0 from Christoph Gohlke‘s website. Higher versions might work as well. By downloading the pre-compiled binaries we avoid to install multiple compilers on our system. When downloading make sure to grab the right packages for Python 3.5 (…cp35…) and win64.
    Install both packages using pip:

    1. pip install C:\Users\admin\Downloads\numpy-1.13.0rc2+mkl-cp35-cp35m-win_amd64.whl
    2. pip install C:\Users\admin\Downloads\scipy-0.19.0-cp35-cp35m-win_amd64.whl

Thats it, not you can use Scikit and Tensorflow on windows!

Are points of a triangle in clockwise order?

Doing some research on triangulation algorithms I found an interesting code snipped that I’d like to share with you. Its intention is to determine if the three points of a triangle are in (counter-)clockwise order.

double Math::crossProductZ(const Vector2 &amp;a, const Vector2 &amp;b) {
	return a.x * b.y - a.y * b.x;
}
 
// If orientation is positive, if a-&gt;b-&gt;c is counterclockwise.
double Math::orientation(const Vector2 &amp;a, const Vector2 &amp;b, const Vector2 &amp;c) {
        return Math::crossProductZ(a, b) +
               Math::crossProductZ(b, c) +
               Math::crossProductZ(c, a);
}

Vector2 is a simple class with x and y coordinates. The method crossProductZ calculates the z component of the cross product of both Vector2s while orientation sums them up. If orientation returns a positive result the points are sorted counterclockwise, otherwise clockwise.

This method is helpful if you write code for different game engines which require an opposite order of indices to set the orientation (the normal) of each triangle in a mesh.

Java client for the hbase REST API

Modern open source projects frequently lack documentation and so does hbase regarding its REST API. Even though REST is covered in the official hbase reference book the guidelines are not complete as I’ll show in the listings below. A second source that you might have come across is a set of three blog posts by Cloudera which is quite good and much more detailled than the official hbase guidelines – but it still does not fully cover then traps you might run into when implementing a java client.

I’d like to show three examples, two for a put and one for a get request. All further operations like delete or post should then be easy to implement once you know how to send requests to the REST server.

Continue reading “Java client for the hbase REST API”

Java / Spark – Printing RDDs from Spark Streaming

A common challenge when writing functional code in Spark is to simply output logs as we usually do it in ordinary applications where line for line is processed sequentially and a debug message could be printed at an arbitrary place using System.out.println(). Working with Spark’s RDDs is new to most of us and might be confusing at first since it is difficult to track what object type we are currently working with (is it a RDDStream, an RDD or a simple type like String or int) – but infact this tracking is important. If you make use of the print function that is offered by RDDs or RDDStreams you’ll just see more or less useless array like information in the console instead of the objects’ content.

What I commonly do is to resolve the RDD object down to a simple JavaRDD or JavaDStream<object> using e.g. flatmap(x -> x) to flatten streams or lists in a dataset or map() to break down complex objects to a simple data type (e.g. via map(person -> person.getAddress().getStreet())). On RDDs you can then do a foreach(System.out::println). On JavaDStream you can do the following:

public static void printRDD(final JavaRDD<String> s) {
  s.foreach(System.out::println);
}
 
public static void main(final String[] args) {
  SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("testApp");
  try (JavaStreamingContext sc = new JavaStreamingContext(conf, new Duration(5000))) {
    JavaDStream<String> stream = sc.textFileStream("C:/testnumbers/");
    stream.foreachRDD(Main::printRDD);
    sc.start();
    sc.awaitTermination();
  }
}

Here, I simply break down a JavaDStream to an RDD using foreachRDD() and use System.out::println as method reference to print each RDDs content.