Random notes on Apache Pulsar / Apache BookKeeper

This is a placeholder for my notes from researching these technologies. It might save you a bit of time if you plan to look into these.

  • Apache BookKeeper ➝ an alternative to Apache Kafka core (“topics”).
  • Apache Pulsar➝ built on top of BookKeeper, add multi-tenancy, multi-region, non-Java clients, tiered storage (offload to S3) etc, plus:
    • Pulsar Functions ➝ Kafka Streams
    • Pulsar IO ➝ Kafka Connect (including Debezium)
    • Pulsar SQL➝ KSQL (Using Presto)
    • Pulsar Schema Registry
  • streaml.io ➝ the company that commercializes it all. Recently added a cloud service offering (on Google Cloud). Great blog.

Apache BookKeeper

Apache BookKeeper provides replicated, durable storage of (append-only) log streams, with low-latency reads and writes (“<5ms”). In their words, “a scalable, fault-tolerant, and low-latency storage service optimized for real-time workloads”.

It was originally developed by Yahoo! as part of an HDFS NameNode HA alternative solution, open sourced as Apache ZooKeeper sub-project in 2011, and graduated as a top-level project in 2015.


  • Record / log entry ➝ basic unit of data.
  • Ledger ➝ a persisted sequence of (append-only) log entries. It has only a single writer and is bounded – eventually sealed when the writer dies or explicitly asks for sealing.
  • Log stream ➝ unbounded stream, uses multiple ledgers, rotated based on a time or size rolling policy.
  • Namespace ➝ a tenant, logical grouping of multiple streams, sharing some policies.
  • Bookie ➝ a single server storing and serving ledger fragments.
  • ensemble ➝ the collection of bookies that handle a specific ledger. A subset of all the bookies in the BookKeeper cluster.
  • ZooKeeper ➝ the metadata store, for coordination and metadata. (etcd support for Kubernetes seems to be commited in 4.9.0)
  • Ledger API ➝ low-level API
  • DistributedLog API ➝ “Log Stream API”. A higher-level, streaming oriented API. It is BookKeeper sub-project that was originally an independent open-source project by Twitter. Seems dead, zero activity on its mailing lists, likely since Twitter have moved to Kafka

Apache Pulsar

  • Pulsar functions – lightweight functions for stream processing, can run in on the Pulsar cluster or on Kubernetes.

NOTE – I’m pausing here, will return in the future if relevant


So what is serverless?

“Serverless? but there is a physical server underneath”

You might have encountered some cynical remarks on the term “Serverless”. I believe this actually comes from some confusion about which type server is being removed, and also from different people having a different cloud background.

Let’s say you have an app that exposes a REST API. It runs all the time, waiting for requests and processes them as they arrive. It may be that this daemon exposes a dozen REST endpoints. It likely has a main loop that waits for HTTP requests from clients, runs the corresponding business logic, and returns a JSON response or some HTTP error.

This app acts as a server, as in “client/server” architecture.
Serverless is about removing that server layer (the always-on “main loop”), which means wiring client requests directly to the business logic functions, without constantly paying for an up-and-running server. You “pay-as-you-go” for the actual resource consumption of your business logic code, not for reserving capacity by the hour.

How does it work?

Technically, if you convert the above app to serverless, you will provide:

  1. The “business logic” functions
    Instead of packaging your entire app, you package each of your inner business logic functions, and upload (only) them to your function service (sometimes called FaaS / Function as a Service), for example to the AWS Lambda service.
  2. The REST glue
    You declare a mapping between HTTP endpoints (and methods) and the functions, so calling the endpoint triggers the function. For example, a POST of myapp.com/api/v1/customers may invoke the create_customer() function. The details varies between cloud providers, but on AWS you set it using the Amazon API Gateway service.

Behind the scenes, your cloud provider will transparently create servers to run your functions – for example, creating JVMs with their own “main()” for Java functions. It will automatically add or remove such servers based on the current number of concurrent executions of your functions, will reuse those servers to save startup time, will stop them all when a function was not called at all for a while etc etc.

