Skip navigation
All Places > The Exchange > Blog
1 2 3 Previous Next

The Exchange

125 posts


During execution, Gateway and Apex generate event log records that provide an audit trail. This can be used to understand the activity of the system and to diagnose problems. Usually, the event log records are stored into a local file system and later can be used for analysis and diagnostic.

Gateway also provides an universal ability to pass and store Gateway and Apex event log records to 3rd party sources. You can use external tools to store the log events and also to query and report. For this you must configure the logger appender in the Gateway configuration files.

Configuring Logger Appenders

Gateway and Apex Client processes run on the machine node where the Gateway instance has been installed. Therefore, you can configure the logger appenders using the regular log4j properties (datatorrent/releases/3.9.0/conf/

Following is an example of log4j properties configuration for Socket Appender:


You can use the regular attribute property “apex.attr.LOGGER_APPENDER” to configure the logger appenders for Apex Application Master and Containers. This can be defined in the configuration file dt-site.xml (global, local, and user) or in the static and runtime application properties.

Use the following syntax to enter the logger appender attribute value:


Following is an example of logger appender attribute configuration for Socket Appender:

  <property>     <name>apex.attr.LOGGER_APPENDER</name>     <value>tcp;,            log4j.appender.tcp.RemoteHost=logstashnode1,            log4j.appender.tcp.Port=5400,            log4j.appender.tcp.ReconnectionDelay=10000,            log4j.appender.tcp.LocationInfo=true     </value>   </property>

Integrating with ElasticSearch and Splunk

You can use different methods to store event log records to an external data source. However, we recommend to use the following method:

Gateway and Apex can be configured to use Socket Appender to send logger events to Logstash and Logstash can deploy event log records to any output data sources. For instance, the following picture shows the integration workflow with ElasticSearch and Splunk.

Following is an example of Logstash configuration:

input {  getting of looger events from Socket Appender   log4j {     mode => "server"     port => 5400     type => "log4j"   } }  Filter{  transformation of looger events to event log records   mutate {     remove_field => [ "@version","path","tags","host","type","logger_name" ]     rename => { "apex.user" => "user" }     rename => { "apex.application" => "application" }     rename => { "apex.containerId" => "containerId" }     rename => { "apex.applicationId" => "applicationId" }     rename => { "apex.node" => "node" }     rename => { "apex.service" => "service" }     rename => { "dt.node" => "node" }     rename => { "dt.service" => "service" }     rename => { "priority" => "level" }     rename => { "timestamp" => "recordTime" }    }    date {     match => [ "recordTime", "UNIX" ]     target => "recordTime"   } }  output {   elasticsearch {  putting of event log records to ElasticSearch cluster   hosts => ["esnode1:9200","esnode2:9200","esnode3:9200"]     index => "apexlogs-%{+YYYY-MM-dd}"     manage_template => false   }    tcp {  putting of event log records to Splunk    host => "splunknode"    mode => "client"    port => 15000    codec => "json_lines"  } }

ElasticSearch users can use Kibana reporting tool for analysis and diagnostic. Splunk users can use Splunkweb.

Links to 3rd party tools:



MapR Persistent Application Client Containers (PACCs) support containerization of existing and new applications by providing containers with persistent data access from anywhere. PACCs are purposely built for connecting to MapR services. They offer secure authentication and connection at the container level, extensible support for the application layer, and can be customized and published in Docker Hub.


Microsoft SQL Server 2017 for Linux offers the flexibility of running MSSQL in a Linux environment. Like all RDBMs, it also needs a robust storage platform to persist in databases, where it is managed and protected securely.


By containerizing MSSQL with MapR PACCs, customers have all the benefits of MSSQL, MapR, and Docker combined. Here, MSSQL offers robust RDBM services that persist data into MapR for disaster recovery and data protection, while leveraging Docker technologies for scalability and agility.


The diagram below shows the architecture for our demonstration:


A MapR Cluster

Before you can deploy the container, you need a MapR cluster for persisting data to. There are multiple ways to deploy a MapR cluster. You can use a sandbox, or you can use MapR Installer for on-premises or cloud deployment. The easiest way to deploy MapR on Azure is through the MapR Azure Marketplace. Once you sign up for Azure, purchase a subscription that has enough quotas, such as CPU cores and storage, and fill out a form to answer some basic questions for the infrastructure and MapR, then off you go at the click of a button. A fully deployed MapR cluster should be at your fingertips within 20 minutes.


A VM with Docker CE/EE Running

Second, you need to spin up a VM in the same VNet or subnet where your MapR cluster is located. Docker CE/EE is required. For information on how to install Docker, follow this link: Docker supports a wide variety of OS platforms. We used CentOS for our demo.

Deploying the MSSQL Container

Once you have the MapR cluster and VM running, you can kick off your container deployment.


Step 1 - Build a Docker Image


Login to your VM as root and run the following command:


curl -L | bash


In a few minutes, you should see a similar message to the one below, indicating a successful build:


Execute the following command to verify the image (mapr-azure/pacc-mssql:latest) is indeed stored in the local Docker repository:

Step 2 – Create a Volume for MSSQL

Before starting up the container, you need to create a volume on the MapR cluster to persist the database into. Login to the MapR cluster as user ‘mapr’ and run the following command to create a volume (e.g., vol1) mounted on path /vol1 in the filesystem:


maprcli volume create –path /vol1 –name vol1


You can get the cluster name by executing this command:


maprcli dashboard info -json | grep name


Step 3 – Start Up the Container

Run the following command to spin up the container with the image we just built in Step 1 above:


# docker run --rm --name pacc-mssql -it \

--cap-add SYS_ADMIN \

--cap-add SYS_RESOURCE \

--device /dev/fuse \

--security-opt apparmor:unconfined \

--memory 0 \

--network=bridge \


-e SA_PASSWORD=m@prr0cks \

-e MAPR_CLUSTER=mapr522 \

-e MSSQL_BASE_DIR=/mapr/mapr522/vol1 \


-e MAPR_MOUNT_PATH=/mapr \

-e MAPR_TZ=Etc/UTC \





-p 1433:1433 \



Note you can replace –it with –d in the first line to place the startup process running in the background.

You can customize the environment variables, colored in red above, to fit your environment. The variable SA_PASSWORD is for the MSSQL admin user. MAPR_CLUSTER is the cluster name. MSSQL_BASE_DIR is the path to MapR-XD, where MSSQL will be persisting its data. The path usually takes the form of /mapr/<cluster name>/<volume name>. MAPR_CLDB_HOSTS is the IP address of the cldb hosts in the MapR cluster. In our case, we only have a single node cluster, so only one IP is used. Finally, the default MSSQL port is 1433. You can use the –p option in Docker to expose it to a port of your choice on the VM host. We selected the same port 1433 in the demo.


There are other environment variables you can pass into MapR PACC. For more information, please refer to this link:


In a few minutes, you should see a message like the one below that indicates the MSSQL server is ready:


2017-11-16 22:54:30.49 spid19s     SQL Server is now ready for client connections. This is an informational message; no user action is required.

Step 4 – Create a Table in MSSQL, and Insert Some Data

Now you are ready to insert some sample data into a test MSSQL database. To do so, find the container ID of the running MSSQL container by issuing this command:

Then use the docker exec command to login to the container:

Then, issue the command below to get into a MSSQL prompt by providing the admin password when you started the container, as in step 3 above:

Issue the following MSSQL statements to populate an inventory table in a test database, then query the table:

Success! This means the database has been persisted into the MapR volume and is now managed and protected by MapR-XD storage. You can verify by issuing the "ls" command in the container: the MSSQL log, secret, and data directories show up in vol1:

Step 5 – Destroy Current Container, Relaunch a New Container, and Access the Existing Table


Now let’s destroy the current container to simulate a server outage by issuing this command:


# docker rm –f c2e69e75b181


Repeat step 3 above to launch a new container. Login to the container and query the same inventory table right away, when the new container is up and running:

With a huge sense of relief, you see the data previously entered is still there, thanks to MapR!


Step 6 – Scale It Up and Beyond


With the container technology know-how in place, it is extremely easy to spin up multiple containers all at once. Simply repeat steps 2 and 3 to assign each MSSQL container a new volume in MapR, and off you go.


In this blog, we demonstrated how to containerize MSSQL with MapR PACC and persist its database into MapR for data protection and disaster recovery. MapR PACCs are a great way for many other applications that require a scalable and robust storage layer to have their data managed and distributed for DR and scalability. The MapR PACCs can also be managed for deployment at scale with an orchestrator, like Kubernetes, Mesos, or Docker, to achieve true scalability and high availability.

To learn how to create HDInsight Spark Cluster in Microsoft Azure Portal please refer to part one of my artcile. After creation of spark cluster named, I have highlighted the URL of my Cluster.

Microsoft Azure


Microsoft Azure


A total of 4 nodes are created -- 2 Head Nodes and 2 Name Nodes -- for a total of 16 cores and an available total of a 60 cluster capacity; out of it 16 are used and 44 clusters remain for scaling up. You can also click and visit Cluster Dashboard, Ambari View and also you can scale the size of clusters.

Apache Ambari is for management and monitoring of Hadoop clusters in the form of WEB UI and REST services. Ambari is used to monitor the clusters and make changes in configuration. Apache  Ambari is used for provision, monitoring and managing the clusters in an easier way. Using Ambari you can manage central security setup and fully visibility into cluster health. Ambari Dashboard looks like below,


Microsoft Azure


Using Ambari Dashboard you can manage and configure services, hosts, alerts for critical conditions etc. Also many services are integrated using Ambari WEB UI. Below is Hive Query Editor through Ambari,


Microsoft Azure


You can write, run and process the Hive Query in Ambari WEB UI you can convert that result in to charts etc you can save queries manage history of queries etc.


Microsoft Azure


Above snapshot is a list of services available in Ambari and below is HDInsight SuketuSpark clients list.


Microsoft Azure


In the new browser you can type or you can directly click on Jupyter Logo in azure portal to open Jupyter notebook. The Jupyter Notebook is a web application that allows you to create and share documents that contain live code, equations, visualizations and explanatory text. Uses include: data cleaning and transformation, numerical simulation, statistical modeling, machine learning and much more. Jupyter and zeppelin are two notepads integrated with hdinsight.


Microsoft Azure

You can use Jupyter notebook to run Spark SQL queries against the Spark cluster. HDInsight Spark clusters provide two kernels that you can use with the Jupyter notebook.

  • PySpark (for applications written in Python)
  • Spark (for applications written in Scala)

PySpark is the python binding for the Spark Platform and API and is not much different from the Java/Scala versions. Learning Scala is a better choice than python as Scala being a functional langauge makes it easier to paralellize code, which is a great feature if working with Big data.

Like Java, Scala is object-oriented, and uses a curly-brace syntax reminiscent of the C programming language. Unlike Java, Scala has many features of functional programming languages like Scheme, Standard ML and Haskell, including currying, type inference, immutability, lazy evaluation, and pattern matching.

When you type or when you click on zeppelin icon in azure portal than zeppelin notepad will be open in new browser tab. Below is a snapshot of that.


Microsoft Azure


A Zeppelin is web-based notebook that enables interactive data analytics. You can make beautiful data-driven, interactive and collaborative documents with SQL, Scala and more.

Flexible searching and indexing for web applications and sites is almost always useful and sometimes absolutely essential. While there are many complex solutions that manage data and allow you to retrieve and interact with it through HTTP methods, ElasticSearch has gained popularity due to its easy configuration and incredible malleability.

Elasticsearch is an open-source search engine built on top of Apache Lucene, a full-text search-engine library.


Basic Crud
CRUD stands for create, read, update, and delete. These are all operations that are needed to effectively administer persistent data storage. Luckily, these also have logical equivalents in HTTP methods, which makes it easy to interact using standard methods. The CRUD methods are implemented by the HTTP methods POST, GET, PUT, and DELETE respectively.

In order to use ElasticSearch for anything useful, such as searching, the first step is to populate an index with some data. This process known as indexing.


Documents are indexed—stored and made searchable—by using the index API.

In ElasticSearch, indexing corresponds to both “Create” and “Update” in CRUD – if we index a document with a given type and ID that doesn’t already exists it’s inserted. If a document with the same type and ID already exists, it’s overwritten.


From our perspectives as users of ElasticSearch, a document is a JSON object. As such a document can can have fields in the form of JSON properties. Such properties can be values such as strings or numbers, but they can also be other JSON objects.


In order to create a document, we make a PUT request to the REST API to a URL made up of the index name, type name and ID. That is: http://localhost:9200///[] and include a JSON object as the PUT data.

Index and type are required while the id part is optional. If we don’t specify an ID ElasticSearch will generate one for us. However, if we don’t specify an id we should use POST instead of PUT. The index name is arbitrary. If there isn’t an index with that name on the server already one will be created using default configuration.


As for the type name it too is arbitrary. It serves several purposes, including:

Each type has its own ID space.
Different types can have different mappings (“schema” that defines how properties/fields should be indexed).
Although it’s possible, and common, to search over multiple types, it’s easy to search only for one or more specific type(s).



Let’s index something! We can put just about anything into our index as long as it can be represented as a single JSON object. For the sake of having something to work with we’ll be indexing, and later searching for, movies. Here’s a classic one:

Sample JSON object


To index the above JSON object we decide on an index name (“movies”), a type name (“movie”) and an ID (“1”) and make a request following the pattern described above with the JSON object in the body.

A request that indexes the sample JSON object as a document of type ‘movie’ in an index named ‘movies’

Execute the above request using cURL or paste it into sense and hit the green arrow to run it. After doing so, given that ElasticSearch is running, you should see a response looking like this:

Response from ElasticSearch to the indexing request.



The request for, and result of, indexing the movie in Sense.

As you see, the response from ElasticSearch is also a JSON object. It’s properties describe the result of the operation. The first three properties simply echo the information that we specified in the URL that we made the request to. While this can be convenient in some cases it may seem redundant. However, remember that the ID part of the URL is optional and if we don’t specify an ID the _id property will be generated for us and its value may then be of great interest to us.


Related Article: Sort the Results Using a Sort Property


The fourth property, _version, tells us that this is the first version of this document (the document with type “movie” with ID “1”) in the index. This is also confirmed by the fifth property, “created”, whose value is true.

Now that we’ve got a movie in our index let’s look at how we can update it, adding a list of genres to it. In order to do that we simply index it again using the same ID. In other words, we make the exact same indexing request as as before but with an extended JSON object containing genres.

Indexing request with the same URL as before but with an updated JSON payload.


This time the response from ElasticSearch looks like this:

The response after performing the updated indexing request.



Not surprisingly the first three properties are the same as before. However, the _version property now reflects that the document has been updated as it now has 2 a version number. The created property is also different, now having the value false. This tells us that the document already existed and therefore wasn’t created from scratch.


It may seem that the created property is redundant. Wouldn’t it be enough to inspect the _-
version property to see if its value is greater than one? In many cases that would work. However,
if we were to delete the document the version number wouldn’t be reset meaning that if we later
indexed a document with the same ID the version number would be greater than one.


So, what’s the purpose of the _version property then? While it can be used to track how many
times a document has been modified it’s primary purpose is to allow for optimistic concurrency


If we supply a version in indexing requests ElasticSearch will then only overwrite the document
if the supplied version is the same as for the document in the index. To try this out add a version
query string parameter to the URL of the request with “1” as value, making it look like this:

Indexing request with a ‘version’ query string parameter.


Now the response from ElasticSearch is different. This time it contains an error property with a message explaining that the indexing didn’t happen due to a version conflict.


Response from ElasticSearch indicating a version conflict.


Getting by ID
We’ve seen how to indexing documents, both new ones and existing ones, and have looked at how ElasticSearch responds to such requests. However, we haven’t actually confirmed that the documents exists, only that ES tells us so.

So, how do we retrieve a document from an ElasticSearch index? Of course we could search for it. However that’s overkill if we only want to retrieve a single document with a known ID. A simpler and faster approach is be to retrieve it by ID.


In order to do that we make a GET request to the same URL as when we indexed it, only this time the ID part of the URL is mandatory. In other words, in order to retrieve a document by ID from ElasticSearch we make a GET request to HTTP://LOCALHOST:9200///. Let’s try it with our movie using the following request:



As you can see the result object contains similar meta data as we saw when indexing, such as index, type and version. Last but not least it has a property named _source which contains the actual document body. There’s not much more to say about GET as it’s pretty straightforward. Let’s move on to the final CRUD operation.


Deleting documents
In order to remove a single document from the index by ID we again use the same URL as for indexing and retrieving it, only this time we change the HTTP verb to DELETE.


Request for deleting the movie with ID 1.


curl -XDELETE “http://localhost:9200/movies/movie/1“


The response object contains some of the usual suspects in terms of meta data, along with a property named “_found” indicating that the document was indeed found and that the operation was successful.

Response to the DELETE request.

"found": true,
"_index": "movies",
"_type": "movie",
"_id": "1",
"_version": 3
If we, after executing the DELETE request, switch back to GET we can verify that the document has indeed been deleted:

Response when making the the DELETE request a second time.

"_index": "movies",
"_type": "movie",
"_id": "1",
"found": false


Simple Spark Tips #1

Posted by MichaelSegel Oct 19, 2017

Many developers are switching over to using Spark and Spark.SQL as a way to ingest and use data.  As an example, you could be asked to take a .csv file and convert it in to a parquet file or even a Hive or MapR-DB table.


With spark, its very easy to do this... you just load the file in to a DataFrame/DataSet and then write the file out as a parquet file and you're done. The code to create the DataFrame:

val used_car_databaseDF =
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")

where the used_cars_databaseURL is a String of the path to the file that I had created earlier in my code.


But suppose you want to work with the data as a SQL table? Spark allows you to create temporary tables/views of the data and rename the DataFrame (DF) into a table 'alias'. 


Here I've created a table/view used_cars where I can now use this in a Spark.sql command.

spark.sql("SELECT COUNT(*)  FROM used_cars ").show();

Obviously this is just a simple example just to show that you can run a query and see its output.

If you're working with a lot of different tables, its easy to lose tract of the tables/views that you've created.


But spark does have a couple of commands which will allow you to view the list of tables that you have already set up for use.  The spark.catalog .  Below is some sample code I pulled from my notebook where I have been experimenting with using Spark and MapR

import org.apache.spark.sql.catalog
import org.apache.spark.sql.SparkSession

//Note: We need to look at listing columns for each table...
spark.catalog.listColumns("creditcard").show // Another test table

// Now lets try to run thru catalog
println("Testing walking thru the table catalog...")
val tableDS = spark.catalog.listTables
println("There are "+ tableDS.count + " rows in the catalog...")

tableDS.printSchema // Prints the structure of the objects in the dataSet

    e => println( }

// Now trying a different way...

    e =>
    val n =
    println("Table Name: "+n)
    spark.catalog.listColumns(n).collect.foreach{ e => println("\t""\t"+e.dataType) }

Note: While I am not sure if I needed to pull in org.apache.spark.sql.SparkSession , it doesn't seem to hurt.


In the sample code, I use the method show() which formats the output and displays it.  However, show() is limited to only the first 20 rows of output, regardless of the source. This can be problematic, especially if you have more than 20 temp tables in your session, or that there are more than 20 columns when we inspect a table.

For more information on the spark catalog, please see: 

listTables() is a method that returns a list of Table objects that describe the temporary view that I have created.


As I said earlier, while show() will give you a nice formatted output, it limits you to the first 20 rows of output.

So lets instead printout our own list of tables.  I used the printSchema() method to identify elements of the Table object. The output looks like this:

|-- name: string (nullable = true)
|-- database: string (nullable = true)
|-- description: string (nullable = true)
|-- tableType: string (nullable = true)
|-- isTemporary: boolean (nullable = false)

For my first example, I'm walking through the list of tables and printing out the name of the table.


In my second example, foreach table I want to print out the table's schema. (In this example, only the column name and its data type.  This works well when you have more than 20 tables and you have more than 20 columns in a table.


If we put this all together, we can load a file, apply filters, and then store the data.  Without knowing the schema, its still possible to determine the data set's schema and then use that information to build out a schema to dynamically create a hive table or to put the data in to a MapR-DB table.


Securing Zeppelin

Posted by MichaelSegel Oct 5, 2017

Zeppelin is an Apache open source Notebook that supports multiple interpreters and seems to be one of the favorites for working with spark.


Zeppelin is capable of running on a wide variety of platforms so that it’s possible to run tests and perform code development away from working on a cluster or on a cluster.


As always, in today’s world, it is no longer paranoia to think about securing your environment.   This article focuses on the setup to secure Zeppelin and is meant as a supplement focusing on adding security.


Zeppelin itself is easy to install. You can ‘wget’ or download the pre-built binaries from the Apache Zeppelin site, follow the instructions and start using it right away.


The benefits are that you can build a small test environment on your laptop without having to work on a cluster. This reduces the impact on a shared environment.


However, its important to understand how to also set up and secure your environment.


Locking down the environment


Running Local

Zeppelin can run local on your desktop platform. In most instances your desktop doesn’t have a static IP address.   In securing your environment, you will want to force Zeppelin to only be available to the localhost/ environment.


In order to set this up, two entries in the $ZEPPELIN_HOME/conf/zeppelin-site.xml file have to be set.





<description>Server address</description>






<description>Server port.</description>



Setting the zeppelin.server.addr to only listen on the localhost address will mean that no one from the outside of the desktop will be able to access the Zeppelin service.


Setting the zeppelin.server.port to a value other than the 8080 the default is done because that port is the default for many services. By going to a different and unique port you can keep this consistent between the instances on your desktop and on a server. While this isn’t necessary, it does make life easier.


Beyond those two settings, there isn’t much else that you need to change.

Notice that there are properties that allow you to set up SSL tickets. While the documentation contains directions on how to set up SSL directly, there is a bug where trying to run with pkcs12 certificates causes an error. In trying to follow up, no resolution could be found. The recommendation is to use a proxy server nginx for managing the secure connection. (More on this later.)


Since the only interface is on the interface, SSL really isn’t required.


The next issue is that by default, there is no user authentication. Zeppelin provides this through Shiro. From the Apache Shiro website:

Apache Shiro™ is a powerful and easy-to-use Java security framework that performs authentication, authorization, cryptography, and session management. With Shiro’s easy-to-understand API, you can quickly and easily secure any application – from the smallest mobile applications to the largest web and enterprise applications.


While it may be ok to run local code as an anonymous user, its also possible for Zeppelin to run locally, yet access a cluster that is maintained remotely which may not accept anonymous users.


In order to setup shiro, just copy the shiro.ini.template to shiro.ini.

Since my SOHO environment is rather small and limited to a handful of people, I am not yet running LDAP. (Maybe one day … ) So I practice the K.I.S.S principle. The only thing need from shiro is to set up local users.


# List of users with their password allowed to access Zeppelin.

# To use a different strategy (LDAP / Database / ...) check the shiro doc at

#admin = password1, admin

#user1 = password2, role1, role2

#user2 = password3, role3

#user3 = password4, role2

If you find the [users] section, you’ll notice that the list of admin and userX entries are not commented out. Any entry in the form of <user> = <password>, <role> [ , <role2>, <role3> … ]   will be active. So you really don’t want to have an entry here that will give someone access.


So you will need to create an entry for yourself and other users.


Note: There are other entries below this section. One section details where you can use LDAP for authenticating users. If you are running LDAP, it would be a good idea to set this up to use LDAP.


Once you’ve made and saved the changes, that’s pretty much it. You can start/restart the service and you will authenticate against the entry in shiro.


Note the following:

While attempting to follow the SSL setup, I was directed to a stack overflow conversation on how to accomplish this. IMHO, it’s a major red flag when the documentation references a stack overflow article on how to setup and configure anything.

Running on a Server

Suppose you want to run Zeppelin on your cluster? Zeppelin then becomes a shared resource. You can set Zeppelin to run spark contexts per user or per notebook instead of one instance for the entire service.

The larger issue is that you will need to use an external interface to gain access, and even if you’re behind a firewall, you will need to have SSL turned on. Because I want to be able to run notes from outside of my network, I have to have SSL in place.   As I alluded to earlier, the ability to configure SSLs from within Zeppelin wasn’t working and the only guidance was to instead set up a proxy using nginx. (This came from two reliable sources)


With nginx, you should use the same configuration that we have already set up. Zeppelin will only listen on the local host and rely on the proxy server to handle external connections. Since my linux server is sitting next to me, I have a monitor set up so I can easily test connections to the local host, ensuring that my zeppelin instance is up and running. I followed the same steps that I used to set up my desktop and it ran without a hitch.


Unlike the instructions for trying to set up SSL directly, the information found on the Zeppelin site was very helpful. You can find a link to it here:


There are various ways of obtaining nginx, since I run Centos, I could have pulled down a version via yum, and of course if you run a different version of Linux, you can use their similar tool. Of course downloading it from the official site will get you the latest stable release.

While the documentation for setting up nginx w Zeppelin is better, there are still gaps… yet its still pretty straight forward.


Nginx installs in /etc/nginx directory. Under this directory, all of the configuration files are located in the ./conf.d directory.   I should have taken better notes, but going from memory, there was one file… the default configuration file. I suggest that you ignore the file and copy it in to another file name. I chose default.conf.ini . Based on the documentation, I was under the impression that nginx will look at all *.conf files for various setup data.


I then created a zeppelin.conf file and cut and pasted the section from the zeppelin documents.


upstream zeppelin {

   server [YOUR-ZEPPELIN-SERVER-IP]:[YOUR-ZEPPELIN-SERVER-PORT];   # For security, It is highly recommended to make this address/port as non-public accessible



# Zeppelin Website

server {


   listen 443 ssl;                                     # optional, to serve HTTPS connection

   server_name [YOUR-ZEPPELIN-SERVER-HOST];             # for example:


   ssl_certificate [PATH-TO-YOUR-CERT-FILE];           # optional, to serve HTTPS connection

   ssl_certificate_key [PATH-TO-YOUR-CERT-KEY-FILE];   # optional, to serve HTTPS connection


   if ($ssl_protocol = "") {

       rewrite ^ https://$host$request_uri? permanent; # optional, to force use of HTTPS



   location / {   # For regular websever support

       proxy_pass http://zeppelin;

       proxy_set_header X-Real-IP $remote_addr;

       proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

       proxy_set_header Host $http_host;

       proxy_set_header X-NginX-Proxy true;

       proxy_redirect off;

       auth_basic "Restricted";

       auth_basic_user_file /etc/nginx/.htpasswd;



   location /ws { # For websocket support

       proxy_pass http://zeppelin/ws;

       proxy_http_version 1.1;

       proxy_set_header Upgrade websocket;

       proxy_set_header Connection upgrade;

       proxy_read_timeout 86400;



As you can see, the configuration is pretty straight forward. Note the comment in the upstream zeppelin section. This is why using the loopback / localhost interface is a good idea.


Since the goal of the use of nginx is to create a secure (SSL) interface to Zeppelin, we need to create a public/private key pair. A simple use of Google will turn up a lots of options. Note: If you don’t have OpenSSL already installed on your server, you should set it up ASAP.   Using OpenSSL, the following command works:


openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365


Note that this will create a 4096 bit key. That’s a bit of an overkill for this implementation however, the minimum key length for SSL connection these days is 2048 so its not really too long.


Note: If you use this command as is, you will be required to provide a simple password which is used to encrypt the key. The downside to that is that each time you want to start/stop the web service, you will be required to manually enter the passphrase. Using the –nodes option will remove this requirement, however the key is visible. You can change the permissions on the key file to control access.


For ngenix, I created the key pair in the ./conf.d directory and set their paths in the zeppelin.conf file.


After the edits, if you start the service, you’re up and running.

Well almost….


Further Tweaking


If you try to use the service, nginx asks your for a user name and password.

   location / {   # For regular websever support

       proxy_pass http://zeppelin;

       proxy_set_header X-Real-IP $remote_addr;

       proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

       proxy_set_header Host $http_host;

       proxy_set_header X-NginX-Proxy true;

       proxy_redirect off;

       auth_basic "Restricted";

       auth_basic_user_file /etc/nginx/.htpasswd;


In this section, the auth_basic setting is set to “Restricted” indicating that a password check has to be performed.

The setting auth_basic_user_file is set to the path of the password file.


The instructions on how to set this up are also found within Zeppelin’s setup page.   While this is secure… you enter a password at the proxy before being able to use the proxy to the protected web service, does this make sense? You need a password to access a website that again asks you to log in before you can use it? Our goal in using nginx was to set up SSL so that any traffic between the client and the server is encrypted and not out in the open.   For our use case, it makes more sense that if you connect to the service that you want it to establish an SSL socket and then take you to your zeppelin service where you could then authenticate.


The simple fix is to set auth_basic to off.   This allows you to still authenticate the user without having to log in twice and your notebooks do not run as ‘anonymous’.


In Summary


Running Zeppelin out of the box with no security is not a good idea. This article helps to demonstrate some simple tweaks that help lock down your environment so that you can run Zeppelin on a desktop or connect to a service running on the edge of your cluster.

I am sure that there are other things that one could do to further lock down Zeppelin or tie in to your network. At a minimum you will want to authenticate your users via shiro as well as offer a SSL connection.


With Zeppelin up and running, now the real fun can begin.

Multi-cloud environments can be an effective way to hedge risk and enable future flexibility for applications. With a secure VPN in between two or more sites, you can leverage the global namespace and high availability features in MapR (like mirroring and replication) to drive business continuity across a wide variety of use cases.


In this tutorial, I will walk through the steps of how to connect Microsoft Azure and Amazon AWS cloud using an IPsec VPN, which will enable secure IP connectivity between the two clouds and serve as a Layer-3 connection you can then use to connect two or more MapR clusters.


Multi-Cloud Architecture with Site-to-Site VPN


Let's first take a look at the end result for which we're aiming.



On the left side of the below figure is an Azure setup with a single Resource Group (MapR_Azure). We'll set this up in the 'US West' zone. On the right side is an Amazon EC2 network in a VPC, which we will deploy in the Northern Virginia zone. This is an example of using geographically disperse regions to lessen the risk of disruptions to operations.  After the VPN is completed we can use MapR replication to ensure that data lives in the right place and applications can read/write to both sites seamlessly.


This "site-to-site" VPN will be encrypted and run over the open Internet, with a separate IP subnet in each cloud, using the VPN gateways to route traffic through a tunnel.  Note that this is a "Layer 3" VPN, in that traffic is routed between two subnets using IP forwarding.  It's also possible to do this at Layer 2, bridging Ethernet frames between the two networks, but I'll leave that for another post (or an exercise for the curious reader. )


Setting up the Azure Side


First, prepare the Resource Group; in our example, we called the group 'MapR_Azure.'

Select it, and find the 'Virtual Network' resource by selecting the '+' icon, then type 'virtual network' in the search box.




Select 'Resource Manager' as the deployment model and press 'Create.' We use the name 'clusternet' for the network.


We will create two subnets, one on each cloud side. On Azure, we create the address range of and a single subnet of within the range. We'll make the Azure and EC2 address prefixes 10.11 and 10.10, respectively, to make for easy identification and troubleshooting.



Create a public IP for the VPN connections. Select '+' to add a resource, then type 'public IP address' in the search box. Press 'Create,' then set up the IP as follows. Select 'Use existing' for the Resource Group, and keep 'Dynamic' for the IP address assignment. The name is 'AzureGatewayIP.'



Take note of this address; we will use it later.


Next, create a 'Virtual Network Gateway' the same way. This entity will serve as the VPN software endpoint.



Reference the public IP you created in the previous step (in our case, 'AzureGatewayIP').


Note the concept of a 'GatewaySubnet' here. This is a subnet that is to be used exclusively by the Azure VPN software. It must be within the configured address range, and you can't connect any other machines to it. Microsoft says "some configurations require more IP addresses to be allocated to the gateway services than do others."  It sounds a little mysterious, but allocating a /24 network seems to work fine for most scenarios.


Select 'clusternet' as the virtual network (what you created in the earlier step), use a Gateway subnet of, and use the 'AzureGatewayIP' address. This will create a new subnet entry called 'GatewaySubnet' for the network.


For testing purposes, select the VpnGw1 SKU. This allows up to 650 Mbps of network throughput, which is more than enough to connect a couple of small clusters, but you can go up to 1.25 Gbps with the VpnGw3 SKU.


This may take up to 45 minutes (according to Microsoft) but it usually completes in a few minutes.


Setting up the AWS Side


We need to pause here to set up a few things on AWS. First, create a VPC in the AWS Console VPC Dashboard. Here we set the IPv4 address range as



Navigate to 'Subnets,' and create a subnet in the VPC. Here we use



Next, create an Internet gateway to connect our VPC to the internet.  This step is important (and easily overlooked), otherwise traffic cannot be routed in between the Elastic IP and the subnet we just created.




Select 'Attach to VPC,' and use the new VPC 'clusterVPC'.




Go back to the EC2 Dashboard and select 'Launch Instance.' We will create an Amazon Linux instance to maintain the VPN. Select the 'Amazon Linux' AMI, and configure the instance details as follows:





Be sure to select the 'clusterVPC' we just created and 'clustersubnet' for the subnet. Select 'Disable' for 'Auto-assign Public IP' because we want to use an Elastic IP that we will associate later.


Under the last step, select 'Edit Security Groups,' and then select 'Create a new security group.' Open the group to all traffic coming from the AzureGatewayIP we configured previously (in this case, Also (optionally), add any rules that you need to connect to the instance via ssh.



Click on 'Launch,' and optionally create a new key pair or use an existing one for the instance.


While the instance launches, let's create an Elastic IP for the VPN endpoint. In the 'Network & Security' menu of the AWS Console, select 'Elastic IPs,' and then allocate an address.



Note this address (here, for later.


Associate the address with the instance we just created.



Finalizing the connection on the Azure side


Let's return to the Azure setup, and use the information from AWS to complete the connection. Add a Local Network Gateway.



The term 'local' is a bit of a misnomer here because the other network is not a local one; it's another cloud network.  Microsoft uses the term 'local' to refer to an on-premise network that you might want to connect to Azure.  For 'IP address,' use the Elastic IP you created in the previous section. For 'Address space,' use the range (the AWS subnet).


Next, add a Connection. Select 'Site-to-site' VPN, and fill in the remaining details for your Resource Group.



Select the AzureGateway we configured as well as the Local Network Gateway (AWSVPN). Enter a key that will be used for the session.



Now is a good time to launch an instance for testing. Type 'Ubuntu Server' into the search box, and select Ubuntu Server 14.04 LTS. Configure the instance details, size, and settings.



Under the last Settings window, configure the virtual network, subnet, and 'None' for a public IP address (we don't need one because the VPN will handle outbound/inbound connectivity). Select a new or existing network security group.



Finalizing the connection on the AWS side


It's a good time to make sure the subnet you created has a default route to the internet gateway. From the AWS Console, navigate to 'Route Tables' and find the subnet associated with your VPC, select the subnet and the 'Routes' tab, and add a default route:



Returning to AWS, ssh into the instance we just created and download/install strongswan along with some dependency packages.


sudo yum install gcc gmp-devel


bzip2 -d strongswan-5.6.0.tar.bz2

tar xvf strongswan-5.6.0.tar

cd strongswan-5.6.0

./configure && make && sudo make install


This should install strongswan in /usr/local, where we will edit the configuration files.


Edit the file /usr/local/etc/ipsec.conf and add the following entry:

conn azure


The 'left' and 'leftsubnet' options refer to the Amazon (local) side. Use the local private IP address and subnet. For the right side, use the AzureGatewayIP we configured ( and the 'clusternet' subnet.


Finally, edit the file /usr/local/etc/ipsec.secrets, and add your shared secret key. : PSK "testing123"


Start the VPN with:


sudo sudo /usr/local/sbin/ipsec start


You can run sudo tail -f /var/log/messages to check the status of the connection.


You should now be able to ping the Azure machine by running ping (or the address of the single interface of that machine). You can check it on the Azure side by viewing the Connection:



If you see 'Connected' as above, congratulations: you have a working two-cloud environment!


Additional Notes


Here are a few other concerns to watch when embarking on a multi-cloud adventure.


Ingress and Egress Data Transfers


Inter-site bandwidth is something to consider in your plan. At the time of this writing, most use cases of data transfer in to EC2 are free, with some exceptions. Data transfer out is free to most other Amazon services, like S3 and Glacier, and also free up to 1GB/month to other internet-connected sites, but costs a small amount per GB after that.


Data transfer in Azure is similar: all inbound data transfers are free, and there is a schedule of costs for outgoing transfers to the internet and other Azure zones.


Bandwidth and Latency


Amazon has a page on how to check bandwidth. Doing some quick tests with iperf between the two sites, here are some typical results:


Accepted connection from, port 35688
[ 5] local port 5201 connected to port 35690
[ ID] Interval Transfer Bitrate
[ 5] 0.00-1.00 sec 36.7 MBytes 308 Mbits/sec
[ 5] 1.00-2.00 sec 39.8 MBytes 334 Mbits/sec
[ 5] 2.00-3.00 sec 42.1 MBytes 353 Mbits/sec
[ 5] 3.00-4.00 sec 39.7 MBytes 333 Mbits/sec
[ 5] 4.00-5.00 sec 30.5 MBytes 256 Mbits/sec
[ 5] 5.00-6.00 sec 30.0 MBytes 252 Mbits/sec
[ 5] 6.00-7.00 sec 30.9 MBytes 259 Mbits/sec
[ 5] 7.00-8.00 sec 36.7 MBytes 308 Mbits/sec
[ 5] 8.00-9.00 sec 41.5 MBytes 348 Mbits/sec
[ 5] 9.00-10.00 sec 37.0 MBytes 311 Mbits/sec
[ 5] 10.00-10.03 sec 977 KBytes 245 Mbits/sec
- - - - - - - - - - - - - - - - - - - - - - - - -
- [ ID] Interval Transfer Bitrate
- [ 5] 0.00-10.03 sec 366 MBytes 306 Mbits/sec receiver


That's some pretty hefty bandwidth (306 Mbits/sec) between the two sites.


Ready to setup a MapR cluster?  


By Ronald van Loon


Editor's Note: This post was originally published on LinkedIn on June 12, 2017.

The world is long past the Industrial Revolution, and now we are experiencing an era of Digital Revolution. Machine Learning, Artificial Intelligence, and Big Data Analysis are the reality of today’s world.

I recently had a chance to talk to Ciaran Dynes, Senior Vice President of Products at Talend and Justin Mullen, Managing Director at Datalytyx. Talend is a software integration vendor that provides Big Data solutions to enterprises, and Datalytyx is a leading provider of big data engineering, data analytics, and cloud solutions, enabling faster, more effective, and more profitable decision-making throughout an enterprise.

The Evolution of Big Data Operations

To understand more about the evolution of big data operations, I asked Justin Mullen about the challenges his company faced five years ago and why they were looking for modern integration platforms. He responded with, “We faced similar challenges to what our customers were facing. Before Big Data analytics, it was what I call ‘Difficult Data analytics.’ There was a lot of manual aggregation and crunching of data from largely on premise systems. And then the biggest challenge that we probably faced was centralizing and trusting the data before applying the different analytical algorithms available to analyze the raw data and visualize the results in meaningful ways for the business to understand.”


He further added that, “Our clients not only wanted this analysis once, but they wanted continuous refreshes of updates on KPI performance across months and years. With manual data engineering practices, it was very difficult for us to meet the requirements of our clients, and that is when we decided we needed a robust and trustworthy data management platform that solves these challenges.”

The Automation and Data Science

Most of the economists and social scientists are concerned about the automation that is taking over the manufacturing and commercial processes. If the digitalization and automation continues to grow at the same pace it is currently happening, there is a high probability of machines partly replacing humans in the workforce. We are seeing some examples of the phenomena in our world today, but it is predicted to be far more prominent in the future.

However, Dynes says, “Data scientists are providing solutions to intricate and complex problems confronted by various sectors today. They are utilizing useful information from data analysis to understand and fix things. Data science is an input and the output is yielded in the form of automation. Machines automate, but humans provide the necessary input to get the desired output.”

This creates a balance in the demand for human and machine services. Both, automation and data science go parallel. One process is incomplete without the other. Raw data is worth nothing if it cannot be manipulated to produce meaningful results and similarly, machine learning cannot happen without sufficient and relevant data.

Start Incorporating Big Data and Machine Learning Solutions into Business Models

Dynes says, “Enterprises are realizing the importance of data, and are incorporating Big Data and Machine Learning solutions into their business models.” He further adds that, “We see automation happening all around us. It is evident in the ecommerce and manufacturing sectors, and has vast applications in the mobile banking and finance.”

When I asked him about his opinion regarding the transformation in the demand of machine learning processes and platforms, he added that, “The demand has always been there. Data analysis was equally useful five years ago as it is now. The only difference is that five years ago there was entrepreneurial monopoly and data was stored secretively. Whoever had the data, had the power, and there were only a few prominent market players who had the access to data.”

Justin has worked with different companies. Some of his most prominent clients were Calor Gas, Jaeger and Wejo. When talking about the challenges those companies faced before implementing advanced analytics or machine learning he said, “The biggest challenges most of my clients face was the accumulation of the essential data at one place so that the complex algorithms can be run simultaneously but the results can be viewed in one place for better analysis. The data plumbing and data pipelines were critical to enable data insights to become continuous rather than one-off.”

The Reasons for Rapid Digitalization

Dynes says, “We are experiencing rapid digitalization because of two major reasons. The technology has evolved at an exponential rate in the last couple of years and secondly, organization culture has evolved massively.” He adds, “With the advent of open source technologies and cloud platforms, data is now more accessible. More people have now access to information, and they are using this information to their benefits.”

In addition to the advancements and developments in the technology, “the new generation entering the workforce is also tech dependent. They rely heavily on the technology for their everyday mundane tasks. They are more open to transparent communication. Therefore, it is easier to gather data from this generation, because they are ready to talk about their opinions and preferences. They are ready to ask and answer impossible questions,” says Dynes.

Integrating a New World with the Old World

When talking about the challenges that companies face while opting for Big Data analytics solutions Mullen adds, “The challenges currently faced by industry while employing machine learning are twofold. The first challenge they face is related to data collection, data ingestion, data curation (quality) and then data aggregation. The second challenge is to combat the lack of human skills in data-engineering, advanced analytics, and machine learning”

“You need to integrate a new world with the old world.

The old world relied heavily on data collection in big batches

while the new world focuses mainly on the real-time data solutions”

Dynes says, “You need to integrate a new world with the old world. The old world relied heavily on data collection while the new world focuses mainly on the data solutions. There are limited solutions in the industry today that deliver on both these requirements at once right now.”


He concludes by saying that, “The importance of data engineering cannot be neglected, and machine learning is like Pandora’s Box. Its applications are widely seen in many sectors, and once you establish yourself as a quality provider, businesses will come to you for your services. Which is a good thing.”

Follow Ciaran Dynes, Justin Mullen, and Ronald van Loon on Twitter and LinkedIn for more interesting updates on Big Data solutions and machine learning.

Additional Resources

By Rachel Silver

Are you a data scientist, engineer, or researcher, just getting into distributed processing and PySpark, and you want to run some of the fancy new Python libraries you've heard about, like MatPlotLib?

If so, you may have noticed that it's not as simple as installing it on your local machine and submitting jobs to the cluster. In order for the Spark executors to access these libraries, they have to live on each of the Spark worker nodes.

You could go through and manually install each of these environments using pip, but maybe you also want the ability to use multiple versions of Python or other libraries like pandas? Maybe you also want to allow other colleagues to specify their own environments and combinations?

If this is the case, then you should be looking toward using condas to provide specialized and personalized Python configurations that are accessible to Python programs. Conda is a tool to keep track of conda packages and tarball files containing Python (or other) libraries and to maintain the dependencies between packages and the platform.

Continuum Analytics provides an installer for conda called Miniconda, which contains only conda and its dependencies, and this installer is what we’ll be using today.

For this blog, we’ll focus on submitting jobs from spark-submit. In a later iteration of this blog, we’ll cover how to use these environments in notebooks like Apache Zeppelin and Jupyter.

Installing Miniconda and Python Libraries to All Nodes

If you have a larger cluster, I recommend using a tool like pssh (parallel SSH) to automate these steps across all nodes.

To begin, we’ll download and install the Miniconda installer for Linux (64-bit) on each node where Apache Spark is running. Please make sure, before beginning the install, that you have the bzip2 library installed on all hosts:

I recommend choosing /opt/miniconda3/ as the install directory, and, when the install completes, you need to close and reopen your terminal session.

If your install is successful, you should be able to run ‘conda list’ and see the following packages:


Miniconda installs an initial default conda, running Python 3.6.1. To make sure this installation worked, run a version command:

python -V Python 3.6.1 :: Continuum Analytics, Inc.

To explain what’s going on here: we haven’t removed the previous default version of Python, and it can still be found by referencing the default path: /bin/python. We’ve simply added some new Python packages, like Java alternatives, that we can point to while submitting jobs without disrupting our cluster environment. See:

/bin/python -V Python 2.7.5

Now, let’s go ahead and create a test environment with access to Python 3.5 and NumPy libraries.

First, we create the conda and specify the Python version (do this as your cluster user):

conda create --name mapr_numpy python=3.5

Next, let’s go ahead and install NumPy to this environment:

conda install --name mapr_numpy numpy

Then, let’s activate this environment, and check the Python version:


Please complete these steps for all nodes that will run PySpark code.

Using Spark-Submit with Conda

Let’s begin with something very simple, referencing environments and checking the Python version to make sure it’s being set correctly. Here, I’ve made a tiny script that prints the Python version:


Testing NumPy

Now, let’s make sure this worked!

I’m creating a little test script called, containing the following:


If I were to run this script without activating or pointing to my conda with NumPy installed, I would see this error:


In order to get around this error, we’ll specify the Python environment in our submit statement:


Now for Something a Little More Advanced...

This example of PySpark, using the NLTK Library for Natural Language Processing, has been adapted from Continuum Analytics.

We’re going to run through a quick example of word tokenization to identify parts of speech that demonstrates the use of Python environments with Spark on YARN.

First, we’ll create a new conda, and add NLTK to it on all cluster nodes:

conda create --name mapr_nltk nltk python=3.5 source activate mapr_nltk

Note that some builds of PySpark are not compatible with Python 3.6, so we’ve specified an older version.

Next, we have to download the demo data from the NLTK repository:


This step will download all of the data to the directory that you specify–in this case, the default MapR-FS directory for the cluster user, accessible by all nodes in the cluster.

Next, create the following Python script:


Then, run the following as the cluster user to test:


Additional Resources

Editor's note: this blog post was originally published in the Converge Blog on June 29, 2017

By Carol McDonald


This post is the second in a series where we will go over examples of how MapR data scientist Joe Blue assisted MapR customers, in this case a regional bank, to identify new data sources and apply machine learning algorithms in order to better understand their customers. If you have not already read the first part of this customer 360° series, then it would be good to read that first. In this second part, we will cover a bank customer profitability 360° example, presenting the before, during and after.

Bank Customer Profitability - Initial State

The back story: a regional bank wanted to gain insights about what’s important to their customers based on their activity with the bank. They wanted to establish a digital profile via a customer 360 solution in order to enhance the customer experience, to tailor products, and to make sure customers have the right product for their banking style.

As you probably know, profit is equal to revenue minus cost. Customer profitability is the profit the firm makes from serving a customer or customer group, which is the difference between the revenues and the costs associated with the customer in a specified period.

Banks have a combination of fixed and variable costs. For example, a building is a fixed cost, but how often a person uses an ATM is a variable cost. The bank wanted to understand the link between their product offerings, customer behavior, and customer attitudes toward the bank in order to identify growth opportunities.

Bank Customer Profitability - During

The bank had a lot of different sources of customer data:

  • Data warehouse account information.
  • Debit card purchase information.
  • Loan information such as what kind of loan and how long it has been open.
  • Online transaction information such as who they’re paying, how often they’re online, or if they go online at all.
  • Survey data.

Analyzing data across multiple data sources

This data was isolated in silos, making it difficult to understand the relationships between the bank’s products and the customer’s activities. The first part of the solution workflow is to get the data into the MapR Platform, which is easy since MapR enables you to mount the cluster itself via NFS. Once all of the data is on the MapR Platform, the data relationships can be explored interactively using the Apache Drill schema-free SQL query engine.

A key data source was a survey that the bank had conducted in order to segment their customers based on their attitudes toward the bank. The survey asked questions like, “Are you embracing new technologies?” and “Are you trying to save?” The responses were then analyzed by a third party in order to define four customer personas. But the usefulness of this survey was limited because it was only performed on 2% of the customers. The question for the data scientist was, “How do I take this survey data and segment the other 98% of the customers, in order to make the customer experience with the bank more profitable?”

Feature extraction

Feature engineering is the process of transforming raw data into inputs for a machine learning algorithm. With data science you often hear about the algorithms that are used, but actually a bigger part—consuming about 80% of a data scientist’s time—is taking the raw data and combining it in a way that is most predictive.

The goal was to find interesting properties in the bank’s data that could be used to segment the customers into groups based on their activities. A key part of finding the interesting properties in the data was working with the bank’s domain experts, because they know their customers better than anyone.

Apache Drill was used to extract features such as:

  • What kind of loans, mix of accounts, and mix of loans does the customer have?
  • How often does the customer use a debit card?
  • How often does the customer go online?
  • What does the customer buy? How much do they spend? Where do they shop?
  • How often does the customer go into the branches?

Accentuate business expertise with new learning

After the behavior of the customers was extracted, it was possible to link these features by customer ID with the labeled survey data, in order to perform machine learning.

The statistical computing language R was used to build segmentation models using many machine learning classification techniques. The result was four independent ensembles, each predicting the likelihood of belonging to one persona, based on their banking activity.

The customer segments were merged and tested with the labeled survey data, allowing to link the survey “customer attitude” personas with their banking actions and provide insights.

Banking Customer 360° - After

The solution results of modeling with the customer data are displayed in the Customer Products heat map below. Each column is an “attitude”-based persona, and each row is a type of bank account or loan. Green indicates the persona is more likely to have this product, and red indicates less likely.

This graph helps define these personas by what kinds of products they like or don’t like.

This can give insight into:

  • How to price some products
  • How to generate fees
  • Gateway products, which allow to go from a less profitable customer segment to a more profitable one.

In the Customer Payees heat map below, the rows are electronic payees. This heat map shows where customer personas are spending their money, which can give channels for targeting and attracting a certain persona to grow your business.

In this graph, the bright green blocks show that Fitness club A, Credit card/Bank A, and Credit card/Bank C are really strong for Persona D. Persona A is almost the opposite of the other personas. This “customer payees” heat map gives a strong signal about persona behavior. It’s hard to find signals like this, but they provide an additional way to look at customer data in a way that the bank could not conceive of before.

Bank Customer 360° Summary

In this example, we discussed how data science can link customer behavioral data with a small sample survey to find customer groups who share behavioral affinities, which can be used to better identify a customer base for growth. The bank now has the ability to project growth rates based on transition between personas over time and find direct channels that allow them to target personas through marketing channels. 


Editor's Note: this post was originally published in the Converge Blog on December 08, 2016. 


Related Resources:

How to Use Data Science and Machine Learning to Revolutionize 360° Customer Views 

Customer 360: Knowing Your Customer is Step One | MapR 

Applying Deep Learning to Time Series Forecasting with TensorFlow 

Churn Prediction with Apache Spark Machine Learning 

Apache Drill Apache Spark

By Carol McDonald


There is more and more data that is available that can help inform businesses about their customers, and those businesses that successfully utilize these new sources and quantities of data will be able to provide a superior customer experience. However, predicting customer behavior remains very challenging. This post is the first in a series where we will go over examples of how Joe Blue, a Data Scientist in MapR Professional Services, assisted MapR customers in identifying new data sources and applying machine learning algorithms in order to better understand their customers. The first example in the series is an advertising customer 360°; the next blog post in the series will cover banking and healthcare customer 360° examples.

Customer 360° Revolution: Before, During, and After

Icon_Use-Cases_Gray_Customer360_v1_300x300px_wht-bk (1).png


MapR works with companies who have solved business problems but are limited with what they can do with their data, and they are looking for the next step. Often, they are analyzing structured data in their data warehouse, but they want to be able to do more.


The goal of the customer 360° revolution is to transform your business by figuring out what customers are going to do—if you can predict what your customer is going to do next, then you can add value to your business.

During = Data Scientist

There is a lot of confusion about what a data scientist does. A data scientist is someone who can draw the lines between the before and after, and this involves:

  1. Identifying new data sources, which traditional analytics or databases are not using due to the format, size, or structure.
  2. Collecting, correlating, and analyzing data across multiple data sources.
  3. Knowing and applying the right kind of machine learning algorithms to get value out of the data.

The goal of the customer 360° revolution is generally to:

  • Find additional information
  • Accentuate business expertise with new learning
  • Use new learning to change your business

Use Case: Advertising Customer 360° Before

An example use case for this customer 360° is a mobile banking web site which displays ads related to what the customer bought with his/her debit card. A “card-to-link” company hosts the ads and determines which ad to display. The advertisers are pharmacies, restaurants, grocery stores, etc., and they want to get their ads to the people who would be interested in buying their products. In order to target ads to interested customers, you need to accurately predict the likelihood that a given ad will be clicked, also known as "click-through rate" (CTR).

The “card-to-link” company was already capable of displaying ads related to purchases, but they wanted to do a better job of targeting and measuring the success of their ads.

As an example, let’s take a look at a new campaign for Chris’s Crème doughnuts:

  • Chris’s Crème asks the “card-to-link” company to find people who like doughnuts but who are not already their customers.
  • Card-to-link first uses data warehouse debit card merchant information to find Dunkin Doughnut customers and customers of other quick-service restaurants that sell breakfast.
  • Card-to-link also uses customer zip code information to only display Chris’s Crème ads to customers who live near a Chris’s Crème location.

This works great, but what if this method increased new customers to 600,000, but Chris’s Crème wanted to pay more to increase the number of new customers to 1,000,000? There is no way to do this with just the existing data warehouse data.

Advertising Customer 360° During Part 1

The first question for the data scientist is, "What kind of data does card-to-link have that they could use to augment new Chris’s Crème customers from 600,000 to 1,000,000, and therefore maximize the profit of this campaign?"

The New Data

The new data comes from Internet browsing history; it consists of billions of device IDs and the keyword content of the browsing history for that ID.

So how can the data scientist take this new data and find a bigger audience?

Advertising Customer 360° During Part 2

Get the Data on the MapR Platform (NFS)

The first part of the solution workflow is to get the data on the MapR Platform, which is easy via the Network File System (NFS) protocol. Unlike other Hadoop distributions that only allow cluster data import or import as a batch operation, MapR enables you to mount the cluster itself via NFS and the MapR File System enables direct file modification and multiple concurrent reads and writes via POSIX semantics. An NFS-mounted cluster allows easy data ingestion from other machines leveraging standard Linux commands, utilities, applications, and scripts.

The complete customer purchase and past campaign history data is exported from the traditional data warehouse and put on the MapR Platform as Parquet tables. The Internet browsing history is put on the MapR platform as text documents.

Once the browsing, campaign and purchase history data is on the MapR platform, Apache Drill is used for interactive exploration and preprocessing of the data with a schema-free SQL query engine.

Feature Extraction

Features are the interesting properties in the data that you can use to make predictions.

Feature engineering is the process of transforming raw data into inputs for a machine learning algorithm. In order to be used in Spark machine learning algorithms, features have to be put into Feature Vectors, which are vectors of numbers representing the value for each feature. To build a classifier model, you extract and test to find the features of interest that most contribute to the classification.

Apache Spark for text Analytics

The TF-IDF (Term Frequency–Inverse Document Frequency) function in Spark MLlib can be used to convert text words into feature vectors. TF-IDF calculates the most important words in a document compared to a collection of documents. For each word in a collection of documents, it computes:

  • Term Frequency is the number of times a word occurs in a specific document.
  • Document Frequency is the number of times a word occurs in a collection of documents.
  • TF * IDF measures the significance of a word in a document (the word occurs a lot in that document, but is rare in the collection of documents).

For example, if you had a collection of documents about football, then the word concussion in a document would be more relevant for that document than the word football.

Machine Learning and Classification

Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (such as whether a customer likes doughnuts or not), based on labeled data (such as purchase history). Classification takes a set of data with labels and features and learns how to label new records based on that information. In this example, the purchase history is used to label customers who bought doughnuts. The browsing history of millions of text keywords, many of which have seemingly nothing to do with doughnuts, is used as the features to discover similarities and categorize customer segments.

  • Label → bought doughnuts → 1 or 0
  • Features → browsing history → TF-IDF features

Machine Learning

Once the browsing and purchase history data is represented as labeled feature vectors, Spark machine learning classification algorithms such as logistic regression, decision trees, and random forests can be used to return a model representing the learning decision for that data. This model can then be used to make predictions on new data.

After the feature extraction and model building, it is possible to use the model to rank the billions of device IDs, from highest to lowest, who are most likely to eat doughnuts. With that, the audience of potential doughnut eaters can be augmented to the goal of one million.

Advertising Customer 360° After

Each colored line in the graph below refers to the results of modeling with data sets from different categories of advertising campaigns: Quick Serve Restaurant (QSR), Full Serve Restaurant, Light Fare, Auto, Grocery, and Apparel. For each campaign:

1. The purchase history was used to label the population:

  • For the grocery campaign, the following modeling populations were created:

  • Those who shopped at grocery stores multiple times

  • Those who bought their food at other stores

  • For the Full Serve restaurant campaign, the following modeling populations were created:

  • Yes: had at least 2 visits to a full serve restaurant in the last 2 months (90th percentile)

  • No: had at least 18 visits to a quick serve restaurant in last 2 months (90th percentile)

  • Excluded those who were in both (small percentage)

2. The browsing history from the labeled populations was used for the features.

3. The classification model was built from the labeled features.

4. The model was used to predict the Click-Through Rate.

The left side of this graph is the percentage of people visiting the banking web site who clicked on an advertisement. The right side is the predicted probability that a person would click on the ad. If the colored lines are continuously increasing from left to right, that means the machine learning model worked well at predicting the click-through rates.

For full-service restaurants (the yellow line), the probability of clicking tracks well with the click-through rate. At a low probability, the click-through rate was 30%, and at a high probability the model found populations with a 70% click-through rate. So the model really responded to the method of finding similar customer profiles using their Internet browsing history.

Some campaigns, like grocery, did not work. This shows that the browsing history does not really apply to increased click-through rates for grocery ads, but the click-to-link company did not know this. These are insights that can be used, for example, if click-to-link has a new automotive, QSR, or full-serve campaign. This is a method that works well and can be used for advertisers that will pay to target more customers.

Advertising Customer 360° Summary

In this example, we discussed how data science can use customer behavioral data to find customer groups who share behavioral affinities in order to better target advertisements. For more information review the following resources.

Want to learn more?


Editor's note: This blog post was originally published in the Converge Blog on August 08, 2016. 

By Mathieu Dumoulin


Modern Open Source Complex Event Processing For IoT

This series of blog posts details my findings as I bring to production a fully modern take on Complex Event Processing, or CEP for short. In many applications, ranging from financials to retail and IoT applications, there is tremendous value in automating tasks that require to take action in real time. Putting aside the IT system and frameworks that would support this capability, this is clearly a useful capability.

In the first post of the series, I explain how CEP has evolved to meet this requirement and how the requirements for CEP can be met in a modern big data context. In short, I present an approach based on the best practices of modern architecture, microservices, and Kafka-style stream messaging, with an up-to-date open source business rule engine.

In this second part, I’ll get more concrete and work through a working example using the system I propose. Let’s get started.

Smart City Traffic Monitoring

We made a working demo for which the code will be released on the MapR GitHub. It is made to work on either the MapR Sandbox or using a real MapR cluster. Our demo is planned to be released, so please stay tuned and check for an annoucement.

For this example, we’ll use a very simple “smart city” use case for traffic monitoring. In this case, we’ll model a single sensor that can measure the speed of cars that pass on it. Using such sensor data, it would be fairly easy to detect traffic jams in real time, and thus notify the police to take action much more quickly than they otherwise could.

Some other types of use cases are easy to envision. We can add data from a public, real-time weather feed and connect to information panels along the roads and show an advisory to drivers without any human intervention. By combining road condition sensors with the weather data, we can provide advisory feedback to drivers about road conditions and warn the public very quickly. Furthermore, by adding historical accident data and using predictive analytics, we can imagine road safety measures that can be deployed in real time to areas with a higher probability of accidents.

To be honest, I’m only scratching the surface of what this kind of smart road could do to make a commuter’s life both easier and safer while saving money for the city. But how would we build a prototype of such a system without it being hugely complicated and expensive?

How to Build a Rule-Engine Based CEP System

So we now have a proper, concrete target in mind. It turns out that if we decide to base our system around reading our data from a Kafka-style stream (i.e., persistent, scalable, and high performance), then we will naturally end up with a pretty cool, modern CEP microservice.

The important point here is not to show how to build a super complicated enterprise architecture, but rather that by making some simple technology choices and building our demo in a reasonable way, we naturally end up with this elegant, modern, and simple architecture.

For simplicity’s sake, I have decided to implement my prototype on a MapR Sandbox (get it free here). This is because it will include the stream messaging system, MapR Streams, which I can use through the Kafka 0.9 API with very little configuration and know it will work the same on a production MapR 5.1+ cluster.

Finally, it should be noted that an Apache Kafka cluster could be used as well with the same design and code, just with some additional work to get it up and running.

High-Level Architecture View

As shown in the diagram above, the flow is to have sensor data get aggregated to a producer gateway, which will forward the data to the stream with topic named “data.” The data will be in JSON format so it is easy to manipulate, human readable, and easily sent to Elasticsearch as-is for monitoring with a Kibana dashboard.

The consumer side will have two tasks: to read the data from the stream and host an instance of a KieSession where the rule engine can apply rules on the facts as they are added to it.

Rules are edited in the Workbench GUI, a Java webapp which can be run on a Java application server such as WildFly or Tomcat.

Rules are fetched from the workbench by the consumer application using the appropriate methods provided by the Drools framework, which are entirely based on Maven Repository.

We can now look at the proposed system in more detail in the following sections.

List of Technologies Used

The technology we’re going to use is as follows:

  • MapR Sandbox 5.2
  • The Java programming language (any JVM language is fine)
    • The Jackson 2 library to convert to and from JSON
  • MapR Streams or Apache Kafka stream messaging system
  • Wildfly 10 application server to host the Workbench
  • JBoss Drools as our choice of OSS business rule engine
  • Log Synth to generate some synthetic data for our prototype
  • Streamsets 1.6 to connect MapR Streams and Elasticsearch
  • Elasticsearch 2.4 and Kibana 4 for monitoring

Traffic Monitoring Prototype Architecture

As I built this demo to run on the MapR Sandbox, I’m using instructions for CentOS 6.X, an open source version of RHEL 6.X. Instructions for CentOS 7 are almost identical, and finding similar instructions for Ubuntu would be pretty straightforward and left up to the reader.

To build the core of the traffic monitoring system, we’re going to need two basic parts:

  • A program to feed sensor data into MapR Streams/Kafka. This part will be using fake data modeled by a vehicle simulation coded with Log Synth. We’ll use the MapR Kafka-rest proxy implementation (just introduced with MEP 2.0) to add the data with Python.
  • A JVM-language application that will read data from the stream and pass it to a KieSession. The minimal code to get this working is surprisingly small.

To edit the rules, we deploy the Workbench on Wildfly 10, which is a fairly straightforward process. Check this blog post for instructions, or read the Drools Documentation. Installing Wildfly is pretty simple; see this blog for great instructions on how to install it as a service on Centos/RHEL (it’s for Wildfly, but the same instructions work for 9 and 10).

We made a single configuration change to Wildfly. We changed the port to 28080 instead of 8080, as it is already used by the sandbox. Wildfly runs in standalone mode, so the configuration file is in WILDFLY_HOME/standalone/configuration/standalone.xml.

For monitoring, we let the streaming architecture work for us. We use the open source Streamset Data collector to easily redirect the sensor data to Elasticsearch so that we can actually monitor the system with a nice dashboard with Kibana. To set up Streamsets with MapR Streams requires some work with version 1.6 (great blog post about it here, or from the official Streamsets documentation).

Finally, installation and setup of Elasticsearch and Kibana is well documented on Centos/RHEL.

For production, all those parts can be readily separated to run on separate servers. They can be run on either cluster nodes or edge nodes. If it’s a MapR cluster, installing the MapR Client and pointing it to the cluster CLDB nodes will be all the configuration needed for full access to the streams. For an Apache Kafka cluster, refer to the official Kafka documentation.

Traffic Monitoring Prototype - How To

Creating the Streams with maprcli

The first task is to create streams and topics for our application. To do this, it’s a best practice to create a volume for streams first. As user mapr, type from the command line:

maprcli volume create -name streams -path /streams maprcli stream create -path /streams/traffic -produceperm p -consumerperm p maprcli stream topic create -path /streams/traffic -topic data maprcli stream topic create -path /streams/traffic -topic agenda maprcli stream topic create -path /streams/traffic -topic rule-runtime

Note: MapR Streams is more of a superset of Kafka than simply a clone. In addition to being faster, MapR Streams can use all the advantages of the MapR File System, such as volumes (with permissions and quotas and so on) and replication. A cluster is not limited to just defining topics, but can define several streams which each can have several topics. Therefore, instead of a topic name, a MapR Stream has a path:topic notation. Here, our data stream’s full name is “/streams/traffic:data”. You can read more about the advantages of MapR Streams vs. Kafka in this whiteboard walkthrough by Jim Scott.

Generating Fake Data

I used the Log-Synth tool to generate data for this prototype. Log-Synth uses a schema combined with a Sampler class to generate data in a very flexible and simple manner.

My Schema:


[    {"name":"traffic", "class":"cars", "speed": "70 kph", "variance": "10 kph", "arrival": "25/min", "sensors": {"locations":[1, 2, 3, 4, 5, 6, 7,8,9,10], "unit":"km"},      "slowdown":[{"speed":"11 kph", "location":"2.9 km - 5.1 km", "time": "5min - 60min"}]}   ]


The command to generate the data is:

synth -count 10K -schema my-schema.json >> output.json

Data is generated one car at a time, and each data point is a reading at a sensor. The data will model a flow of cars driving at 70 km/h, arriving at a rate of 25 cars per minute. A slowdown will happen between km 2.9 and 5.1 where speed will be reduced to 11km/h 5 minutes to 60 minutes after the start of the simulation. This will be the traffic jam we wish to detect using our CEP system.

The generated data is a file where each line is the resulting list of sensor measurements for a single car:



A reading has a sensor ID, a speed in meters per second, and a time delta from time 0 (the moment the simulation starts) in seconds.

My producer code simply translates the readings into a list of sensor readings ordered by time, and I transform the speed into km/s and the time into a timestamp as milliseconds from epoch.

Sending to the code onto the stream can be done one line at a time using standard producer code. The code in the sample Java producer works just fine.

Another exciting new possibility is to use the brand new Kafka Rest Proxy, which is also available on MapR from MEP 2.0 (MapR Ecosystem Pack). This means sensors can connect directly to Kafka from any language since HTTP-based REST API is a global standard.

Using the Workbench

We can login to the Workbench and login with an admin user (a user with the role “admin”) created with the script from Wildfly.

Using the Workbench is beyond the scope of the article, but the general idea is to create an Organizational Unit and a Repository, and then create a project.

Data Objects

We will need to create facts for the rule engine to work with. Best practice for Drools is to use a separate Maven project for your data model. For the sake of simplicity, I created them right from the workbench.

The Measurement is a super generic bean which models the sensor speed measurements. For the prototype, I kept it as simple as the raw data with a timestamp, a sensor ID, and a speed.

The Sensor bean models the sensor itself and will have an ID and the average speed of all cars that it measures over a time window defined by our rules. This average speed will be used to trigger an alert for heavy traffic. The traffic String is to mark the current traffic level which can be “NONE”, “LIGHT” or “HEAVY.”

The Traffic Monitoring Rules

  • create sensors for new id
  • detect heavy traffic
  • detect light traffic
  • detect normal traffic
  • get average speed at sensor

The create sensors rule ensures that there are sensor objects available in memory. This is the fact we use to know the average speed at a certain sensor.

The detect heavy traffic is the key rule we want to use to send an alarm to the police if heavy traffic is detected at a certain sensor.

So when the average speed reaches 20 km/h or less, and the sensor isn’t already in the HEAVY traffic level, set the level to HEAVY and send an alert.

Which means we need to know the average speed. Here is the rule to compute it using the Drools rule DSL (domain specific language):

This is not rocket science! These rules illustrate rather clearly how making up simple but useful rules can be realistically left up to business analysts and developed separately from the whole stream and big data platform.

The Consumer Side

The consumer reads the data from the stream. The tutorial code in Java from the documentation is perfectly adequate. Jackson is used to convert the JSON to Measurement objects.

The consumer has an instance of the KieSession, and each measurement is added to the session using kieSession.insert(fact) and followed by a call to kieSession.fireAllRules(), which triggers the algorithm to check if any rules match with the new state of the session given the new data.

A channel, which is just a callback, is used to allow a rule to call a function “outside” of the KieSession. My Prototype uses this method to log the alert. In a production system, we could easily change the code to send an email, an SMS, or take some other action.

Importing the Rules into the Consumer Application

The way we get the rules into the running application is by fetching it from the Maven repository integrated into the Workbench.

KieServices kieServices = KieServices.Factory.get();  
ReleaseId releaseId = kieServices.newReleaseId( "org.mapr.demo", "smart-traffic-kjar", "1.0.0" ); 
KieContainer kContainer = kieServices.newKieContainer( releaseId );

KieSession kieSession = kContainer.newKieSession();


So the question becomes, how does the call to newReleaseId know to fetch the artifact with our rules from the Maven repository in the Workbench?

The answer is with the ~/.m2/settings.xml file, where you add this information. We recommend to use the user mapr for everything in the sandbox, so the full path is: /home/mapr/.m2/settings.xm

[mapr@maprdemo .m2]$ cat settings.xml  
<?xml version="1.0" encoding="UTF-8"?> 
                   <name>Guvnor M2 Repo</name> 


The key information is in bold, which corresponds to the URL of the maven2 repository of the Drools workbench. This information can be copied and pasted from the pom.xml which can be seen by using the repository view:

So I just copy-pasted that and now everything works like magic.

Monitoring the Smart Traffic Prototype

We have one stream with data, and two other streams to monitor the rule engine internals. This is very easy with Drools because it uses Java Listeners to report on its internal state. We simply provide a custom implementation of the listeners to produce the data to a stream, and then use Streamsets to redirect everybody to Elasticsearch.

Elasticsearch Mappings

The mappings are defined in a small script I created:

Streamsets for No-Code Stream To Elasticsearch Pipeline

Each stream has its own pipeline, where each looks like this:

The Jython Evaluator adds timestamp information if necessary.

Running the Prototype

Start the consumer:

Then start the producer:

In my prototype, I added code to control the rate at which the data is sent to the stream to make the rules firing easier to see. 10,000 events are quite small for Drools and for MapR Streams/Kafka, and so the whole demo would be over in less than a second. This is the meaning of the “-r 25” for 25 events per second rate.

The dashboard looks like this:

Once data starts to stream in:

The traffic jam is quite visible now:

As soon as a sensor average speed drops below 20 km/h, an alarm is fired:

And the dashboard will display a count of “1”

The simulation will continue and the two other sensors will go below 20 in turn for a total of 3 alarms fired.

In Conclusion

This project turned into a great illustration of the power and ease of streaming architectures using microservices. A common misconception of microservices is that they are limited to synchronous REST services. This is not the case at all. Our system is a backend system and so the microservices (the producer, the consumer, streamsets, and ES/Kibana) are all run asynchronously, dealing with the data as it comes off the stream.

The project was fairly easy to build technically because every part is totally independent of the others, and can therefore be tested in isolation. Once the producer can send data to the stream properly, it doesn’t need to be tested ever again for issues that concern other parts of the system. Also, each part was added one at a time, making it easy to identify issues and fix them.

In total, with very little original code, we could implement a working, useful system that would need very little change to be put into production. Only the producer needs to be customized for the particular sensor or data source.

Rules are the most time consuming and error-prone part. This is no different than if the project had been done using custom-coded Spark code. The win for an approach based on a rule engine such as Drools and the Drools Workbench is that the rules can be edited, tested, and improved independently of how the code runs on the cluster. The work in Workbench has no dependency on the system at all, as it is pulled in by the consumer application automatically.

From a business value point of view, all the value is in the rules, assuming a stable production system. There is no reason for organizations not to take advantage of this quick edition capability to become ever more agile and responsive to evolving conditions for the benefit of their customers… and the bottom line.


Editor's note: This blog post was originally published in the Converge Blog on January 10, 2017


By Ian Downard



A lot of people choose MapR as their core platform for processing and storing big data because of its advantages for speed and performance. MapR consistently performs faster than any other big data platform for all kinds of applications, including Hadoop, distributed file I/O, NoSQL data storage, and data streaming. In this post, I’m focusing on the latter to provide some perspective on how much better/faster/cheaper MapR Streams can be compared to Apache Kafka as a data streaming technology.

MapR Streams is a cluster-based messaging system for streaming data at scale. It’s integrated into the MapR Converged Data Platform and implements the Apache Kafka Java API so applications written for Kafka can also run on MapR Streams. What differentiates the MapR Streams technology from Kafka are its built-in features for global replication, security, multi-tenancy, high availability, and disaster recovery—all of which it inherits from the MapR Converged Data Platform. From an operational perspective, these features make MapR Streams easier to manage than Kafka, but there are speed advantages, too. I’ve been looking at this a lot lately, trying to understand where and why MapR Streams outperforms Kafka. In this blog post, I will share with you how clearly MapR Streams can transport a much faster stream of data, with much larger message sizes, and to far more topics than what can be achieved with Kafka.

Test Strategy

In this study, I wanted to compare Kafka and MapR Streams as to how they perform “off the shelf” without the burden of tuning my test environment to perfectly optimize performance in each test scenario. So, I have pretty much stuck with the default settings for services and clients. The only exceptions are that I configured each Kafka topic with a replication factor of 3 and configured producers to send messages synchronously, since these are the default modes for MapR Streams. I also disabled stream compression in order to control message sizes and measure throughput more precisely.

Test Configurations

I measured performance from both producer and consumer perspectives. However, consumers run faster than producers, so I focused primarily on the producer side since the throughput of a stream is bounded by the throughput of its producers. I used two threads in my producer clients so that message generation could happen in parallel with sending messages and waiting for acknowledgments. I used the following properties for producers and topics:

acks = all
batch.size = 16384 = 0ms
block.on.buffer.full = true
compression = none
default.replication.factor = 3


My test environment consisted of three Ubuntu servers running Kafka 2.11- or MapR 5.2 on Azure VMs sized with the following specs:

  • Intel Xeon CPU E5-2660 2.2 GHz processor with 16 cores
  • SSD disk storage with 64,000 Mbps cached / 51,200 uncached max disk throughput
  • 112GB of RAM
  • Virtual networking throughput between 1 and 2 Gbits/sec (I measured this quantitatively since I couldn’t easily find virtual network throughput specs from Microsoft).

Performance Metrics

Throughput, latency, and loss are the most important metrics measuring the performance of a message bus system. MapR Streams and Kafka both guarantee zero loss through at-least-once semantics. MapR provides some advantages when it comes to latency, but typically both MapR Streams and Kafka deliver messages sufficiently quick for real-time applications. For those reasons, I chose to focus on throughput in this study.

Throughput is important because if an application generates messages faster than a message bus can consume and deliver them, then those messages must be queued. Queueing increases end-to-end latency and destabilizes applications when queues grow too large.

Furthermore, throughput in Kafka and MapR Streams is sensitive to the size of the messages being sent and to the distribution of those messages into topics. So, I analyzed those two attributes independently in order to measure how message size and stream topics affect throughput.

Throughput Performance

To measure producer throughput, I measured how fast a single producer could publish a sustained flow of messages to single topic with 1 partition and 3x replication. I ran this test for a variety of message sizes to see how that affects throughput. The results show MapR Streams consistently achieving much higher throughput than Kafka and having a much higher capacity for handling large message sizes, as shown below.

Throughput MB/s

MapR Streams doesn’t just send a faster volume of data than Kafka; it also has the capacity to send more records per second. We can see this by plotting throughput in terms of raw record count, as shown below:

Throughput Msgs/s

I recorded these results with two different code bases. First, I used custom tests that I wrote using the Java unit test framework (JUnit), then I used the performance test scripts included with Kafka and MapR. These different approaches did not produce exactly the same results but they were close, as shown below. This correlation helps validate the conclusions stated above, that MapR Streams can transport a larger volume of data and more frequent messages than Kafka.

Throughput Correlation

How does MapR Streams achieve more than 4x throughput than Kafka?

There are a lot of reasons why MapR Streams is faster, and without getting too technical, I’ll mention just a few. First, the MapR Streams client more efficiently flushes data to the MapR Streams server. It spawns its own threads to do this work, whereas Kafka uses the client application threads directly to flush to a Kafka broker, which in many cases is limited to just a single thread.

On the server side, MapR Streams inherits efficient I/O patterns from the core MapR storage layer which keeps files coherent and clean so that I/O operations can be efficiently buffered and addressed to sequential locations on disk. Replication is more efficient, too, since the underlying MapR storage platform has distributed synchronous replication built in, along with other operational features that simply don’t exist in Kafka, such as snapshots, mirroring, quotas, access controls, etc.

Replicating this test

My JUnit tests for benchmarking Kafka and MapR Streams is available at Here are the commands that I used to generate the data shown above:

git clone
cd kafka_junit_tests
# Create a Kafka topic...
/opt/kafka_2.11- --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic t-00000 --config compression.type=uncompressed
# or create a MapR Streams topic.
maprcli stream create -path /user/mapr/iantest -produceperm p -consumeperm p -topicperm p -defaultpartitions 1 -compression off
# Then compile.
mvn -e -Dtest=MessageSizeSpeedTest test
# Test data will be saved in size-count.csv

You can also measure throughput using the performance test utilities included with Kafka and MapR. Here are the commands that I used to do that:

Kafka script:

MapR script:

Topic Scalability

Another major advantage that MapR Streams holds over Kafka relates to how well it can handle large quantities of stream topics. Topics are the primary means of organizing stream data; however, there is overhead associated with categorizing streams into topics, and producer throughput is sensitive to that overhead. I quantified this by measuring how fast a single producer could publish a sustained flow of messages to an increasingly large quantity of topics. This is essentially a "fan-out" producer (illustrated below) and it is very common for fast data pipelines to use this pattern so that data can be more easily consumed downstream.

Fanout Producer

Each of the topics created for this scenario were configured with a single partition and 3x replication. Record size was held constant at 100 bytes.

It’s clear from the following graph that MapR Streams scales to a larger quantity of topics than Kafka.

Topic Scalability

How does MapR Streams handle so many more topics than Kafka?

A topic is just metadata in MapR Streams; it does not introduce overhead to normal operations. MapR Streams uses only one data structure for a stream, no matter how many topics it has, and the MapR storage system provides extremely fast and scalable storage for that data.

On the other hand, Kafka represents each topic by at least one directory and several files in a general purpose file system. The more topics/partitions Kafka has the more files it creates. This makes it harder to buffer disk operations, perform sequential I/O, and it increases the complexity of what ZooKeeper must manage.

Replicating this test

This scenario can be run with another JUnit test from, as follows:

git clone
cd kafka_junit_tests
# For MapR only, create the stream first:
maprcli stream create -path /user/mapr/taq -produceperm p -consumeperm p -topicperm p -compression off
mvn -e -Dtest= ThreadCountSpeedTest test
# Test data will be saved in thread-count.csv


Partition Scalability

Stream topics are often subdivided into partitions in order to allow multiple consumers to read from a topic simultaneously. Both Kafka and MapR Streams allow topics to be partitioned, but partitions in MapR Streams are much more powerful and easier to manage than partitions in Kafka. For example, Kakfa requires partitions to fit within the disk space of a single cluster node and cannot be split across machines. MapR Streams is not limited by the storage capacity of any one node because the MapR storage system automatically grows (or shrinks) partitions across servers. I’ll talk more about these operational advantages later, but let’s consider the performance implications of partitioning now.

ZooKeeper elects separate nodes to be leaders for each partition. Leaders are responsible for processing the client reads and writes for their designated partition. This helps load balance client requests across the cluster, but it complicates the work the ZooKeeper must do to keep topics synchronized and replicated. Leader election takes time and does not scale well. In my tests, I saw leader election take at least 0.1 seconds per partition and it ran serially. So, for example, it would take more than 10 seconds to configure a topic with 100 partitions, that is, if ZooKeeper didn’t crash, which it frequently did when I created topics with 100 or more partitions.

In MapR Streams, I had no problem streaming data to topics with thousands of partitions, as shown below. This graph shows the throughput for a producer sending synchronously to a 3x replicated topic subdivided into an increasingly large number of partitions. I could not run my test in Kafka beyond 400 partitions, so that line is cut short.

Partitioning Scalability

Replicating this test

I used the performance scripts included with Kafka and MapR to generate the partition vs. throughput data shown above. Here is the script I used to run this test in Kafka:

That script will silently freeze if ZooKeeper fails, but it will continue once ZooKeeper starts again. So in another terminal, I simultaneously ran the following script to automatically restart ZooKeeper if it fails (which it is likely to do during this test):

Here is the script I used to generate partitions vs. throughput data in MapR:

Operational Advantages for MapR Streams

Increasing throughput capacity and decreasing message latency can often be accomplished simply by adding nodes to your distributed messaging cluster. However, doing so costs money and complicates management, so essentially saying that MapR Streams performs better than Kafka is another way of saying that operating a distributed messaging platform can be done with less hardware on MapR than with Kafka.

However, unless you’re working on applications that scale to extreme lengths, then the challenges you face with Kafka are more likely to be operational rather than performance in nature. And this is where the MapR total cost of ownership really shines.

Not only does MapR Streams execute with higher performance, it also addresses major operational deficiencies in Kafka. Here are three examples relating to replication, scaling, and mirroring:

  • Kafka requires that the MirrorMaker processes be manually configured in order to replicate across clusters. Replication is easy to configure with MapR Streams and supports unique capabilities for replicating streams across data centers and allowing streams to be updated in multiple locations at the same time.

  • Kafka’s mirroring design simply forwards messages to a mirror cluster. The offsets in the source cluster are useless in the mirror, which means consumers and producers cannot automatically failover from one cluster to a mirror. MapR continuously transfers updated records for near real-time replication and preserves message offsets in all replicated copies.

  • Kakfa requires partitions to fit within the disk space of a single cluster node and cannot be split across machines. This is especially risky, because ZooKeeper could automatically assign multiple large partitions to a node that doesn’t have space for them. You can move them manually, but that can quickly become unmanageable. MapR Streams is not limited by the storage capacity of any one node because it distributes stream data across the cluster.


For more information about the operational advantages of MapR Streams, see Will Ochandarena’s blog post, Scaling with Kafka – Common Challenges Solved.

I also highly recommend reading Chapter 5 of Streaming Architecture: New Designs Using Apache Kafka and MapR Streams, by Ted Dunning & Ellen Friedman.


MapR Streams outperforms Kafka in big ways. I measured the performance of distributed streaming in a variety of cases that focused on the effects of message size and topic quantity, and I saw MapR Streams transport a much faster stream of data, with much larger message sizes, and to far more topics than what could be achieved with Kafka on a similarly sized cluster. Although performance isn’t the only thing that makes MapR Streams desirable over Kafka, it offers one compelling reason to consider it.


Editor's Note: This blog post was originally published in the Converge Blog on January 11, 2017. Since the publication of the original article MapR Streams has been renamed as MapR-ES




Getting Started with MapR-ES Event Data Streams

By Carol McDonald

This is a 4-Part Series, see the previously published posts below:

Part 1 - Spark Machine Learning

Part 3 – Real-Time Dashboard Using Vert.x

This post is the second part in a series where we will build a real-time example for analysis and monitoring of Uber car GPS trip data. If you have not already read the first part of this series, you should read that first.

The first post discussed creating a machine learning model using Apache Spark’s K-means algorithm to cluster Uber data based on location. This second post will discuss using the saved K-means model with streaming data to do real-time analysis of where and when Uber cars are clustered.

Example Use Case: Real-Time Analysis of Geographically Clustered Vehicles/Items

The following figure depicts the architecture for the data pipeline:

  1. Uber trip data is published to a MapR Streams topic using the Kafka API
  2. A Spark streaming application subscribed to the first topic:
    1. Ingests a stream of uber trip events
    2. Identifies the location cluster corresponding to the latitude and longitude of the uber trip
    3. Adds the cluster location to the event and publishes the results in JSON format to another topic
  3. A Spark streaming application subscribed to the second topic:
    1. Analyzes the uber trip location clusters that are popular by date and time

Example Use Case Data

The example data set is Uber trip data, which you can read more about in part 1 of this series. The incoming data is in CSV format, an example is shown below , with the header:

date/time, latitude,longitude,base
2014-08-01 00:00:00,40.729,-73.9422,B02598

The enriched Data Records are in JSON format. An example line is shown below:

Spark Kafka Consumer Producer Code

Parsing the Data Set Records

A Scala Uber case class defines the schema corresponding to the CSV records. The parseUber function parses the comma separated values into the Uber case class.

Loading the K-Means Model

The Spark KMeansModel class is used to load the saved K-means model fitted on the historical Uber trip data.

Output of model clusterCenters:

Below the cluster centers are displayed on a google map:

Spark Streaming Code

These are the basic steps for the Spark Streaming Consumer Producer code:

  1. Configure Kafka Consumer Producer properties.
  2. Initialize a Spark StreamingContext object. Using this context, create a DStream which reads message from a Topic.
  3. Apply transformations (which create new DStreams).
  4. Write messages from the transformed DStream to a Topic.
  5. Start receiving data and processing. Wait for the processing to be stopped.

We will go through each of these steps with the example application code.

  1. Configure Kafka Consumer Producer properties

The first step is to set the KafkaConsumer and KafkaProducer configuration properties, which will be used later to create a DStream for receiving/sending messages to topics. You need to set the following paramters:

  • Key and value deserializers: for deserializing the message.
  • Auto offset reset: to start reading from the earliest or latest message.
  • Bootstrap servers: this can be set to a dummy host:port since the broker address is not actually used by MapR Streams.

For more information on the configuration parameters, see the MapR Streams documentation.

  1. Initialize a Spark StreamingContext object.

ConsumerStrategies.Subscribe, as shown below, is used to set the topics and Kafka configuration parameters. We use the KafkaUtils createDirectStream method with a StreamingContext, the consumer and location strategies, to create an input stream from a MapR Streams topic. This creates a DStream that represents the stream of incoming data, where each message is a key value pair. We use the DStream map transformation to create a DStream with the message values.

  1. Apply transformations (which create new DStreams)

We use the DStream foreachRDD method to apply processing to each RDD in this DStream. We parse the message values into Uber objects, with the map operation on the DStream. Then we convert the RDD to a DataFrame, which allows you to use DataFrames and SQL operations on streaming data.

Here is example output from the

A VectorAssembler is used to transform and return a new DataFrame with the latitude and longitude feature columns in a vector column.

Then the model is used to get the clusters from the features with the model transform method, which returns a DataFrame with the cluster predictions.

The output of is below:

The DataFrame is then registered as a table so that it can be used in SQL statements. The output of the SQL query is shown below:

  1. Write messages from the transformed DStream to a Topic

The Dataset result of the query is converted to JSON RDD Strings, then the RDD sendToKafka method is used to send the JSON key-value messages to a topic (the key is null in this case).

Example message values (the output for temp.take(2) ) are shown below:

{"dt":"2014-08-01 00:00:00","lat":40.729,"lon":-73.9422,"base":"B02598","cluster":7}

{"dt":"2014-08-01 00:00:00","lat":40.7406,"lon":-73.9902,"base":"B02598","cluster":7}

  1. Start receiving data and processing it. Wait for the processing to be stopped.

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

Spark Kafka Consumer Code

Next, we will go over some of the Spark streaming code which consumes the JSON-enriched messages.

We specify the schema with a Spark Structype:

Below is the code for:

  • Creating a Direct Kafka Stream
  • Converting the JSON message values to Dataset[Row] using with the schema
  • Creating two temporary views for subsequent SQL queries
  • Using ssc.remember to cache data for queries

Now we can query the streaming data to ask questions like: which hours had the highest number of pickups? (Output is shown in a Zeppelin notebook):

spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")

How many pickups occurred in each cluster?



spark.sql("select cluster, count(cluster) as count from uber group by cluster")

Which hours of the day and which cluster had the highest number of pickups?

spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")

Display datetime and cluster counts for Uber trips:

%sql select cluster, dt, count(cluster) as count from uber group by dt, cluster order by dt, cluster



In this blog post, you learned how to use a Spark machine learning model in a Spark Streaming application, and how to integrate Spark Streaming with MapR Streams to consume and produce messages using the Kafka API.

References and More Information:

Editor's note: This blog post was originally published in the MapR Converge Blog on January 05, 2017.

By Carol McDonald


A Formula 1 race is a high-speed example of the Internet of Things, where gathering, analyzing, and acting on tremendous amounts of data in real time is essential for staying competitive. The sport’s use of such information is so sophisticated that some teams are exporting their expertise to other industries, even for use on oil rigs. Within the industry, automobile companies such as Daimler and Audi are leveraging deep learning on the MapR Converged Data Platform to successfully analyze the continuous data generated from car sensors.

This blog discusses a proof of concept demo, which was developed as part of a pre-sales project to demonstrate an architecture to capture and distribute lots of data, really fast, for a Formula 1 team.

What’s the Point of Data in Motor Sports?

Formula 1 cars are some of the most heavily instrumented objects in the world.


Read more about The Formula 1 Data Journey

Formula 1 data engineers analyze data from ~150 sensors per car, tracking vital stats such as tire pressure, fuel efficiency, wind force, GPS location, and brake temperature in real time. Each sensor communicates with the track, the crew in the pit, a broadcast crew on-site, and a second team of engineers back in the factory. They can transmit 2GB of data in one lap and 3TB in a full race.


Watch WSJ's video "The F1 Big Data Explainer"

Data engineers can make sense of the car’s speed, stability, aerodynamics, and tire degradation around a race track. The graph below shows an example of what is being analyzed by race engineers: rpm, speed, acceleration, gear, throttle, brakes, by lap.


More analysis is also completed at the team’s manufacturing base. Below is an example race strategy, analyzing whether it would be faster to do a pit stop tire change at lap 13 and lap 35 vs. lap 17 and lap 38.


The Challenge: How Do You Capture, Analyze, and Store this Amount of Data in Real Time at Scale?

The 2014 U.S. Grand Prix collected more than 243 terabytes of data in a race weekend, and now there is even more data. Formula 1 teams are looking for newer technologies with faster ways to move and analyze their data in the cockpits and at the factory.

Below is a proposed proof of concept architecture for a Formula 1 team:


MapR Edge in the cars provides an on-board storage system that buffers the most recent data and retries when transmission fails. MapR Edge addresses the need to capture, process, and provide backup for data transmission close to the source. A radio frequency link publishes the data from the edge to the referee “FIA” topic, and from there it is published to each team’s topic, where local analytics is done. The “team engineers” Stream replicates to the “trackside” and “factory” Streams, so that the data is pushed in real time for analytics. In a sport where seconds make a difference, it’s crucial that the trackside team can communicate quickly with the engineers at headquarters and team members around the world. MapR Streams replication provides an unusually powerful way to handle data across distant data centers at large scale and low latency.

The Demo Architecture

The demo architecture is considerably simplified because it needs to be able to run on a single node MapR sandbox. The demo does not use real cars; it uses the Open Racing Car Simulator (TORCS), a race car simulator often used for AI race games and as a research platform. 


The demo architecture is shown below. Sensor data from the TORCS simulator is published to a MapR Streams topic using the Kafka API. Two consumers are subscribed to the topic. One Kafka API consumer, a web application, provides a real-time dashboard using websockets and HTML5. Another Kafka API consumer stores the data in MapR-DB JSON, where analytics with SQL are performed using Apache Drill. You can download the demo code here:


How Do You Capture this Amount of Data in Real Time at Scale?

Older messaging systems track message acknowledgements on a per-message, per-listener basis. A new approach is needed to handle the volume of data for the Internet of Things. 


MapR Streams is a distributed messaging system that enables producers and consumers to exchange events in real time via the Apache Kafka 0.9 API. In MapR Streams, topics are logical collections of messages, which organize events into categories and decouple producers from consumers.


Topics are partitioned for throughput and scalability. Producers are load balanced between partitions, and consumers can be grouped to read in parallel from multiple partitions within a topic for faster performance.


You can think of a partition like a queue; new messages are appended to the end, and messages are delivered in the order they are received.


Unlike a queue, events are not deleted when read; they remain on the partition, available to other consumers.


A key to high performance at scale, in addition to partitioning, is minimizing the time spent on disk reads and writes. With Kafka and MapR Streams, messages are persisted sequentially as produced and read sequentially when consumed. These design decisions mean that non-sequential reading or writing is rare, allowing messages to be handled at very high speeds with scale. Not deleting messages when they are read also allows processing of the same messages by different consumers for different purposes. 


Example Producer Using the Kafka API

Below is an example producer using the Kafka Java API. For more information, see the MapR Streams Java applications documentation.


How to Store the Data

One of the challenges, when you have 243 terabytes of data every race weekend, is where do you want to store it? With a relational database and a normalized schema, related data is stored in different tables. Queries joining this data together can cause bottlenecks with lots of data. For this application, MapR-DB JSON was chosen for its scalability and flexible ease of use with JSON. MapR-DB and a denormalized schema scale, because data that is read together is stored together.


With MapR-DB (HBase API or JSON API), a table is automatically partitioned across a cluster by key range, providing for really fast reads and writes by row key.


JSON Schema Flexibility

JSON facilitates the natural evolution of your data schema during the life of your application. For example, in version 1, we have the following schema, where each JSON message group sensors values for Speed, Distance, and RPM:


{  "_id":"1.458141858E9/0.324",
   "car" = "car1",

For version 2, you can easily capture more data values quickly without changing the architecture of your application, by adding attributes as shown below:


               "Throttle" : 37,
               "Gear" : 2

. . .

As discussed before, MapR Streams allows processing of the same messages for different purposes or views. With this type of architecture and flexible schema, you can easily create new services and new APIs–for example, by adding Apache Spark Streaming or an Apache Flink Kafka Consumer for in-stream analysis, such as aggregations, filtering, alerting, anomaly detection, and/or machine learning.


Processing the Data with Apache Flink

Apache Flink is an open source distributed data stream processor. Flink provides efficient, fast, consistent, and robust handling of massive streams of events as well as batch processing, a special case of stream processing. Stream processing of events is useful for filtering, creating counters and aggregations, correlating values, joining streams together, machine learning, and publishing to a different topic for pipelines.


The code snippet below uses the FlinkKafkaConsumer09 class to get a DataStream from the MapR Streams “sensors” topic. The DataStream is a potentially unbounded distributed collection of objects. The code then calculates the average speed with a time window of 10 seconds. 


Querying the Data with Apache Drill

Apache Drill is an open source, low-latency query engine for big data that delivers interactive SQL analytics at petabyte scale. Drill provides a massively parallel processing execution engine, built to perform distributed query processing across the various nodes in a cluster. 


With Drill, you can use SQL to query and join data from files in JSON, Parquet, or CSV format, Hive, and NoSQL stores, including HBase, MapR-DB, and Mongo, without defining schemas.

Below is an example query to ‘show all’ from the race car’s datastore :

FROM dfs.`/apps/racing/db/telemetry/all_cars`

And the results:


Below is an example query to show the average speed by car and race:

SELECT race_id, car, 
ROUND(AVG( `t`.`records`.`sensors`.`Speed` ),2) as `mps`,
ROUND(AVG( `t`.`records`.`sensors`.`Speed` * 18 / 5 ), 2) as `kph`
SELECT race_id, car, flatten(records) as `records`
FROM dfs.`/apps/racing/db/telemetry/all_cars`
) AS `t`
GROUP BY race_id, car


And the results:


All of the components of the use case architecture that we just discussed can run on the same cluster with the MapR Converged Data Platform.


You can download the code and instructions to run this example from here:

Editor's note: This blog post was originally shared in the Converge Blog on May 11, 217. Since the publication of this blog MapR Streams product name has been changed to MapR-ES.

Additional Resources

MapR Edge product information


MapR Streams Application Development

Getting Started with Apache Flink and MapR Streams

Big Data On The Road

NorCom selects MapR converged data platform for foundation of deep learning framework for Mercedes

Apache Drill