Today I want to do fight, a fight of Spark vs Flink
The other day I went to a Meetup where I could take a look at Apache Flink
I already know about Apache Spark but I didn’t hear about Flink. I’ve doing some work with Spark so I want to know other actors in the Big Data paradigm
First of all we must know what are the main differences. This post in Stackoverflow is a great introduction, but this video is great to see the hole picture
Once you know the main differences is time to get hands dirty
We need a big dataset so I’ll go with climate datasets. In this URL you can grab some datasets. I choose the blended flavour from the Daily maximum temperature TX
Once is downloaded you’ll notice that there’s a lot of files but with a lot of useless information for our purposes
EUROPEAN CLIMATE ASSESSMENT & DATASET (ECA&D), file created on: 12-01-2016 THESE DATA CAN BE USED FREELY FOR NON-COMMERCIAL RESEARCH PROVIDED THAT THE FOLLOWING SOURCE IS ACKNOWLEDGED: Klein Tank, A.M.G. and Coauthors, 2002. Daily dataset of 20th-century surface air temperature and precipitation series for the European Climate Assessment. Int. J. of Climatol., 22, 1441-1453. Data and metadata available at http://www.ecad.eu FILE FORMAT (MISSING VALUE CODE = -9999): 01-06 STAID: Station identifier 08-13 SOUID: Source identifier 15-22 DATE : Date YYYYMMDD 24-28 TX : Maximum temperature in 0.1 °C 30-34 Q_TX : quality code for TX (0='valid'; 1='suspect'; 9='missing') This is the blended series of station ALTASTENBERG, GERMANY (STAID: 4978) Blended and updated with sources:11462 32830 910427 See files sources.txt and stations.txt for more info. STAID, SOUID, DATE, TX, Q_TX 4978, 32830,19470101, -14, 0 4978, 32830,19470102, -14, 0 4978, 32830,19470103, -1, 0 4978, 32830,19470104, -44, 0 4978, 32830,19470105, -74, 0 4978, 32830,19470106, -104, 0 . . .
We need to remove the 21 first lines of each one so we you can use the next command if you have a Mac or a Linux
find TX_STAID0* | xargs -I{} sed -i '' '1,21d' {}
We are going to find the greatest and the lowest maximum temperature from all the files and we will see how long they will last so we can find our winner
Let’s start coding
Spark vs Flink
We’ll start with Spark
This is the Scala code
def main () { case class Data(stadid: Integer, sourceid: Integer, date: Integer, temp: Integer, quality: Integer) val sparkConf = new SparkConf() .setAppName("Weather Test") .setMaster("local") val sc = new SparkContext(sparkConf) try { val count = sc.textFile("/Users/ruben/ECA_blend_tx") .map(_.split(",")) .map(l => Data).count() println(count) } finally { sc.stop() } }
I want to know first how long it will last just counting all the data. And after 3922 tasks and 45.873978 seconds I’ve obtained 84479027 records, not bad
Let’s find the greatest and the lowest temperature
def main () { case class Data(stadid: Integer, sourceid: Integer, date: Integer, temp: Integer, quality: Integer) extends Ordered[Data] { import scala.math.Ordered.orderingToOrdered override def compare(that: Data): Int = (this.temp) compare (that.temp) } val sparkConf = new SparkConf() .setAppName("Weather Test") .setMaster("local") val sc = new SparkContext(sparkConf) try { val rdd = sc.textFile("/Users/ruben/ECA_blend_tx") .map(_.split(",")) .map(l => Data(Integer.parseInt(l(0).trim), Integer.parseInt(l(1).trim), Integer.parseInt(l(2).trim), Integer.parseInt(l(3).trim), Integer.parseInt(l(4).trim))) .filter(data => data.quality == 0) val min = rdd.min() val max = rdd.max() println(min) println(max) } finally { sc.stop() } }
The first attempt showed a result of Data(1,2,19460801,-9999,9) but this is wrong, we need to filter all the data that contains quality 0 (valid) and with the second attempt everything works fine
The results:
min: Data(1187,927252,19971212,-866,0) (-86.6 ºC)
max: Data(382,14585,18900604,560,0) (56 ºC)
Total time: 67.337738 + 68.334026 = 135.671764 seconds
And now the same with Flink
First we’ll count the data
def main(args: Array[String]) { case class Data(stadid: Integer, sourceid: Integer, date: Integer, temp: Integer, quality: Integer) val env = ExecutionEnvironment.getExecutionEnvironment val count = env.readTextFile("/Users/ruben/Downloads/ECA_blend_tx") .map(_.split(",")) .count() println(count) }
After 42 seconds I’ve obtained 84479027 records, exactly like Spark
Let’s find the minimum and maximum temperature
def main(args: Array[String]) { case class Data(stadid: Integer, sourceid: Integer, date: Integer, temp: Integer, quality: Integer) val env = ExecutionEnvironment.getExecutionEnvironment val ds = env.readTextFile("/Users/ruben/Downloads/ECA_blend_tx") .map(_.split(",")) .map(l => Data(Integer.parseInt(l(0).trim), Integer.parseInt(l(1).trim), Integer.parseInt(l(2).trim), Integer.parseInt(l(3).trim), Integer.parseInt(l(4).trim))) .filter(data => data.quality == 0) val min = ds.min("temp") val max = ds.max("temp") println(min.print()) println(max.print()) }
The results:
min: Data(241,906700,20151231,-866,0) (-86.6 ºC)
max: Data(3015,928952,20151231,560,0) (56 ºC)
Total time: 74 + 73 = 143 seconds
As we can see the results are similar to Spark and the code is similar too so if you already know Spark take a look at Flink
The enviroment
All the test were executed in my Macbook Pro 2015 Intel Core i7 4 cores at 2,5 GHz 16gb RAM. Spark is executing 1 executor with 2.4G and Flink is executing with 1 task slot with a taskmanager heap size of 2.4G