Today I want to do fight, a fight of Spark vs Flink

The other day I went to a Meetup where I could take a look at Apache Flink

I already know about Apache Spark but I didn’t hear about Flink. I’ve doing some work with Spark so I want to know other actors in the Big Data paradigm

First of all we must know what are the main differences. This post in Stackoverflow is a great introduction, but this video is great to see the hole picture

Once you know the main differences is time to get hands dirty

We need a big dataset so I’ll go with climate datasets. In this URL you can grab some datasets. I choose the blended flavour from the Daily maximum temperature TX

Once is downloaded you’ll notice that there’s a lot of files but with a lot of useless information for our purposes

EUROPEAN CLIMATE ASSESSMENT & DATASET (ECA&D), file created on: 12-01-2016
THESE DATA CAN BE USED FREELY FOR NON-COMMERCIAL RESEARCH PROVIDED THAT THE FOLLOWING SOURCE IS ACKNOWLEDGED: 

Klein Tank, A.M.G. and Coauthors, 2002. Daily dataset of 20th-century surface
air temperature and precipitation series for the European Climate Assessment.
Int. J. of Climatol., 22, 1441-1453.
Data and metadata available at http://www.ecad.eu

FILE FORMAT (MISSING VALUE CODE = -9999):

01-06 STAID: Station identifier
08-13 SOUID: Source identifier
15-22 DATE : Date YYYYMMDD
24-28 TX   : Maximum temperature in 0.1 °C
30-34 Q_TX : quality code for TX (0='valid'; 1='suspect'; 9='missing')

This is the blended series of station ALTASTENBERG, GERMANY (STAID: 4978)
Blended and updated with sources:11462 32830 910427 
See files sources.txt and stations.txt for more info.

STAID, SOUID,    DATE,   TX, Q_TX
  4978, 32830,19470101,  -14,    0
  4978, 32830,19470102,  -14,    0
  4978, 32830,19470103,   -1,    0
  4978, 32830,19470104,  -44,    0
  4978, 32830,19470105,  -74,    0
  4978, 32830,19470106, -104,    0
.
.
.

We need to remove the 21 first lines of each one so we you can use the next command if you have a Mac or a Linux

find TX_STAID0* | xargs -I{} sed -i '' '1,21d' {}

We are going to find the greatest and the lowest maximum temperature from all the files and we will see how long they will last so we can find our winner

Let’s start coding

Spark vs Flink

We’ll start with Spark

This is the Scala code

def main () {

    case class Data(stadid: Integer, sourceid: Integer, date: Integer, temp: Integer, quality: Integer)

    val sparkConf = new SparkConf()
      .setAppName("Weather Test")
      .setMaster("local")

    val sc = new SparkContext(sparkConf)

    try {
      val count = sc.textFile("/Users/ruben/ECA_blend_tx")
          .map(_.split(","))
          .map(l => Data).count()
      println(count)
    }
    finally {
      sc.stop()
    }
  }

I want to know first how long it will last just counting all the data. And after 3922 tasks and 45.873978 seconds I’ve obtained 84479027 records, not bad

Let’s find the greatest and the lowest temperature

def main () {

    case class Data(stadid: Integer, sourceid: Integer, date: Integer, temp: Integer, quality: Integer) extends Ordered[Data] {
      import scala.math.Ordered.orderingToOrdered

      override def compare(that: Data): Int = (this.temp) compare (that.temp)
    }

    val sparkConf = new SparkConf()
      .setAppName("Weather Test")
      .setMaster("local")

    val sc = new SparkContext(sparkConf)

    try {
      val rdd = sc.textFile("/Users/ruben/ECA_blend_tx")
        .map(_.split(","))
        .map(l => Data(Integer.parseInt(l(0).trim), Integer.parseInt(l(1).trim), Integer.parseInt(l(2).trim), Integer.parseInt(l(3).trim), Integer.parseInt(l(4).trim)))
        .filter(data => data.quality == 0)
      val min = rdd.min()
      val max = rdd.max()
      println(min)
      println(max)
    }
    finally {
      sc.stop()
    }
  }

The first attempt showed a result of Data(1,2,19460801,-9999,9) but this is wrong, we need to filter all the data that contains quality 0 (valid) and with the second attempt everything works fine

The results:

min: Data(1187,927252,19971212,-866,0) (-86.6 ºC)
max: Data(382,14585,18900604,560,0) (56 ºC)
Total time: 67.337738 + 68.334026 = 135.671764 seconds

And now the same with Flink

First we’ll count the data

  def main(args: Array[String]) {

    case class Data(stadid: Integer, sourceid: Integer, date: Integer, temp: Integer, quality: Integer)

    val env = ExecutionEnvironment.getExecutionEnvironment


    val count = env.readTextFile("/Users/ruben/Downloads/ECA_blend_tx")
      .map(_.split(","))
      .count()

    println(count)
  }

After 42 seconds I’ve obtained 84479027 records, exactly like Spark

Let’s find the minimum and maximum temperature

  def main(args: Array[String]) {

    case class Data(stadid: Integer, sourceid: Integer, date: Integer, temp: Integer, quality: Integer)

    val env = ExecutionEnvironment.getExecutionEnvironment


    val ds = env.readTextFile("/Users/ruben/Downloads/ECA_blend_tx")
      .map(_.split(","))
      .map(l => Data(Integer.parseInt(l(0).trim), Integer.parseInt(l(1).trim), Integer.parseInt(l(2).trim), Integer.parseInt(l(3).trim), Integer.parseInt(l(4).trim)))
      .filter(data => data.quality == 0)
    val min = ds.min("temp")
    val max = ds.max("temp")
    println(min.print())
    println(max.print())
  }

The results:

min: Data(241,906700,20151231,-866,0) (-86.6 ºC)
max: Data(3015,928952,20151231,560,0) (56 ºC)
Total time: 74 + 73 = 143 seconds

As we can see the results are similar to Spark and the code is similar too so if you already know Spark take a look at Flink

The enviroment

All the test were executed in my Macbook Pro 2015 Intel Core i7 4 cores at 2,5 GHz 16gb RAM. Spark is executing 1 executor with 2.4G and Flink is executing with 1 task slot with a taskmanager heap size of 2.4G