Exploring the HDFS Default Value Behaviour

In this post I’ll explore how the HDFS default values really work, which I found to be quite surprising and non-intuitive, so there s a good lesson here.

In my case, I have my local virtual Hadoop cluster, and I have a different client outside the cluster. I’ll just put a file from the external client into the cluster in two configurations.

All the nodes of my Hadoop (1.2.1) cluster have the same hdfs-site.xml file with the same (non-default) value for dfs.block.size (renamed to dfs.blocksize in Hadoop 2.x) of 134217728, which is 128MB. In my external node, I also have the hadoop executables with a minimal hdfs-site.xml.

First, I have set dfs.block.size to 268435456 (256MB) in my client hdfs-site.xml and copied a 400MB file to HDFS:

 ./hadoop fs -copyFromLocal /sw/400MB.file /user/ofir

Checking its block size from the NameNode:

./hadoop fsck /user/ofir/400MB.file -files -blocks -racks
FSCK started by root from /10.0.1.111 for path /user/ofir/400MB.file at Thu Jan 30 22:21:29 UTC 2014
/user/ofir/400MB.file 419430400 bytes, 2 block(s):  OK
0. blk_-5656069114314652598_27957 len=268435456 repl=3 [/rack03/10.0.1.133:50010, /rack03/10.0.1.132:50010, /rack02/10.0.1.122:50010]
1. blk_3668240125470951962_27957 len=150994944 repl=3 [/rack03/10.0.1.133:50010, /rack03/10.0.1.132:50010, /rack01/10.0.1.113:50010]

Status: HEALTHY
 Total size:    419430400 B
 Total dirs:    0
 Total files:    1
 Total blocks (validated):    2 (avg. block size 209715200 B)
 Minimally replicated blocks:    2 (100.0 %)
 Over-replicated blocks:    0 (0.0 %)
 Under-replicated blocks:    0 (0.0 %)
 Mis-replicated blocks:        0 (0.0 %)
 Default replication factor:    1
 Average block replication:    3.0
 Corrupt blocks:        0
 Missing replicas:        0 (0.0 %)
 Number of data-nodes:        6
 Number of racks:        3
FSCK ended at Thu Jan 30 22:21:29 UTC 2014 in 1 milliseconds

So far, looks good – the first block is 256MB.

Now, let’s remove the reference for dfs.block.size from my hdfs-site.xml file on the client. I expected that, since we don’t specify a value in our client, it should not ask for any specific block size, so the actual block size will be the value set in the NameNode and DataNodes – 128MB. However, this is what I got:

./hadoop fs -copyFromLocal /sw/400MB.file /user/ofir/400MB.file2

Checking from the NameNode:

# ./hadoop fsck /user/ofir/400MB.file2 -files -blocks -racks
FSCK started by root from /10.0.1.111 for path /user/ofir/400MB.file2 at Thu Jan 30 22:27:52 UTC 2014
/user/ofir/400MB.file2 419430400 bytes, 7 block(s):  OK
0. blk_1787803769905799654_27959 len=67108864 repl=3 [/rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack01/10.0.1.113:50010]
1. blk_3875160333629322317_27959 len=67108864 repl=3 [/rack01/10.0.1.112:50010, /rack01/10.0.1.113:50010, /rack03/10.0.1.132:50010]
2. blk_-3214658865520475261_27959 len=67108864 repl=3 [/rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack02/10.0.1.122:50010]
3. blk_-3454099016147726286_27959 len=67108864 repl=3 [/rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack01/10.0.1.113:50010]
4. blk_-4422811025086071415_27959 len=67108864 repl=3 [/rack01/10.0.1.112:50010, /rack01/10.0.1.113:50010, /rack02/10.0.1.122:50010]
5. blk_8134056524051282216_27959 len=67108864 repl=3 [/rack02/10.0.1.123:50010, /rack02/10.0.1.122:50010, /rack03/10.0.1.132:50010]
6. blk_2811491227269477239_27959 len=16777216 repl=3 [/rack02/10.0.1.122:50010, /rack02/10.0.1.123:50010, /rack03/10.0.1.133:50010]

Status: HEALTHY
 Total size:    419430400 B
 Total dirs:    0
 Total files:    1
 Total blocks (validated):    7 (avg. block size 59918628 B)
 Minimally replicated blocks:    7 (100.0 %)
 Over-replicated blocks:    0 (0.0 %)
 Under-replicated blocks:    0 (0.0 %)
 Mis-replicated blocks:        0 (0.0 %)
 Default replication factor:    1
 Average block replication:    3.0
 Corrupt blocks:        0
 Missing replicas:        0 (0.0 %)
 Number of data-nodes:        6
 Number of racks:        3
FSCK ended at Thu Jan 30 22:27:52 UTC 2014 in 2 milliseconds

The filesystem under path '/user/ofir/400MB.file2' is HEALTHY

Well, the file definitely has a block size of 64MB, not 128MB! This is actually the default value for dfs.block.size in Hadoop 1.x.

It seems that the value of dfs.block.size is dictated directly by the client, regarding of the cluster setting. If a value is not specified, the client just picks the default value. This finding is not  specific to this parameter – for example, the same thing happens with dfs.replication and others….

So, if you ever wondered why you must have full copy of the Hadoop config files around instead of just pointing to the NameNode and JobTracker, now you know…
Now double check your client setup!

Corrections and comments are welcomed!

P.S. Just as I was publishing it, I found a shorter way to check the block size of a file, that also works from any client:

# ./hadoop fs -stat %o /user/ofir/400MB.file
268435456
# ./hadoop fs -stat %o  /user/ofir/400MB.file2
67108864

