Skip navigation
All Places > The Exchange > Blog
1 2 3 Previous Next

The Exchange

80 posts

Quick Tips For Using The Hive Shell Inside Scripts

by Jimmy Bates

There are many great examples out there for using the Hive shell, as well as examples of ways to automate many of the animals in our Hadoop zoo. However, if you’re just getting started, or need something fast that won’t stay around long, then all you need to do is throw a few lines of code together with some existing programs in order to avoid re-inventing the workflow. In this blog post, I’ll share a few quick tips on using the Hive shell inside scripts. We’ll take a look at a simple script that needs to pull an item or count, and then look at two ways to use the Hive shell to get an answer.

All of the steps in this example were executed on Hive 0.13 on a MapR cluster using CentOS.


In this setup, we have a data set that consists of travel times for specific traffic routes. We’ve pulled together travel times for days of the year, so when someone wants to know what to expect, we can quickly pull the info. In the past this was done on a traditional RDBMS, but we moved it off that in order to save on processing time and money, and are now using the MapR Distribution including Hadoop. The table is an external table and is tab-delimited. There are more efficient ways to store the data, but we’ll cover those comparisons in a later post. The table is also partitioned by collection day. If you’re not familiar with a partition, it just means that when we enter data into our table, it is grouped into folders of records on the file system. The data in each folder in this case represents a day.

CREATE EXTERNAL TABLE traffic_speeds (

`id` STRING,
`collectiontime` string,
`speed` STRING,
`traveltime` STRING,
`linkpoints` STRING)


`collectionday` string)






Here is a sample of some typical data:


In this example, we’re going to use a simple Bash script to extract some values out of a Hive table using the Hive shell. The same methods can be used for just about any scripting language. In this case, we are going to take a few parameters as arguments and then execute them on Hive using the Hive shell. The results are captured as a variable and then echoed to standard out. There are two basic ways to pass things off to the Hive shell to execute. This script will walk through both of those options. The first option will consist of the Hive command running an external HQL script and returning a value. The second option will consist of passing all the needed commands to Hive at one time and returning a value.

Option 1: Using Hive in Conjunction with an HQL Script

One way to use Hive inside a script is to have the Hive shell execute an HQL file. This is accomplished with the file option. hive -f my_script.hql

An HQL script is just a series of Hive query language commands. They are the same ones you would use in the Hive shell.


show databases;
show tables;
set dfs.block.size=1073741824;
select * from my_database.my_table limit 10;


In order to make this a process that can be used more generically, Hive allows you to pass in variables using the -hiveconf option hive -hiveconf MY_VAR1=value1 -hiveconf MY_VAR2=value2 -f my_script.hql

Inside the HQL file, it is used like this:

show databases;
show tables;
set dfs.block.size=${hiveconf:MY_ VAR1};
select * from ${hiveconf:MY_ VAR2} limit 10;

Operational messages are not typically passed back to the parent script and can be removed with the silent options -S. This will make things far less chatty. hive -S -hiveconf MY_VAR1=value1 -hiveconf MY_VAR2=value2 -f my_script.hql

Pulling a value from the command will vary depending on your script language of choice, but one thing that will hold true for all of them is ensuring that only one Hive command that returns rows or values is returned. In the example above, we have three. If the data from the select statement is what I want to get cleanly, then I would remove the show commands from the HQL file. set dfs.block.size=${hiveconf:MY_ VAR1}; select * from ${hiveconf:MY_ VAR2} limit 10;

An example of pulling a value using Bash looks something like this: my_value=hive -S -hiveconf MY_VAR1=value1 -hiveconf MY_VAR2=value2 -f my_script.hql``

Option 2: Passing Commands Directly to Hive

Another way to use Hive inside a script is to pass the HQL commands as a query string for the Hive shell to execute. This is accomplished with the -e option. hive -e "select * from my_database.my_table limit 10;"

You can actually add several HQL commands in that string, which comes in handy when you have to specify a database because the next command only has a table option. An example is when you load or add a partition with the ALTER command. hive -e "USE my_database; alter table my_table add if not exists partition(my_partition=my_value);"

As with the previous example, the only values returned are values pulled from the dataset. Hive operational messages may show up on the screen, but are not included in the response. You can remove that chatter by specifying silent mode again with the -S option.

hive -S -e "USE my_database; alter table my_table add if not exists partition(my_partition=my_value); select * from my_table  limit 10;"

As I mentioned earlier, pulling a value from the command will vary depending on your script language of choice. The one thing that will hold true for all of them is ensuring that only one Hive command has rows or values that are returned. Another example of pulling a value using Bash looks something like this:

my_value=$( hive -S -e "USE my_database; alter table my_table add if not exists partition(my_partition=my_value); select * from my_table  limit 10;")



Let’s take a look at a Bash script that runs either of the options mentioned above in a more generic fashion in order to pull average traffic transit times for a given station.

Script Overview

The script for the rest of the doc is called “”. It allows for two operations: “pull_with_hql_script” or “pull_with_one_liner”. The basic flow is: ./ command [options]

Example Using External Hive Files with an HQL Script

Let’s take a look at using Hive HQL files in conjunction with a Bash script in order to ascertain a given station’s average traffic speed.

Command arguments

The arguments for “./ pull_with_hql_script”

Example execution

In this example, the command line arguments are parsed, and passed to Hive to execute in our HQL script.

Command: ./ pull_with_hql_script -c id=A2F36 -d default -t traffic_speed -r speed

The select statement is built to execute an average of the traffic times throughout the range specified. In this case, a single station id was selected. select=<font color="red">"AVG($requested_column)"</font>

The Hive command is executed and the value is echoed to standard out. The variable Hive_script was hard coded, but could be changed to a passed object like all the other variables.

hive_script=<font color="red">"hive_script.hql"</font>
my_value=<font color="red">`hive -S -hiveconf MY_SELECT=$select -hiveconf MY_COMPARISON=$comparison -hiveconf MY_DATABASE=$database -hiveconf MY_TABLE=$table -f $hive_script`</font>
echo <font color="red">"returned value = $my_value"</font>

Example passing all commands to Hive directly

Let’s take a look at passing Hive a set of HQL commands as a string with a Bash script to find a given station’s average traffic speed:

Command arguments

The arguments for “./ pull_with_one_liner”

Example execution

In this example, the command line arguments are parsed, and passed to Hive to execute as a string. Additionally, there is an example of checking to see if partitions exist and are loaded.

Command: ./ pull_with_one_liner -c id=A2F36 -d default -t traffic_speed -r speed -b 20130621 -e 20130629

The select statement is built to execute an average of the traffic times throughout the range specified. In this case, a single station id was selected. A date range was also passed and could be used to enhance the data selection and filtering process, but in this case it was used to load partitions for the given range if they were not found.

As in the example above, the select statement is built for use in the command. select=<font color="red">"AVG($requested_column)"</font>

The values are passed and used to build the query. The query is passed to Hive and executed. The value is stored as my_value and is echoed to standard out.

<font color="green"># lets build the query we will execute in the hive shell</font>
my_query=<font color="red">"set mapred.reduce.tasks=30;"</font>
my_query=<font color="red">"$my_query SELECT $select"</font>
my_query=<font color="red">"$my_query FROM ("</font>
my_query=<font color="red">"$my_query SELECT DISTINCT collectiontime, id, collectionday, speed"</font>
my_query=<font color="red">"$my_query FROM $database.$table"</font>
my_query=<font color="red">"$my_query WHERE $comparison"</font>
my_query=<font color="red">"$my_query ) t"</font>
my_query=<font color="red">"$my_query GROUP by id, collectionday;"</font>

<font color="green"># echo the query passed to the hive shell just because</font>
echo <font color="red">"hive -S -e \"$my_query\""</font>
my_value=$(hive -e <font color="red">"$my_query"</font>)
echo <font color="red">"returned value = $my_value"</font>

<font color="green">#!/bin/bash


# this will print the usage statements and exit</font>
usage() {

<font color="pink">case</font> $<font color="blue">1</font> <font color="pink">in</font>

<font color="red">""</font>)

echo <font color="red">""</font>
echo <font color="red">"Usage: command [options]"</font>
echo <font color="red">" for info on each command: --> command -h|--help"</font>
echo <font color="red">"Commands:"</font>
echo <font color="red">" pull_with_hql_script [options]"</font>
echo <font color="red">" pull_with_one_liner [options]"</font>
echo <font color="red">""</font>
echo <font color="red">""</font>


echo <font color="red">""</font>
echo <font color="red">"Usage: pull_with_hql_script [-h|--help]"</font>
echo <font color="red">""</font>
echo <font color="red">" This is a quick example of using a hql script with a bash script to pull a value:"</font>
echo <font color="red">""</font>
echo <font color="red">"Params:"</font>
echo <font color="red">" -c|--comparison comparison_statement: column=value"</font>
echo <font color="red">" -d|--database database_name"</font>
echo <font color="red">" -h|--help: print this help info and exit"</font>
echo <font color="red">" -r|--requested_column: column to get averaged value from"</font>
echo <font color="red">" -t|--table table_name"</font>
echo <font color="red">"Examples:"</font>
echo <font color="red">""</font>
echo <font color="red">" ./ pull_with_hql_script -c a_column_name=a_value -d a_database_name -t a_table_name -r a_column_name"</font>
echo <font color="red">""</font>


echo <font color="red">""</font>
echo <font color="red">"Usage: pull_with_one_liner [-h|--help]"</font>
echo <font color="red">""</font>
echo <font color="red">" This is a quick example of passing query strings directly to a hive shell with a bash script to pull a value:"</font>
echo <font color="red">""</font>
echo <font color="red">"Params:"</font>
echo <font color="red">" -b|--begin_date yyyymmdd"</font>
echo <font color="red">" -c|--comparison comparison_statement: column=value"</font>
echo <font color="red">" -d|--database database_name"</font>
echo <font color="red">" -e|--end_date yyyymmdd"</font>
echo <font color="red">" -h|--help: print this help info and exit"</font>
echo <font color="red">" -r|--requested_column: column to get averaged value from"</font>
echo <font color="red">" -t|--table table_name"</font>
echo <font color="red">"Examples:"</font>
echo <font color="red">""</font>
echo <font color="red">" ./ pull_with_one_liner -c a_column_name=a_value -d a_database_name -t a_table_name -r a_column_name -b 20120122 -e 20130122"</font>
echo <font color="red">""</font>

<font color="pink">esac