Continue reading

Exploring the HDFS Default Value Behaviour

In this post I’ll explore how the HDFS default values really work, which I found to be quite surprising and non-intuitive, so there s a good lesson here.

In my case, I have my local virtual Hadoop cluster, and I have a different client outside the cluster. I’ll just put a file from the external client into the cluster in two configurations.

All the nodes of my Hadoop (1.2.1) cluster have the same hdfs-site.xml file with the same (non-default) value for dfs.block.size (renamed to dfs.blocksize in Hadoop 2.x) of 134217728, which is 128MB. In my external node, I also have the hadoop executables with a minimal hdfs-site.xml.

First, I have set dfs.block.size to 268435456 (256MB) in my client hdfs-site.xml and copied a 400MB file to HDFS:

 ./hadoop fs -copyFromLocal /sw/400MB.file /user/ofir

Checking its block size from the NameNode:

./hadoop fsck /user/ofir/400MB.file -files -blocks -racks
FSCK started by root from / for path /user/ofir/400MB.file at Thu Jan 30 22:21:29 UTC 2014
/user/ofir/400MB.file 419430400 bytes, 2 block(s):  OK
0. blk_-5656069114314652598_27957 len=268435456 repl=3 [/rack03/, /rack03/, /rack02/]
1. blk_3668240125470951962_27957 len=150994944 repl=3 [/rack03/, /rack03/, /rack01/]

 Total size:    419430400 B
 Total dirs:    0
 Total files:    1
 Total blocks (validated):    2 (avg. block size 209715200 B)
 Minimally replicated blocks:    2 (100.0 %)
 Over-replicated blocks:    0 (0.0 %)
 Under-replicated blocks:    0 (0.0 %)
 Mis-replicated blocks:        0 (0.0 %)
 Default replication factor:    1
 Average block replication:    3.0
 Corrupt blocks:        0
 Missing replicas:        0 (0.0 %)
 Number of data-nodes:        6
 Number of racks:        3
FSCK ended at Thu Jan 30 22:21:29 UTC 2014 in 1 milliseconds

So far, looks good – the first block is 256MB.

Continue reading

Exploring HDFS Block Placement Policy

In this post I’ll cover how to see where each data block is actually placed. I’m using my fully-distributed Hadoop cluster on my laptop for exploration and the network topology definitions from my previous post – you’ll need a multi-node cluster to reproduce it.


When a file is written to HDFS, it is split up into big chucks called data blocks, whose size is controlled by the parameter dfs.block.size in the config file hdfs-site.xml (in my case – left as the default which is 64MB). Each block is stored on one or more nodes, controlled by the parameter dfs.replication in the same file (in most of this post – set to 3, which is the default). Each copy of a block is called a replica.

Regarding my setup – I have nine nodes spread over three racks. Each rack has one admin node and two worker nodes (running DataNode and TaskTracker), using Apache Hadoop 1.2.1. Please note that in my configuration, there is a simple mapping between host name, IP address and rack number -host name is hadoop[rack number][node number] and IP address is[rack number][node number].

Where should the NameNode place each replica?  Here is the theory:

The default block placement policy is as follows:

  • Place the first replica somewhere – either a random node (if the HDFS client is outside the Hadoop/DataNode cluster) or on the local node (if the HDFS client is running on a node inside the cluster).
  • Place the second replica in a different rack.
  • Place the third replica in the same rack as the second replica
  • If there are more replicas – spread them across the rest of the racks.

That placement algorithm is a decent default compromise – it provides protection against a rack failure while somewhat minimizing inter-rack traffic.

So, let’s put some files on HDFS and see if it behaves like the theory in my setup – try it on yours to validate.

Continue reading

Exploring The Hadoop Network Topology

In this post I’ll cover how to define and debug a simple Hadoop network topology.