Exploring HDFS Block Placement Policy

In this post I’ll cover how to see where each data block is actually placed. I’m using my fully-distributed Hadoop cluster on my laptop for exploration and the network topology definitions from my previous post – you’ll need a multi-node cluster to reproduce it.

Background

When a file is written to HDFS, it is split up into big chucks called data blocks, whose size is controlled by the parameter dfs.block.size in the config file hdfs-site.xml (in my case – left as the default which is 64MB). Each block is stored on one or more nodes, controlled by the parameter dfs.replication in the same file (in most of this post – set to 3, which is the default). Each copy of a block is called a replica.

Regarding my setup – I have nine nodes spread over three racks. Each rack has one admin node and two worker nodes (running DataNode and TaskTracker), using Apache Hadoop 1.2.1. Please note that in my configuration, there is a simple mapping between host name, IP address and rack number -host name is hadoop[rack number][node number] and IP address is 10.0.1.1[rack number][node number].

Where should the NameNode place each replica?  Here is the theory:

The default block placement policy is as follows:

  • Place the first replica somewhere – either a random node (if the HDFS client is outside the Hadoop/DataNode cluster) or on the local node (if the HDFS client is running on a node inside the cluster).
  • Place the second replica in a different rack.
  • Place the third replica in the same rack as the second replica
  • If there are more replicas – spread them across the rest of the racks.

That placement algorithm is a decent default compromise – it provides protection against a rack failure while somewhat minimizing inter-rack traffic.

So, let’s put some files on HDFS and see if it behaves like the theory in my setup – try it on yours to validate.

Putting a file into HDFS from outside the Hadoop cluster

As my block size is 64MB,  I created a file (called /sw/big) on my laptop for testing. The file is about 125MB – so it consumes two HDFS blocks. It is not inside one of the data node containers but outside the Hadoop cluster. I then copied it to HDFS twice:

# /sw/hadoop-1.2.1/bin/hadoop fs -copyFromLocal /sw/big /big1
# /sw/hadoop-1.2.1/bin/hadoop fs -copyFromLocal /sw/big /big2

In order to see the actual block placement, of a specific file, I ran hadoop fsck filename -files -blocks -racks from the NameNode server and looked at the top of the report:

# bin/hadoop fsck /big1 -files -blocks -racks
FSCK started by root from /10.0.1.111 for path /big1 at Mon Jan 06 11:20:01 UTC 2014
/big1 130633102 bytes, 2 block(s):  OK
0. blk_3712902403633386081_1008 len=67108864 repl=3 [/rack03/10.0.1.133:50010, /rack03/10.0.1.132:50010, /rack02/10.0.1.123:50010]
1. blk_381038406874109076_1008 len=63524238 repl=3 [/rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack01/10.0.1.113:50010]

Status: HEALTHY
 Total size:    130633102 B
 Total dirs:    0
 Total files:    1
 Total blocks (validated):    2 (avg. block size 65316551 B)
 Minimally replicated blocks:    2 (100.0 %)
 Over-replicated blocks:    0 (0.0 %)
 Under-replicated blocks:    0 (0.0 %)
 Mis-replicated blocks:        0 (0.0 %)
 Default replication factor:    3
 Average block replication:    3.0
 Corrupt blocks:        0
 Missing replicas:        0 (0.0 %)
 Number of data-nodes:        6
 Number of racks:        3
FSCK ended at Mon Jan 06 11:20:01 UTC 2014 in 3 milliseconds

The filesystem under path '/big1' is HEALTHY

You can see near the end that the NameNode correctly reports it know of six data nodes and three racks. If we look specifically at the block report:

0. blk_37129... len=67108864 repl=3 [/rack03/10.0.1.133:50010, /rack03/10.0.1.132:50010, /rack02/10.0.1.123:50010]
1. blk_38103... len=63524238 repl=3 [/rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack01/10.0.1.113:50010]

We can see:

  • Block 0 of file big1 is on hadoop23 (rack 2), hadoop32 (rack 3), hadoop33 (rack 3)
  • Block 1 of file big1 is on hadoop13 (rack1), hadoop32 (rack 3), hadoop33 (rack 3)

Running the same report on big2:

# bin/hadoop fsck /big2 -files -blocks -racks
FSCK started by root from /10.0.1.111 for path /big2 at Mon Jan 06 11:49:36 UTC 2014
/big2 130633102 bytes, 2 block(s):  OK
0. blk_-2514176538290345389_1009 len=67108864 repl=3 [/rack02/10.0.1.122:50010, /rack02/10.0.1.123:50010, /rack03/10.0.1.133:50010]
1. blk_316929026766197453_1009 len=63524238 repl=3 [/rack01/10.0.1.113:50010, /rack01/10.0.1.112:50010, /rack03/10.0.1.133:50010]
...

We can see that:

  • Block 0 of file big2 is on hadoop33 (rack 3), hadoop22 (rack 2), hadoop23 (rack 2)
  • Block 1 of file big2 is on hadoop33 (rack 3), hadoop12 (rack 1), hadoop13 (rack 1)

Conclusion – we can see that when the HDFS client is outside the Hadoop cluster, for each block, there is a single replica on a random rack and two other replicas on a different, random rack, as expected.

Putting a file into HDFS from within the Hadoop cluster

OK, let’s repeat the exercise, now copying the big file from a local path on one of the data nodes (I used hadoop22) to HDFS:

# /usr/local/hadoop-1.2.1/bin/hadoop fs -copyFromLocal big /big3
# /usr/local/hadoop-1.2.1/bin/hadoop fs -copyFromLocal big /big4