<font color="green"># this will process command line arguments enough to get you to a specific function</font>
args() {

echo <font color="red">"processing command request"</font>
<font color="pink">case</font> $<font color="blue">1</font> <font color="pink">in</font>


<font color="pink">shift</font>
pull_with_hql_script $@


<font color="pink">shift</font>
pull_with_one_liner $@


echo >&<font color="blue">2</font> <font color="red">"Invalid comand: $1"</font>

<font color="pink">esac</font>


pull_with_hql_script() {

<font color="green"># init params</font>
requested_column=<font color="red">""</font>
database=<font color="red">""</font>
table=<font color="red">""</font>
comparison=<font color="red">""</font>
hive_script=<font color="red">"hive_script.hql"</font>
select=<font color="red">""</font>
begin_date=<font color="red">""</font>
end_date=<font color="red">""</font>


<font color="green"># process args for this block</font>
<font color="pink">while</font> test $# -gt <font color="blue">0</font>
<font color="pink">do</font>
<font color="pink">case</font> $<font color="blue">1</font> <font color="pink">in</font>


<font color="pink">shift</font>
comparison=$<font color="blue">1</font>


<font color="pink">shift</font>
database=$<font color="blue">1</font>


usage pull_with_hql_script


<font color="pink">shift</font>
requested_column=$<font color="blue">1</font>


<font color="pink">shift</font>
table=$<font color="blue">1</font>


echo >&<font color="blue">2</font> <font color="red">"Invalid argument: $1"</font>
usage <font color="red">"pull_with_hql_script"</font>

<font color="pink">esac



<font color="green"># determine if any option is missing</font>
<font color="pink">if [ x<font color="red">"$requested_column"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing requested column: -r|--requested_column column_name_to_count"</font>
usage <font color="red">"pull_with_hql_script"</font>

<font color="pink">fi

if</font> [ x<font color="red">"$database"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing database name: -d|--database database_name"</font>
usage <font color="red">"pull_with_hql_script"</font>
<font color="pink">fi


if</font> [ x<font color="red">"$table"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing table name: -t|--table table_name"</font>
usage <font color="red">"pull_with_hql_script"</font>

<font color="pink">fi</font>


if</font> [ x<font color="red">"$comparison"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing comparison clause WHERE <comparison>: -c|--comparison comparison"</comparison></font>
usage <font color="red">"pull_with_hql_script"</font>

<font color="pink">fi</font>


<font color="green"># set select statement</font>
select=<font color="red">"AVG($requested_column)"</font>


<font color="green"># echo the command used to run the hive hql script just because</font>
echo <font color="red">"hive -S -hiveconf MY_SELECT=$select -hiveconf MY_COMPARISON=$comparison -hiveconf MY_DATABASE=$database -hiveconf MY_TABLE=$table -f $hive_script"</font>
my_value=<font color="red">`hive -S -hiveconf MY_SELECT=$select -hiveconf MY_COMPARISON=$comparison -hiveconf MY_DATABASE=$database -hiveconf MY_TABLE=$table -f $hive_script`</font>
echo <font color="red">"returned value = $my_value"</font>
<font color="pink">exit</font>



pull_with_one_liner() {

<font color="green"># init params</font>
requested_column=<font color="red">""</font>
distinct=<font color="red">""</font>
database=<font color="red">""</font>
table=<font color="red">""</font>
comparison=<font color="red">""</font>
hive_script=<font color="red">"hive_script.hql"</font>
select=<font color="red">""</font>
begin_date=<font color="red">""</font>
end_date=<font color="red">""</font>


<font color="green"># process args for this block</font>
<font color="pink">while</font> test $# -gt <font color="blue">0</font>
<font color="pink">do</font>
<font color="pink">case</font> $<font color="blue">1</font> <font color="pink">in</font>


<font color="pink">shift</font>
begin_date=$<font color="blue">1</font>


<font color="pink">shift</font>
comparison=$<font color="blue">1</font>


<font color="pink">shift</font>
database=$<font color="blue">1</font>


<font color="pink">shift</font>
end_date=$<font color="blue">1</font>


usage pull_with_one_liner


<font color="pink">shift</font>
requested_column=$<font color="blue">1</font>


<font color="pink">shift</font>
table=$<font color="blue">1</font>


echo >&<font color="blue">2</font> <font color="red">"Invalid argument: $1"</font>
usage <font color="red">"pull_with_one_liner"</font>

<font color="pink">esac



<font color="green"># determine if any option is missing</font>
<font color="pink">if</font> [ x<font color="red">"$requested_column"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing requested column: -r|--requested_column column_name_to_count"</font>
usage <font color="red">"pull_with_one_liner"</font>

<font color="pink">fi


if</font> [ x<font color="red">"$database"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing database name: -d|--database database_name"</font>
usage <font color="red">"pull_with_one_liner"</font>

<font color="pink">fi


if</font> [ x<font color="red">"$table"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing table name: -t|--table table_name"</font>
usage <font color="red">"pull_with_one_liner"</font>

<font color="pink">fi


if</font> [ x<font color="red">"$comparison"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing comparison clause WHERE <comparison>: -c|--comparison comparison"</font>
usage <font color="red">"pull_with_one_liner"</font>

<font color="pink">fi


if</font> [ x<font color="red">"$begin_date"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing start date <2012-01-28>: -b|--begin_date yyyy-mm-dd"</font>
usage <font color="red">"pull_with_one_liner"</font>

<font color="pink">fi


if</font> [ x<font color="red">"$end_date"</font> == <font color="red">"x"</font> ]; <font color="pink">then</font>

echo <font color="red">"missing ending date <2014-02-17>: -b|--begin_date yyyy-mm-dd"</font>
usage <font color="red">"pull_with_one_liner"</font>

<font color="pink">fi</font>


<font color="green"># get start date and end date in correct format</font>
begin_date=<font color="red">`date --date="$begin_date" '+%Y%m%d'`</font>
end_date=<font color="red">`date --date="$end_date" '+%Y%m%d'`</font>


<font color="green"># set select statement in affect overriding any set by argument</font>
select=<font color="red">"avg($requested_column)"</font>


<font color="green"># before we query for information we may want to make sure we have all the partitions loaded?
# in this case the table was external so there may be partitions that were not loaded
# when the table was created
# Here is an example of executing two commands with hive in a bash shell</font>
partitions=$(hive -e <font color="red">"USE $database; SHOW PARTITIONS $table"</font>)


<font color="green"># loop through all the partitions and load if not present
# in most case this is not needed.</font>
<font color="pink">while</font> test $my_date -le $end_date
<font color="pink">do</font>

<font color="green"># Load partitions</font>
<font color="pink">if</font> [ <font color="red">"$load"</font> = true ]; <font color="pink">then</font>

<font color="green"># check if partition is present</font>
<font color="pink">for</font> partition <font color="pink">in</font> $partitions
<font color="pink">do</font>

my_timestamp=<font color="red">`date -d "$my_date" "+%s"`</font>

<font color="pink">if</font> [ <font color="red">"$partition"</font> == <font color="red">"collectionday=$my_timestamp"</font> ]; <font color="pink">then</font>


<font color="pink">fi
if</font> [ <font color="red">"$add_partition"</font> = true ]; <font color="pink">then</font>
<font color="green"># Here is an example of executing two statements in the hive shell</font>

echo <font color="red">"hive -e \"USE $database; alter table $table add if not exists partition(collectionday=$my_timestamp);\""</font>
hive -e <font color="red">"USE $database; alter table $table add if not exists partition(collectionday=$my_timestamp);"</font>
<font color="pink">fi

my_date=<font color="red">`date --date="$my_date + 1 day" '+%Y%m%d'`</font>

<font color="green"># end of while loop</font>
<font color="pink">done</font>


<font color="green"># lets build the query we will execute in the hive shell</font>
my_query=<font color="red">"set mapred.reduce.tasks=30;"</font>
my_query=<font color="red">"$my_query SELECT $select"</font>
my_query=<font color="red">"$my_query FROM ("</font>
my_query=<font color="red">"$my_query SELECT DISTINCT collectiontime, id, collectionday, speed"</font>
my_query=<font color="red">"$my_query FROM $database.$table"</font>
my_query=<font color="red">"$my_query WHERE $comparison"</font>
my_query=<font color="red">"$my_query ) t"</font>
my_query=<font color="red">"$my_query GROUP by id, collectionday;"</font>


<font color="green"># echo the query passed to the hive shell just because</font>
echo <font color="red">"hive -S -e \"$my_query\""</font>
my_value=$(hive -e <font color="red">"$my_query")</font>
echo <font color="red">"returned value = $my_value"</font>
<font color="pink">exit</font>


<font color="green"># -------------------------------------------------------------------------------------
# Beginning of script execution

args $@


<font color="gray">set</font> mapred.reduce.tasks=30;
use ${hiveconf:MY_DATABASE};
<font color="gray">SELECT</font> ${hiveconf:MY_SELECT}
<font color="gray">FROM</font> ( <font color="gray">SELECT DISTINCT</font> collectiontime, id, collectionday, speed <font color="gray">FROM</font> ${hiveconf:MY_TABLE} <font color="gray">WHERE</font> ${hiveconf:MY_COMPARISON}) t
<font color="gray">GROUP</font> by id, collectionday;

Using the Hive shell inside scripts is a great way to avoid re-inventing the workflow. In this blog post, I’ve showed you how to use Hive in conjunction with an HQL script, as well as how to pass commands directly to Hive.

If you have any questions, please ask them in the comments section below.

Content Originally posted in MapR Converge Blog post on December 11, 2014, visit here 

Subscribe to Converge Blog



Apache Hive

Products and Services

Getting Started With Sample Programs For Apache Kafka 0.9

by Tug Grall


Editor's Note: If you're interested in learning more about Apache Kafka, be sure to read the free O'Reilly book, "New Designs Using Apache Kafka and MapR Streams".

Streaming data is of growing interest to many organizations, and most applications need to use a producer-consumer model to ingest and process data in real time. Many messaging solutions exist today on the market, but few of them have been built to handle the challenges of modern deployment related to IoT, large web based applications and related big data projects.

Apache Kafka has been built by LinkedIn to solve these challenges and deployed on many projects. Apache Kafka is a fast, scalable, durable and distributed messaging system.

The goal of this article is use an end-to-end example and sample code to show you how to:

  1. Install, configure and start Kafka
  2. Create new topics
  3. Write and run a Java producer to post messages to topics
  4. Write and run a Java consumer to read and process messages from the topics


This content is based in part on the documentation provided by the Apache Kafka project.

We have added short, realistic sample programs that illustrate how real programs are written using Kafka.


You will need basic Java programming skills plus access to:


Step 1: Download Kafka
Download the Apache Kafka 0.9.0 release and un-tar it.

$ tar -xzf kafka_2.11-
$ cd kafka_2.11-

Step 2: Start the server
Start a ZooKeeper server; Kafka has a single node Zookeeper configuration built-in.

$ bin/ config/ &
[2016-02-08 14:59:28,275] INFO Reading configuration from: config (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-02-08 14:59:28,276] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)

Note that this will start Zookeeper in the background. To stop Zookeeper, you will need to bring it back to the foreground and use control-C or you will need to find the process and kill it. You can now start the Kafka server itself:

$ bin/ config/
[2016-02-08 15:10:29,945] INFO KafkaConfig values: ..


As with Zookeeper, this runs the Kafka broker in the background. To stop Kafka, you will need to bring it back to the foreground or find the process and kill it explicitly using kill.

Step 3: Create the topics for the example programs
The messages are organized by topics on which Producers post messages and from which Consumers read messages. Our sample application uses two topics: fast-messages and summary-markers. The following commands create the topics:

$ bin/ --create --zookeeper localhost:2181 \
      --replication-factor 1 --partitions 1 --topic fast-messages
$ bin/ --create --zookeeper localhost:2181 \
      --replication-factor 1 --partitions 1 --topic summary-markers


These can be listed:

$ bin/ --list --zookeeper localhost:2181

You will see log messages from the Kafka process when you run Kafka commands. You can switch to a different window if these are distracting.

Note: The broker can be configured to auto-create new topics as they are mentioned by the client application, but that is often considered a bit dangerous because mis-spelling a topic name doesn't cause a failure.


At this point, you should have a working Kafka broker running on your machine. The next steps are to compile the example programs and play around with the way that they work.

1- Compile and package up the example programs
Clone and compile the repository using the following commands:

$ git clone
$ cd kafka-sample-programs/
$ mvn clean package

For convenience, the example programs project is set up so that the maven package target produces a single executable, target/kafka-example, that includes all of the example programs and dependencies.

2- Start the example consumer
Start the consumer using the following command:

$ target/kafka-example consumer
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.

The consumer is now started and listen to all the messages on the fast-messages and summary-markers topics

Nothing should happen at this point because there aren't any messages

3- Run the example producer
In a new terminal window, run the example producer using the following command:

$ target/kafka-example producer
Sent msg number 0
Sent msg number 1000
Sent msg number 998000
Sent msg number 999000

The producer sends a large number of messages to fast-messages along with occasional messages to summary-markers.

The consumer running in the other window receives and processes all the messages from these topics.

$ target/kafka-example consumer
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.
Got 31003 records after 0 timeouts
1 messages received in period, latency(min, max, avg, 99%) = 20352, 20479, 20416.0, 20479 (ms)
1 messages received overall, latency(min, max, avg, 99%) = 20352, 20479, 20416.0, 20479 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 19840, 20095, 19968.3, 20095 (ms)
1001 messages received overall, latency(min, max, avg, 99%) = 19840, 20479, 19968.7, 20095 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 12032, 12159, 12119.4, 12159 (ms)
998001 messages received overall, latency(min, max, avg, 99%) = 12032, 20479, 15073.9, 19583 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 12032, 12095, 12064.0, 12095 (ms)
999001 messages received overall, latency(min, max, avg, 99%) = 12032, 20479, 15070.9, 19583 (ms)

If you run the producer again, you will see new messages in the consumer terminal window.


At this point you have Kafka running, a simple Kafka application that sends and consumes messages. It is time to look at the code and understand how the application has been created.

To create a Kafka Producer or Consumer, so a Kafka Client Application, you must add the following dependency to your Maven project:





The sample Producer is a classical Java application with a main() method, this application must:

  • Initialize and configure a producer
  • Use the producer to send messages

1- Producer Initialization
Create a producer is quite simple, you just need to create an instance of the org.apache.kafka.clients.producer.KafkaProducer class with a set of properties, this looks like:

producer = new KafkaProducer<string,string>(properties);</string,string>


In this example, the configuration is externalized in a property file, with the following entries:



For this introduction, the most important property to look at is:

  • the bootstrap.servers lists the host and port of the Kafka server/cluster you started earlier in this tutorial.

The other properties are used to control the way the messages are sent, and serialized. You can find information about all the properties in the Producer Configs chapter of the Kafka documentation.

2- Message posting
Once you have a producer instance you can post messages to a topic using the ProducerRecord class. The ProducerRecord class is a key/value pair where:

  • the key is the topic
  • the value is the message

As you can guess sending a message to the topic is straight forward:



producer.send(new ProducerRecord<string,string>("fast-messages", "This is a dummy message"));

Note that, there are other ProducerRecord constructors that allow you to more constructor parameters such as a message key, a partition number but these parameters are not used in this simple tutorial.

The sample application producer post messages using a loop to send:

3- Producer End
Once you are done with the producer use the producer.close() method that blocks the process until all the messages are sent to the server. This call is used in a finally block to guarantee that it is called. A Kafka producer can also be used in a try with resources construct.

    } finally {


4- Producer execution
As mentioned earlier, the producer is a simple Java class, in this example application the Producer is started from the Run Application as follow:


Now that you know how to send messages to the Kafka server let’s look at the consumer.


The Consumer class, like the producer is a simple Java class with a main method.

This sample consumer uses the HdrHistogram library to record and analyze the messages received from the fast-messages topic, and Jackson to parse JSON messages.

This is why you see the following dependencies in the pom.xml file:


Let’s now focus on the consumer code.

1- Consumer Initialization
The first thing to do is to create a consumer instance of the org.apache.kafka.clients.consumer.KafkaConsumer class with a set of properties, that looks like:

consumer = new KafkaConsumer<string,string>(properties);</string,string>


In this example, the properties are externalized in a file, with the following entries:


For this introduction, the most important properties to look at are

  • the bootstrap.servers that is the host and port of the Kafka server/cluster you have started earlier in this tutorial
  • the that the group of consumer processes to which this consumer belongs.

The other properties are used to control the way the messages are consumed. You can find information about all the properties in the Consumer Configs chapter of the Kafka documentation.

2- Topics Subscription
A consumer can subscribe to one or more topics, in this example, the consumer will listen to messages from two topics using the following code:

consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));

3- Message Consumption
Now that your consumer has subscribed to the topics, the consumer can now poll the messages from the topics in an loop. The loop looks like:

   while (true) {
         ConsumerRecords <string, string="">records = consumer.poll(200);


The poll method is called repeatedly in the loop. For each call, the consumer will read records from the topic. For each read, it tracks the offset to be able to read from the proper message in the next call. The poll method takes a timeout in milliseconds. It will wait up to that long if data is not available.

The returned object, of the poll method, is an Iterable containing the received records so you just need to loop on each record to process them. The code to process the messages in the consumer looks like:

for (ConsumerRecord <string, string="">record : records) {
      switch (record.topic()) {
            case "fast-messages":
                  // deal with messages from fast-messages topic
            case "summary-markers":
                  // deal with messages from summary-markers topic


In the sample application, the consumer only processes messages from the fast-messages topic with the following logic based on the fact that messages are consumed in order in which they have been sent by the producer:

4- Consumer end
Once you are done with the consumer use the consumer.close() method to free resources. This is especially important in a multithreaded application. The sample consumer does not call this method, as it is stopped with a Ctrl+C that stops the whole JVM.

5- Consumer execution
As mentioned earlier, the Consumer is a simple Java class, in this example application the consumer is started from the Run Application as follow:




In this article you have learned how to create a simple Kafka 0.9.x application using:

  • a producer that publishes JSON messages into multiple topics
  • a consumer that receives JSON messages and calculates statistics from message content.

This application is very simple, and you can extend it to test some other interesting features of Kafka:

  • add new consumers, using different groups, to process the data differently, for example to save data into a NoSQL database like HBase or MapR-DB
  • add new partitions and consumers for the topics to provide high availability to your application.

Finally, while this example is based on Apache Kafka, the same code will work directly on a MapR cluster using MapR Streams, an integrated messaging system that is compatible with the Kafka 0.9.0 API. With MapR Streams, you will simplify the production deployment of your application as it is integrated into the MapR data platform so that you will have a single cluster to deploy and manage. Watch ESG Lab's video review after testing MapR Streams. Using MapR Streams for messaging also provides additional scalability, security and big data services associated with the MapR Converged Data Platform.

Content Originally posted in MapR Converge Blog post, visit here

February 09, 2016 | BY Tugdual Grall






If you want to try out the MapR Converged Data Platform to see its unique big data capabilities but don’t have a cluster of hardware immediately available, you still have a few other options. For example, you can spin up a MapR cluster in the cloud using multiple node instances on one of our IaaS partners (Amazon, Azure, etc.). The only downside with multiple node instances is that the costs can add up to more than you want to spend for an experimental cluster. You also have the option of experimenting with using the MapR Sandbox. The limitation, though, is that it doesn’t give you a true multi-node cluster, so you can’t fully explore features such as multi-tenancy, topologies, and services layouts.

As another option, you can leverage Docker technology with the AWS CloudFormation template to spin up a multi-node MapR cluster in a single virtual instance. There are options to set up a non-secure, secure, or Kerberos-enabled (“Kerberized”) cluster, so you can explore the full feature set that the MapR Platform offers. An LDAP service container is set up to provide centralized directory lookup across the cluster, while a KDC service container provides tokens for cluster authentication in a Kerberized cluster. Additionally, a MapR client container is used to better simulate a production environment. Customers can also install their software in separate containers alongside the cluster in the same instance. This is a very cost-effective way of spinning up a true multi-node MapR cluster on the cloud.

Since containers are highly disposable, it is really easy to reinstall the cluster if you want to try out different things, like a PoC, a demo, or a training environment. The image below shows what the deployment might look like:

To get you started, the rest of this blog post will walk you through how you can get a mini MapR cluster up and running on AWS in less than 30 minutes.

There are 4 main steps involved:

  1. Spin up an AWS instance using the CloudFormation template.
  2. Log in to the instance, and execute the MapR deployment script.
  3. Apply the trial license (optional).
  4. Start exploring the cluster.

Spin up an AWS Instance Using the CloudFormation Template

  1. Log in to the AWS portal. If you don’t have an AWS account, you will need to create one. If you already have one, then you can log in to the console.
  2. Switch to one of these regions: US West (Oregon), US East (Virginia), Asia Pacific (Tokyo), Asia Pacific (Sydney), EU (Ireland), or EU (Frankfurt).
  3. Download this CFT from here:
  4. Select “CloudFormation Template” on your AWS portal. Then select “Create Stack” -> “Upload a template to AWS S3” -> “Browse,” select the template you just downloaded from above step, then upload.

  1. Follow the instructions to launch a MapR cluster.

Fill in the stack name, and select your key at the minimum.

The items on the below page are all optional. Leave them blank.

Check the agreement box, and hit “Create.”

If all goes well, you should have a cluster deployed successfully.

Log in to the Instance and Run the MapR Deployment Script

  1. Find the external IP address of the newly spun up instance by going to the EC2 portal. Locate the instance named “mapr520_docker,” and its IP address should show up.

  1. Wait until the status of the instance changes from “Initializing” to “2/2 checks,” then ssh into the instance by issuing the command “ssh ec2-user@” from your computer.
  2. Once you are in the instance, issue this command to deploy the MapR cluster: “sudo /usr/bin/deploy-mapr.” You will be prompted to answer several questions, which should be self-explanatory, but if you are not sure, then leave the default answers. Write down these selections because you will need them later. It will then walk you through the setup process interactively. In 20–30 minutes, you should have a cluster up and running.
  3. If you made a mistake, don’t worry. Just repeat the previous step to reinstall. Also, if you want to try a different secure mode, simply rerun the previous step - the old cluster will be deleted, and a new cluster will be deployed.

Apply the Trial License (Optional)

At this point the cluster is functional, and you can start to explore the MapR cluster, even without a license. However, in order to take advantages of capabilities such as HA, NFS gateway, and others, you will have to register for a 30-day unlimited trial license.

  1. Point your browser to the MapR Control System (MCS) page: https://<your VM instance IP>:8443, login as the admin user, and input the password that you assigned in the previous step. Click on the “Manage Licenses” tab on the upper right corner. Copy the cluster ID.

  1. Now go to and register an account with MapR (the login link is near the upper-right corner of the home page). After you are logged in, select “My Clusters” tab, and click “Register a Cluster.”

  1. Fill in the cluster ID and cluster name and hit “Register.”

  1. Now go back to the MCS page, and click “Add license via Web” to apply license.

  1. Once the license is applied, you can now start the NFS gateway services on the MCS portal.

Start Exploring the Cluster

The mini cluster comes with some sample scripts/data included to get you started. To start exploring, you first have to log in to the client container.

  1. Go back to your instance shell prompt. Type “sudo ent” to get into the client container.


#ent CONTAINER NAMES 5af123c10715 mapr-client d078b54a942f mapr520-node0 2b414d3d0d2a mapr520-node1 47c73d048f35 mapr520-node2 bd393b70ae8e kdc 546c8efc27e1 ldap  Which containter you want to enter? 5af123c10715 [root@mapr-client /]#
  1. Now let’s become the ldap user; note that the ldap username doesn’t exist in the local /etc/passwd file.
root@mapr-client /]# su - ldapdude Last login: Tue Dec 20 22:57:06 UTC 2016 [ldapdude@mapr-client ~]$
  1. If you have a secure cluster (MapR ticket), you should use the maprlogin command to obtain a maprticket, or you won’t be able to access the filesystem.
[ldapdude@mapr-client ~]$ maprlogin password [Password for user 'ldapdude' at cluster 'mapr520': ] xxxxxxx MapR credentials of user 'ldapdude' for cluster 'mapr520' are written to '/tmp/maprticket_5000'
  1. If you have a Kerberized cluster, you should use the kinit command to obtain a Kerberos token.
[ldapdude@mapr-client ~]$ kinit Password for ldapdude@EXAMPLE.COM: xxxxxxx [ldapdude@mapr-client ~]$ hadoop fs -ls / 16/12/21 04:33:34 INFO client.MapRLoginHttpsClient: MapR credentials of user 'ldapdude' for cluster 'mapr520' are written to '/tmp/maprticket_5000' MapR credentials of user 'ldapdude' for cluster 'mapr520' are written to '/tmp/maprticket_5000' Found 7 items drwxr-xr-x   - maprdude maprdude          1 2016-12-20 22:34 /apps drwxr-xr-x   - mapr     mapr              0 2016-12-20 22:32 /hbase drwxr-xr-x   - mapr     mapr              0 2016-12-20 22:34 /opt drwxr-xr-x   - root     root              0 2016-12-20 22:34 /tables drwxrwxrwx   - mapr     mapr              2 2016-12-20 23:01 /tmp drwxr-xr-x   - root     root              4 2016-12-20 22:34 /user drwxr-xr-x   - mapr     mapr              1 2016-12-20 22:32 /var
  1. To get started with Apache Drill, go to /opt/data/drill. Example:
[ldapdude@mapr-client ~]$ cd /opt/data/drill [ldapdude@mapr-client ~]$ /opt/mapr/drill/drill-1.8.0/bin/sqlline -u jdbc:drill:zk=mapr520-node0:5181,mapr520-node1:5181,mapr520-node2:5181/drill/mapr520-drillbits -f review.sql
  1. To get started with Spark, check out Carol McDonald’s blog post to analyze Uber data.

Note that when the cluster was first started, the processing speed could be a bit slow depending on the AWS region you were in. But due to Docker’s caching capability, you will find that the speed will increase over time.


The mini MapR cluster is an excellent way to experience a production-like environment for the MapR Converged Data Platform. It can be secured (with or without Kerberos). It has a separate client container and can be integrated with your choice of 3rd party software. And it gives you the full feature set that the MapR Platform has to offer without having to spin up multiple node instances in the cloud.


Big data developers and QA professionals need a robust big data platform, where they can concentrate their efforts on software development and code testing before rolling out to production. However, getting access to test and staging environments can be challenging, as these are often not self-enabled and require IT assistance. Because of this gap, time-to-market could be adversely affected and product life cycles become too long to adapt to today’s speed of business.


Fortunately, containerized service offers a solution to narrow this gap. In my previous post, I offered a way to spin up a mini containerized MapR cluster in a single virtual instance. That works fine for single user environments but is not scalable. What if you have a team of developers who want to collaborate on the very same containerized MapR cluster? A single virtual instance will not be able to satisfy the need.


Introducing Azure Container Service (ACS). It is an Azure service that makes it simpler to create, configure, and manage a cluster of virtual machines that are preconfigured to run containerized applications. It uses an optimized configuration of popular open-source scheduling and orchestration tools, like Kubernetes, DC/OS, and Docker Swarm. As a result, there's no need to change your existing management practices and tools to move container workloads to Azure; you can keep using the tools you are used to. It is possible to deploy a full-blown MapR cluster in less than an hour. There's no need to rely on the ever-busy IT professionals to assist you and no need to consume a large hardware environment.


MapR has been working closely with ACS to make the deployment much easier. In this blog post, I will walk you through the necessary steps to deploy the MapR Converged Data Platform on ACS and demonstrate the capabilities of the MapR Persistent Application Client Container (PACC) for deploying your containerized applications that leverage the MapR Platform as a persistence tier. Note that the described configuration is not supported by MapR and thus should not be used for production deployments; it should only be used for test, demo, training, or development environments.



Before you start, please set up an account on Azure. You can sign up for one here. Additionally, install Docker on your computer or a cloud instance by following these instructions here. That’s it: now you are ready to start deploying!


Step 1 – Download a pre-built container, start it ,and login to Azure

On the computer or cloud instance where you installed Docker, run the following command:


docker run --name azm --hostname azm -it maprazure/azm:latest /bin/bash


Now you are in the azm container, and at the prompt, you need to login to your Azure account. This container already has Azure CLI 2.0 installed; you can find more information about it on the following documentation page: Get started with Azure CLI 2.0. Below, we have a quick summary of how to login to your Azure account:


[root@azm /]# az login


Follow the instruction to complete the login process.


To sign in, use a web browser to open the page and enter the code BT376Q5W8 to authenticate.


In a short moment, you should get the prompt back, after you login successfully.


Step 2 – Deploy a Kubernetes cluster


To deploy a Kubernetes cluster, you need to execute the “deploy-k8” command. You can provide various options. To view these options, specify the “-h” option for help menu.

[root@azmaster payload]# deploy-k8 –h


Usage: deploy-k8 [options]



--version             show program's version number and exit

-h, --help           show this help message and exit

-g GNAME, --resource-group=GNAME

                       Azure Resource Group Name. default: myacs

-d DNS_PREFIX, --dns-prefix=DNS_PREFIX

                       DNS Prefix for Kubernetes Hosts.

-l LOC, --location=LOC

                       Azure Region, e.g. westus, eastus, etc. default:


-a APPNAME, --app-name=APPNAME

                       Azure Application Name. default: mykubecluster


                        Azure Application Password

-s VMSIZE, --vm-size=VMSIZE

                       VM size of the Kubernetes agents. default:



                       Number of the Kubernetes agents. default: 3

-q, --quiet           don't print status messages to stdout


There are default values for each option. At the minimum, you should specify the password and DNS prefix while executing the command to deploy a Kubernetes cluster; this password is a key required by the Kubernetes application to authenticate with Azure infrastructure.



For example:

[root@azm ~]# deploy-k8 -p M@prtest1 -d myk8

In about 10 minutes, you will get the shell prompt back. This means that the Kubernetes cluster is deployed. Now go to the Azure portal (, select Resource Groups, and assuming you didn’t specify the resource group name, the default resource group myacs will be listed as below.


Select myacs and you will see it includes quite a few resources, including virtual machines, load balancers, storage, etc. By default, one Kubenetes master and 3 agents are created, and their VM sizes are Standard_D2_v2.


Step 3 - Login to Kubernetes master node and deploy MapR Converged Data Platform


On the azm container, issue “ssh-master” command:

[root@azm ~]# ssh-master


This will get you to login to Kubernetes master node; to check if the Kubernetes cluster is ready, issue this command at the prompt, for example:

root@k8s-master-C5E9779-0:~# kubectl get nodes

NAME                   STATUS                    AGE

k8s-agent-c5e9779-0   Ready                     5m

k8s-agent-c5e9779-1   Ready                     5m

k8s-agent-c5e9779-2   Ready                     5m

k8s-master-c5e9779-0   Ready,SchedulingDisabled   5m


You can see the output indicates that one master and 3 agents are ready. Now, you can move forward to deploy a MapR cluster by issuing the “deploy-mapr” command; again, -h option gives you the help menu:


root@k8s-master-C5E9779-0:~# deploy-mapr -h

Usage: deploy-mapr [options]



--version             show program's version number and exit

-h, --help           show this help message and exit

--maprv=MAPRV, --mapr-version=MAPRV

                       MapR version. default: 520

--mep=MEP             MEP version. default: 2.0

-c CLNAME, --cluster-name=CLNAME

                       MapR cluster name. default: mapr520

-n NNODES, --mapr-nodes=NNODES

                       MapR cluster size. default: 3

-a ADMIN, --admin-user=ADMIN

                       MapR admin username. default: mapruser

-p PASSWD, --admin-password=PASSWD

                       MapR admin user password

-s MODE, --security-mode=MODE

                       MapR security mode: base, sec or kdc. default: base

-d LDAPUSER, --ldap-user=LDAPUSER

                     MapR ldap username. default: ldapuser

-q, --quiet           don't print status messages to stdout


At the minimum, you should provide an admin password to manage MapR; for example:

root@k8s-master-C5E9779-0:~# deploy-mapr -p M@prtest1


This will kick off the MapR installation. By default, there will be 3 MapR containers deployed along with a LDAP container for user directory lookup, a Metastore container for Apache Hive, a MapR client container, a squid proxy container, and a cockpit container that is used to visualize and manage the Kubernetes cluster.


About halfway through, you will see messages like the following, which indicate that Kubernetes is configuring Azure load balancer, so you can access cockpit portal from the internet; for example:


Waiting for load balancer to open up cockpit port – 5 seconds, est. 5 min

Waiting for load balancer to open up cockpit port – 10 seconds, est. 5 min


Waiting for load balancer to open up cockpit port - 185 seconds, est. 5 min

Waiting for load balancer to open up cockpit port - 190 seconds, est. 5 min

Please point your browser's at for cockpit access.


Now point your browser at the URL. You should see the cockpit portal: login as root with the password you provided for MapR admin above.



Once you are logged in, you will see the console as below: click on the “Cluster” tab at the top. You will then further click on “Topology” on the left pane, and select the “mapr520” in the Project drop-down menu. See the graphs below:



Now you should be seeing the animation of your MapR cluster being deployed on Kubernetes. The black circles are the VMs in the Kubernetes cluster; the blue circles are the MapR containers that are being spun up.


Wait until the MapR deployment is finished. You should see something similar to the messages below on your Kubernetes master console:


Waiting for load balancer to open up proxy port - 40 seconds, est. 5 min

Waiting for load balancer to open up proxy port - 45 seconds, est. 5 min

Waiting for load balancer to open up proxy port - 50 seconds, est. 5 min

Waiting for load balancer to open up proxy port - 55 seconds, est. 5 min

All Done!!


Please point your browser's at for cockpit access.


Please configure your browser's proxy setting to IP: and Port: 30128

and then point your browser at https://mapr520-node0:8443 to access MCS

You can also point your browser at http://mapr520node0:8047 to access Apache Drill



To get inside these containers, click on “Containers” in the left pane and highlight your desired container (e.g. mapr-client).



Once you are in the mapr-client container, you can issue certain commands such as:

  1. df: you will see a /posix mount point, which is the mount point that allows you to interact with MapR-FS using your POSIX compliant Linux commands.
  2. hadoop fs –ls: this is your all-too familiar command for showing the MapR-FS contents.
  3. id ldapuser: note that we have spun up a LDAP container for centralized username lookup; you should see the uid, gid of user ldapuser, and it is not in the local /etc/passwd.



Step 4 - Configure your browser to access MapR Control System (MCS)

We have also deployed a squid proxy container that allows you to access the MapR cluster. To do this, open up your browser’s proxy setting (Firefox browser is used as an example here, shown below), fill in the HTTP proxy field with the proxy setting IP in step 3 above, then type in port 30128. Click OK. Examples are shown in the following two graphs:


Now point your browser at MCS https://mapr520-node0:8443, and login as user mapr with the admin password you provided in step 3. You should be able to start managing the MapR cluster from the MCS portal. The cluster comes with a basic license that is sufficient to get you started; however, if you wish to explore the full features of the MapR Converged Data Platform, such as MapR-DB and MapR Streams, you will have to install a free 30-day unlimited trial license. You can do so by following the section under “Apply the Trial License” in my previous blog post.



Apache Drill is also available by pointing your browser at http://mapr520-node0:8047.



Step 5 – Deploy PACC services

Tugdual Grall wrote a great blog post regarding how to start using MapR PACC service. Basically, it uses a sensor PACC that collects its host’s performance stats (IO, memory, cpu load, etc.) and publishes them to a MapR Streams topic; a webserver PACC then consumes these stream messages and displays them with HTTP, so you can view them with a browser.


I have prepared a script in /opt/pacc on the Kubernetes master node. Execute it as follows:


root@k8s-master-238E7C1E-0# cd /opt/pacc

root@k8s-master-238E7C1E-0:/opt/pacc# bash deploy_pacc


deployment "sensor-deploy" created

deployment "websr-deploy" created

service "mapr-pacc-svc" created

Waiting for load balancer to open up cockpit port - 5 seconds, est. 5 min

Waiting for load balancer to open up cockpit port - 10 seconds, est. 5 min

Waiting for load balancer to open up cockpit port - 15 seconds, est. 5 min


Waiting for load balancer to open up cockpit port - 90 seconds, est. 5 min

Waiting for load balancer to open up cockpit port - 95 seconds, est. 5 min

PACC deployment Done...

point your browser at


In the browser, you can see these stats are refreshed every 3 seconds as they are published and consumed in real time.



Step 6 – Scale your PACC deployment according to demand

One very nice feature of Kubernetes is the capability to scale up and down your services dynamically, according to the demands, with no downtime required. In your cockpit window, one sensor container and two web server containers got spun up with the script in the previous step. You can see the web servers are spread across two VMs (black circles) for high availability purpose. Both of them are attached to a service (orange circle) to achieve balancing. The service is associated with a public internet IP address on the Azure load balancer to allow external access.



Now suppose the demand suddenly peaks, and you need more web server containers to serve the load. Issue the following command on the Kubernetes master node to scale the number of web servers from 2 to 8:


root@k8s-master-238E7C1E-0:/opt/pacc# kubectl --namespace=pacc scale deployment websr-deploy --replicas=8


In a few seconds, you should see 6 more web servers popping up in the cockpit window to handle higher load. Kubernetes also makes sure that these new containers are distributed across the hosts (black circles) as evenly as possible. Note that the other window, where it is displaying the stats, continues without any disruptions.



Lastly, in any case should you want to redeploy the cluster, you can issue this command to remove the existing cluster first, e.g. kubectl delete namespace mapr520, and then do “deploy-mapr –n <password>” to redeploy.



We have demonstrated how to spin up a MapR cluster on the Azure Container Service Platform and how to manage PACC with Kubernetes orchestration. The benefit for this solution is a faster time-to-market software development cycle. Software developers can confidently test their codes in this environment before releasing to production.


If you feel this article is of interest to you, please find out more about MapR at or ask technical questions at our community forum, Answers



MapR Installation

Products and Services


Apache Drill: How To Create A New Function? by Tug Grall

Apache Drill allows users to explore any type of data using ANSI SQL. This is great, but Drill goes even further than that and allows you to create custom functions to extend the query engine. These custom functions have all the performance of any of the Drill primitive operations, but allowing that performance makes writing these functions a little trickier than you might expect.

In this article, I'll explain step by step how to create and deploy a new function using a very basic example. Note that you can find lot of information about Drill Custom Functions in the documentation.

Let's create a new function that allows you to mask some characters in a string, and let's make it very simple. The new function will allow user to hide x number of characters from the start and replace then by any characters of their choice. This will look like:
MASK( 'PASSWORD' , '#' , 4 ) => ####WORD

You can find the full project in the following Github Repository.

As mentioned before, we could imagine many advanced features to this, but my goal is to focus on the steps to write a custom function, not so much on what the function does.


For this you will need:

  • Java Developer Kit 7 or later
  • Apache Drill 1.1 or later
  • Maven 3.0 or later


The following Drill dependency should be added to your maven project



The Mask function is an implementation of the DrillSimpleFunc.

Developers can create 2 types of custom functions:

  • Simple Functions: these functions have a single row as input and produce a single value as output
  • Aggregation Functions: that will accept multiple rows as input and produce one value as output

Simple functions are often referred to as UDF's which stands for user defined function. Aggregation functions are referred to as UDAF which stands for user defined aggregation function.

In this example, we just need to transform the value of a column on each row, so a simple function is enough.


The first step is to implement the DrillSimpleFunc interface.

package org.apache.drill.contrib.function;


import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;



scope= FunctionTemplate.FunctionScope.SIMPLE,
nulls = FunctionTemplate.NullHandling.NULL_IF_NULL


public class SimpleMaskFunc implements DrillSimpleFunc{


public void setup() {


public void eval() {




The behavior of the function is driven by annotations (line 6-10) Name of the function Scope of the function, in our case Simple * What to do when the value is NULL, in this case Reverse will just returns NULL


Now we need to implement the logic of the function using setup() and eval()methods.

  • setup is self-explanatory, and in our case we do not need to setup anything.
  • eval that is the core of the function. As you can see this method does not have any parameter, and return void. So how does it work?

In fact the function will be generated dynamically (see DrillSimpleFuncHolder), and the input parameters and output holders are defined using holders by annotations. Let's look into this.

import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.holders.IntHolder;
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;


import javax.inject.Inject;



name = "mask",
scope = FunctionTemplate.FunctionScope.SIMPLE,
nulls = FunctionTemplate.NullHandling.NULL_IF_NULL

public class SimpleMaskFunc implements DrillSimpleFunc {


NullableVarCharHolder input;


@Param(constant = true)
VarCharHolder mask;


@Param(constant = true)
IntHolder toReplace;


VarCharHolder out;


DrillBuf buffer;


public void setup() {

public void eval() {




We need to define the parameters of the function. In this case we have 3 parameters, each defined using the @Param annotation. In addition, we also have to define the returned value using the @Output annotation.

The parameters of our mask function are:

  • A nullable string
  • The mask char or string
  • The number of characters to replace starting from the first

The function returns :

  • A string

For each of these parameters you have to use an holder class. For the String, this is managed by a VarCharHolder or NullableVarCharHolder -lines 21, 24,30- that provides a buffer to manage larger objects in a efficient way. Since we are manipulating a VarChar you also have to inject another buffer that will be used for the output -line 33-. Note that Drill doesn't actually use the Java heap for data being processed in a query but instead keeps this data off the heap and manages the life-cycle for us without using the Java garbage collector.

We are almost done since we have the proper class, the input/output object, we just need to implement the eval() method itself, and use these objects.

public void eval() {

// get the value and replace with
String maskValue = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(mask);
String stringValue = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer);


int numberOfCharToReplace = Math.min(toReplace.value, stringValue.length());


// build the mask substring
String maskSubString =, numberOfCharToReplace);
String outputValue = (new StringBuilder(maskSubString)).append(stringValue.substring(numberOfCharToReplace)).toString();


// put the output value in the out buffer
out.buffer = buffer;
out.start = 0;
out.end = outputValue.getBytes().length;
buffer.setBytes(0, outputValue.getBytes());


The code is quite simple:

  • Get the mask itself - line 4
  • Get the value - line 5
  • Get the number of character to replace - line 7
  • Generate a new string with masked values - lines 10/11
  • Create and populate the output buffer - lines 14 to 17

This code does, however, look a bit strange to somebody used to reading Java code. This strangeness arises because the final code that is executed in a query will actually be generated on the fly. This allows Drill to leverage Java's just-in-time (JIT) compiler for maximum speed. To make this work, you have to respect some basic rules:

  • Do not use imports, but instead use the fully qualified class name, this is what is done on line 10 with the Strings class. (coming from the Google Guava API packaged in Apache Drill)
  • The ValueHolders classes, in our case VarCharHolder and IntHolder should be manipulated like structs, so you must call helper methods, for example getStringFromVarCharHolder and toStringFromUTF8. Calling methods like toString will result in very bad problems.

Starting in Apache Drill 1.3.x, it is mandatory to specify the package name of your function in the ./resources/drill-module.conf file as follow:

drill {

classpath.scanning {

packages : ${?drill.classpath.scanning.packages} [







We are now ready to deploy and test this new function.


Once again since, Drill will generate source, you must prepare your package in a way that classes and sources of the function are present in the classpath. This is different from the way that Java code is normally packaged but is necessary for Drill to be able to do the necessary code generation. Drill uses the compiled code to access the annotations and uses the source code to do code generation.

An easy way to do that is to use maven to build your project, and, in particular, use the maven-source-plugin like this in your pom.xml file:










Now, when you build using mvn package, Maven will generate 2 jars:

  • The default jar with the classes and resources (drill-simple-mask-1.0.jar)
  • A second jar with the sources (drill-simple-mask-1.0-sources.jar)

Finally you must add a drill-module.conf file in the resources folder of your project, to tell Drill that your jar contains a custom function. If you have no specific configuration to set for your function you can keep this file empty.

We are all set, you can now package and deploy the new function, just package and copy the Jars into the Drill 3rd party folder; $DRILLHOME/jars/3rdparty , where $DRILLHOME being your Drill installation folder.

mvn clean package
cp target/*.jar  $DRILL_HOME/jars/3rdparty

Restart drill.


You should now be able to use your function in your queries:

SELECT MASK(first_name, '*' , 3) FIRST , MASK(last_name, '#', 7) LAST  FROM cp.`employee.json` LIMIT 5;


In this simple project you have learned how to write, deploy and use a custom Apache Drill Function. You can now extend this to create your own function.

One important thing to remember when extending Apache Drill (using a custom function, storage plugin or format), is that Drill runtime is generating dynamically lot of code. This means you may have to use a very specific pattern when writing and deploying your extensions. With our basic function this meant we had to:

  • deploy classes AND sources
  • use fully Qualified Class Names
  • use value holder classes and helper methods to manipulate parameters *

Content Originally posted in MapR Converge Blog post on July 28, 2015, visit here

Subscribe to Converge Blog




Apache Drill


Lenovo and MapR have published a reference architecture for MapR Converged Data Platform, based on Lenovo's x3650 M5 servers.

It provides a predefined and optimized hardware infrastructure for the MapR Converged Enterprise Edition, the commercial edition of the MapR Converged Data Platform, which supports enterprise-grade features for business-critical production deployments. This reference architecture provides the planning, design considerations, and best practices for implementing the MapR Converged Data Platform with Lenovo products.

You can find the reference architecture here: Lenovo Big Data Reference Architecture for MapR Converged Data Platform.


On LinkedIn, I wrote the following:

Kappa Architecture is the next 'great thing' to be misunderstood and misapplied. No wonder 'Big Data' projects fail.

After talking with a couple of friends I thought I should explain why I said this...


Jay Kreps wrote an article questioning Lambda Architecture and proposed an alternative. The article can be found here.  At LinkedIn, Jay built some infrastructure using Kafka and Samza such that you didn't need to create a separate persistence layer storing the data in some form of a database.


The issue isn't with what Jay was saying. For certain applications, this architecture makes sense. In the MapR world, you can replace the Kafka / Samza with MapR Streams.  However, the issue is that the Kappa Architecture isn't a good solution for all or for many of the problems where one want's fast data ingestion.


As a solutions architect, you have to ask yourself a couple of questions prior to choosing a tool. In this case, 'Where is the data coming from? (what is the data)' and 'How are we going to use the data'). While attending a couple of local meetups in Chicago, I noticed from some of the comments, that there was a general acceptance that Kappa was the way to go, no questions asked.  This is a very dangerous 'group think'.  This is one of the main factors as to why Big Data projects fail.  

Why Not Kappa

Suppose you're building a Data Lake and rather than perform batch updates from your legacy systems, you intend to use a tool like IBM's CDC or Oracle's Golden Gate software to perform log shipping.  For each transaction performed on a table, you capture the log information and you place it on your Kafka queue and ship it off to your lake.

Since this information contains the latest state of the data row, you can easily just persist the message via Samza and your ingestion is complete.


But here's the problem... when it comes time to use the data, you have to walk thru the results.  Its a sequential scan.

At the same time, you're still treating Hadoop as a relational model.


In this use case, you will want to still persist the CDC record for the table as well as apply the changes to the table. This allows you to retain a snapshot that matches the underlying legacy table, as well as the historical changes to the row.

However, you will still take a performance hit when you try to use the data. Hadoop isn't a relational platform. Its hierarchical.  So to get the most from the data, you will need to transform the data from a relational model to a hierarchical model, then persist.  (Think MapRDB JSON tables)   In this use case, you're storing your data in multiple formats during the ingestion stage.  Your initial storage of the transaction could follow Kappa, however, its less than optimal. You would want to store the data as a record collocating the changes in time order per record ID not in order of inbound messages.


What does this mean?

Note that this is just one example of a use case where if you attempt to implement Kappa, you will end up with a big fail. Its not one application/use case, but a class of use cases.  While Kappa makes sense if you're processing logs, which is a class of use cases, its not fit as a general solution. 


The key here is to understand what each tool does and how to effectively use the tool rather than blindly chose a tool since its the next great thing.  As always Caveat Emptor. ;-)



Editors note: The views, opinions and positions expressed by the authors and those providing comments on these blogs are theirs alone, and do not necessarily reflect the views, opinions or positions of MapR.

Cisco and MapR have been longtime partners on the big data journey. The recently published MapR CVD further strengthens the integration of our products with UCS’s superb management capabilities, complemented by the award-winning, enterprise-grade big data platform, the MapR Distribution including Apache Hadoop.


With the advent of container technology like Docker and application resource management platforms such as Apache Mesos, enterprise customers are looking at these technologies very seriously, as they promise much shorter development cycles and highly scalable product deployment.


A common use case for Mesos deployments is scaling Apache web server services dynamically. Normally, without a utility-grade persistent storage such as one backed by MapR-FS, the storage allocated to Docker containers is ephemeral and lost if a container crashes or is killed. With UCS and MapR, the web content is consistent among the Docker containers. Logs are persisted to MapR-FS and later analyzed with Hadoop.


Dockerizing the Web Server

With the Cisco UCS servers at the foundation, we effortlessly spun up a 10-node MapR cluster with Mesos installed. Using Docker, we created a web server container and then launched the container with Marathon – a Mesos framework for initiating/scaling long-running applications. We simply typed in an arbitrary ID and the following string in the command section of the New Application form:

docker run -d –v /mapr:/www my/webserver” (my/webserver is the Docker container name), and off it went. The web server spun up almost instantaneously. We used the “Scale” button to quickly spin up 5 more web containers in less than a few seconds. See the figure below:




Sharing Persistent Storage among Containers with MapR-FS

A container has its own storage that is limited in space and cannot be shared with other containers. The MapR POSIX- compliant NFS gateway is a perfect solution that allows the containers to tap into the robust, HA/DR-ready MapR-FS for big data analytics. Note that we already have NFS-mounted MapR-FS on the cluster nodes under /mapr. When we spun up the container, the –v option allows the /mapr mount point on the host node to be mapped to a /www mount point in the container. Furthermore, we modified the DocumentRoot directive in httpd.conf to point to /www. This makes managing the web content much easier with real-time synchronization across all the web containers. Additionally, we modified the CustomLog and ErrorLog directives, pointing to a log directory under /www, where each container has its own set of log files associated with a unique hostid. With the MapR NFS gateway, we can simply verify these log files by typing the Unix ls command against the NFS mount point:


# ls /mapr/<MapR cluster name>/www/logs

2c4b95924357_access.log 64cc248c438b_error.log   869fff95a17a_access.log 871e141fc8e7_error.log

2c4b95924357_error.log   6b33974a3848_access.log 869fff95a17a_error.log   9631b85d9dd2_access.log

64cc248c438b_access.log 6b33974a3848_error.log   871e141fc8e7_access.log 9631b85d9dd2_error.log


This setup ensures that we have a central log repository protected by MapR-FS with scheduled snapshots and mirrored volumes that can be processed later with SQL-on-Hadoop tools like Apache Drill to perform web click stream analysis. Of course, this only serves to demonstrate the combined power and capabilities of MapR-FS, Docker and Mesos. The sky is the limit when it comes to other big data applications.


Project Myriad and Beyond

As you know, Yarn manages Hadoop cluster resources, and Mesos manages cluster resources for applications. Unfortunately, they do not communicate with each other, although each does a fairly good job in its own realm. Project Myriad was created to break down the wall between Mesos and Yarn. We believe that with Cisco UCS as the hardware foundation that delivers rock-solid performance with top-quality compute/network/storage resources, the MapR Distribution with Myriad enables the aggregation of the pools of resources for Yarn and Mesos. This combo holds great promise in solving most operation/development challenges by achieving much better resource allocation while retaining agility at scale.

Getting Started with the Spark Web UI

by Carol McDonald



This post will help you get started using the Apache Spark Web UI to understand how your Spark application is executing on a Hadoop cluster. The Spark Web UI displays useful information about your application, including:

  • A list of scheduler stages and tasks
  • A summary of RDD sizes and memory usage
  • Environmental information
  • Information about the running executors

This post will go over:

  • Components and lifecycle of a Spark program
  • How your Spark application runs on a Hadoop cluster
  • Using the Spark web UI to view the behavior and performance of your Spark application

This post assumes a basic understanding of Spark concepts. If you have not already read the tutorial on Getting Started with Spark on MapR Sandbox, it would be good to read that first.


This tutorial will run on the MapR Sandbox. Version 4.1 needs Spark to be installed , Version 5 includes Spark.

Spark Components of Execution

Before looking at the web UI, you need to understand the components of execution for a Spark application.  Let’s go over this using the word count example from the  Getting Started with Spark on MapR Sandbox tutorial, shown in the image below.  

Apache Spark web UI

In order to run this example, first at the command line you can get the text file with this command:

Then you can enter the code shown below in the Spark shell.  The example code results in the wordcounts RDD, which defines a directed acyclic graph (DAG) of RDDs that will be used later when an action is called. Operations on RDDs create new RDDs that refer back to their parents, thereby creating a graph. You can print out this RDD lineage with toDebugString as shown below.

val file= sc.textFile("/user/user01/alice.txt").cache()

val wordcount = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)


//show RDD and its recursive dependencies for debugging


// res0: String =

// (2) ShuffledRDD[4] at reduceByKey at <console>:23 []

// +-(2) MapPartitionsRDD[3] at map at <console>:23 []

//    |  MapPartitionsRDD[2] at flatMap at <console>:23 []

//    |  /user/user01/alice.txt MapPartitionsRDD[1] at textFile at <console>:21 []

//    |  /user/user01/alice.txt HadoopRDD[0] at textFile at <console>:21 []



The first RDD, HadoopRDD, was created by calling sc.textFile(), the last RDD in the lineage is the ShuffledRDD created by reduceByKey. Below on the left is a diagram of the DAG graph for the wordcount RDD; the green inner rectangles in the RDDs represent partitions. When the action collect() is called, Spark’s scheduler creates a Physical Execution Plan for the job, shown on the right, to execute the action.

HadoopRDD graph

The scheduler splits the RDD graph into stages, based on the transformations. The narrow transformations (transformations without data movement) will be grouped (pipe-lined) together into a single stage. This physical plan has two stages, with everything before ShuffledRDD in the first stage.

shuffledRDD graph

Each stage is comprised of tasks, based on partitions of the RDD, which will perform the same computation in parallel.  The scheduler submits the stage task set to the task scheduler, which launches tasks via a cluster manager.

Here is a summary of the components of execution:

  • Task: a unit of execution that runs on a single machine
  • Stage: a group of tasks, based on partitions of the input data, which will perform the same computation in parallel
  • Job: has one or more stages
  • Pipelining: collapsing of RDDs into a single stage, when RDD transformations can be computed without data movement
  • DAG: Logical graph of RDD operations
  • RDD: Parallel dataset with partitions

How your Spark Application runs on a Hadoop cluster

The diagram below shows Spark on an example Hadoop cluster:

Spark application on Hadoop

Here is how a Spark application runs:

  • A Spark application runs as independent processes, coordinated by the SparkContext object in the driver program.
  • The task scheduler launches tasks via the cluster manager;  in this case it’s YARN.
  • The cluster manager assigns tasks to workers, one task per partition.  
  • A task applies its unit of work to the elements in its partition, and outputs a new partition.
    • Partitions can be read from an HDFS block, HBase or other source and cached on a worker node (data does not have to be written to disk between tasks like with MapReduce).
  • Results are sent back to the driver application.

Using the Spark web UI to view the behavior and performance of your Spark application

You can view the behavior of your Spark application in the Spark web UI at http://<host ip>:4040.  Here is a screen shot of the web UI after running the word count job. Under the "Jobs" tab, you see a list of jobs that have been scheduled or run, which in this example is the word count collect job.  The Jobs table displays job, stage, and task progress.

Spark Jobs

Under the Stages tab, you can see the details for stages. Below is the stages page for the word count job, Stage 0 is named after the last RDD in the stage pipeline, and Stage 1 is named after the action.

Details for Spark Job

You can view RDDs in the Storage tab.

Spark RDD storage

Under the Executors tab, you can see processing and storage for each executor. You can look at the thread call stack by clicking on the thread dump link.

Spark Executors


This concludes the Getting Started with the Spark web UI tutorial.  If you have any further questions about using the Spark web UI, please ask them in the section below.

You can find more information about these technologies here:


Related Content



Content Originally posted in MapR Converge Blog post, visit here

Subscribe to Converge Blog

Event Driven Microservices Patterns

by Carol McDonald


In this blog we will discuss some patterns which are often used in microservices applications which need to scale:

  • Event Stream
  • Event Sourcing
  • Polyglot Persistence
  • Memory Image
  • Command Query Responsibility Separation

The Motivation

Uber, Gilt and others have moved from a monolithic to a microservices architecture because they needed to scale.  A monolithic application puts all of its functionality into a single process, scaling requires replicating the whole application, which has limitations.

Sharing normalized tables in a clustered RDBMS does not scale well because distributed transactions and joins can cause concurrency bottlenecks.

The microservice architectural style is an approach to developing an application as a suite of small  independently deployable services built around specific business capabilities.  A microservices approach is well aligned to a typical big data deployment.  You can gain modularity, extensive parallelism and cost-effective scaling by deploying services across many commodity hardware servers.  Microservices modularity facilitates independent updates/deployments, and helps to avoid single points of failure, which can help prevent large-scale outages. 

Event Stream

When moving from a monolithic to a microservices architecture  a common architecture  pattern is event sourcing  using an append only event stream such as Kafka or MapR Streams (which provides a Kafka 0.9 API) .   With MapR Streams (or  Kafka) events  are grouped into logical collections of events called Topics. Topics are partitioned  for parallel processing. You can think of a partitioned Topic like a queue, events are delivered in the order they are received.

Unlike a queue, events are persisted, even after they are delivered they remain on the partition, available to other consumers.

Older messages are automatically deleted based on the Stream’s  time-to-live setting, if the setting is 0 then they will never be deleted.

Messages are not deleted from Topics when read, and topics can have multiple different consumers, this allows processing of the same messages by different consumers for different purposes. Pipelining is also possible where a consumer enriches an event and publishes it to another topic.

Event Sourcing

Event Sourcing is an architectural pattern in which the state of the application is determined by a sequence of events each of which is recorded in an append-only Event store or Stream.  As an example, imagine that each “event” is an incremental update to an entry in a database.   In this case, the state of a particular entry is simply the accumulation of events pertaining to that entry.  In the example below the Stream persists the queue of all deposit and withdrawal events, and the database table persists the current account balances.

Which one of these, the Stream or the Database, makes a better system of record? The events in the Stream can be used to reconstruct the current account balances in the Database, but not the other way around.  Database replication actually works by suppliers writing changes to a change log, and consumers applying the changes locally.  Another well known example of this is a source code version control system.

With a Stream, events can be re-played to create a new view, index, cache, memory image, or materialized view of the data.

The Consumer simply reads the messages from the oldest to the latest to create a new View of the data.

There are several advantages for modeling application state with streams:

  • Lineage: to ask how did BradA’s balance get so low?
  • Auditing:  it gives an audit trail, who deposited/withdrew from account id BradA? This is how accounting transactions work.
  • Rewind: to see what the status of the accounts were last year.
  • Integrity: can I trust the data hasn’t been tampered with?
    •  yes because Streams are immutable.

The Replication of  MapR Streams gives a powerful testing and debugging technique.  A replica of a Stream can be used to replay a version of events for testing or debugging purposes.

Different databases and schemas for different needs

There are lots of databases out there, each use different technologies depending on how the data is used, optimized for a type of write or read pattern:  graph query, search, document ...  What if you need to have the same set of data for different databases, for different types of queries coming in? The Stream can act as the distribution point for multiple databases, each one providing a different read pattern.  All changes to application state are persisted to an event store which is the system of record. The event store provides rebuilding state by re-running the events in the stream.

Events funnel out to databases which are consumers of the stream.  Polyglot persistence provides different specialized materialized views.


Command and Query Responsibility Segregation (CQRS) is a pattern that separates the read model and Queries from the write model and Commands often using event sourcing.  Let’s look at how an online shopping  application’s item rating functionality could be separated using the CQRS pattern.  The functionality, shown below in a monolithic application, consists of users rating items they have bought, and browsing item ratings while shopping.

In the CQRS design shown below we isolate and separate the Rate Item write “command” from the Get Item Ratings read “query” using event sourcing.  Rate Item events are published to a Stream. A handler process reads from the stream and persists a materialized view of the ratings for an item in a NoSQL document-style database.


NoSQL and De-normalization

With MapR-DB a table is automatically partitioned across a cluster by key range, and each server is the source for a subset of a table. Grouping the data by key range provides for really fast read and writes by row key. With MapR-DB you design your schema so that the data that is read together is stored together.

Often with MapR-DB, you de-normalize or store in one table what would be multiple tables in a normalized relational database.  If your entities exist in a one-to-many relationship, it’s possible to model it in MapR-DB HBase as a single row or MapR-DB JSON as a single document.  In the example below, the item and related ratings are stored together and can be read together with a single get on the indexed row key.  This makes the reads a lot faster than joining tables together.

Event Sourcing: New Uses of Data

An advantage of using an Event Stream for the rate item and other shopping related events is shown here. This design lets us use this data more broadly.  Raw or enriched events can be stored in inexpensive storage such as MapR-FS. Historical ratings data can be used to build a machine learning model for recommendations. Having a long retention time for data in the queue is also very useful. For example, that data could be processed to build a collection of shopping transaction histories stored in a data format such as Parquet that allows very efficient querying. Other processes might use historical data and streaming shopping related events with machine learning to predict shopping trends, to detect fraud, or to build a real-time display of where transactions are happening.


Fashion Retailer’s Event Driven Architecture

A major fashion retailer wanted to increase in-season agility and inventory discipline in order to react to demand changes and reduce markdowns.  The Event driven solution architecture is shown below:

  • Weather, world events, and logistical data is collected in real time via MapR Streams, allowing for real time analysis of potential logistical impacts, and rerouting of inventory.
  • Apache Spark is used for batch and streaming analytics processing, and machine learning for predicting supply chain disruptions, and product recommendations.
  • Data is stored in MapR-DB providing scalable, fast reads and writes. Apache Drill is used for interactive exploration and preprocessing of the data with a schema-free SQL query engine.
  • ODBC with Drill provides support for existing BI tools.
  • MapR’s Enterprise capabilities provide for global data center replication.


In this blog post, we discussed event driven microservice architecture using the following design patterns: Event Sourcing, Command Query Responsibility Separation, and Polyglot Persistence.  All of the components of the architectures we discussed can run on the same cluster with the MapR Converged Data Platform. 

References and More Information

Content Originally posted in MapR Converge Blog post on February 08, 2017 |by Carol McDonald, visit here 

Subscribe to Converge Blog


Related Content



This blog is a first in a series that discusses some design patterns from the book MapReduce design patterns and shows how these patterns can be implemented in Apache Spark(R).

When writing MapReduce or Spark programs, it is useful to think about the data flows to perform a job. Even if Pig, Hive, Apache Drill and Spark Dataframes make it easier to analyze your data, there is value in understanding the flow at a lower level, just like there is value in using Explain to understand a query plan. One way to think about this is in groupings for types of patterns, which are templates for solving a common and general data manipulation problems. Below is the list of types of MapReduce patterns in the MapReduce book:

  • Summarization Patterns
  • Filtering Patterns
  • Data Organization Patterns
  • Join Patterns
  • Metapatterns
  • Input and Output Patterns

In this post we will go over one of the summarization patterns, namely numerical summarizations.


Numerical summarizations are a pattern for calculating aggregate statistical values over data. The intent is to group records by a key field and calculate aggregates per group such as min, max, median. The figure below from the MapReduce design patterns book shows the general execution of this pattern in MapReduce.

This Aggregation pattern corresponds to using GROUP BY in SQL for example:

SELECT MIN(numericalcol1), MAX(numericalcol1),
   COUNT(*) FROM table GROUP BY groupcol2;

In Pig this corresponds to:

b = GROUP a BY groupcol2;
c = FOREACH b GENERATE group, MIN(a.numericalcol1),
MAX(a.numericalcol1), COUNT_STAR(a);


In Spark, Key value Pair RDDs are commonly used to group by a key in order to perform aggregations, as shown in the MapReduce diagram, however with Spark Pair RDDS, you have a lot more functions than just Map and Reduce.

We will go through some aggregation examples using the dataset from a previous blog on Spark Dataframes. The dataset is a .csv file that consists of online auction data. Each auction has an auction id associated with it and can have multiple bids. Each row represents a bid. For each bid, we have the following information:

(In the code boxes, comments are in Green and output is in Blue)

Below we load the data from the ebay.csv file, then we use a Scala case class to define the Auction schema corresponding to the ebay.csv file. Then map() transformations are applied to each element to create the auctionRDD of Auction objects.

<font color="green">// SQLContext entry point for working with structured data</font>
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
<font color="green">// this is used to implicitly convert an RDD to a DataFrame.</font>
import sqlContext.implicits._
<font color="green">// Import Spark SQL data types and Row.</font>
import org.apache.spark.sql._
<font color="green">//define the schema using a case class</font>
case class Auction(auctionid: String, bid: Double, bidtime: Double, bidder: String, bidderrate: Integer, openbid: Double, price: Double, item: String, daystolive: Integer)
<font color="green">// create an RDD of Auction objects</font>
val auctionRDD= sc.textFile("ebay.csv").map(_.split(",")).map(p => Auction(p(0),p(1).toDouble,p(2).toDouble,p(3),p(4).toInt,p(5).toDouble,p(6).toDouble,p(7),p(8).toInt ))

The figure below shows the general execution in Spark to calculate the average bid per auction for an item.

The corresponding code is shown below. First a key value pair is created with the auction id and item as the key and the bid amount and a 1 as the value , e.g. ((id,item), bid amount,1)) . Next a reduceBykey performs a sum of the bid amounts and a sum of the ones to get the total bid amount and the count. A mapValues calculates the average which is the total bid amount / count of bids.

<font color="green">// create key value pairs of ( (auctionid, item) , (bid, 1))</font>
val apair =>((auction.auctionid,auction.item), (, 1)))
<font color="green">// reducebyKey to get the sum of bids and count sum</font>
val atotalcount = apair.reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))
<font color="green">// get a couple results</font>
<font color="blue">// Array(((1641062012,cartier),(4723.99,3)), ((2920322392,palm),(3677.96,32)))</font>
<font color="green">// avg = total/count</font>
val avgs = atotalcount.mapValues{ case (total, count) => total.toDouble / count }
<font color="green">// get a couple results</font>
<font color="blue">// Array(((1641062012,cartier),1574.6633333333332), ((2920322392,palm),114.93625))</font>

<font color="green">// This could also be written like this</font>
val avgs>((auction.auctionid,auction.item), (, 1))).reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)).mapValues{ case (total, count) => total.toDouble / count }

It is also possible to use the java Math class or the spark StatCounter class to calculate statistics as shown

import java.lang.Math

<font color="green">// Calculate the minimum bid per auction</font>
val amax = apair.reduceByKey(Math.min)
<font color="green">// get a couple results</font>
<font color="blue">// Array(((1641062012,cartier),1524.99), ((2920322392,palm),1.0))</font>

import org.apache.spark.util.StatCounter
<font color="green">// Calculate statistics on the bid amount per auction</font>
val astats = apair.groupByKey().mapValues(list => StatCounter(list))
<font color="green">// get a result</font>
<font color="blue">// Array(((1641062012,cartier),(count: 3, mean: 1574.663333, stdev: 35.126723, max: 1600.000000, min: 1524.990000)))</font>

Spark DataFrames provide a domain-specific language for distributed data manipulation, making it easier to perform aggregations. Also DataFrame queries can perform better than coding with PairRDDs because their execution is automatically optimized by a query optimizer. Here is an example of using DataFrames to get the min , max, and avg bid by auctionid and item :

val auctionDF = auctionRDD.toDF()
<font color="green">// get the max, min, average bid by auctionid and item</font>
auctionDF.groupBy("auctionid", "item").agg($"auctionid",$"item", max("bid"), min("bid"), avg("bid")).show

<font color="blue">auctionid item MAX(bid) MIN(bid) AVG(bid)
3016429446 palm 193.0 120.0 167.54900054931642
8211851222 xbox 161.0 51.0 95.98892879486084</font>

You can also use SQL while working with DataFrames, using Spark SQL. This example gets the max, min, average bid by auctionid and item.

<font color="green">// register as a temp table inorder to use sql</font>
auctionDF .registerTempTable("auction")
<font color="green">// get the max, min, average bid by auctionid and item</font>
val aStatDF = sqlContext.sql("SELECT auctionid, item, MAX(bid) as maxbid, min(bid) as minbid, avg(bid) as avgbid FROM auction GROUP BY auctionid, item")


<font color="green">// show some results</font>
<font color="blue">auctionid item maxbid minbid avgbid
3016429446 palm 193.0 120.0 167.549
8211851222 xbox 161.0 51.0 95.98892857142857</font>


This concludes the first in a series which will discuss some MapReduce design patterns implemented with Spark. This discussion was very condensed, for more information on the patterns refer to the MapReduce design patterns book, for more information on Spark Pair RDDs refer to the Learning Spark Key value Pairs chapter.

Related Content

Apache Spark



Content Originally posted in MapR Converge Blog post, visit here

November 02, 2015 | BY Carol McDonald


MapR has worked closely with Azure to develop marketplace offerings that enable users to conduct a proof of concept experience or production deployment with the MapR Converged Data Platform on Azure. These marketplace offerings, which are preloaded and preconfigured with the MapR software and the required supporting operating system, can be launched on the Azure Marketplace portal. By default, these offerings are based on the MapR Converged Community edition that allows free and unlimited production use without certain features enabled. If the user wants to use MapR Streams, MapR-DB, HA, etc., they need to purchase these licenses from MapR and enable it on the cluster. For more detail regarding various MapR editions, please visit this link: This blog post will cover the details of how you can get a MapR cluster on Azure up and running in less than 30 minutes. Alternatively, for developers who want to have a low-cost MapR environment to play with, check out this blog post to spin up a MapR sandbox on Azure Marketplace. It is based on a MapR community license that gives you unlimited access to MapR-DB and MapR Streams as well as other DR and HA features, such as mirroring and snapshot, etc.


Working with Azure MapR marketplace offerings

Before you start, it is highly recommended that you check the usage and quota under your current Azure subscription. These quotas include CPU, Network, and Storage. Make absolutely certain that you have planned for enough resources before you start launching the cluster. To check your quota, you can go to the Azure portal, select the desired subscription where you want to launch the MapR cluster, then select “Usage + quotas." See the screenshot below:



Step 1

Point your browser to, and login to your Azure account. In the search area, click on the “+New” option on the left pane, type in MapR, and you will find a few MapR Azure Marketplace offerings, as illustrated below.

Select the desired offering (e.g. MapR Converged Data Platform v5.2).




Step 2

After you have selected a desired offering, you will see an introduction screen, as shown below. Go ahead and click on “Create.”



Step 3

In this screen, you need to provide information, such as cluster name, disk type for the cluster, username for the admin to manage the cluster, password, subscription, and resource group name (currently only new resource group option is supported). After you have filled in the information, select “OK” to go to the next step.




Step 4

In this step you will need to determine the cluster size and VM size as well as the password for the unique “mapr” user. The “mapr” user is a reserved power user for the MapR cluster. When selecting the VM size, you can expand the “View all” on the top right corner of the screen to see more options. In this case, we selected “D3 Standard.” Click “Select” to move forward. You can visit this link to find out the pricing information for these various VM sizes:






Step 5

In this step, you are going to configure network settings for the cluster. You can select either new or existing networks. We chose to create a new network for our cluster in this demo: you fill in the name of the network as well as the address space and subnet address range, and then hit “OK.” Note that if there is any security group associated with the existing subnet, the cluster will inherit those security group settings. By default, there will be no security group setting created if you chose to create a new network for this deployment. Please visit this link to find out information about how to create a security group to protect the cluster after the cluster is deployed successfully:






Step 6

In this step, you have an opportunity to review your cluster settings. Click “OK” if everything looks good.





Step 7

In this step, you need to review the purchase agreement. Click “Purchase” to proceed.




If everything goes well, you should see a successful deployment in about 30 minutes, depending on the cluster size and VM types. Click the template in the resource group on the portal to find out the various web addresses to connect to the newly deployed cluster.




Once you have copied the IP addresses of the web links, paste them to your browser’s navigation window. For example, you can go to the cluster web console for the MapR Control System (MCS) portal and login with the credentials that you provided when creating the cluster. Note that if you selected ssh public key as your authentication method earlier, you should login as the power user ‘mapr’ with its password you assigned. If you selected the “Password” option for authentication, you can either login as the sysadmin user or the power user ‘mapr’ with the passwords you assigned respectively.



MapR 5.2 also comes with MapR Monitoring installed; it is based on Kibana, Grafana, Elastic search, and OpenTSDB. It gives you real-time cluster performance visualization and the abilities to analyze the various cluster logs. Visit this link to learn more about MapR Monitoring:


You are also encouraged to try out Apache Drill (, the SQL-on-Hadoop tool that is gaining a lot of momentum in the community. Simply point your browser at the URL link described above. Don’t forget to check out the Drill tutorials, where it walks you through the steps required to query data stored in a MapR cluster using Drill, and extract insights and visualize the findings using Tableau, a leading visualization tool that is part of the broader ecosystem.


For those who are interested in real-time processing, such as Apache Spark, MapR Streams, or NoSQL database like MapR-DB, the MapR Converged Data Platform comes with everything included; you just need to purchase respective licenses to enable these modules. There is no need to install extra layers of applications, like Kafka or HBase.


Final Notes:


  1. MapR Azure Marketplace offerings by default use dynamic external IPs for the instances, because most users may not have access to static IPs. Therefore, the external IPs will change across instance reboots. That said, the cluster will continue to function across the boots because the internal IPs remain the same. However, if you plan to have external applications connecting to the cluster, you will either have to modify the client to point to the new IP addresses of the cluster nodes or switch to static IPs after you spin up the cluster the first time. To switch to static IPs, pick the instance, usually the first node, select “Network interfaces,” then click on the interface name as described in the screenshot below:




Further click on “Overview” -> “Public IP address” -> “Configuration” -> “Static,” then click “Save” to take effect. See the screenshot below:



2. You may want to shutdown the cluster from time to time to save costs. To do that, simply ssh into the first node as root, then issue the following commands to shutdown MapR services:

clush –a service mapr-warden stop

clush –a service mapr-zookeeper stop


Once MapR services are stopped, go to the Azure portal to stop your cluster nodes. Note: don’t simply issue a “init 0” or “halt” command in the instances, because these commands will not deallocate the used resources from instances, and you will still be billed for VM usages.



We have walked you through how to spin up a MapR cluster on Azure Marketplace. In order to take full advantage of the MapR cluster for your big data analytics, I encourage you to check out MapR community ( You can find lots and lots of resources to help you understand more about MapR and its differentiators against other Hadoop distributions as well as demo blog posts on using MapR for data analytics. You can also ask questions, which will be answered very quickly by our community members. 

TensorFlow on MapR Tutorial

by Nick Amato

Even if you haven't had a chance to check out TensorFlow in detail, it's clear that your choice of platform has a big impact just as it does for other machine learning frameworks. The adventure from trial to production involves many intermediate destinations, from feature engineering to model-building to execution and real-time evaluation. Even a model with the most spectacular F1-score is only as good as how effectively you can put it to use helping customers. Questions arise such as: do you need to evaluate against data for offline or online analysis (or both)? Where does the preprocessed (or feature-engineered) data live on its way to TensorFlow? Is there a way to preserve data lineage as it moves through the various stages to support both security concerns as well as easy debugging?

In this post we'll look at how to get a TensorFlow environment running on the MapR sandbox, which, as you'll see in the tutorial, just might be the "ultimate" starting point.


This combination is useful if you are early in the process and want to try out a few examples. You can run the sandbox on a well-equipped laptop and it will expose all of the MapR features so it's easy to envision how your application can evolve from concept to production use.


Follow these steps to build the single-node environment:

  1. Download and run the MapR sandbox per the instructions.

  2. After starting the sandbox, you should see a banner screen on the console that says something like 'MapR-Sandbox ...'. Press Alt-F2 to select a new virtual terminal. You can run this tutorial from the console directly or via 'ssh' from another machine. This blog post has some good pointers for configuring networking on the sandbox to support ssh from another machine (usually the hosting machine).

  3. Log in as 'root', with password 'mapr', to add the 'mapr' user to the 'sudoers' group as follows:

       # echo "mapr ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers

       # exit

  4. After logging out, log in again as the user 'mapr' with password 'mapr'.

  5. Download the Tensorflow-on-MapR installation script.

       wget -O

  6. You should now have a file called in the current directory. Run the script as follows.


This script will download prerequisite packages including the 'bazel' build tool from Google which is required to build TensorFlow from source. In the process of building the packages, MapR services may be temporarily disabled and reenabled to ensure there are enough resources on the virtual machine. If you have more than 8GB RAM this is not strictly necessary but is done as a precaution to fit all environments. This procedure may take a few minutes to complete on slower systems or if you have a lot of other things running on the machine.

Congratulations! You should now have a fully functional TensorFlow setup running on MapR. Let's dive into an example.


The tflearn interface to Tensorflow is a convenient interface that is structured similarly to the well-known scikit-learn high-level API. This API is both expressive on its own and helpful in porting existing ML applications to TensorFlow. Additionally, many of the lower-level TensorFlow aspects are usable from this interface, making tflearn a good way to transition from scikit-learn to developing models of other shapes and sizes.

We installed tflearn in the above preparation steps, so the sandbox is ready to run our example code.

First, let's enable the TensorFlow environment and download the rest of the files for the tutorial.

scl enable python27 bash git clone []( cd tensorflow

In this Python example we will use a recently released dataset from the US Department of Homeland Security relating to lost baggage claims in United States airports. We will develop a small application that can make predictions based on past yearly data. We will use it to answer the question, "Will this claim be accepted?" You can envision scenarios where this will be valuable: implementing automated claims, predicting the impact of future claims, or even as the basis of a mobile application where customers could get their claims instantly processed. Have you ever had to wait in line to make a damage claim? Wouldn't it be cool to have the whole thing done on your phone in the taxi ride back to the hotel?

DHS has several data sets for prior years. The raw data file for our example, 'claims_2002_2006.csv' is pulled directly from the public DHS site, converted from XLS to CSV in the repo for easy handling.

First we need to preprocess the rows, by handling categorical variables and doing the usual cleaning. Before we do that, however, let's copy the files to MapR-FS so they are replicated, and take a snapshot so we can always reference the original data set. This is a unique capability of MapR in that we can make copy-on-write, application consistent snapshots of any volume.

Let's create a new volume for landing the raw data, pull it from the repo, and take a snapshot of it.

$ maprcli volume create -name claimdata -path /cd $ cp claims_2002_2006.csv /mapr/ $ maprcli volume snapshot create -snapshotname origdata -volume claimdata

We now have an application-consistent snapshot of the original data so we can refer back to it if needed -- notice we also used a local NFS mount here and didn't use any 'hadoop' commands to ingest the data.

Next, preprocess the data and create the test and training sets by running the included script:


You should see something like the following output:

reading input file: /mapr/ raw data len: 97231 dropped 11286 rows with invalid values dropped 1302 very large claims dropped 0 remaining rows with invalid values classes: accepted/total: train: 38150/67706 test: 9559/16937 writing training file: /mapr/ writing test file: /mapr/

Note that the two classes (after our mapping in are fairly balanced out of the box, with roughly half of each data set representing one of the two classes. The two classes are 0 and 1, meaning "accepted" or "other" respectively. We did this to simplify the example -- if a claim was in any other case than 'accepted' (for example, still in progress, referred to a contractor claim, etc.) it was considered "not accepted". There are probably some nuances not completely captured here and it's an area for further exploration.

Now let's look at the first section of

#!/usr/bin/env python  from __future__ import absolute_import from __future__ import division from __future__ import print_function  import tensorflow as tf import numpy as np  # Similarly to the example in: # we create a model and test on our own TSA Baggage Claims data.  # separated train and test files from MapR-FS TRAIN = "/mapr/" TEST = "/mapr/" MODEL_DIR = "/mapr/"  # load the data sets training_set = tf.contrib.learn.datasets.base.load_csv_with_header(     filename=TRAIN,,     features_dtype=np.float32) test_set = tf.contrib.learn.datasets.base.load_csv_with_header(     filename=TEST,,     features_dtype=np.float32)

TensorFlow provides two of its own functions for reading CSV files directly (as if there weren't enough already in Python). These have their own resulting data structures and even their own header format. Refer to the output in for how the header is constructed. In the above code we use these CSV functions to load our train and test sets.

Let's move on to the fun part... the model-building:

dim = len([0]) feature_columns = [tf.contrib.layers.real_valued_column("", dimension=dim)]  # make a 3-layer DNN classifier = tf.contrib.learn.DNNClassifier(feature_columns=feature_columns,                                                                    hidden_units=[512, 256, 128],                                                                    n_classes=2,                                                                    model_dir=MODEL_DIR)  # fit the model on the training data,          ,                    steps=100)

In the above code we create a 3-layer Deep Neural Net (DNN) classifier with a very similar API to what we would do with a scikit-learn classifier. Note that we set model_dir to a path in MapR-FS. One of the benefits of TensorFlow is that the model can be easily saved to a file which you can load later and perform more iterations. This is another spot where MapR-FS snapshots can come in handy, and the ability to use the filesystem as random read-write capable, while fully replicating the data across the cluster, saves a lot of time.

Now that we have our DNN and training data consumed, the last step is to predict 0/1 (accepted/not accepted) values for our held-out training data. We've used an 80/20 split here, with 80% going to training and 20% going to test, which is a typical value.

# print an accuracy report acc = classifier.evaluate(,                 ['accuracy'] print('accuracy: %2.2f' % acc)

This last final code prints the overall accuracy. There are other metrics

Ready to try it? Run the script:


You should see output similar to the following (you may also see intermediate output from the TensorFlow code as it trains the model).

accuracy: 0.64

This can be interpreted as, "for approximately 64% of the data set the claim status was accurately predicted." Not bad for a few minutes work -- the model is correct nearly two thirds of the time. Can we do better? Some possibilities come to mind: more feature engineering? Building separate models for each airport? Throwing more hardware at the problem? TensorFlow lets you do them all but we'll leave those for for another blog post.


A handful of the core API functions take parameters that specify where to save certain information in files, such as logs for individual processes and checkpointed models. Many of the provided examples point to /tmp directories for this on the local filesystem, but this can lead to a sprawling mess of files when using a cluster. With MapR-FS mounted locally over NFS at each node, you can simply give these files a unique name and write them to the shared filesystem with almost zero performance impact. If a directory is needed, you can just point to a directory starting from /mapr to use the local NFS mount for the cluster. For example, when creating a Supervisor:

# Create a "supervisor", which oversees the training process. sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),                              logdir="/mapr/",                              init_op=init_op,                              summary_op=summary_op,                              saver=saver,                              global_step=global_step,                              save_model_secs=600)

Similarly with writing logs that would ordinarily write on every machine.

logwriter = tf.train.SummaryWriter(logs_path, graph=tf.get_default_graph())


Even though we only scratched the surface of TensorFlow and MapR, you can see in this example that the MapR Converged Data Platform provides an easy place to get started with machine learning. The platform will be there with you as your application grows into production. All of the code in this tutorial can be found on GitHub here. Have questions? Leave a comment below or talk to us in the community forum.

Content Originally posted in MapR Converge Blog post, visit here

March 22, 2017 | BY Nick Amato




machine learning

Ted Dunning, Chief Applications Architect for MapR, presented a session titled: “Some Important Streaming Algorithms You Should Know About” at the Spark Summit 2015 conference held in San Francisco. During the session, he highlighted some newer streaming algorithms such as t-digest and streaming k-means. This article is adapted from his talk.

Streaming algorithms have to be approximate

The key thing about streaming algorithms is they have to be approximate algorithms. There are a few things that you can compute exactly in a streaming fashion, but there are lots of important things that you can't compute that way, so we have to approximate.

Most important aggregates can be approximated online. Many of these approximate aggregates can be computed online and all the algorithms mentioned here are that way.


There are two tricks that I'm going to highlight here. One is hashing, which turns a beautiful identity function into hash. The second trick is sketching. Often, you can take a very large amount of data and build a very small sketch of the data. Carefully done, we can use the sketch to get values of interest. The trick is to find a good sketch. All of the algorithms here use sketching of some kind and most use hashing as well.


Let's talk very quickly about HyperLogLog. The basic idea is if I have n samples that are hashed and inserted into a [0, 1) interval, those n samples are going to make n+1 intervals. Therefore, the average size of the n+1 intervals has to be 1/(n+1). By symmetry, that means the average distance to the minimum of those hashed versions is also going to be 1/(n+1). In addition, duplicates values will go exactly on top of previous values, so they won't change the picture at all, so the n that we're talking about is the number of unique values we have inserted. That means that if we have ten samples, the minimum is going to be right around 1/11. I said .1 on the slide here for brevity, but you get the idea.

Here it is as a much larger aggregate. Here we have the same 100 samples, and I've hashed them 10,000 times. This gives lots of different values for the minimum hashed result (and different samples have the minimum hashed value in different trials), but key observation is that the mean value there is 0.0099, or 1/101. That tells us how many unique values there were. What we need to do is store many, many hashed minimum values and average them. HyperLogLog does exactly this, and it accomplishes this by cleverly storing the minimums in a very small space. So the result is that you get a sketch that helps you compute a very useful aggregate. Remember that the Hyper-log-log is good for computing the number of distinct values.

Count min sketch

Count Min Sketch is another, different algorithm, where the goal is to know the frequency of popular items, but we don't have enough room for a full table of counters for all possible items. The basic idea is we can hash each incoming item several different ways, and increment a count for that item in a lot of different places, one place for each different hash. Clearly, because each array that we use is much smaller than the number of unique items that we see, it will be common for more than one item to has to a particular location. The trick is that for the any of most common items, it is very likely that at least one of the hashed locations for that item will only have collisions with less common items. That means that the count in that location will be mostly driven by that item. The problem is how to find the cell that only has collisions with less popular items.


The answer is that when we try to find out the count for this popular item, we look in all the places it hashed to and take the minimum count that we find in any of them, hoping that no more popular item collided with it in all of those locations. If at least one of those has only less popular items, we'll get a pretty good count. Because we take the minimum of all of the counts that we find, we know that we have found the least impacted count. That's the Count Min Sketch which is a kind of a sketch that is really useful for counting number of occurrences of the most popular items we have seen.

We’ve discussed two sketches so far. Leaky counters are another interesting thing. This graph, which is actually very hard to see because it drops so rapidly, is a count of word frequencies in English. This is what Zipf's law looks like if you don't do a log-log scale, but the key point about this is because it drops so fast, the frequency of anything that's rare is very, very low compared to the things that are more common. This is the same observation that makes the count-min sketch work, but it is also what makes the leaky counter idea work as well. This kind of distribution of counts means we can keep a table of counters for just the things that have the highest count, then periodically cull the ones that have the smallest counts. The advantage of the leaky counter over the count-min sketch is that the leaky counter remembers which items were the most popular as well as approximating their count. The count-min sketch tells us the approximate count for any item we ask about and gives good answers for popular items, but it doesn’t remember which items we should ask about.

If we use this trick of culling counters that have small counts, eventually some of them are going to reappear in our input and thus get put into the table. At that point, we won’t remember what their count was when we discarded them, so what we can do is we can reinsert them with the average count based on how long we have been running the algorithm and thus what the counts probably are for things we have forgotten. The cool thing is that we will tend to forget things that are rare so that the counter table can stay small. Things will drop out of it, but when they come back, we know about how much count they should have on average and we know how much error there is based on what the minimum count is in the table; it’s a clever, clever trick. So this abbreviated counter table is effectively a sketch that helps us remember which items are popular and how many times we have seen them.

Streaming k-means

Now, here we move to algorithms that are not very well known. The first of these little known algorithms is streaming k-means. The problem to solve is that k-means clustering requires multiple tries to get a good clustering and each try involves going through the input data several times. The graph above shows what can happen if you don’t do this. What happened there is that the initial conditions were chosen with two points in the upper left cluster which resulted in a poor clustering that can’t be fixed with k-means iterations. Now, to avoid confusion, what I am talking about here is really streaming k-means, not the kind that's in MLlib. MLlib has a streaming k-means assigner; it doesn't do the clustering in a single pass, and it doesn't do the clustering in an online fashion. This algorithm will—it will do what is normally a multi-pass algorithm in exactly one pass. Now the problem in k-means, typically, is that you wind up with clusterings like in the graph above. Because of bad initial conditions, you will split some cluster and some other two clusters will be joined together as one. You can see red and yellow here are splitting that upper-left cluster, and blue is sharing the two clusters in the middle—this is very typical. This is why you restart k-means. K-means is not only multi-pass, but you often have to do restarts and run it again. Even worse, with complex data in many dimensions, and with many clusters, this kind of failure becomes almost inevitable so even with many restarts, you get bad results.

But if we could come up with a sketch (a small representation of the data) that would avoid this sort of problem , then we could do the clustering on the sketch, not on the data. Even better, if we can create the sketch in a single fast pass through the data, we have effectively converted k-means into a single pass algorithm. The graphic above shows the picture of some data (that's the red, green and the blueish dots). Superimposed on that is a k-means clustering of that set of data, except that I've put in way more clusters in the that clustering than I would like to have in the final clustering result. This clustering with too many clusters is the idea behind the streaming k-means sketch. The point is that all of the actual clusters in the original data have several sketch centroids in them, and that means that I will almost always have something in every interesting feature of the data, so I can cluster the sketch instead of the data.

The other interesting thing is that you can do a very slimy job of clustering to get the sketch centroids. You can use an approximate distance, and you can do an approximate search and slam those babies in there. You can also adapt how many you need on the fly. It's really kind of a cool, one-pass algorithm to get the sketch.

The sketch can represent all kinds of amazing distributions if you have enough clusters. This spiral, shown above, can be approximated with 20 clusters as shown above. So any sort of clustering you'd like to do on the original data can be done on the sketch.

Clustering Summary

  • Sketch is just lots of clusters
  • Sketching can be done very approximately in one pass
  • High quality clustering of the sketch is a high quality clustering of the data

That's what's behind the proof, and that's what allows us to take a one-pass through the data with a very, very fast algorithm. That gives me a high quality sketch at the end with many clusters compared to how many I want in the end and that, then, is what I cluster in memory on one machine—piece of cake! Sketching, again, is the key here. Very often, with high dimensional clustering, you also want to hash down to a lower dimension, especially when building the sketch.


The final algorithm I want to talk about is t-digest. The idea here is that I want to compute the quantiles of a vast number of samples. In fact, I may want to keep millions of these quantiles for different kinds of subsets of everything I'm looking at. It’s an OLAP cube sort of arrangement, but for all the distributions. I'm going to get an approximation of the entire one dimensional distribution of my data.


The basic idea here is that we can do an adaptive k-means algorithm in one dimension. The idea is we find the nearest cluster. If that cluster is too full, we'll start a new cluster. If it's not too full, we add the new point to the existing cluster. Now, the cool thing about that is we can choose the meaning of “too full” to give us very, very cool accuracy bounds on our estimate of the quantile. If we choose to have small centroids (small clusters, near Q=0 and Q=1), what we can do is bound the accuracy, so the accuracy is relative to the distance to the nearer end.

If you think about it, the 50th percentile (plus or minus .1 percent), it's still pretty much the medium, but the 99.99th percentile (plus or minus .1 percent) isn't the 99.99th percentile at all. We want a much finer accuracy at the ends with lots of nines and lots of zeros. With 99.99 percent, we probably want .001 or less error. So we want relative accuracy, not absolute accuracy. Small clusters and big clusters let us trade that off and have the smallest distribution in memory.


The way we do that is that we can interpolate in the quantiles space, versus back to the sample space. The X-axis here is the sample space and you can see that's a piece wise linear approximation of the cumulative distribution function. You can see how linear interpolation there follows the actual distribution quite nicely.


Now we might want to turn this a little bit. The way that these sizes are limited in the t-digest is we translate from q (which is now on the horizontal axis) to a centroid number. We do that by this non-linear curve. What that does, because it's steep near the ends, is the centroids are close to together in q space near the ends, so they have to be small. They're far apart in the middle, so they can be large. That means our accuracy is very, very fine at the ends and course in the middle.


As we're inserting new samples in there, we determine for every cluster whether the k-value at the beginning of the cluster, before the cluster and the k-value after (the k2 and k1 in the graph) have a difference of greater than one or not. If the difference is less than one, then the cluster is small enough to have a sample added.


The size bound on this data structure follows directly from the form here— the arcsin function that we use there and the accuracy on real data. These are two kinds of data: uniform and Gamma 0.1-0.1. The Gamma function is so disturbed that the mean is 1, but the median is 10 to the -6th. The .1 percentile is 10 to the -30th. So it's very, very skewed, but you can see the accuracy is virtually the same as in the uniform case. And near the ends, at .001 or 99.9 percent, the accuracy is within a few parts per million. This is with a finite data structure at about 200 nanoseconds per sample.


  • Hashing and Sketching
  • Hyper log log = count distinct
  • Count min = count(s)
  • Streaming k-means
  • Quantiles via t-digest

In summary, hashing and sketching are two important tricks for streaming algorithms. HyperLogLog is one well-known algorithm for count distinct. The count-min sketch is a good one for doing counts, and there’s also the leaky counter for the heavy hitters. Truly streaming k-means is done with sketches and the quantiles via the t-digest algorithm for quantile summaries.


Content Originally posted in MapR Converge Blog post, visit here

August 17, 2015 | BY Ted Dunning

This post will help you get started using Apache Spark Streaming with HBase on the MapR Sandbox. Spark Streaming is an extension of the core Spark API that enables continuous data stream processing.

Editor’s Note: Download our free E-Book Getting Started with Apache Spark: From Inception to Production here.

This post is the fifth in a series; if you are new to Spark, read these first:

What is Spark Streaming?

First of all, what is streaming? A data stream is an unbounded sequence of data arriving continuously. Streaming divides continuously flowing input data into discrete units for processing. Stream processing is low latency processing and analyzing of streaming data. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data. Spark Streaming is for use cases which require a significant amount of data to be quickly processed as soon as it arrives. Example real-time use cases are:

  • Website monitoring , Network monitoring
  • Fraud detection
  • Web clicks
  • Advertising
  • Internet of Things: sensors

Spark Streaming supports data sources such as HDFS directories, TCP sockets, Kafka, Flume, Twitter, etc. Data Streams can be processed with Spark’s core APIS, DataFrames SQL, or machine learning APIs, and can be persisted to a filesystem, HDFS, databases, or any data source offering a Hadoop OutputFormat.

How Spark Streaming Works

Streaming data is continuous and needs to be batched to process. Spark Streaming divides the data stream into batches of X seconds called Dstreams, which internally is a sequence of RDDs. Your Spark Application processes the RDDs using Spark APIs, and the processed results of the RDD operations are returned in batches.

Architecture of the example Streaming Application

The Spark Streaming example code does the following:

  • Reads streaming data.
  • Processes the streaming data.
  • Writes the processed data to an HBase Table.

Other Spark example code does the following:

  • Reads HBase Table data written by the streaming code
  • Calculates daily summary statistics
  • Writes summary statistics to the HBase table Column Family stats

Example data set

The Oil Pump Sensor data comes in as comma separated value (csv) files dropped in a directory. Spark Streaming will monitor the directory and process any files created in that directory. (As stated before, Spark Streaming supports different streaming data sources; for simplicity, this example will use files.) Below is an example of the csv file with some sample data:

We use a Scala case class to define the Sensor schema corresponding to the sensor data csv files, and a parseSensor function to parse the comma separated values into the sensor case class.

<font color="green">// schema for sensor data</font> case class Sensor(resid: String, date: String, time: String, hz: Double, disp: Double, flo: Double,            sedPPM: Double, psi: Double, chlPPM: Double)  object Sensor {    <font color="green">// function to parse line of csv data into Sensor class</font>    def parseSensor(str: String): Sensor = {        val p = str.split(",")         Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble,             p(7).toDouble, p(8).toDouble)   } … }

HBase Table schema

The HBase Table Schema for the streaming data is as follows:

  • Composite row key of the pump name date and time stamp

The Schema for the daily statistics summary rollups is as follows:

  • Composite row key of the pump name and date
  • Column Family stats
  • Columns for min, max, avg.

The function below converts a Sensor object into an HBase Put object, which is used to insert a row into HBase.

val cfDataBytes = Bytes.toBytes("data")  object Sensor { . . .   <font color="green">//  Convert a row of sensor object data to an HBase put object</font>   def convertToPut(sensor: Sensor): (ImmutableBytesWritable, Put) = {       val dateTime = + " " + sensor.time       // create a composite row key: sensorid_date time       val rowkey = sensor.resid + "_" + dateTime       val put = new Put(Bytes.toBytes(rowkey))       // add to column family data, column  data values to put object        put.add(cfDataBytes, Bytes.toBytes("hz"), Bytes.toBytes(sensor.hz))       put.add(cfDataBytes, Bytes.toBytes("disp"), Bytes.toBytes(sensor.disp))       put.add(cfDataBytes, Bytes.toBytes("flo"), Bytes.toBytes(sensor.flo))       put.add(cfDataBytes, Bytes.toBytes("sedPPM"), Bytes.toBytes(sensor.sedPPM))       put.add(cfDataBytes, Bytes.toBytes("psi"), Bytes.toBytes(sensor.psi))       put.add(cfDataBytes, Bytes.toBytes("chlPPM"), Bytes.toBytes(sensor.chlPPM))       return (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put)   } }

Configuration for Writing to an HBase Table

You can use the TableOutputFormat class with Spark to write to an HBase table, similar to how you would write to an HBase table from MapReduce. Below we set up the configuration for writing to HBase using the TableOutputFormat class.

   val tableName = "sensor"     <font color="green">// set up Hadoop HBase configuration using TableOutputFormat</font>     val conf = HBaseConfiguration.create()     conf.set(**TableOutputFormat.OUTPUT_TABLE**, tableName)     val jobConfig: jobConfig = new JobConf(conf, this.getClass)     jobConfig.setOutputFormat(classOf[**TableOutputFormat**])     jobConfig.set(**TableOutputFormat**.OUTPUT_TABLE, tableName)

The Spark Streaming Example Code

These are the basic steps for Spark Streaming code:

  1. Initialize a Spark StreamingContext object.
  2. Apply transformations and output operations to DStreams.
  3. Start receiving data and processing it using streamingContext.start().
  4. Wait for the processing to be stopped using streamingContext.awaitTermination().

We will go through each of these steps with the example application code.

Initializing the StreamingContext

First we create a StreamingContext, the main entry point for streaming functionality, with a 2 second batch interval. (In the code boxes, comments are in Green)

val sparkConf = new SparkConf().setAppName("HBaseStream")  <font color="green">//  create a StreamingContext, the main entry point for all streaming functionality</font> val ssc = new StreamingContext(sparkConf, Seconds(2))

Next, we use the StreamingContext textFileStream(directory) method to create an input stream that monitors a Hadoop-compatible file system for new files and processes any files created in that directory.

<font color="green">// create a DStream that represents streaming data from a directory source</font> val linesDStream = ssc.textFileStream("/user/user01/stream")

The linesDStream represents the stream of data, each record is a line of text. Internally a DStream is a sequence of RDDs, one RDD per batch interval.

Apply transformations and output operations to DStreams

Next we parse the lines of data into Sensor objects, with the map operation on the linesDStream.

<font color="green">// parse each line of data in linesDStream  into sensor objects</font>  val sensorDStream =

The map operation applies the Sensor.parseSensor function on the RDDs in the linesDStream, resulting in RDDs of Sensor objects.

Next we use the DStream foreachRDD method to apply processing to each RDD in this DStream. We filter the sensor objects for low psi to create alerts, then we write the sensor and alert data to HBase by converting them to Put objects, and using the PairRDDFunctions saveAsHadoopDataset method, which outputs the RDD to any Hadoop-supported storage system using a Hadoop Configuration object for that storage system (see Hadoop Configuration for HBase above).

<font color="green">// for each RDD. performs function on each RDD in DStream</font> sensorRDD.foreachRDD { rdd =>         <font color="green">// filter sensor data for low psi</font>      val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)        <font oclor="green">// convert sensor data to put object and write to HBase  Table CF data</font>       <font color="green">// convert alert to put object write to HBase  Table CF alerts</font> }

The sensorRDD objects are converted to put objects then written to HBase.

Start receiving data

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

    <font color="green">// Start the computation</font>     ssc.start()     <font color="green">// Wait for the computation to terminate</font>     ssc.awaitTermination()

Spark Reading from and Writing to HBase

Now we want to read the HBase sensor table data , calculate daily summary statistics and write these statistics to the stats column family.

The code below reads the HBase table sensor table psi column data, calculates statistics on this data using StatCounter, then writes the statistics to the sensor stats column family.

     <font color="green">// configure HBase for reading</font>      val conf = HBaseConfiguration.create()     conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)     <font color="green">// scan data column family psi column</font>     conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi")   <font color="green">// Load an RDD of (row key, row Result) tuples from the table</font>     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],       classOf[],       classOf[org.apache.hadoop.hbase.client.Result])      <font color="green">// transform (row key, row Result) tuples into an RDD of Results</font>     val resultRDD = => tuple._2)      <font color="green">// transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key</font>     val keyValueRDD = resultRDD.               map(result => (Bytes.toString(result.getRow()).               split(" ")(0), Bytes.toDouble(result.value)))      <font color="green">// group by rowkey , get statistics for column value</font>     val keyStatsRDD = keyValueRDD.              groupByKey().              mapValues(list => StatCounter(list))      <font color="green">// convert rowkey, stats to put and write to hbase table stats column family</font> { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

The diagram below shows that the output from newAPIHadoopRDD is an RDD of row key, result pairs. The PairRDDFunctions saveAsHadoopDataset saves the Put objects to HBase.


Running the Application

You can run the code as a standalone application as described in the tutorial on Getting Started with Spark on MapR Sandbox.

Here are the steps summarized:

  1. Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr.
  2. Build the application using maven.
  3. Copy the jar file and data file to your sandbox home directory /user/user01 using scp.
  4. Run the streaming app:

     /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath`     --class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar
  5. Copy the streaming data file to the stream directory:
    cp sensordata.csv /user/user01/stream/

  6. Read data and calculate stats for one column

       /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath`      --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar
  7. Calculate stats for whole row

      /opt/mapr/spark/spark-1.3.1/bin/spark-submit --driver-class-path `hbase classpath`     --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar


This concludes the tutorial on Spark Streaming with HBase . You can find more information here:

References and More Information:

Content Originally posted in MapR Converge Blog post, visit here

September 04, 2015 | BY Carol McDonald


Related Content

Apache Spark