Hadoop is designed to run on large clusters of commodity servers – in many cases spanning many physical racks of servers. A physical rack is in many cases a single point of failure (for example, having typically a single switch for lower cost), so HDFS tries to place block replicas on more than one rack. Also, there is typically more bandwidth within a rack than between the racks, so the software on the cluser (HDFS and MapReduce / YARN) can take it into account. This leads to a question:

How does the NameNode know the network topology?

By default, the NameNode has no idea which node is in which rack. It therefore by default assumes that all nodes are in the same rack, which is likely true for small clusters. It calls this rack “/default-rack“.

So, we have to teach Hadoop our cluster network topology – the way the nodes are grouped into racks. Hadoop supports a pluggable rack topology implementation – controlled by the parameter topology.node.switch.mapping.impl in core-site.xml, which specifies a java class implementation. The default implementation is using a user-provided script, specified in topology.script.file.name in the same config file, a script that gets a list of IP addresses or host names and returns a list of rack names (in Hadoop2: net.topology.script.file.name). The script will get up to topology.script.number.args parameters per invocation, by default up to 100 requests per invocation (in Hadoop2: net.topology.script.number.args).

Continue reading

Creating a virtualized fully-distributed Hadoop cluster using Linux Containers

TL;DR  Why and how I created a working 9-node Hadoop Cluster on my  laptop

In this post I’ll cover why I wanted to have a decent multi-node Hadoop cluster on my laptop, why I chose not to use virtualbox/VMware player, what is LXC (Linux Containers) and how did I set it up. The last part is a bit specific to my desktop O/S (Ubuntu 13.10).

Why install a fully-distributed Hadoop cluster on my laptop?

Hadoop has a “laptop mode” called pseudo-distributed mode. In that mode, you run a single copy of each service (for example, a single HDFS namenode and a single HDFS datanode), all listening under localhost.

This feature is rather useful for basic development and testing – if all you want is write some code that uses the services and check if it runs correctly (or at all). However, if your focus is the platform itself – an admin perspective, than that mode is quite limited. Trying out many of Hadoop’s feature requires a real cluster – for example replication, failure testing, rack awareness, HA (of every service types) etc.

Why VirtualBox or VMware Player are not a good fit?

When you want to run several / many virtual machines, the overhead of these desktop virtualization sotware is big. Specifically, as they run a full-blown guest O/S inside them, they:

  1. Consume plenty of RAM per guest – always a premium on laptops.
  2. Consume plenty of disk space – which is limited, especially on SSD.
  3. Take quite a while to start / stop

To run many guests on a laptop, a more lightweight infrastructure is needed.

What is LXC (Linux Containers)

Continue reading

The end of the monolithic database engine dream

It seems I like calling my posts “The End of…” 🙂

Anyway, Rob Klopp wrote an interesting post titled “Specialized Databases vs. Swiss Army Knives“. In it, he argues with Stonebraker’s claim that the database market will split to three-to-six categories of databases, each with its own players. Rob counter claims by saying that data is typically used in several ways, so it is cumbersome to have several specialized databases instead of one decent one (like Hana of course…).

I have a somewhat different perspective. In the past, let’s say ten years ago, I was sure that Oracle  database is the right thing to throw at any database challenge, and I think many in the industry shared that feeling (each with his/her favorite database, of course). There was a belief that a single database engine could be smart enough, flexible enough, powerful enough to handle almost everything.

That belief is now history. As I will show, it is now well understood and acknowledged by all the major vendors that a general-purpose database engine just can’t compete in all high-end niches. HOWEVER, the existing vendors are, as always, adapting. All of them are extending their databases to offer multiple database engines inside their product, each for a different use case.

The leader here seems to be Microsoft SQL Server. SQL Server 2014 (currently at CTP2) comes with three separate database engines. In addition to the existing engine, they introduced Hekaton – an in-memory OLTP engine that looks very promising. They also delivered a brand new implementation of their columnar format – now called clustered columnstore index – which is now fully updatable and is actually not an index – it is a primary table storage format with all the usual plumbing (delta trees with a tuple mover process when enough rows have accumulated).

Continue reading