We expect to see the first replica of all blocks to be local – on node hadoop22.
Running the report on big3:

# bin/hadoop fsck /big3 -files -blocks -racks
FSCK started by root from /10.0.1.111 for path /big3 at Mon Jan 06 12:10:03 UTC 2014
/big3 130633102 bytes, 2 block(s):  OK
0. blk_-509446789413926742_1013 len=67108864 repl=3 [/rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack02/10.0.1.122:50010]
1. blk_1461305132201468085_1013 len=63524238 repl=3 [/rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack02/10.0.1.122:50010]
...

We can see that:

  • Block 0 of file big3 is on hadoop22 (rack 2), hadoop33 (rack 3), hadoop32 (rack 3)
  • Block 1 of file big3 is on hadoop22 (rack 2), hadoop33 (rack 3), hadoop32 (rack 3)

And for big4:

# bin/hadoop fsck /big4 -files -blocks -racks
FSCK started by root from /10.0.1.111 for path /big4 at Mon Jan 06 12:12:52 UTC 2014
/big4 130633102 bytes, 2 block(s):  OK
0. blk_-8460635609179877275_1014 len=67108864 repl=3 [/rack03/10.0.1.133:50010, /rack03/10.0.1.132:50010, /rack02/10.0.1.122:50010]
1. blk_8256749577534695387_1014 len=63524238 repl=3 [/rack01/10.0.1.112:50010, /rack02/10.0.1.122:50010, /rack01/10.0.1.113:50010]
  • Block 0 of file big4 is on hadoop22 (rack 2), hadoop32 (rack 3), hadoop33 (rack 3)
  • Block 1 of file big4 is on hadoop22 (rack 2), hadoop12 (rack 1), hadoop13 (rack 1)

Conclusion – we can see that when the HDFS client is inside the Hadoop cluster for each block, there is a single replica on the local node (hadoop22 in this case) and two other replicas on a different,random rack, as expected.

Just for fun – create a file with more than three replicas

We can override Hadoop parameters when invoking the shell using -D option. Let’s use it to create a file with four replicas from node hadoop22:

# /usr/local/hadoop-1.2.1/bin/hadoop fs -D dfs.replication=4 -copyFromLocal ~/big /big5

Running the report on big5:

# bin/hadoop fsck /big5 -files -blocks -racks
FSCK started by root from /10.0.1.111 for path /big5 at Mon Jan 06 12:36:28 UTC 2014
/big5 130633102 bytes, 2 block(s):  OK
0. blk_-4060957873241541489_1015 len=67108864 repl=4 [/rack01/10.0.1.112:50010, /rack03/10.0.1.133:50010, /rack03/10.0.1.132:50010, /rack02/10.0.1.122:50010]
1. blk_8361194277567439305_1015 len=63524238 repl=4 [/rack03/10.0.1.133:50010, /rack01/10.0.1.113:50010, /rack01/10.0.1.112:50010, /rack02/10.0.1.122:50010]
...

We can see that:

  • Block 0 of file big3 is on hadoop22 (rack 2), hadoop33 (rack 3), hadoop32 (rack3), hadooop12 (rack1)
  • Block 1 of file big3 is on hadoop22 (rack 2), hadoop13 (rack 1), hadoop12 (rack1), hadooop33 (rack3)

So far, works as expected! That’s a nice surprise…

Finally, let’s try to break something 🙂

What will happen when we’ll try to specify more replicas than data nodes?

While it is “stupid”, it will likely happen in pseudo-distributed config every time you try dfs.replication > 1, as you have only one DataNode.

# /usr/local/hadoop-1.2.1/bin/hadoop fs -D dfs.replication=7 -copyFromLocal ~/big /big7

We do have only six data nodes. Running the report on big7:

# bin/hadoop fsck /big7 -files -blocks -racks
FSCK started by root from /10.0.1.111 for path /big7 at Mon Jan 06 12:40:05 UTC 2014
/big7 130633102 bytes, 2 block(s):  Under replicated blk_1565930173693194428_1016. Target Replicas is 7 but found 6 replica(s).
 Under replicated blk_-2825553771191528109_1016. Target Replicas is 7 but found 6 replica(s).
0. blk_1565930173693194428_1016 len=67108864 repl=6 [/rack01/10.0.1.112:50010, /rack01/10.0.1.113:50010, /rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack02/10.0.1.123:50010, /rack02/10.0.1.122:50010]
1. blk_-2825553771191528109_1016 len=63524238 repl=6 [/rack01/10.0.1.112:50010, /rack01/10.0.1.113:50010, /rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack02/10.0.1.123:50010, /rack02/10.0.1.122:50010]

Status: HEALTHY
 Total size:    130633102 B
 Total dirs:    0
 Total files:    1
 Total blocks (validated):    2 (avg. block size 65316551 B)
 Minimally replicated blocks:    2 (100.0 %)
 Over-replicated blocks:    0 (0.0 %)
 Under-replicated blocks:    2 (100.0 %)
 Mis-replicated blocks:        0 (0.0 %)
 Default replication factor:    3
 Average block replication:    6.0
 Corrupt blocks:        0
 Missing replicas:        2 (16.666666 %)
 Number of data-nodes:        6
 Number of racks:        3
FSCK ended at Mon Jan 06 12:40:05 UTC 2014 in 1 milliseconds

We can see that each data node has a single copy of each block, and fsck notifies us that the two blocks are under-replicated. Nice!

Changing the replication factor of an existing file.

Since this warning in fsck might be annoying (and depending on your monitoring scripts, sets various alarms…), we should change the replication factor of this file:

# bin/hadoop fs -setrep 6 /big7
Replication 6 set: hdfs://hadoop11:54310/big7

In this case, the NameNode won’t immediate delete the extra replicas:

# bin/hadoop fsck /big7 -files -blocks -racks
FSCK started by root from /10.0.1.111 for path /big7 at Mon Jan 06 12:51:00 UTC 2014
/big7 130633102 bytes, 2 block(s):  OK
0. blk_1565930173693194428_1016 len=67108864 repl=6 [/rack01/10.0.1.112:50010, /rack01/10.0.1.113:50010, /rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack02/10.0.1.123:50010, /rack02/10.0.1.122:50010]
1. blk_-2825553771191528109_1016 len=63524238 repl=6 [/rack01/10.0.1.112:50010, /rack01/10.0.1.113:50010, /rack03/10.0.1.132:50010, /rack03/10.0.1.133:50010, /rack02/10.0.1.123:50010, /rack02/10.0.1.122:50010]

Status: HEALTHY
 Total size:    130633102 B
 Total dirs:    0
 Total files:    1
 Total blocks (validated):    2 (avg. block size 65316551 B)
 Minimally replicated blocks:    2 (100.0 %)
 Over-replicated blocks:    2 (100.0 %)
 Under-replicated blocks:    0 (0.0 %)
 Mis-replicated blocks:        0 (0.0 %)
 Default replication factor:    3
 Average block replication:    6.0
 Corrupt blocks:        0
 Missing replicas:        0 (0.0 %)
 Number of data-nodes:        6
 Number of racks:        3
FSCK ended at Mon Jan 06 12:51:00 UTC 2014 in 1 milliseconds

The filesystem under path '/big7' is HEALTHY

If you are in a hurry, adding -w to the command should trigger immediate replication level change:

# bin/hadoop fs -setrep 5 -w /big7
Replication 5 set: hdfs://hadoop11:54310/big7
Waiting for hdfs://hadoop11:54310/big7 .... done

Exploring The Hadoop Network Topology

In this post I’ll cover how to define and debug a simple Hadoop network topology.

Background

Hadoop is designed to run on large clusters of commodity servers – in many cases spanning many physical racks of servers. A physical rack is in many cases a single point of failure (for example, having typically a single switch for lower cost), so HDFS tries to place block replicas on more than one rack. Also, there is typically more bandwidth within a rack than between the racks, so the software on the cluser (HDFS and MapReduce / YARN) can take it into account. This leads to a question:

How does the NameNode know the network topology?

By default, the NameNode has no idea which node is in which rack. It therefore by default assumes that all nodes are in the same rack, which is likely true for small clusters. It calls this rack “/default-rack“.

So, we have to teach Hadoop our cluster network topology – the way the nodes are grouped into racks. Hadoop supports a pluggable rack topology implementation – controlled by the parameter topology.node.switch.mapping.impl in core-site.xml, which specifies a java class implementation. The default implementation is using a user-provided script, specified in topology.script.file.name in the same config file, a script that gets a list of IP addresses or host names and returns a list of rack names (in Hadoop2: net.topology.script.file.name). The script will get up to topology.script.number.args parameters per invocation, by default up to 100 requests per invocation (in Hadoop2: net.topology.script.number.args).

In my case, I set it to /usr/local/hadoop-1.2.1/conf/topology.script.sh which I copied from the Hadoop wiki. I just made a few changes – I changed the path to my conf directory in the second line, added some logging of the call in the third line and changed the default rack name near the end to /rack01:

#!/bin/bash          
HADOOP_CONF=/usr/local/hadoop-1.2.1/conf
echo `date` input: $@ >> $HADOOP_CONF/topology.log
while [ $# -gt 0 ] ; do
  nodeArg=$1
  exec< ${HADOOP_CONF}/topology.data
  result=""
  while read line ; do
    ar=( $line )
    if [ "${ar[0]}" = "$nodeArg" ] ; then
      result="${ar[1]}"
    fi
  done
  shift
  if [ -z "$result" ] ; then
#    echo -n "/default/rack "
     echo -n "/rack01"
  else
    echo -n "$result "
  fi
done

The script basically just parses a text file (topology.data) that holds a mapping from IP address or host name to rack name. Here are the content of my file:

hadoop11        /rack01
hadoop12        /rack01
hadoop13        /rack01
hadoop14        /rack01
haddop15        /rack01
hadoop21        /rack02
hadoop22        /rack02
hadoop23        /rack02
hadoop24        /rack02
hadoop25        /rack02
hadoop31        /rack03
hadoop32        /rack03
hadoop33        /rack03
hadoop34        /rack03
hadoop35        /rack03
10.0.1.111      /rack01
10.0.1.112      /rack01
10.0.1.113      /rack01
10.0.1.114      /rack01
10.0.1.115      /rack01
10.0.1.121      /rack02
10.0.1.122      /rack02
10.0.1.123      /rack02
10.0.1.124      /rack02
10.0.1.125      /rack02
10.0.1.131      /rack03
10.0.1.132      /rack03
10.0.1.133      /rack03
10.0.1.134      /rack03
10.0.1.135      /rack03

A bit long, but pretty straight forward. Please note that in my configuration, there is a simple mapping between host name, IP address and rack number – the host name is hadoop[rack number][node number] and IP address is 10.0.1.1[rack number][node number].

You could of course write any logic into the script. For example, using my naming convention, I could have written a simple script that just takes the second-to-last character and translate it to rack number – that would work on both IP addresses and host names. As another example – I know some companies allocate IP addresses for their Hadoop cluster as x.y.[rack_number].[node_number] – so they can again just parse it directly.

Before we test the script, if you have read my previous post on LXC setup, please note that I made  a minor change – I switched to a three-rack cluster, to make the block placement more interesting.  So, my LXC nodes are:

hadoop11 namenode zookeeper
hadoop12 datanode tasktracker
hadoop13 datanode tasktracker
hadoop21 jobtracker zookeeper
hadoop22 datanode tasktracker
hadoop23 datanode tasktracker
hadoop31 secondarynamenode hiveserver2 zookeeper
hadoop32 datanode tasktracker
hadoop33 datanode tasktracker

OK, now that we covered the script, let’s start the NameNode and see see what gets logged:

# bin/hadoop-daemon.sh --config conf/ start namenode
starting namenode, logging to /usr/local/hadoop-1.2.1/libexec/../logs/hadoop-root-namenode-hadoop11.out
# cat conf/topology.log 
Mon Jan 6 19:04:03 UTC 2014 input: 10.0.1.123 10.0.1.122 10.0.1.113 10.0.1.112 10.0.1.133 10.0.1.132

As the NameNode started, it asked in a single called what is the rack name of all our nodes. This is what the script returns to the NameNode:

# conf/topology.script.sh 10.0.1.123 10.0.1.122 10.0.1.113 10.0.1.112 10.0.1.133 10.0.1.132
/rack02 /rack02 /rack01 /rack01 /rack03 /rack03

Why did the NameNode send all the IP addresses of the DataNodes in a single call? In this case, I have pre-configured another HDFS parameter called dfs.hosts in hdfs-site.xml. This parameter points to a file with a list of all nodes that are allowed to run a data node. So, when the NameNode started, it just asked for the mapping of all known data node servers.

What happens if you don’t use dfs.hosts? To check, I removed this parameter from my hdfs-site.xml file, restarted the NameNode and started all the data nodes. In this case, the NameNode called the topology script once per DataNode (when they first reported their status to the NameNode):

# cat topology.log 
Mon Jan 6 19:04:03 UTC 2014 input: 10.0.1.123 10.0.1.122 10.0.1.113 10.0.1.112 10.0.1.133 10.0.1.132
Mon Jan 6 19:07:53 UTC 2014 input: 10.0.1.112
Mon Jan 6 19:07:53 UTC 2014 input: 10.0.1.123
Mon Jan 6 19:07:53 UTC 2014 input: 10.0.1.113
Mon Jan 6 19:07:53 UTC 2014 input: 10.0.1.133
Mon Jan 6 19:07:54 UTC 2014 input: 10.0.1.132
Mon Jan 6 19:07:54 UTC 2014 input: 10.0.1.122

I hope this post has enough data for you to hack and QA your own basic network topologies. In the next post I hope to investigate block placement – how to see where HDFS is actually putting each copy of each block under various conditions.

Creating a virtualized fully-distributed Hadoop cluster using Linux Containers

TL;DR Why and how I created a working 9-node Hadoop Cluster on my  laptop

In this post I’ll cover why I wanted to have a decent multi-node Hadoop cluster on my laptop, why I chose not to use virtualbox/VMware player, what is LXC (Linux Containers) and how did I set it up. The last part is a bit specific to my desktop O/S (Ubuntu 13.10).

Why install a fully-distributed Hadoop cluster on my laptop?

Hadoop has a “laptop mode” called pseudo-distributed mode. In that mode, you run a single copy of each service (for example, a single HDFS namenode and a single HDFS datanode), all listening under localhost.

This feature is rather useful for basic development and testing – if all you want is write some code that uses the services and check if it runs correctly (or at all). However, if your focus is the platform itself – an admin perspective, than that mode is quite limited. Trying out many of Hadoop’s feature requires a real cluster – for example replication, failure testing, rack awareness, HA (of every service types) etc.

Why VirtualBox or VMware Player are not a good fit?

When you want to run several / many virtual machines, the overhead of these desktop virtualization sotware is big. Specifically, as they run a full-blown guest O/S inside them, they:

  1. Consume plenty of RAM per guest – always a premium on laptops.
  2. Consume plenty of disk space – which is limited, especially on SSD.
  3. Take quite a while to start / stop

To run many guests on a laptop, a more lightweight infrastructure is needed.

What is LXC (Linux Containers)

LXC is a lightweight virtualization solution for Linux, sometimes called “chroot on steroids”. It leverages a set of Linux kernel features (control groups, kernel namespaces etc) to provide isolation between groups of processes running on the host. In other words, in most cases the containers (virtual machines) are using the host kernel and operating system, but are isolated as if they are running on their own environent (including their own IP addresses etc).

In practical terms, I found that the LXC containers start in a sub-second. They have no visible memory overhead  – for example, if a container runs the namenode, only the namenode consumes RAM, based on its java settings. They also consume very little disk space, and their file system is just stored directly in the host file system (under /var/lib/lxc/container_name/rootfs), which is very useful. You don’t need to statically decide how much RAM or virtual cores to allocate each (though you could) – they consume just what they need. Actually, while the guests are isolated from each other, you can see all their processes together in the host using the ps. command as root..

Some other notes – LXC is already used by Apache Mesos and Dockers, amongst others. A “1.0” release is targeted next month (February). If you are using RHEL/CentOS, LXC is provided as a “technical preview” on RHEL 6 (likely best to use a current update).

If you want a detailed introduction, Stéphane Graber which is one of the LXC maintainer started an excellent ten-post series on LXC a couple of weeks ago.

So, how to set it up?

The following are some of my notes. I only tried it on my laptop, with Ubuntu 13.10 64-bit as a guest, so if you have a different O/S flavor, expect some changes.

All commands below are as root.
start by installing LXC:

apt-get install lxc

The installation also creates a default virtual network bridge device for all containers (lxcbr0). Next, create your first Ubuntu container called test1 – it will take a few minutes as some O/S packages will be downloaded (but they will be cached, so the next containers will be created in a few seconds):

lxc-create -t ubuntu -n test1

Start the container with:

lxc-start -d -n test1

List all your containers (and their IP addreses) with:

lxc-ls --fancy

and open a shell on it with:

lxc-attach -n test1

stop the container running on your host:

lxc-stop -n test1

You can also clone a container (lxc-clone) or freeze/unfreeze a running one (lxc-freeze / lxc-unfreeze). LXC also has a web GUI with some nice visualizations (though some of it was broken for me), install it with:

wget http://lxc-webpanel.github.io/tools/install.sh -O - | bash

and access it with http://localhost:5000 (default user/password is admin/admin):

lxc-web-panel

Now, some specific details for an Hadoop container

1. Install a JVM inside the container. The default LXC Ubuntu container has a minimal set of packages . At first, I installed openjdk in it:

apt-get install openjdk-7-jdk

However, I decided to switched to Oracle JDK as that is what’s typically being used. That required some extra steps (the first step adds the “add-apt-repository” command, the others set a java7 repository to ease installation and updates):

apt-get install software-properties-common
add-apt-repository ppa:webupd8team/java
apt-get update
apt-get install oracle-java7-installer

2. Setting the network

By default, LXC uses DHCP to provide dynamic IP addresses to containers. As Hadoop is very sensitive to IP addresses (it occasionally uses them instead of host names), I decided that I want static IPs for my Hadoop containers.

In Ubuntu, the lxc bridge config file is /etc/default/lxc-net – I uncommented the following line:

LXC_DHCP_CONFILE=/etc/lxc/dnsmasq.conf

and created /etc/lxc/dnsmasq.conf with the following contents:

dhcp-host=hadoop11,10.0.1.111
dhcp-host=hadoop12,10.0.1.112
dhcp-host=hadoop13,10.0.1.113
dhcp-host=hadoop14,10.0.1.114
dhcp-host=hadoop15,10.0.1.115
dhcp-host=hadoop16,10.0.1.116
dhcp-host=hadoop17,10.0.1.117
dhcp-host=hadoop18,10.0.1.118
dhcp-host=hadoop19,10.0.1.119
dhcp-host=hadoop21,10.0.1.121
dhcp-host=hadoop22,10.0.1.122
dhcp-host=hadoop23,10.0.1.123
dhcp-host=hadoop24,10.0.1.124
dhcp-host=hadoop25,10.0.1.125
dhcp-host=hadoop26,10.0.1.126
dhcp-host=hadoop27,10.0.1.127
dhcp-host=hadoop28,10.0.1.128
dhcp-host=hadoop29,10.0.1.129

That should cover more containers than I’ll ever need… My naming convention is that the host name is hadoop[rack number][node number] and the IP is 10.0.1.1[rack number][hostname]. For example, host hadoop13 is located at rack 1 node 3 with IP address of 10.0.113. This way it is easy to figure out what’s going on even if the logs only have IP.

I’ve also added all of the potential containers to my /etc/hosts and copied them to hadoop11, which I use as a template to clone the rest of the nodes:

10.0.1.111	hadoop11
10.0.1.112      hadoop12
10.0.1.113      hadoop13
10.0.1.114      hadoop14
10.0.1.115      hadoop15
10.0.1.116      hadoop16
10.0.1.117      hadoop17
10.0.1.118      hadoop18
10.0.1.119      hadoop19
10.0.1.121      hadoop21
10.0.1.122      hadoop22
10.0.1.123      hadoop23
10.0.1.124      hadoop24
10.0.1.125      hadoop25
10.0.1.126      hadoop26
10.0.1.127      hadoop27
10.0.1.128      hadoop28
10.0.1.129      hadoop29

I ran into one  piece of ugliness regarding DHCP. I use hadoop11 container as a template, so I as I develop my environment I have destroyed the rest of the containers and cloned them again a few times. At that point, they got a new dynamic IP – it seems the DHCP server remembered that the static IP that I wanted was already allocated. Eventually, I added this to the end of  my “destroy all nodes” script to solve that:

service lxc-net stop
rm /var/lib/misc/dnsmasq.lxcbr0.leases 
service lxc-net start

If you run into networking issues – I recommend checking the Ubuntu-specific docs, as lots of what you find on the internet is scary network hacks that are likely not needed in Ubuntu 13.10 (and didn’t work for me).

Another setup thing – if you run on BTRFS (a relatively new copy-on-write filesystem), the lxc-clone should detect it and leverage it. It doesn’t seem to work for me, but instead copying files directly into /var/lib/lxc/container_name/rootfs using cp –reflink works as expected (you can verify disk space after copying by waiting a few seconds and then issuing btrfs file df /var/lib/lxc/ )

3. Setting Hadoop

This is pretty standard… You should just set up ssh keys, maybe create some O/S users for Hadoop, download Hadoop bits from Apache or a vendor and get going… Personally, I used Apache Hadoop (1.2.1 and 2.2.0) – I have set things up on a single node and then clone it to create a cluster.

In addition, to ease my tests, I’ve been scripting the startup, shutdown and full reset of the environment based on a config file that I created. For example, here is my current config file for Apache Hadoop 1.2.1 (the scripting works nicely but is still evolving – ping me if you are curious):

hadoop11 namenode zookeeper
hadoop12 datanode tasktracker
hadoop13 datanode tasktracker
hadoop14 datanode tasktracker
hadoop21 jobtracker zookeeper
hadoop22 datanode tasktracker
hadoop23 datanode tasktracker
hadoop24 datanode tasktracker
hadoop29 secondarynamenode hiveserver2 zookeeper
Obviously, you can set a smaller cluster, based on your cores / RAM / disk / workload etc.
Summary
If you’re working with Hadoop, having a full blown cluster on your laptop is just awesome… Stay tuned for some examples in the coming weeks 🙂

Highlights from writing a data sheet for JethroData

A couple of weeks ago, my friends at JethroData asked me to help them out writing a technical data sheet.

Now, JethroData product is a SQL engine for Hadoop, currently in private beta, that has a very different design and architecture than the rest of the crowd. Specifically, it actually always indexes all the columns of your tables, and also uses HDFS as a shared, remote storage instead of running locally on the data nodes. It has some very good reasons to work this way, but being radically different has made it harder to explain.

So, my task was to write a technical data sheet that would help technical people switch from “this is an insane nonsense” to “that actually makes sense in these cases, but how do you handle this or that…”. In other words, tell a consistent technical story that makes sense and highlights their strengths, encouraging readers to engage further.

I think data sheets in general are a piece of paper you get from a vendor at their booth, and quickly browse through them later while you are stuck at a boring presentation (or is it just me?). So, they have to be interesting and appealing. Also, due to their short format, they should be very focused on the main values and not too high-level or low-level. Quite a challenge!

To overcome the challenge, I first needed deeper knowledge regarding the product. For that, I spent some time with JethroData team, especially with Boaz Raufman, their co-founder and CTO. I have grilled him with questions on architecture and functionality, which led to great discussions. While of course some functionality is still lacking at this early phase, I was impressed to see that the core architecture is designed to handle quite a lot.

Then came the harder part – focus. There are many ways to use JethroData, its implementation has various advantages in different scenarios and of course, the implementation has many details, each with some logic behind it. It is impossible to cram it all into a two-page datasheet, even if most of it is very important. So, I had several lively discussions with Eli Singer, JethroData CEO, of everything from positioning to many specific strengths and supporting arguments and their relative importance.

At the end of the day, I think we got something pretty sweet. You can read it on their web site technology page. If you are attending Strata/Hadoop World, JethroData team will be on the Startup Showcase on Monday and in booth #75 Tuesday-Wednesday – bring questions and arguments 🙂

How will the rise of stream processing effect Big Data paradigm?

Even if you’ve never heard so far about stream processing, it is an easy guess you’ll likely run into this term in the coming weeks around Hadoop World, given the way it illustrates so nicely what YARN all about and also of course given the Hortonworks announcement of endorsing Apache Storm (disclaimer – I love Hortonworks).

Now, before I start fantasizing about the potential popularity of various architectures in the years to come, some quick background…

“Traditional” big data streaming / messaging is just about moving events from place to place. For example, collecting log records from dozens or hundreds of servers in near real-time and delivering each to the relevant target(s), like HDFS files,HBase, a database or another application in a reliable fashion (with specific semantics). It is a re-incarnation of the message queuing concepts, in a high-throughput, distributed fashion and extended both to actively collect events directly from various sources and to persist the events on the other side in various targets.

It seems to me the most popular system is Apache Kafka (originally from LinkedIn), but there is also Apache Flume (originally from Cloudera) which is bundled in most Hadoop distributions.

Stream processing on the other hands, focuses on doing complex parsing, filtering, transformations and computations on the data stream as the data flows, for example maintaining an up-to-date counters and aggregated statistics per user or rolling window “top n” (continuous computation). This is opposed to batch processing frameworks, like MapReduce and Tez, that are based on one-time bulk processing. In this space, Apache Storm (originally from Backtype / Twitter) is the main player, but as LinkedIn released Apache Samza last month, it might lead to some healthy competition. There are other initiatives – for example, Cloudera contributed a library or building real-time analytics on Flume which might be good enough for simpler cases.

By the way, these two types of systems are complementary. Specifically, the combination of Kafka and Storm seems to be gaining traction – here is one success story from LivePerson.

Now, stay with me for a minute while I hype… What if you could write most of the common batches as stream processing? What if you could provide your business users updated data that is only seconds or tens of seconds behind the present? What if you could just have the answers to many common questions always updated and ready? Could stream processing significantly de-emphasize / replace batch processing on Hadoop in the future?

Now, that is of course mostly crazy talk today. In reality, maybe stream processing would not be a good fit for many computation types, and it is rather early to fully understand it. Also, the functionality, optimizations, integrations, methodologies and best practices around stream processing would likely take a few years to really mature.

Having said that, it seems to me that at least in the database / ETL space, stream processing has a great potential to evolve into a powerful alternative paradigm on how we design a system that adds analytic to operational systems.

But, instead of hyping anymore, I’ll ask – how crazy is that? How far are we from such capabilities? What I already see looks promising in theory. Here are a couple of examples:

  • Storm already has its own official abstraction layer to simplify writing complex computations. It is called Trident, contributed by Twitter and is part of Storm 0.8. It provides higher-level micro-batch primitives that allows, for example, to build parallel aggregations and joins. Interestingly, when Twitter released it a year ago, they specifically described it as similar to Pig or Cascading, but for stream processing.
    It seems to me that declarative,SQL-like continuous computation is only one step away from that.
  • Even more fascinating is Twitter’s Summingbird (open-sourced last month). It is a programmer-friendly abstraction of stream processing that can be executed either in real-time (on Storm) or as batch (using Scalding, translated to MapReduce, in the future maybe to Tez). More interestingly, It was designed to allow an hybrid mode, transparently mixing historical data (batch) from Hadoop with last-minute updates from Storm.
    So, batch and streaming could co-exist and even transparently mix, even today.

Now, it remains to be seen of course how will those technologies evolve, but it seems that the current state of events is already not too bad. With the adoption steam processing by mainstream Hadoop distributions (surely Hortonworks wont be the last), usability, functionality and innovation will likely all accelerate. So, if you are starting to look into “next-gen” Big Data infrastructure and low require latency from event to results, it might be a good time to start looking into stream processing.

Update 30/10/13: I can’t believe I didn’t spend time to read about Lambda Architecture before. Nathan Marz (who led Twitter’s Storm) came up with a coherent, end-to-end architecture for a data system to make relational databases obsolete, with both a batch processing component and a stream processing component. Find some time to watch his fascinating presentation from Strange Loop 2012, you won’t regret it.

Big Data products/projects types – from proprietary to industry standard

I recently read Merv’s excellent post on proprietary vs open-source Hadoop – suggesting that use the term distribution-specific is likely more appropriate. It reminded that I wanted to write about the subject – I had a somewhat more detailed classification in mind.
I’m focusing on Hadoop but this also is very relevant to the broader Big Data ecosystem, including the NoSQL/NewSQL solutions etc.

But first, why bother? Well, the classification typically hints some things to decision makers regarding that piece of software. For enterprises, which typically relies on outside help, some open questions might be:

  • Skills – How can we get confidence in using this software (people, training etc)? Can we get help from our partners / consultants / someone new if needed? In our region / country?
  • Support  – Can we get commercial support? Global or local? Is there more than one vendor so we can choose / switch if the relationship goes bad?
  • Finally, how much of a “safe bet” is it? Will it be in a few years a niche, abandoned code or mainstream? Will it integrate well with the ecosystem over time (ex: YARN-friendly) or not? and what are the odds that there will be someone around supporting it in several years when we still rely on it in production?

Of course, the state of piece of software may change over time, but still I think each of the following categories has some immediate implications regarding these questions. For example, proprietary (closed-source) projects should come with decent paid support, optional professional services and training, average plus documentation, and decent chance of long term support (but small startups are still a potential risk).

Having said that, here is the way I classify projects and products:

  • A proprietary offering of a specific Hadoop distribution –  a closed-source offering bundled with a vendor’s Hadoop distribution. Some examples are MapR (with proprietary implementation of HDFS, MapReduce etc), Pivotal (with HAWQ as the central attraction of their distribution) and also Cloudera (with Cloudera Manager, although proprietary installation and management is almost universal).
    The propriety bits are usually considered by the vendor as a significant part of their secret sauce.
  • Proprietary offering on Hadoop – a closed-source offering that runs on Hadoop. Some are distribution-agnostic (A.K.A Bring-Your-Own-Hadoop), for example Platfora, which runs on Cloudera, Hortonworks, MapR and Amazon. Some are certfiied on only a specific distribution (for now?) like Teradata-Hortonworks.
    Also, while some are Hadoop-only, many are an extension of an existing vendor offering to Hadoop, for example – all ETL vendors, some analytics vendors, some security vendors etc (the BI vendors typically just interact with Hadoop using SQL).
  • Open source projects, not bundled with a distribution – many great tools have been released as open source and have started building a community around them. While they are not yet bundled with an existing distribution, this doesn’t stop many from using them.
    Some of the projects complement existing functionality  – like salesforce.com Phoenix for smart, thin SQL layer over HBase or Netflix Lipstick for great Pig workflow visualization. Other projects are alternatives to existing projects, like Linkedin (now Apache) Kafka for distributed messaging as a popular alternative for Apache Flume or like Apache Accumulo as a security-enhanced alternative to Apache HBase. Yet others are just new types of stuff on Hadoop, like Twitter’s Storm for distributed, real-time, stream computations or any of the distributed graph processing alternatives (Apache Giraph and others).
  • Open source projects, bundled with a single distribution – a few of them are  existing open source projects that got picked by a distribution, like Linkedin DataFu that adds useful UDFs to Apache Pig and is now bundled with Cloudera. However, most of them are projects led by the distribution, with varying community around them, for example Hortonworks Ambari (Apache Incubator), Cloudera Impala and Cloudera Hue, plus various security initiatives all over the place (Cloudera Sentry, Hortonworks Knox, Intel Rhino etc).
    As an interesting anecdote of the market dynamics – it seems Cloudera Hue is now also bundled in Hortonworks 2.0 beta  under “third-party components”, which might hint it could evolve into last category:
  • Industry-standard open source projects – these are projects that you can expect to find on most leading distributions, meaning, several vendors provide support.
    While some are obvious (Pig,Hive etc), some are new or just arriving. For example, HCatalog was created by Hortonworks but is now included in Cloudera and MapR. I would guess that merging it into Hive helped smooth things out (likely it is easier to deliver it than rip it out…). It would be interesting to see if Tez will follow the same adoption path, as Hortonworks planned to eventually build support directly in Hive, Pig and Cacading. A similar fate would likely come to most of the security projects I mentioned, as well as ORC and Parquet support (ORC is already in Hive).

So there you have you it. I believe knowing the type of project can help organization gauge the risk involved (vs. the benefit) to make an informed decision about its use.

Oh, and on a side note – “bundled in a distribution” and “distribution does provides decent support, bug fixes and enhancements”  are of course two different things… That’s why the “brag the number of your commiters” resonate well with me, but that’s a different story.