Skip navigation
All Places > The Exchange > Blog
1 2 3 Previous Next

The Exchange

123 posts

To learn how to create HDInsight Spark Cluster in Microsoft Azure Portal please refer to part one of my artcile. After creation of spark cluster named suketucluster.azurehdinsight.net, I have highlighted the URL of my Cluster.

Microsoft Azure

 

Microsoft Azure

 

A total of 4 nodes are created -- 2 Head Nodes and 2 Name Nodes -- for a total of 16 cores and an available total of a 60 cluster capacity; out of it 16 are used and 44 clusters remain for scaling up. You can also click and visit Cluster Dashboard, Ambari View and also you can scale the size of clusters.

Apache Ambari is for management and monitoring of Hadoop clusters in the form of WEB UI and REST services. Ambari is used to monitor the clusters and make changes in configuration. Apache  Ambari is used for provision, monitoring and managing the clusters in an easier way. Using Ambari you can manage central security setup and fully visibility into cluster health. Ambari Dashboard looks like below,

 

Microsoft Azure

 

Using Ambari Dashboard you can manage and configure services, hosts, alerts for critical conditions etc. Also many services are integrated using Ambari WEB UI. Below is Hive Query Editor through Ambari,

 

Microsoft Azure

 

You can write, run and process the Hive Query in Ambari WEB UI you can convert that result in to charts etc you can save queries manage history of queries etc.

 

Microsoft Azure

 

Above snapshot is a list of services available in Ambari and below is HDInsight SuketuSpark clients list.

 

Microsoft Azure

 

In the new browser you can type https://CLUSTERNAME.azurehdinsight.net/jupyter/tree or you can directly click on Jupyter Logo in azure portal to open Jupyter notebook. The Jupyter Notebook is a web application that allows you to create and share documents that contain live code, equations, visualizations and explanatory text. Uses include: data cleaning and transformation, numerical simulation, statistical modeling, machine learning and much more. Jupyter and zeppelin are two notepads integrated with hdinsight.

 

Microsoft Azure

You can use Jupyter notebook to run Spark SQL queries against the Spark cluster. HDInsight Spark clusters provide two kernels that you can use with the Jupyter notebook.

  • PySpark (for applications written in Python)
  • Spark (for applications written in Scala)

PySpark is the python binding for the Spark Platform and API and is not much different from the Java/Scala versions. Learning Scala is a better choice than python as Scala being a functional langauge makes it easier to paralellize code, which is a great feature if working with Big data.

Like Java, Scala is object-oriented, and uses a curly-brace syntax reminiscent of the C programming language. Unlike Java, Scala has many features of functional programming languages like Scheme, Standard ML and Haskell, including currying, type inference, immutability, lazy evaluation, and pattern matching.

When you type https://CLUSTERNAME.azurehdinsight.net/zeppelin or when you click on zeppelin icon in azure portal than zeppelin notepad will be open in new browser tab. Below is a snapshot of that.

 

Microsoft Azure

 

A Zeppelin is web-based notebook that enables interactive data analytics. You can make beautiful data-driven, interactive and collaborative documents with SQL, Scala and more.

Flexible searching and indexing for web applications and sites is almost always useful and sometimes absolutely essential. While there are many complex solutions that manage data and allow you to retrieve and interact with it through HTTP methods, ElasticSearch has gained popularity due to its easy configuration and incredible malleability.

Elasticsearch is an open-source search engine built on top of Apache Lucene, a full-text search-engine library.

 

Basic Crud
CRUD stands for create, read, update, and delete. These are all operations that are needed to effectively administer persistent data storage. Luckily, these also have logical equivalents in HTTP methods, which makes it easy to interact using standard methods. The CRUD methods are implemented by the HTTP methods POST, GET, PUT, and DELETE respectively.

In order to use ElasticSearch for anything useful, such as searching, the first step is to populate an index with some data. This process known as indexing.

 

Indexing
Documents are indexed—stored and made searchable—by using the index API.

In ElasticSearch, indexing corresponds to both “Create” and “Update” in CRUD – if we index a document with a given type and ID that doesn’t already exists it’s inserted. If a document with the same type and ID already exists, it’s overwritten.

 

From our perspectives as users of ElasticSearch, a document is a JSON object. As such a document can can have fields in the form of JSON properties. Such properties can be values such as strings or numbers, but they can also be other JSON objects.

 

In order to create a document, we make a PUT request to the REST API to a URL made up of the index name, type name and ID. That is: http://localhost:9200///[] and include a JSON object as the PUT data.

Index and type are required while the id part is optional. If we don’t specify an ID ElasticSearch will generate one for us. However, if we don’t specify an id we should use POST instead of PUT. The index name is arbitrary. If there isn’t an index with that name on the server already one will be created using default configuration.

 

As for the type name it too is arbitrary. It serves several purposes, including:

Each type has its own ID space.
Different types can have different mappings (“schema” that defines how properties/fields should be indexed).
Although it’s possible, and common, to search over multiple types, it’s easy to search only for one or more specific type(s).

 

 

Let’s index something! We can put just about anything into our index as long as it can be represented as a single JSON object. For the sake of having something to work with we’ll be indexing, and later searching for, movies. Here’s a classic one:

Sample JSON object

 

To index the above JSON object we decide on an index name (“movies”), a type name (“movie”) and an ID (“1”) and make a request following the pattern described above with the JSON object in the body.

A request that indexes the sample JSON object as a document of type ‘movie’ in an index named ‘movies’

Execute the above request using cURL or paste it into sense and hit the green arrow to run it. After doing so, given that ElasticSearch is running, you should see a response looking like this:

Response from ElasticSearch to the indexing request.

 

 

The request for, and result of, indexing the movie in Sense.

As you see, the response from ElasticSearch is also a JSON object. It’s properties describe the result of the operation. The first three properties simply echo the information that we specified in the URL that we made the request to. While this can be convenient in some cases it may seem redundant. However, remember that the ID part of the URL is optional and if we don’t specify an ID the _id property will be generated for us and its value may then be of great interest to us.

 

Related Article: Sort the Results Using a Sort Property

 

The fourth property, _version, tells us that this is the first version of this document (the document with type “movie” with ID “1”) in the index. This is also confirmed by the fifth property, “created”, whose value is true.

Now that we’ve got a movie in our index let’s look at how we can update it, adding a list of genres to it. In order to do that we simply index it again using the same ID. In other words, we make the exact same indexing request as as before but with an extended JSON object containing genres.

Indexing request with the same URL as before but with an updated JSON payload.

 

This time the response from ElasticSearch looks like this:

The response after performing the updated indexing request.

 

 

Not surprisingly the first three properties are the same as before. However, the _version property now reflects that the document has been updated as it now has 2 a version number. The created property is also different, now having the value false. This tells us that the document already existed and therefore wasn’t created from scratch.

 

It may seem that the created property is redundant. Wouldn’t it be enough to inspect the _-
version property to see if its value is greater than one? In many cases that would work. However,
if we were to delete the document the version number wouldn’t be reset meaning that if we later
indexed a document with the same ID the version number would be greater than one.

 

So, what’s the purpose of the _version property then? While it can be used to track how many
times a document has been modified it’s primary purpose is to allow for optimistic concurrency
control.

 

If we supply a version in indexing requests ElasticSearch will then only overwrite the document
if the supplied version is the same as for the document in the index. To try this out add a version
query string parameter to the URL of the request with “1” as value, making it look like this:

Indexing request with a ‘version’ query string parameter.

 

Now the response from ElasticSearch is different. This time it contains an error property with a message explaining that the indexing didn’t happen due to a version conflict.

 

Response from ElasticSearch indicating a version conflict.

 

Getting by ID
We’ve seen how to indexing documents, both new ones and existing ones, and have looked at how ElasticSearch responds to such requests. However, we haven’t actually confirmed that the documents exists, only that ES tells us so.

So, how do we retrieve a document from an ElasticSearch index? Of course we could search for it. However that’s overkill if we only want to retrieve a single document with a known ID. A simpler and faster approach is be to retrieve it by ID.

 

In order to do that we make a GET request to the same URL as when we indexed it, only this time the ID part of the URL is mandatory. In other words, in order to retrieve a document by ID from ElasticSearch we make a GET request to HTTP://LOCALHOST:9200///. Let’s try it with our movie using the following request:

 

 


As you can see the result object contains similar meta data as we saw when indexing, such as index, type and version. Last but not least it has a property named _source which contains the actual document body. There’s not much more to say about GET as it’s pretty straightforward. Let’s move on to the final CRUD operation.

 

Deleting documents
In order to remove a single document from the index by ID we again use the same URL as for indexing and retrieving it, only this time we change the HTTP verb to DELETE.

 

Request for deleting the movie with ID 1.

 

curl -XDELETE “http://localhost:9200/movies/movie/1“

 

The response object contains some of the usual suspects in terms of meta data, along with a property named “_found” indicating that the document was indeed found and that the operation was successful.

Response to the DELETE request.

{
"found": true,
"_index": "movies",
"_type": "movie",
"_id": "1",
"_version": 3
}
If we, after executing the DELETE request, switch back to GET we can verify that the document has indeed been deleted:

Response when making the the DELETE request a second time.

{
"_index": "movies",
"_type": "movie",
"_id": "1",
"found": false
}

MichaelSegel

Simple Spark Tips #1

Posted by MichaelSegel Oct 19, 2017

Many developers are switching over to using Spark and Spark.SQL as a way to ingest and use data.  As an example, you could be asked to take a .csv file and convert it in to a parquet file or even a Hive or MapR-DB table.

 

With spark, its very easy to do this... you just load the file in to a DataFrame/DataSet and then write the file out as a parquet file and you're done. The code to create the DataFrame:

val used_car_databaseDF = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load(used_cars_databaseURL);

where the used_cars_databaseURL is a String of the path to the file that I had created earlier in my code.

 

But suppose you want to work with the data as a SQL table? Spark allows you to create temporary tables/views of the data and rename the DataFrame (DF) into a table 'alias'. 

used_car_databaseDF.createOrReplaceTempView("used_cars");

Here I've created a table/view used_cars where I can now use this in a Spark.sql command.

spark.sql("SELECT COUNT(*)  FROM used_cars ").show();

Obviously this is just a simple example just to show that you can run a query and see its output.

If you're working with a lot of different tables, its easy to lose tract of the tables/views that you've created.

 

But spark does have a couple of commands which will allow you to view the list of tables that you have already set up for use.  The spark.catalog .  Below is some sample code I pulled from my notebook where I have been experimenting with using Spark and MapR

import org.apache.spark.sql.catalog
import org.apache.spark.sql.SparkSession

spark.catalog
spark.catalog.listTables.show
//Note: We need to look at listing columns for each table...
spark.catalog.listColumns("creditcard").show // Another test table

// Now lets try to run thru catalog
println("Testing walking thru the table catalog...")
val tableDS = spark.catalog.listTables
println("There are "+ tableDS.count + " rows in the catalog...")

tableDS.printSchema // Prints the structure of the objects in the dataSet

tableDS.collect.foreach{
    e => println(e.name) }

// Now trying a different way...

spark.catalog.listTables.collect.foreach{
    e =>
    val n = e.name
    println("Table Name: "+n)
    spark.catalog.listColumns(n).collect.foreach{ e => println("\t"+e.name+"\t"+e.dataType) }
}
   

Note: While I am not sure if I needed to pull in org.apache.spark.sql.SparkSession , it doesn't seem to hurt.

 

In the sample code, I use the method show() which formats the output and displays it.  However, show() is limited to only the first 20 rows of output, regardless of the source. This can be problematic, especially if you have more than 20 temp tables in your session, or that there are more than 20 columns when we inspect a table.

For more information on the spark catalog, please see:

https://spark.apache.org/docs/2.1.1/api/scala/index.html#org.apache.spark.sql.catalog.Catalog 

listTables() is a method that returns a list of Table objects that describe the temporary view that I have created.

 

As I said earlier, while show() will give you a nice formatted output, it limits you to the first 20 rows of output.

So lets instead printout our own list of tables.  I used the printSchema() method to identify elements of the Table object. The output looks like this:

root
|-- name: string (nullable = true)
|-- database: string (nullable = true)
|-- description: string (nullable = true)
|-- tableType: string (nullable = true)
|-- isTemporary: boolean (nullable = false)

For my first example, I'm walking through the list of tables and printing out the name of the table.

 

In my second example, foreach table I want to print out the table's schema. (In this example, only the column name and its data type.  This works well when you have more than 20 tables and you have more than 20 columns in a table.

 

If we put this all together, we can load a file, apply filters, and then store the data.  Without knowing the schema, its still possible to determine the data set's schema and then use that information to build out a schema to dynamically create a hive table or to put the data in to a MapR-DB table.

MichaelSegel

Securing Zeppelin

Posted by MichaelSegel Oct 5, 2017

Zeppelin is an Apache open source Notebook that supports multiple interpreters and seems to be one of the favorites for working with spark.

 

Zeppelin is capable of running on a wide variety of platforms so that it’s possible to run tests and perform code development away from working on a cluster or on a cluster.

 

As always, in today’s world, it is no longer paranoia to think about securing your environment.   This article focuses on the setup to secure Zeppelin and is meant as a supplement focusing on adding security.

 

Zeppelin itself is easy to install. You can ‘wget’ or download the pre-built binaries from the Apache Zeppelin site, follow the instructions and start using it right away.

 

The benefits are that you can build a small test environment on your laptop without having to work on a cluster. This reduces the impact on a shared environment.

 

However, its important to understand how to also set up and secure your environment.

 

Locking down the environment

 

Running Local

Zeppelin can run local on your desktop platform. In most instances your desktop doesn’t have a static IP address.   In securing your environment, you will want to force Zeppelin to only be available to the localhost/127.0.0.1 environment.

 

In order to set this up, two entries in the $ZEPPELIN_HOME/conf/zeppelin-site.xml file have to be set.

 

<property>

<name>zeppelin.server.addr</name>

<value>127.0.0.1</value>

<description>Server address</description>

</property>

 

<property>

<name>zeppelin.server.port</name>

<value>9080</value>

<description>Server port.</description>

</property>

 

Setting the zeppelin.server.addr to only listen on the localhost address will mean that no one from the outside of the desktop will be able to access the Zeppelin service.

 

Setting the zeppelin.server.port to a value other than the 8080 the default is done because that port is the default for many services. By going to a different and unique port you can keep this consistent between the instances on your desktop and on a server. While this isn’t necessary, it does make life easier.

 

Beyond those two settings, there isn’t much else that you need to change.

Notice that there are properties that allow you to set up SSL tickets. While the documentation contains directions on how to set up SSL directly, there is a bug where trying to run with pkcs12 certificates causes an error. In trying to follow up, no resolution could be found. The recommendation is to use a proxy server nginx for managing the secure connection. (More on this later.)

 

Since the only interface is on the 127.0.0.1 interface, SSL really isn’t required.

 

The next issue is that by default, there is no user authentication. Zeppelin provides this through Shiro. From the Apache Shiro website:

Apache Shiro™ is a powerful and easy-to-use Java security framework that performs authentication, authorization, cryptography, and session management. With Shiro’s easy-to-understand API, you can quickly and easily secure any application – from the smallest mobile applications to the largest web and enterprise applications.

(see: https://shiro.apache.org/)


While it may be ok to run local code as an anonymous user, its also possible for Zeppelin to run locally, yet access a cluster that is maintained remotely which may not accept anonymous users.

 

In order to setup shiro, just copy the shiro.ini.template to shiro.ini.

Since my SOHO environment is rather small and limited to a handful of people, I am not yet running LDAP. (Maybe one day … ) So I practice the K.I.S.S principle. The only thing need from shiro is to set up local users.

[users]

# List of users with their password allowed to access Zeppelin.

# To use a different strategy (LDAP / Database / ...) check the shiro doc at http://shiro.apache.org/configuration.html#Configuration-INISections

#admin = password1, admin

#user1 = password2, role1, role2

#user2 = password3, role3

#user3 = password4, role2

If you find the [users] section, you’ll notice that the list of admin and userX entries are not commented out. Any entry in the form of <user> = <password>, <role> [ , <role2>, <role3> … ]   will be active. So you really don’t want to have an entry here that will give someone access.

 

So you will need to create an entry for yourself and other users.

 

Note: There are other entries below this section. One section details where you can use LDAP for authenticating users. If you are running LDAP, it would be a good idea to set this up to use LDAP.

 

Once you’ve made and saved the changes, that’s pretty much it. You can start/restart the service and you will authenticate against the entry in shiro.

 

Note the following:

While attempting to follow the SSL setup, I was directed to a stack overflow conversation on how to accomplish this. IMHO, it’s a major red flag when the documentation references a stack overflow article on how to setup and configure anything.

Running on a Server


Suppose you want to run Zeppelin on your cluster? Zeppelin then becomes a shared resource. You can set Zeppelin to run spark contexts per user or per notebook instead of one instance for the entire service.

The larger issue is that you will need to use an external interface to gain access, and even if you’re behind a firewall, you will need to have SSL turned on. Because I want to be able to run notes from outside of my network, I have to have SSL in place.   As I alluded to earlier, the ability to configure SSLs from within Zeppelin wasn’t working and the only guidance was to instead set up a proxy using nginx. (This came from two reliable sources)

 

With nginx, you should use the same configuration that we have already set up. Zeppelin will only listen on the local host and rely on the proxy server to handle external connections. Since my linux server is sitting next to me, I have a monitor set up so I can easily test connections to the local host, ensuring that my zeppelin instance is up and running. I followed the same steps that I used to set up my desktop and it ran without a hitch.

 

Unlike the instructions for trying to set up SSL directly, the information found on the Zeppelin site was very helpful. You can find a link to it here:

https://zeppelin.apache.org/docs/0.7.3/security/authentication.html#http-basic-authentication-using-nginx

 

There are various ways of obtaining nginx, since I run Centos, I could have pulled down a version via yum, and of course if you run a different version of Linux, you can use their similar tool. Of course downloading it from the official site will get you the latest stable release.

While the documentation for setting up nginx w Zeppelin is better, there are still gaps… yet its still pretty straight forward.

 

Nginx installs in /etc/nginx directory. Under this directory, all of the configuration files are located in the ./conf.d directory.   I should have taken better notes, but going from memory, there was one file… the default configuration file. I suggest that you ignore the file and copy it in to another file name. I chose default.conf.ini . Based on the documentation, I was under the impression that nginx will look at all *.conf files for various setup data.

 

I then created a zeppelin.conf file and cut and pasted the section from the zeppelin documents.

 

upstream zeppelin {

   server [YOUR-ZEPPELIN-SERVER-IP]:[YOUR-ZEPPELIN-SERVER-PORT];   # For security, It is highly recommended to make this address/port as non-public accessible

}

 

# Zeppelin Website

server {

   listen [YOUR-ZEPPELIN-WEB-SERVER-PORT];

   listen 443 ssl;                                     # optional, to serve HTTPS connection

   server_name [YOUR-ZEPPELIN-SERVER-HOST];             # for example: zeppelin.mycompany.com

 

   ssl_certificate [PATH-TO-YOUR-CERT-FILE];           # optional, to serve HTTPS connection

   ssl_certificate_key [PATH-TO-YOUR-CERT-KEY-FILE];   # optional, to serve HTTPS connection

 

   if ($ssl_protocol = "") {

       rewrite ^ https://$host$request_uri? permanent; # optional, to force use of HTTPS

   }

 

   location / {   # For regular websever support

       proxy_pass http://zeppelin;

       proxy_set_header X-Real-IP $remote_addr;

       proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

       proxy_set_header Host $http_host;

       proxy_set_header X-NginX-Proxy true;

       proxy_redirect off;

       auth_basic "Restricted";

       auth_basic_user_file /etc/nginx/.htpasswd;

   }

 

   location /ws { # For websocket support

       proxy_pass http://zeppelin/ws;

       proxy_http_version 1.1;

       proxy_set_header Upgrade websocket;

       proxy_set_header Connection upgrade;

       proxy_read_timeout 86400;

   }

}

As you can see, the configuration is pretty straight forward. Note the comment in the upstream zeppelin section. This is why using the loopback / localhost interface is a good idea.

 

Since the goal of the use of nginx is to create a secure (SSL) interface to Zeppelin, we need to create a public/private key pair. A simple use of Google will turn up a lots of options. Note: If you don’t have OpenSSL already installed on your server, you should set it up ASAP.   Using OpenSSL, the following command works:

 

openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365

 

Note that this will create a 4096 bit key. That’s a bit of an overkill for this implementation however, the minimum key length for SSL connection these days is 2048 so its not really too long.

 

Note: If you use this command as is, you will be required to provide a simple password which is used to encrypt the key. The downside to that is that each time you want to start/stop the web service, you will be required to manually enter the passphrase. Using the –nodes option will remove this requirement, however the key is visible. You can change the permissions on the key file to control access.

 

For ngenix, I created the key pair in the ./conf.d directory and set their paths in the zeppelin.conf file.

 

After the edits, if you start the service, you’re up and running.

Well almost….

 

Further Tweaking

 

If you try to use the service, nginx asks your for a user name and password.

   location / {   # For regular websever support

       proxy_pass http://zeppelin;

       proxy_set_header X-Real-IP $remote_addr;

       proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

       proxy_set_header Host $http_host;

       proxy_set_header X-NginX-Proxy true;

       proxy_redirect off;

       auth_basic "Restricted";

       auth_basic_user_file /etc/nginx/.htpasswd;

   }

In this section, the auth_basic setting is set to “Restricted” indicating that a password check has to be performed.

The setting auth_basic_user_file is set to the path of the password file.

 

The instructions on how to set this up are also found within Zeppelin’s setup page.   While this is secure… you enter a password at the proxy before being able to use the proxy to the protected web service, does this make sense? You need a password to access a website that again asks you to log in before you can use it? Our goal in using nginx was to set up SSL so that any traffic between the client and the server is encrypted and not out in the open.   For our use case, it makes more sense that if you connect to the service that you want it to establish an SSL socket and then take you to your zeppelin service where you could then authenticate.

 

The simple fix is to set auth_basic to off.   This allows you to still authenticate the user without having to log in twice and your notebooks do not run as ‘anonymous’.

 

In Summary

 

Running Zeppelin out of the box with no security is not a good idea. This article helps to demonstrate some simple tweaks that help lock down your environment so that you can run Zeppelin on a desktop or connect to a service running on the edge of your cluster.


I am sure that there are other things that one could do to further lock down Zeppelin or tie in to your network. At a minimum you will want to authenticate your users via shiro as well as offer a SSL connection.

 

With Zeppelin up and running, now the real fun can begin.

Multi-cloud environments can be an effective way to hedge risk and enable future flexibility for applications. With a secure VPN in between two or more sites, you can leverage the global namespace and high availability features in MapR (like mirroring and replication) to drive business continuity across a wide variety of use cases.

 

In this tutorial, I will walk through the steps of how to connect Microsoft Azure and Amazon AWS cloud using an IPsec VPN, which will enable secure IP connectivity between the two clouds and serve as a Layer-3 connection you can then use to connect two or more MapR clusters.

 

Multi-Cloud Architecture with Site-to-Site VPN

 

Let's first take a look at the end result for which we're aiming.

 

 

On the left side of the below figure is an Azure setup with a single Resource Group (MapR_Azure). We'll set this up in the 'US West' zone. On the right side is an Amazon EC2 network in a VPC, which we will deploy in the Northern Virginia zone. This is an example of using geographically disperse regions to lessen the risk of disruptions to operations.  After the VPN is completed we can use MapR replication to ensure that data lives in the right place and applications can read/write to both sites seamlessly.

 

This "site-to-site" VPN will be encrypted and run over the open Internet, with a separate IP subnet in each cloud, using the VPN gateways to route traffic through a tunnel.  Note that this is a "Layer 3" VPN, in that traffic is routed between two subnets using IP forwarding.  It's also possible to do this at Layer 2, bridging Ethernet frames between the two networks, but I'll leave that for another post (or an exercise for the curious reader. )

 

Setting up the Azure Side

 

First, prepare the Resource Group; in our example, we called the group 'MapR_Azure.'

Select it, and find the 'Virtual Network' resource by selecting the '+' icon, then type 'virtual network' in the search box.

 

 

 

Select 'Resource Manager' as the deployment model and press 'Create.' We use the name 'clusternet' for the network.

 

We will create two subnets, one on each cloud side. On Azure, we create the address range of 10.11.0.0/16 and a single subnet of 10.11.11.0/24 within the range. We'll make the Azure and EC2 address prefixes 10.11 and 10.10, respectively, to make for easy identification and troubleshooting.

 

 

Create a public IP for the VPN connections. Select '+' to add a resource, then type 'public IP address' in the search box. Press 'Create,' then set up the IP as follows. Select 'Use existing' for the Resource Group, and keep 'Dynamic' for the IP address assignment. The name is 'AzureGatewayIP.'

 

 

Take note of this address; we will use it later.

 

Next, create a 'Virtual Network Gateway' the same way. This entity will serve as the VPN software endpoint.

 

 

Reference the public IP you created in the previous step (in our case, 'AzureGatewayIP').

 

Note the concept of a 'GatewaySubnet' here. This is a subnet that is to be used exclusively by the Azure VPN software. It must be within the configured address range, and you can't connect any other machines to it. Microsoft says "some configurations require more IP addresses to be allocated to the gateway services than do others."  It sounds a little mysterious, but allocating a /24 network seems to work fine for most scenarios.

 

Select 'clusternet' as the virtual network (what you created in the earlier step), use a Gateway subnet of 10.11.0.0/24, and use the 'AzureGatewayIP' address. This will create a new subnet entry called 'GatewaySubnet' for the 10.11.0.0/24 network.

 

For testing purposes, select the VpnGw1 SKU. This allows up to 650 Mbps of network throughput, which is more than enough to connect a couple of small clusters, but you can go up to 1.25 Gbps with the VpnGw3 SKU.

 

This may take up to 45 minutes (according to Microsoft) but it usually completes in a few minutes.

 

Setting up the AWS Side

 

We need to pause here to set up a few things on AWS. First, create a VPC in the AWS Console VPC Dashboard. Here we set the IPv4 address range as 10.10.0.0/16.

 

 

Navigate to 'Subnets,' and create a subnet in the VPC. Here we use 10.10.10.0/24.

 

 

Next, create an Internet gateway to connect our VPC to the internet.  This step is important (and easily overlooked), otherwise traffic cannot be routed in between the Elastic IP and the subnet we just created.

 

 

 

Select 'Attach to VPC,' and use the new VPC 'clusterVPC'.

 

 

 

Go back to the EC2 Dashboard and select 'Launch Instance.' We will create an Amazon Linux instance to maintain the VPN. Select the 'Amazon Linux' AMI, and configure the instance details as follows:

 

 

 

 

Be sure to select the 'clusterVPC' we just created and 'clustersubnet' for the subnet. Select 'Disable' for 'Auto-assign Public IP' because we want to use an Elastic IP that we will associate later.

 

Under the last step, select 'Edit Security Groups,' and then select 'Create a new security group.' Open the group to all traffic coming from the AzureGatewayIP we configured previously (in this case, 40.80.158.169). Also (optionally), add any rules that you need to connect to the instance via ssh.

 

 

Click on 'Launch,' and optionally create a new key pair or use an existing one for the instance.

 

While the instance launches, let's create an Elastic IP for the VPN endpoint. In the 'Network & Security' menu of the AWS Console, select 'Elastic IPs,' and then allocate an address.

 

 

Note this address (here, 34.231.217.197) for later.

 

Associate the address with the instance we just created.

 

 

Finalizing the connection on the Azure side

 

Let's return to the Azure setup, and use the information from AWS to complete the connection. Add a Local Network Gateway.

 

 

The term 'local' is a bit of a misnomer here because the other network is not a local one; it's another cloud network.  Microsoft uses the term 'local' to refer to an on-premise network that you might want to connect to Azure.  For 'IP address,' use the Elastic IP you created in the previous section. For 'Address space,' use the 10.10.10.0/24 range (the AWS subnet).

 

Next, add a Connection. Select 'Site-to-site' VPN, and fill in the remaining details for your Resource Group.

 

 

Select the AzureGateway we configured as well as the Local Network Gateway (AWSVPN). Enter a key that will be used for the session.

 

 

Now is a good time to launch an instance for testing. Type 'Ubuntu Server' into the search box, and select Ubuntu Server 14.04 LTS. Configure the instance details, size, and settings.

 

 

Under the last Settings window, configure the virtual network, subnet, and 'None' for a public IP address (we don't need one because the VPN will handle outbound/inbound connectivity). Select a new or existing network security group.

 

 

Finalizing the connection on the AWS side

 

It's a good time to make sure the subnet you created has a default route to the internet gateway. From the AWS Console, navigate to 'Route Tables' and find the subnet associated with your VPC, select the subnet and the 'Routes' tab, and add a default route:

 

 

Returning to AWS, ssh into the instance we just created and download/install strongswan along with some dependency packages.

 

sudo yum install gcc gmp-devel

wget https://download.strongswan.org/strongswan-5.6.0.tar.bz2

bzip2 -d strongswan-5.6.0.tar.bz2

tar xvf strongswan-5.6.0.tar

cd strongswan-5.6.0

./configure && make && sudo make install

 

This should install strongswan in /usr/local, where we will edit the configuration files.

 

Edit the file /usr/local/etc/ipsec.conf and add the following entry:

conn azure
   authby=secret
   type=tunnel
   leftsendcert=never
   left=10.10.10.222
   leftsubnet=10.10.10.0/24
   right=40.80.158.169
   rightsubnet=10.11.11.0/24
   keyexchange=ikev2
   ikelifetime=10800s
   keylife=57m
   keyingtries=1
   rekeymargin=3m
   compress=no
   auto=start

 

The 'left' and 'leftsubnet' options refer to the Amazon (local) side. Use the local private IP address and subnet. For the right side, use the AzureGatewayIP we configured (40.80.158.169) and the 'clusternet' subnet.

 

Finally, edit the file /usr/local/etc/ipsec.secrets, and add your shared secret key.

 

10.10.10.222 40.80.158.169 : PSK "testing123"

 

Start the VPN with:

 

sudo sudo /usr/local/sbin/ipsec start

 

You can run sudo tail -f /var/log/messages to check the status of the connection.

 

You should now be able to ping the Azure machine by running ping 10.11.11.4 (or the address of the single interface of that machine). You can check it on the Azure side by viewing the Connection:

 

 

If you see 'Connected' as above, congratulations: you have a working two-cloud environment!

 

Additional Notes

 

Here are a few other concerns to watch when embarking on a multi-cloud adventure.

 

Ingress and Egress Data Transfers

 

Inter-site bandwidth is something to consider in your plan. At the time of this writing, most use cases of data transfer in to EC2 are free, with some exceptions. Data transfer out is free to most other Amazon services, like S3 and Glacier, and also free up to 1GB/month to other internet-connected sites, but costs a small amount per GB after that.

 

Data transfer in Azure is similar: all inbound data transfers are free, and there is a schedule of costs for outgoing transfers to the internet and other Azure zones.

 

Bandwidth and Latency

 

Amazon has a page on how to check bandwidth. Doing some quick tests with iperf between the two sites, here are some typical results:

 

Accepted connection from 10.99.1.237, port 35688
[ 5] local 10.10.10.4 port 5201 connected to 10.99.1.237 port 35690
[ ID] Interval Transfer Bitrate
[ 5] 0.00-1.00 sec 36.7 MBytes 308 Mbits/sec
[ 5] 1.00-2.00 sec 39.8 MBytes 334 Mbits/sec
[ 5] 2.00-3.00 sec 42.1 MBytes 353 Mbits/sec
[ 5] 3.00-4.00 sec 39.7 MBytes 333 Mbits/sec
[ 5] 4.00-5.00 sec 30.5 MBytes 256 Mbits/sec
[ 5] 5.00-6.00 sec 30.0 MBytes 252 Mbits/sec
[ 5] 6.00-7.00 sec 30.9 MBytes 259 Mbits/sec
[ 5] 7.00-8.00 sec 36.7 MBytes 308 Mbits/sec
[ 5] 8.00-9.00 sec 41.5 MBytes 348 Mbits/sec
[ 5] 9.00-10.00 sec 37.0 MBytes 311 Mbits/sec
[ 5] 10.00-10.03 sec 977 KBytes 245 Mbits/sec
- - - - - - - - - - - - - - - - - - - - - - - - -
- [ ID] Interval Transfer Bitrate
- [ 5] 0.00-10.03 sec 366 MBytes 306 Mbits/sec receiver

 

That's some pretty hefty bandwidth (306 Mbits/sec) between the two sites.

 

Ready to setup a MapR cluster?  

 

By Ronald van Loon

 

Editor's Note: This post was originally published on LinkedIn on June 12, 2017.

The world is long past the Industrial Revolution, and now we are experiencing an era of Digital Revolution. Machine Learning, Artificial Intelligence, and Big Data Analysis are the reality of today’s world.

I recently had a chance to talk to Ciaran Dynes, Senior Vice President of Products at Talend and Justin Mullen, Managing Director at Datalytyx. Talend is a software integration vendor that provides Big Data solutions to enterprises, and Datalytyx is a leading provider of big data engineering, data analytics, and cloud solutions, enabling faster, more effective, and more profitable decision-making throughout an enterprise.

The Evolution of Big Data Operations

To understand more about the evolution of big data operations, I asked Justin Mullen about the challenges his company faced five years ago and why they were looking for modern integration platforms. He responded with, “We faced similar challenges to what our customers were facing. Before Big Data analytics, it was what I call ‘Difficult Data analytics.’ There was a lot of manual aggregation and crunching of data from largely on premise systems. And then the biggest challenge that we probably faced was centralizing and trusting the data before applying the different analytical algorithms available to analyze the raw data and visualize the results in meaningful ways for the business to understand.”

Picture1

He further added that, “Our clients not only wanted this analysis once, but they wanted continuous refreshes of updates on KPI performance across months and years. With manual data engineering practices, it was very difficult for us to meet the requirements of our clients, and that is when we decided we needed a robust and trustworthy data management platform that solves these challenges.”

The Automation and Data Science

Most of the economists and social scientists are concerned about the automation that is taking over the manufacturing and commercial processes. If the digitalization and automation continues to grow at the same pace it is currently happening, there is a high probability of machines partly replacing humans in the workforce. We are seeing some examples of the phenomena in our world today, but it is predicted to be far more prominent in the future.

However, Dynes says, “Data scientists are providing solutions to intricate and complex problems confronted by various sectors today. They are utilizing useful information from data analysis to understand and fix things. Data science is an input and the output is yielded in the form of automation. Machines automate, but humans provide the necessary input to get the desired output.”

This creates a balance in the demand for human and machine services. Both, automation and data science go parallel. One process is incomplete without the other. Raw data is worth nothing if it cannot be manipulated to produce meaningful results and similarly, machine learning cannot happen without sufficient and relevant data.

Start Incorporating Big Data and Machine Learning Solutions into Business Models

Dynes says, “Enterprises are realizing the importance of data, and are incorporating Big Data and Machine Learning solutions into their business models.” He further adds that, “We see automation happening all around us. It is evident in the ecommerce and manufacturing sectors, and has vast applications in the mobile banking and finance.”

When I asked him about his opinion regarding the transformation in the demand of machine learning processes and platforms, he added that, “The demand has always been there. Data analysis was equally useful five years ago as it is now. The only difference is that five years ago there was entrepreneurial monopoly and data was stored secretively. Whoever had the data, had the power, and there were only a few prominent market players who had the access to data.”

Justin has worked with different companies. Some of his most prominent clients were Calor Gas, Jaeger and Wejo. When talking about the challenges those companies faced before implementing advanced analytics or machine learning he said, “The biggest challenges most of my clients face was the accumulation of the essential data at one place so that the complex algorithms can be run simultaneously but the results can be viewed in one place for better analysis. The data plumbing and data pipelines were critical to enable data insights to become continuous rather than one-off.”

The Reasons for Rapid Digitalization

Dynes says, “We are experiencing rapid digitalization because of two major reasons. The technology has evolved at an exponential rate in the last couple of years and secondly, organization culture has evolved massively.” He adds, “With the advent of open source technologies and cloud platforms, data is now more accessible. More people have now access to information, and they are using this information to their benefits.”

In addition to the advancements and developments in the technology, “the new generation entering the workforce is also tech dependent. They rely heavily on the technology for their everyday mundane tasks. They are more open to transparent communication. Therefore, it is easier to gather data from this generation, because they are ready to talk about their opinions and preferences. They are ready to ask and answer impossible questions,” says Dynes.

Integrating a New World with the Old World

When talking about the challenges that companies face while opting for Big Data analytics solutions Mullen adds, “The challenges currently faced by industry while employing machine learning are twofold. The first challenge they face is related to data collection, data ingestion, data curation (quality) and then data aggregation. The second challenge is to combat the lack of human skills in data-engineering, advanced analytics, and machine learning”

“You need to integrate a new world with the old world.

The old world relied heavily on data collection in big batches

while the new world focuses mainly on the real-time data solutions”

Dynes says, “You need to integrate a new world with the old world. The old world relied heavily on data collection while the new world focuses mainly on the data solutions. There are limited solutions in the industry today that deliver on both these requirements at once right now.”

Picture3

He concludes by saying that, “The importance of data engineering cannot be neglected, and machine learning is like Pandora’s Box. Its applications are widely seen in many sectors, and once you establish yourself as a quality provider, businesses will come to you for your services. Which is a good thing.”

Follow Ciaran Dynes, Justin Mullen, and Ronald van Loon on Twitter and LinkedIn for more interesting updates on Big Data solutions and machine learning.

Additional Resources

By Rachel Silver

Are you a data scientist, engineer, or researcher, just getting into distributed processing and PySpark, and you want to run some of the fancy new Python libraries you've heard about, like MatPlotLib?

If so, you may have noticed that it's not as simple as installing it on your local machine and submitting jobs to the cluster. In order for the Spark executors to access these libraries, they have to live on each of the Spark worker nodes.

You could go through and manually install each of these environments using pip, but maybe you also want the ability to use multiple versions of Python or other libraries like pandas? Maybe you also want to allow other colleagues to specify their own environments and combinations?

If this is the case, then you should be looking toward using condas to provide specialized and personalized Python configurations that are accessible to Python programs. Conda is a tool to keep track of conda packages and tarball files containing Python (or other) libraries and to maintain the dependencies between packages and the platform.

Continuum Analytics provides an installer for conda called Miniconda, which contains only conda and its dependencies, and this installer is what we’ll be using today.

For this blog, we’ll focus on submitting jobs from spark-submit. In a later iteration of this blog, we’ll cover how to use these environments in notebooks like Apache Zeppelin and Jupyter.

Installing Miniconda and Python Libraries to All Nodes

If you have a larger cluster, I recommend using a tool like pssh (parallel SSH) to automate these steps across all nodes.

To begin, we’ll download and install the Miniconda installer for Linux (64-bit) on each node where Apache Spark is running. Please make sure, before beginning the install, that you have the bzip2 library installed on all hosts:


I recommend choosing /opt/miniconda3/ as the install directory, and, when the install completes, you need to close and reopen your terminal session.

If your install is successful, you should be able to run ‘conda list’ and see the following packages:

 

Miniconda installs an initial default conda, running Python 3.6.1. To make sure this installation worked, run a version command:

python -V Python 3.6.1 :: Continuum Analytics, Inc.

To explain what’s going on here: we haven’t removed the previous default version of Python, and it can still be found by referencing the default path: /bin/python. We’ve simply added some new Python packages, like Java alternatives, that we can point to while submitting jobs without disrupting our cluster environment. See:

/bin/python -V Python 2.7.5

Now, let’s go ahead and create a test environment with access to Python 3.5 and NumPy libraries.

First, we create the conda and specify the Python version (do this as your cluster user):

conda create --name mapr_numpy python=3.5

Next, let’s go ahead and install NumPy to this environment:

conda install --name mapr_numpy numpy

Then, let’s activate this environment, and check the Python version:

 

Please complete these steps for all nodes that will run PySpark code.

Using Spark-Submit with Conda

Let’s begin with something very simple, referencing environments and checking the Python version to make sure it’s being set correctly. Here, I’ve made a tiny script that prints the Python version:

 

Testing NumPy

Now, let’s make sure this worked!

I’m creating a little test script called spark_numpy_test.py, containing the following:

 

If I were to run this script without activating or pointing to my conda with NumPy installed, I would see this error:

 

In order to get around this error, we’ll specify the Python environment in our submit statement:

 

Now for Something a Little More Advanced...

This example of PySpark, using the NLTK Library for Natural Language Processing, has been adapted from Continuum Analytics.

We’re going to run through a quick example of word tokenization to identify parts of speech that demonstrates the use of Python environments with Spark on YARN.

First, we’ll create a new conda, and add NLTK to it on all cluster nodes:

conda create --name mapr_nltk nltk python=3.5 source activate mapr_nltk

Note that some builds of PySpark are not compatible with Python 3.6, so we’ve specified an older version.

Next, we have to download the demo data from the NLTK repository:

 

This step will download all of the data to the directory that you specify–in this case, the default MapR-FS directory for the cluster user, accessible by all nodes in the cluster.

Next, create the following Python script: nltk_test.py

 

Then, run the following as the cluster user to test:

 

Additional Resources

Editor's note: this blog post was originally published in the Converge Blog on June 29, 2017

By Carol McDonald

 

This post is the second in a series where we will go over examples of how MapR data scientist Joe Blue assisted MapR customers, in this case a regional bank, to identify new data sources and apply machine learning algorithms in order to better understand their customers. If you have not already read the first part of this customer 360° series, then it would be good to read that first. In this second part, we will cover a bank customer profitability 360° example, presenting the before, during and after.

Bank Customer Profitability - Initial State

The back story: a regional bank wanted to gain insights about what’s important to their customers based on their activity with the bank. They wanted to establish a digital profile via a customer 360 solution in order to enhance the customer experience, to tailor products, and to make sure customers have the right product for their banking style.


As you probably know, profit is equal to revenue minus cost. Customer profitability is the profit the firm makes from serving a customer or customer group, which is the difference between the revenues and the costs associated with the customer in a specified period.

Banks have a combination of fixed and variable costs. For example, a building is a fixed cost, but how often a person uses an ATM is a variable cost. The bank wanted to understand the link between their product offerings, customer behavior, and customer attitudes toward the bank in order to identify growth opportunities.

Bank Customer Profitability - During

The bank had a lot of different sources of customer data:

  • Data warehouse account information.
  • Debit card purchase information.
  • Loan information such as what kind of loan and how long it has been open.
  • Online transaction information such as who they’re paying, how often they’re online, or if they go online at all.
  • Survey data.

Analyzing data across multiple data sources

This data was isolated in silos, making it difficult to understand the relationships between the bank’s products and the customer’s activities. The first part of the solution workflow is to get the data into the MapR Platform, which is easy since MapR enables you to mount the cluster itself via NFS. Once all of the data is on the MapR Platform, the data relationships can be explored interactively using the Apache Drill schema-free SQL query engine.

A key data source was a survey that the bank had conducted in order to segment their customers based on their attitudes toward the bank. The survey asked questions like, “Are you embracing new technologies?” and “Are you trying to save?” The responses were then analyzed by a third party in order to define four customer personas. But the usefulness of this survey was limited because it was only performed on 2% of the customers. The question for the data scientist was, “How do I take this survey data and segment the other 98% of the customers, in order to make the customer experience with the bank more profitable?”

Feature extraction

Feature engineering is the process of transforming raw data into inputs for a machine learning algorithm. With data science you often hear about the algorithms that are used, but actually a bigger part—consuming about 80% of a data scientist’s time—is taking the raw data and combining it in a way that is most predictive.

The goal was to find interesting properties in the bank’s data that could be used to segment the customers into groups based on their activities. A key part of finding the interesting properties in the data was working with the bank’s domain experts, because they know their customers better than anyone.

Apache Drill was used to extract features such as:

  • What kind of loans, mix of accounts, and mix of loans does the customer have?
  • How often does the customer use a debit card?
  • How often does the customer go online?
  • What does the customer buy? How much do they spend? Where do they shop?
  • How often does the customer go into the branches?

Accentuate business expertise with new learning

After the behavior of the customers was extracted, it was possible to link these features by customer ID with the labeled survey data, in order to perform machine learning.

The statistical computing language R was used to build segmentation models using many machine learning classification techniques. The result was four independent ensembles, each predicting the likelihood of belonging to one persona, based on their banking activity.

The customer segments were merged and tested with the labeled survey data, allowing to link the survey “customer attitude” personas with their banking actions and provide insights.

Banking Customer 360° - After

The solution results of modeling with the customer data are displayed in the Customer Products heat map below. Each column is an “attitude”-based persona, and each row is a type of bank account or loan. Green indicates the persona is more likely to have this product, and red indicates less likely.

This graph helps define these personas by what kinds of products they like or don’t like.

This can give insight into:

  • How to price some products
  • How to generate fees
  • Gateway products, which allow to go from a less profitable customer segment to a more profitable one.

In the Customer Payees heat map below, the rows are electronic payees. This heat map shows where customer personas are spending their money, which can give channels for targeting and attracting a certain persona to grow your business.

In this graph, the bright green blocks show that Fitness club A, Credit card/Bank A, and Credit card/Bank C are really strong for Persona D. Persona A is almost the opposite of the other personas. This “customer payees” heat map gives a strong signal about persona behavior. It’s hard to find signals like this, but they provide an additional way to look at customer data in a way that the bank could not conceive of before.

Bank Customer 360° Summary

In this example, we discussed how data science can link customer behavioral data with a small sample survey to find customer groups who share behavioral affinities, which can be used to better identify a customer base for growth. The bank now has the ability to project growth rates based on transition between personas over time and find direct channels that allow them to target personas through marketing channels. 

 

Editor's Note: this post was originally published in the Converge Blog on December 08, 2016. 

 

Related Resources:

How to Use Data Science and Machine Learning to Revolutionize 360° Customer Views 

Customer 360: Knowing Your Customer is Step One | MapR 

Applying Deep Learning to Time Series Forecasting with TensorFlow 

Churn Prediction with Apache Spark Machine Learning 

Apache Drill Apache Spark

By Carol McDonald

 

There is more and more data that is available that can help inform businesses about their customers, and those businesses that successfully utilize these new sources and quantities of data will be able to provide a superior customer experience. However, predicting customer behavior remains very challenging. This post is the first in a series where we will go over examples of how Joe Blue, a Data Scientist in MapR Professional Services, assisted MapR customers in identifying new data sources and applying machine learning algorithms in order to better understand their customers. The first example in the series is an advertising customer 360°; the next blog post in the series will cover banking and healthcare customer 360° examples.

Customer 360° Revolution: Before, During, and After

Icon_Use-Cases_Gray_Customer360_v1_300x300px_wht-bk (1).png

Before

MapR works with companies who have solved business problems but are limited with what they can do with their data, and they are looking for the next step. Often, they are analyzing structured data in their data warehouse, but they want to be able to do more.

After

The goal of the customer 360° revolution is to transform your business by figuring out what customers are going to do—if you can predict what your customer is going to do next, then you can add value to your business.

During = Data Scientist

There is a lot of confusion about what a data scientist does. A data scientist is someone who can draw the lines between the before and after, and this involves:

  1. Identifying new data sources, which traditional analytics or databases are not using due to the format, size, or structure.
  2. Collecting, correlating, and analyzing data across multiple data sources.
  3. Knowing and applying the right kind of machine learning algorithms to get value out of the data.

The goal of the customer 360° revolution is generally to:

  • Find additional information
  • Accentuate business expertise with new learning
  • Use new learning to change your business

Use Case: Advertising Customer 360° Before

An example use case for this customer 360° is a mobile banking web site which displays ads related to what the customer bought with his/her debit card. A “card-to-link” company hosts the ads and determines which ad to display. The advertisers are pharmacies, restaurants, grocery stores, etc., and they want to get their ads to the people who would be interested in buying their products. In order to target ads to interested customers, you need to accurately predict the likelihood that a given ad will be clicked, also known as "click-through rate" (CTR).

The “card-to-link” company was already capable of displaying ads related to purchases, but they wanted to do a better job of targeting and measuring the success of their ads.

As an example, let’s take a look at a new campaign for Chris’s Crème doughnuts:

  • Chris’s Crème asks the “card-to-link” company to find people who like doughnuts but who are not already their customers.
  • Card-to-link first uses data warehouse debit card merchant information to find Dunkin Doughnut customers and customers of other quick-service restaurants that sell breakfast.
  • Card-to-link also uses customer zip code information to only display Chris’s Crème ads to customers who live near a Chris’s Crème location.

This works great, but what if this method increased new customers to 600,000, but Chris’s Crème wanted to pay more to increase the number of new customers to 1,000,000? There is no way to do this with just the existing data warehouse data.

Advertising Customer 360° During Part 1

The first question for the data scientist is, "What kind of data does card-to-link have that they could use to augment new Chris’s Crème customers from 600,000 to 1,000,000, and therefore maximize the profit of this campaign?"

The New Data

The new data comes from Internet browsing history; it consists of billions of device IDs and the keyword content of the browsing history for that ID.

So how can the data scientist take this new data and find a bigger audience?

Advertising Customer 360° During Part 2

Get the Data on the MapR Platform (NFS)

The first part of the solution workflow is to get the data on the MapR Platform, which is easy via the Network File System (NFS) protocol. Unlike other Hadoop distributions that only allow cluster data import or import as a batch operation, MapR enables you to mount the cluster itself via NFS and the MapR File System enables direct file modification and multiple concurrent reads and writes via POSIX semantics. An NFS-mounted cluster allows easy data ingestion from other machines leveraging standard Linux commands, utilities, applications, and scripts.

The complete customer purchase and past campaign history data is exported from the traditional data warehouse and put on the MapR Platform as Parquet tables. The Internet browsing history is put on the MapR platform as text documents.

Once the browsing, campaign and purchase history data is on the MapR platform, Apache Drill is used for interactive exploration and preprocessing of the data with a schema-free SQL query engine.

Feature Extraction

Features are the interesting properties in the data that you can use to make predictions.

Feature engineering is the process of transforming raw data into inputs for a machine learning algorithm. In order to be used in Spark machine learning algorithms, features have to be put into Feature Vectors, which are vectors of numbers representing the value for each feature. To build a classifier model, you extract and test to find the features of interest that most contribute to the classification.

Apache Spark for text Analytics

The TF-IDF (Term Frequency–Inverse Document Frequency) function in Spark MLlib can be used to convert text words into feature vectors. TF-IDF calculates the most important words in a document compared to a collection of documents. For each word in a collection of documents, it computes:

  • Term Frequency is the number of times a word occurs in a specific document.
  • Document Frequency is the number of times a word occurs in a collection of documents.
  • TF * IDF measures the significance of a word in a document (the word occurs a lot in that document, but is rare in the collection of documents).

For example, if you had a collection of documents about football, then the word concussion in a document would be more relevant for that document than the word football.

Machine Learning and Classification

Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (such as whether a customer likes doughnuts or not), based on labeled data (such as purchase history). Classification takes a set of data with labels and features and learns how to label new records based on that information. In this example, the purchase history is used to label customers who bought doughnuts. The browsing history of millions of text keywords, many of which have seemingly nothing to do with doughnuts, is used as the features to discover similarities and categorize customer segments.

  • Label → bought doughnuts → 1 or 0
  • Features → browsing history → TF-IDF features

Machine Learning

Once the browsing and purchase history data is represented as labeled feature vectors, Spark machine learning classification algorithms such as logistic regression, decision trees, and random forests can be used to return a model representing the learning decision for that data. This model can then be used to make predictions on new data.

After the feature extraction and model building, it is possible to use the model to rank the billions of device IDs, from highest to lowest, who are most likely to eat doughnuts. With that, the audience of potential doughnut eaters can be augmented to the goal of one million.

Advertising Customer 360° After

Each colored line in the graph below refers to the results of modeling with data sets from different categories of advertising campaigns: Quick Serve Restaurant (QSR), Full Serve Restaurant, Light Fare, Auto, Grocery, and Apparel. For each campaign:

1. The purchase history was used to label the population:

  • For the grocery campaign, the following modeling populations were created:

  • Those who shopped at grocery stores multiple times

  • Those who bought their food at other stores

  • For the Full Serve restaurant campaign, the following modeling populations were created:

  • Yes: had at least 2 visits to a full serve restaurant in the last 2 months (90th percentile)

  • No: had at least 18 visits to a quick serve restaurant in last 2 months (90th percentile)

  • Excluded those who were in both (small percentage)

2. The browsing history from the labeled populations was used for the features.

3. The classification model was built from the labeled features.

4. The model was used to predict the Click-Through Rate.

The left side of this graph is the percentage of people visiting the banking web site who clicked on an advertisement. The right side is the predicted probability that a person would click on the ad. If the colored lines are continuously increasing from left to right, that means the machine learning model worked well at predicting the click-through rates.

For full-service restaurants (the yellow line), the probability of clicking tracks well with the click-through rate. At a low probability, the click-through rate was 30%, and at a high probability the model found populations with a 70% click-through rate. So the model really responded to the method of finding similar customer profiles using their Internet browsing history.

Some campaigns, like grocery, did not work. This shows that the browsing history does not really apply to increased click-through rates for grocery ads, but the click-to-link company did not know this. These are insights that can be used, for example, if click-to-link has a new automotive, QSR, or full-serve campaign. This is a method that works well and can be used for advertisers that will pay to target more customers.

Advertising Customer 360° Summary

In this example, we discussed how data science can use customer behavioral data to find customer groups who share behavioral affinities in order to better target advertisements. For more information review the following resources.

Want to learn more?

 

Editor's note: This blog post was originally published in the Converge Blog on August 08, 2016. 

By Mathieu Dumoulin

 

Modern Open Source Complex Event Processing For IoT

This series of blog posts details my findings as I bring to production a fully modern take on Complex Event Processing, or CEP for short. In many applications, ranging from financials to retail and IoT applications, there is tremendous value in automating tasks that require to take action in real time. Putting aside the IT system and frameworks that would support this capability, this is clearly a useful capability.

In the first post of the series, I explain how CEP has evolved to meet this requirement and how the requirements for CEP can be met in a modern big data context. In short, I present an approach based on the best practices of modern architecture, microservices, and Kafka-style stream messaging, with an up-to-date open source business rule engine.

In this second part, I’ll get more concrete and work through a working example using the system I propose. Let’s get started.

Smart City Traffic Monitoring

We made a working demo for which the code will be released on the MapR GitHub. It is made to work on either the MapR Sandbox or using a real MapR cluster. Our demo is planned to be released, so please stay tuned and check www.mapr.com for an annoucement.

For this example, we’ll use a very simple “smart city” use case for traffic monitoring. In this case, we’ll model a single sensor that can measure the speed of cars that pass on it. Using such sensor data, it would be fairly easy to detect traffic jams in real time, and thus notify the police to take action much more quickly than they otherwise could.

Some other types of use cases are easy to envision. We can add data from a public, real-time weather feed and connect to information panels along the roads and show an advisory to drivers without any human intervention. By combining road condition sensors with the weather data, we can provide advisory feedback to drivers about road conditions and warn the public very quickly. Furthermore, by adding historical accident data and using predictive analytics, we can imagine road safety measures that can be deployed in real time to areas with a higher probability of accidents.

To be honest, I’m only scratching the surface of what this kind of smart road could do to make a commuter’s life both easier and safer while saving money for the city. But how would we build a prototype of such a system without it being hugely complicated and expensive?

How to Build a Rule-Engine Based CEP System

So we now have a proper, concrete target in mind. It turns out that if we decide to base our system around reading our data from a Kafka-style stream (i.e., persistent, scalable, and high performance), then we will naturally end up with a pretty cool, modern CEP microservice.

The important point here is not to show how to build a super complicated enterprise architecture, but rather that by making some simple technology choices and building our demo in a reasonable way, we naturally end up with this elegant, modern, and simple architecture.

For simplicity’s sake, I have decided to implement my prototype on a MapR Sandbox (get it free here). This is because it will include the stream messaging system, MapR Streams, which I can use through the Kafka 0.9 API with very little configuration and know it will work the same on a production MapR 5.1+ cluster.

Finally, it should be noted that an Apache Kafka cluster could be used as well with the same design and code, just with some additional work to get it up and running.

High-Level Architecture View

As shown in the diagram above, the flow is to have sensor data get aggregated to a producer gateway, which will forward the data to the stream with topic named “data.” The data will be in JSON format so it is easy to manipulate, human readable, and easily sent to Elasticsearch as-is for monitoring with a Kibana dashboard.

The consumer side will have two tasks: to read the data from the stream and host an instance of a KieSession where the rule engine can apply rules on the facts as they are added to it.

Rules are edited in the Workbench GUI, a Java webapp which can be run on a Java application server such as WildFly or Tomcat.

Rules are fetched from the workbench by the consumer application using the appropriate methods provided by the Drools framework, which are entirely based on Maven Repository.

We can now look at the proposed system in more detail in the following sections.

List of Technologies Used

The technology we’re going to use is as follows:

  • MapR Sandbox 5.2
  • The Java programming language (any JVM language is fine)
    • The Jackson 2 library to convert to and from JSON
  • MapR Streams or Apache Kafka stream messaging system
  • Wildfly 10 application server to host the Workbench
  • JBoss Drools as our choice of OSS business rule engine
  • Log Synth to generate some synthetic data for our prototype
  • Streamsets 1.6 to connect MapR Streams and Elasticsearch
  • Elasticsearch 2.4 and Kibana 4 for monitoring

Traffic Monitoring Prototype Architecture

As I built this demo to run on the MapR Sandbox, I’m using instructions for CentOS 6.X, an open source version of RHEL 6.X. Instructions for CentOS 7 are almost identical, and finding similar instructions for Ubuntu would be pretty straightforward and left up to the reader.

To build the core of the traffic monitoring system, we’re going to need two basic parts:

  • A program to feed sensor data into MapR Streams/Kafka. This part will be using fake data modeled by a vehicle simulation coded with Log Synth. We’ll use the MapR Kafka-rest proxy implementation (just introduced with MEP 2.0) to add the data with Python.
  • A JVM-language application that will read data from the stream and pass it to a KieSession. The minimal code to get this working is surprisingly small.

To edit the rules, we deploy the Workbench on Wildfly 10, which is a fairly straightforward process. Check this blog post for instructions, or read the Drools Documentation. Installing Wildfly is pretty simple; see this blog for great instructions on how to install it as a service on Centos/RHEL (it’s for Wildfly, but the same instructions work for 9 and 10).

We made a single configuration change to Wildfly. We changed the port to 28080 instead of 8080, as it is already used by the sandbox. Wildfly runs in standalone mode, so the configuration file is in WILDFLY_HOME/standalone/configuration/standalone.xml.

For monitoring, we let the streaming architecture work for us. We use the open source Streamset Data collector to easily redirect the sensor data to Elasticsearch so that we can actually monitor the system with a nice dashboard with Kibana. To set up Streamsets with MapR Streams requires some work with version 1.6 (great blog post about it here, or from the official Streamsets documentation).

Finally, installation and setup of Elasticsearch and Kibana is well documented on Centos/RHEL.

For production, all those parts can be readily separated to run on separate servers. They can be run on either cluster nodes or edge nodes. If it’s a MapR cluster, installing the MapR Client and pointing it to the cluster CLDB nodes will be all the configuration needed for full access to the streams. For an Apache Kafka cluster, refer to the official Kafka documentation.

Traffic Monitoring Prototype - How To

Creating the Streams with maprcli

The first task is to create streams and topics for our application. To do this, it’s a best practice to create a volume for streams first. As user mapr, type from the command line:

maprcli volume create -name streams -path /streams maprcli stream create -path /streams/traffic -produceperm p -consumerperm p maprcli stream topic create -path /streams/traffic -topic data maprcli stream topic create -path /streams/traffic -topic agenda maprcli stream topic create -path /streams/traffic -topic rule-runtime

Note: MapR Streams is more of a superset of Kafka than simply a clone. In addition to being faster, MapR Streams can use all the advantages of the MapR File System, such as volumes (with permissions and quotas and so on) and replication. A cluster is not limited to just defining topics, but can define several streams which each can have several topics. Therefore, instead of a topic name, a MapR Stream has a path:topic notation. Here, our data stream’s full name is “/streams/traffic:data”. You can read more about the advantages of MapR Streams vs. Kafka in this whiteboard walkthrough by Jim Scott.

Generating Fake Data

I used the Log-Synth tool to generate data for this prototype. Log-Synth uses a schema combined with a Sampler class to generate data in a very flexible and simple manner.

My Schema:

 

[    {"name":"traffic", "class":"cars", "speed": "70 kph", "variance": "10 kph", "arrival": "25/min", "sensors": {"locations":[1, 2, 3, 4, 5, 6, 7,8,9,10], "unit":"km"},      "slowdown":[{"speed":"11 kph", "location":"2.9 km - 5.1 km", "time": "5min - 60min"}]}   ]

 

The command to generate the data is:

synth -count 10K -schema my-schema.json >> output.json

Data is generated one car at a time, and each data point is a reading at a sensor. The data will model a flow of cars driving at 70 km/h, arriving at a rate of 25 cars per minute. A slowdown will happen between km 2.9 and 5.1 where speed will be reduced to 11km/h 5 minutes to 60 minutes after the start of the simulation. This will be the traffic jam we wish to detect using our CEP system.

The generated data is a file where each line is the resulting list of sensor measurements for a single car:

[{"id":"s01-b648b87c-848d131","time":52.565782936267404,"speed":19.62484385513174},{"id":"s02-4ec04b36-2dc4a6c0","time":103.5216023752337,"speed":19.62484385513174},{"id":"s03-e06eb821-cda86389","time":154.4774218142,"speed":19.62484385513174},{"id":"s04-c44b23f0-3f3e0b9e","time":205.43324125316627,"speed":19.62484385513174},{"id":"s05-f57b9004-9f884721","time":256.38906069213255,"speed":19.62484385513174},{"id":"s06-567ebda7-f3d1013b","time":307.3448801310988,"speed":19.62484385513174},{"id":"s07-3dd6ca94-81ca8132","time":358.3006995700651,"speed":19.62484385513174},{"id":"s08-2d1ca66f-65696817","time":409.25651900903136,"speed":19.62484385513174},{"id":"s09-d3eded13-cf6294d6","time":460.21233844799764,"speed":19.62484385513174},{"id":"s0a-1cbe97e8-3fc279c0","time":511.1681578869639,"speed":19.62484385513174}]

 

A reading has a sensor ID, a speed in meters per second, and a time delta from time 0 (the moment the simulation starts) in seconds.

My producer code simply translates the readings into a list of sensor readings ordered by time, and I transform the speed into km/s and the time into a timestamp as milliseconds from epoch.

Sending to the code onto the stream can be done one line at a time using standard producer code. The code in the sample Java producer works just fine.

Another exciting new possibility is to use the brand new Kafka Rest Proxy, which is also available on MapR from MEP 2.0 (MapR Ecosystem Pack). This means sensors can connect directly to Kafka from any language since HTTP-based REST API is a global standard.

Using the Workbench

We can login to the Workbench and login with an admin user (a user with the role “admin”) created with the add-user.sh script from Wildfly.

Using the Workbench is beyond the scope of the article, but the general idea is to create an Organizational Unit and a Repository, and then create a project.

Data Objects

We will need to create facts for the rule engine to work with. Best practice for Drools is to use a separate Maven project for your data model. For the sake of simplicity, I created them right from the workbench.

The Measurement is a super generic bean which models the sensor speed measurements. For the prototype, I kept it as simple as the raw data with a timestamp, a sensor ID, and a speed.

The Sensor bean models the sensor itself and will have an ID and the average speed of all cars that it measures over a time window defined by our rules. This average speed will be used to trigger an alert for heavy traffic. The traffic String is to mark the current traffic level which can be “NONE”, “LIGHT” or “HEAVY.”

The Traffic Monitoring Rules

  • create sensors for new id
  • detect heavy traffic
  • detect light traffic
  • detect normal traffic
  • get average speed at sensor

The create sensors rule ensures that there are sensor objects available in memory. This is the fact we use to know the average speed at a certain sensor.

The detect heavy traffic is the key rule we want to use to send an alarm to the police if heavy traffic is detected at a certain sensor.

So when the average speed reaches 20 km/h or less, and the sensor isn’t already in the HEAVY traffic level, set the level to HEAVY and send an alert.

Which means we need to know the average speed. Here is the rule to compute it using the Drools rule DSL (domain specific language):

This is not rocket science! These rules illustrate rather clearly how making up simple but useful rules can be realistically left up to business analysts and developed separately from the whole stream and big data platform.

The Consumer Side

The consumer reads the data from the stream. The tutorial code in Java from the documentation is perfectly adequate. Jackson is used to convert the JSON to Measurement objects.

The consumer has an instance of the KieSession, and each measurement is added to the session using kieSession.insert(fact) and followed by a call to kieSession.fireAllRules(), which triggers the algorithm to check if any rules match with the new state of the session given the new data.

A channel, which is just a callback, is used to allow a rule to call a function “outside” of the KieSession. My Prototype uses this method to log the alert. In a production system, we could easily change the code to send an email, an SMS, or take some other action.

Importing the Rules into the Consumer Application

The way we get the rules into the running application is by fetching it from the Maven repository integrated into the Workbench.

KieServices kieServices = KieServices.Factory.get();  
ReleaseId releaseId = kieServices.newReleaseId( "org.mapr.demo", "smart-traffic-kjar", "1.0.0" ); 
KieContainer kContainer = kieServices.newKieContainer( releaseId );

KieSession kieSession = kContainer.newKieSession();

 

So the question becomes, how does the call to newReleaseId know to fetch the artifact with our rules from the Maven repository in the Workbench?

The answer is with the ~/.m2/settings.xml file, where you add this information. We recommend to use the user mapr for everything in the sandbox, so the full path is: /home/mapr/.m2/settings.xm

[mapr@maprdemo .m2]$ cat settings.xml  
<?xml version="1.0" encoding="UTF-8"?> 
<settings> 
   <servers> 
       <server> 
           <id>guvnor-m2-repo</id> 
           <username>admin</username> 
           <password>admin</password> 
       </server> 
   </servers> 
   <profiles> 
       <profile> 
           <id>cep</id> 
           <repositories> 
               <repository> 
                   <id>guvnor-m2-repo</id> 
                   <name>Guvnor M2 Repo</name> 
                   <url>**http://127.0.0.1:28080/drools-wb-webapp/maven2/**</url> 
                   <releases> 
                       <enabled>true</enabled> 
            <updatePolicy>interval:1</updatePolicy> 
                   </releases> 
                   <snapshots> 
                       <enabled>true</enabled> 
            <updatePolicy>interval:1</updatePolicy> 
                   </snapshots> 
               </repository> 
           </repositories> 
       </profile> 
   </profiles> 
   <activeProfiles> 
       <activeProfile>cep</activeProfile> 
   </activeProfiles> 
</settings>

 

The key information is in bold, which corresponds to the URL of the maven2 repository of the Drools workbench. This information can be copied and pasted from the pom.xml which can be seen by using the repository view:

So I just copy-pasted that and now everything works like magic.

Monitoring the Smart Traffic Prototype

We have one stream with data, and two other streams to monitor the rule engine internals. This is very easy with Drools because it uses Java Listeners to report on its internal state. We simply provide a custom implementation of the listeners to produce the data to a stream, and then use Streamsets to redirect everybody to Elasticsearch.

Elasticsearch Mappings

The mappings are defined in a small script I created:

http://pastebin.com/kRCbvAkU

Streamsets for No-Code Stream To Elasticsearch Pipeline

Each stream has its own pipeline, where each looks like this:

The Jython Evaluator adds timestamp information if necessary.

Running the Prototype

Start the consumer:

Then start the producer:

In my prototype, I added code to control the rate at which the data is sent to the stream to make the rules firing easier to see. 10,000 events are quite small for Drools and for MapR Streams/Kafka, and so the whole demo would be over in less than a second. This is the meaning of the “-r 25” for 25 events per second rate.

The dashboard looks like this:

Once data starts to stream in:

The traffic jam is quite visible now:

As soon as a sensor average speed drops below 20 km/h, an alarm is fired:

And the dashboard will display a count of “1”

The simulation will continue and the two other sensors will go below 20 in turn for a total of 3 alarms fired.

In Conclusion

This project turned into a great illustration of the power and ease of streaming architectures using microservices. A common misconception of microservices is that they are limited to synchronous REST services. This is not the case at all. Our system is a backend system and so the microservices (the producer, the consumer, streamsets, and ES/Kibana) are all run asynchronously, dealing with the data as it comes off the stream.

The project was fairly easy to build technically because every part is totally independent of the others, and can therefore be tested in isolation. Once the producer can send data to the stream properly, it doesn’t need to be tested ever again for issues that concern other parts of the system. Also, each part was added one at a time, making it easy to identify issues and fix them.

In total, with very little original code, we could implement a working, useful system that would need very little change to be put into production. Only the producer needs to be customized for the particular sensor or data source.

Rules are the most time consuming and error-prone part. This is no different than if the project had been done using custom-coded Spark code. The win for an approach based on a rule engine such as Drools and the Drools Workbench is that the rules can be edited, tested, and improved independently of how the code runs on the cluster. The work in Workbench has no dependency on the system at all, as it is pulled in by the consumer application automatically.

From a business value point of view, all the value is in the rules, assuming a stable production system. There is no reason for organizations not to take advantage of this quick edition capability to become ever more agile and responsive to evolving conditions for the benefit of their customers… and the bottom line.

 

Editor's note: This blog post was originally published in the Converge Blog on January 10, 2017

Related:

By Ian Downard

 

 

A lot of people choose MapR as their core platform for processing and storing big data because of its advantages for speed and performance. MapR consistently performs faster than any other big data platform for all kinds of applications, including Hadoop, distributed file I/O, NoSQL data storage, and data streaming. In this post, I’m focusing on the latter to provide some perspective on how much better/faster/cheaper MapR Streams can be compared to Apache Kafka as a data streaming technology.

MapR Streams is a cluster-based messaging system for streaming data at scale. It’s integrated into the MapR Converged Data Platform and implements the Apache Kafka Java API so applications written for Kafka can also run on MapR Streams. What differentiates the MapR Streams technology from Kafka are its built-in features for global replication, security, multi-tenancy, high availability, and disaster recovery—all of which it inherits from the MapR Converged Data Platform. From an operational perspective, these features make MapR Streams easier to manage than Kafka, but there are speed advantages, too. I’ve been looking at this a lot lately, trying to understand where and why MapR Streams outperforms Kafka. In this blog post, I will share with you how clearly MapR Streams can transport a much faster stream of data, with much larger message sizes, and to far more topics than what can be achieved with Kafka.

Test Strategy

In this study, I wanted to compare Kafka and MapR Streams as to how they perform “off the shelf” without the burden of tuning my test environment to perfectly optimize performance in each test scenario. So, I have pretty much stuck with the default settings for services and clients. The only exceptions are that I configured each Kafka topic with a replication factor of 3 and configured producers to send messages synchronously, since these are the default modes for MapR Streams. I also disabled stream compression in order to control message sizes and measure throughput more precisely.

Test Configurations

I measured performance from both producer and consumer perspectives. However, consumers run faster than producers, so I focused primarily on the producer side since the throughput of a stream is bounded by the throughput of its producers. I used two threads in my producer clients so that message generation could happen in parallel with sending messages and waiting for acknowledgments. I used the following properties for producers and topics:

acks = all
batch.size = 16384
latency.ms = 0ms
block.on.buffer.full = true
compression = none
default.replication.factor = 3

 

My test environment consisted of three Ubuntu servers running Kafka 2.11-0.10.0.1 or MapR 5.2 on Azure VMs sized with the following specs:

  • Intel Xeon CPU E5-2660 2.2 GHz processor with 16 cores
  • SSD disk storage with 64,000 Mbps cached / 51,200 uncached max disk throughput
  • 112GB of RAM
  • Virtual networking throughput between 1 and 2 Gbits/sec (I measured this quantitatively since I couldn’t easily find virtual network throughput specs from Microsoft).

Performance Metrics

Throughput, latency, and loss are the most important metrics measuring the performance of a message bus system. MapR Streams and Kafka both guarantee zero loss through at-least-once semantics. MapR provides some advantages when it comes to latency, but typically both MapR Streams and Kafka deliver messages sufficiently quick for real-time applications. For those reasons, I chose to focus on throughput in this study.

Throughput is important because if an application generates messages faster than a message bus can consume and deliver them, then those messages must be queued. Queueing increases end-to-end latency and destabilizes applications when queues grow too large.

Furthermore, throughput in Kafka and MapR Streams is sensitive to the size of the messages being sent and to the distribution of those messages into topics. So, I analyzed those two attributes independently in order to measure how message size and stream topics affect throughput.

Throughput Performance

To measure producer throughput, I measured how fast a single producer could publish a sustained flow of messages to single topic with 1 partition and 3x replication. I ran this test for a variety of message sizes to see how that affects throughput. The results show MapR Streams consistently achieving much higher throughput than Kafka and having a much higher capacity for handling large message sizes, as shown below.

Throughput MB/s

MapR Streams doesn’t just send a faster volume of data than Kafka; it also has the capacity to send more records per second. We can see this by plotting throughput in terms of raw record count, as shown below:

Throughput Msgs/s

I recorded these results with two different code bases. First, I used custom tests that I wrote using the Java unit test framework (JUnit), then I used the performance test scripts included with Kafka and MapR. These different approaches did not produce exactly the same results but they were close, as shown below. This correlation helps validate the conclusions stated above, that MapR Streams can transport a larger volume of data and more frequent messages than Kafka.

Throughput Correlation

How does MapR Streams achieve more than 4x throughput than Kafka?

There are a lot of reasons why MapR Streams is faster, and without getting too technical, I’ll mention just a few. First, the MapR Streams client more efficiently flushes data to the MapR Streams server. It spawns its own threads to do this work, whereas Kafka uses the client application threads directly to flush to a Kafka broker, which in many cases is limited to just a single thread.

On the server side, MapR Streams inherits efficient I/O patterns from the core MapR storage layer which keeps files coherent and clean so that I/O operations can be efficiently buffered and addressed to sequential locations on disk. Replication is more efficient, too, since the underlying MapR storage platform has distributed synchronous replication built in, along with other operational features that simply don’t exist in Kafka, such as snapshots, mirroring, quotas, access controls, etc.

Replicating this test

My JUnit tests for benchmarking Kafka and MapR Streams is available at https://github.com/iandow/kafka_junit_tests. Here are the commands that I used to generate the data shown above:

git clone https://github.com/iandow/kafka_junit_tests
cd kafka_junit_tests
# Create a Kafka topic...
/opt/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic t-00000 --config compression.type=uncompressed
# or create a MapR Streams topic.
maprcli stream create -path /user/mapr/iantest -produceperm p -consumeperm p -topicperm p -defaultpartitions 1 -compression off
# Then compile.
mvn -e -Dtest=MessageSizeSpeedTest test
# Test data will be saved in size-count.csv

You can also measure throughput using the performance test utilities included with Kafka and MapR. Here are the commands that I used to do that:

Kafka script:

https://gist.github.com/iandow/bf5df0f9b4f19e6a19aa5a7a93b7c81c

MapR script:

https://gist.github.com/iandow/0750185f1d3631301d476b426c109a50

Topic Scalability

Another major advantage that MapR Streams holds over Kafka relates to how well it can handle large quantities of stream topics. Topics are the primary means of organizing stream data; however, there is overhead associated with categorizing streams into topics, and producer throughput is sensitive to that overhead. I quantified this by measuring how fast a single producer could publish a sustained flow of messages to an increasingly large quantity of topics. This is essentially a "fan-out" producer (illustrated below) and it is very common for fast data pipelines to use this pattern so that data can be more easily consumed downstream.

Fanout Producer

Each of the topics created for this scenario were configured with a single partition and 3x replication. Record size was held constant at 100 bytes.

It’s clear from the following graph that MapR Streams scales to a larger quantity of topics than Kafka.

Topic Scalability

How does MapR Streams handle so many more topics than Kafka?

A topic is just metadata in MapR Streams; it does not introduce overhead to normal operations. MapR Streams uses only one data structure for a stream, no matter how many topics it has, and the MapR storage system provides extremely fast and scalable storage for that data.

On the other hand, Kafka represents each topic by at least one directory and several files in a general purpose file system. The more topics/partitions Kafka has the more files it creates. This makes it harder to buffer disk operations, perform sequential I/O, and it increases the complexity of what ZooKeeper must manage.

Replicating this test

This scenario can be run with another JUnit test from https://github.com/iandow/kafka_junit_tests, as follows:

git clone https://github.com/iandow/kafka_junit_tests
cd kafka_junit_tests
# For MapR only, create the stream first:
maprcli stream create -path /user/mapr/taq -produceperm p -consumeperm p -topicperm p -compression off
mvn -e -Dtest= ThreadCountSpeedTest test
# Test data will be saved in thread-count.csv

 

Partition Scalability

Stream topics are often subdivided into partitions in order to allow multiple consumers to read from a topic simultaneously. Both Kafka and MapR Streams allow topics to be partitioned, but partitions in MapR Streams are much more powerful and easier to manage than partitions in Kafka. For example, Kakfa requires partitions to fit within the disk space of a single cluster node and cannot be split across machines. MapR Streams is not limited by the storage capacity of any one node because the MapR storage system automatically grows (or shrinks) partitions across servers. I’ll talk more about these operational advantages later, but let’s consider the performance implications of partitioning now.

ZooKeeper elects separate nodes to be leaders for each partition. Leaders are responsible for processing the client reads and writes for their designated partition. This helps load balance client requests across the cluster, but it complicates the work the ZooKeeper must do to keep topics synchronized and replicated. Leader election takes time and does not scale well. In my tests, I saw leader election take at least 0.1 seconds per partition and it ran serially. So, for example, it would take more than 10 seconds to configure a topic with 100 partitions, that is, if ZooKeeper didn’t crash, which it frequently did when I created topics with 100 or more partitions.

In MapR Streams, I had no problem streaming data to topics with thousands of partitions, as shown below. This graph shows the throughput for a producer sending synchronously to a 3x replicated topic subdivided into an increasingly large number of partitions. I could not run my test in Kafka beyond 400 partitions, so that line is cut short.

Partitioning Scalability

Replicating this test

I used the performance scripts included with Kafka and MapR to generate the partition vs. throughput data shown above. Here is the script I used to run this test in Kafka:

https://gist.github.com/iandow/625d783333a53b592f0381e6b37ee9ab

That script will silently freeze if ZooKeeper fails, but it will continue once ZooKeeper starts again. So in another terminal, I simultaneously ran the following script to automatically restart ZooKeeper if it fails (which it is likely to do during this test):

https://gist.github.com/iandow/2dc07bde132669706467e8ee45507561

Here is the script I used to generate partitions vs. throughput data in MapR:

https://gist.github.com/iandow/8074962f6205552c9cdc3fceccdd9793

Operational Advantages for MapR Streams

Increasing throughput capacity and decreasing message latency can often be accomplished simply by adding nodes to your distributed messaging cluster. However, doing so costs money and complicates management, so essentially saying that MapR Streams performs better than Kafka is another way of saying that operating a distributed messaging platform can be done with less hardware on MapR than with Kafka.

However, unless you’re working on applications that scale to extreme lengths, then the challenges you face with Kafka are more likely to be operational rather than performance in nature. And this is where the MapR total cost of ownership really shines.

Not only does MapR Streams execute with higher performance, it also addresses major operational deficiencies in Kafka. Here are three examples relating to replication, scaling, and mirroring:

  • Kafka requires that the MirrorMaker processes be manually configured in order to replicate across clusters. Replication is easy to configure with MapR Streams and supports unique capabilities for replicating streams across data centers and allowing streams to be updated in multiple locations at the same time.

  • Kafka’s mirroring design simply forwards messages to a mirror cluster. The offsets in the source cluster are useless in the mirror, which means consumers and producers cannot automatically failover from one cluster to a mirror. MapR continuously transfers updated records for near real-time replication and preserves message offsets in all replicated copies.

  • Kakfa requires partitions to fit within the disk space of a single cluster node and cannot be split across machines. This is especially risky, because ZooKeeper could automatically assign multiple large partitions to a node that doesn’t have space for them. You can move them manually, but that can quickly become unmanageable. MapR Streams is not limited by the storage capacity of any one node because it distributes stream data across the cluster.

References

For more information about the operational advantages of MapR Streams, see Will Ochandarena’s blog post, Scaling with Kafka – Common Challenges Solved.

I also highly recommend reading Chapter 5 of Streaming Architecture: New Designs Using Apache Kafka and MapR Streams, by Ted Dunning & Ellen Friedman.

Conclusion

MapR Streams outperforms Kafka in big ways. I measured the performance of distributed streaming in a variety of cases that focused on the effects of message size and topic quantity, and I saw MapR Streams transport a much faster stream of data, with much larger message sizes, and to far more topics than what could be achieved with Kafka on a similarly sized cluster. Although performance isn’t the only thing that makes MapR Streams desirable over Kafka, it offers one compelling reason to consider it.

 

Editor's Note: This blog post was originally published in the Converge Blog on January 11, 2017. Since the publication of the original article MapR Streams has been renamed as MapR-ES

 

Related

MapR-ES

Getting Started with MapR-ES Event Data Streams

By Carol McDonald

This is a 4-Part Series, see the previously published posts below:

Part 1 - Spark Machine Learning

Part 3 – Real-Time Dashboard Using Vert.x

This post is the second part in a series where we will build a real-time example for analysis and monitoring of Uber car GPS trip data. If you have not already read the first part of this series, you should read that first.

The first post discussed creating a machine learning model using Apache Spark’s K-means algorithm to cluster Uber data based on location. This second post will discuss using the saved K-means model with streaming data to do real-time analysis of where and when Uber cars are clustered.

Example Use Case: Real-Time Analysis of Geographically Clustered Vehicles/Items

The following figure depicts the architecture for the data pipeline:

  1. Uber trip data is published to a MapR Streams topic using the Kafka API
  2. A Spark streaming application subscribed to the first topic:
    1. Ingests a stream of uber trip events
    2. Identifies the location cluster corresponding to the latitude and longitude of the uber trip
    3. Adds the cluster location to the event and publishes the results in JSON format to another topic
  3. A Spark streaming application subscribed to the second topic:
    1. Analyzes the uber trip location clusters that are popular by date and time

Example Use Case Data

The example data set is Uber trip data, which you can read more about in part 1 of this series. The incoming data is in CSV format, an example is shown below , with the header:

date/time, latitude,longitude,base
2014-08-01 00:00:00,40.729,-73.9422,B02598

The enriched Data Records are in JSON format. An example line is shown below:

Spark Kafka Consumer Producer Code

Parsing the Data Set Records

A Scala Uber case class defines the schema corresponding to the CSV records. The parseUber function parses the comma separated values into the Uber case class.

Loading the K-Means Model

The Spark KMeansModel class is used to load the saved K-means model fitted on the historical Uber trip data.

Output of model clusterCenters:

Below the cluster centers are displayed on a google map:

Spark Streaming Code

These are the basic steps for the Spark Streaming Consumer Producer code:

  1. Configure Kafka Consumer Producer properties.
  2. Initialize a Spark StreamingContext object. Using this context, create a DStream which reads message from a Topic.
  3. Apply transformations (which create new DStreams).
  4. Write messages from the transformed DStream to a Topic.
  5. Start receiving data and processing. Wait for the processing to be stopped.

We will go through each of these steps with the example application code.

  1. Configure Kafka Consumer Producer properties

The first step is to set the KafkaConsumer and KafkaProducer configuration properties, which will be used later to create a DStream for receiving/sending messages to topics. You need to set the following paramters:

  • Key and value deserializers: for deserializing the message.
  • Auto offset reset: to start reading from the earliest or latest message.
  • Bootstrap servers: this can be set to a dummy host:port since the broker address is not actually used by MapR Streams.

For more information on the configuration parameters, see the MapR Streams documentation.

  1. Initialize a Spark StreamingContext object.

ConsumerStrategies.Subscribe, as shown below, is used to set the topics and Kafka configuration parameters. We use the KafkaUtils createDirectStream method with a StreamingContext, the consumer and location strategies, to create an input stream from a MapR Streams topic. This creates a DStream that represents the stream of incoming data, where each message is a key value pair. We use the DStream map transformation to create a DStream with the message values.

  1. Apply transformations (which create new DStreams)

We use the DStream foreachRDD method to apply processing to each RDD in this DStream. We parse the message values into Uber objects, with the map operation on the DStream. Then we convert the RDD to a DataFrame, which allows you to use DataFrames and SQL operations on streaming data.

Here is example output from the df.show:

A VectorAssembler is used to transform and return a new DataFrame with the latitude and longitude feature columns in a vector column.

Then the model is used to get the clusters from the features with the model transform method, which returns a DataFrame with the cluster predictions.

The output of categories.show is below:

The DataFrame is then registered as a table so that it can be used in SQL statements. The output of the SQL query is shown below:

  1. Write messages from the transformed DStream to a Topic

The Dataset result of the query is converted to JSON RDD Strings, then the RDD sendToKafka method is used to send the JSON key-value messages to a topic (the key is null in this case).

Example message values (the output for temp.take(2) ) are shown below:

{"dt":"2014-08-01 00:00:00","lat":40.729,"lon":-73.9422,"base":"B02598","cluster":7}

{"dt":"2014-08-01 00:00:00","lat":40.7406,"lon":-73.9902,"base":"B02598","cluster":7}

  1. Start receiving data and processing it. Wait for the processing to be stopped.

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

Spark Kafka Consumer Code

Next, we will go over some of the Spark streaming code which consumes the JSON-enriched messages.

We specify the schema with a Spark Structype:

Below is the code for:

  • Creating a Direct Kafka Stream
  • Converting the JSON message values to Dataset[Row] using spark.read.json with the schema
  • Creating two temporary views for subsequent SQL queries
  • Using ssc.remember to cache data for queries

Now we can query the streaming data to ask questions like: which hours had the highest number of pickups? (Output is shown in a Zeppelin notebook):

spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")

How many pickups occurred in each cluster?

df.groupBy("cluster").count().show()

or

spark.sql("select cluster, count(cluster) as count from uber group by cluster")

Which hours of the day and which cluster had the highest number of pickups?

spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")

Display datetime and cluster counts for Uber trips:

%sql select cluster, dt, count(cluster) as count from uber group by dt, cluster order by dt, cluster

Software

Summary

In this blog post, you learned how to use a Spark machine learning model in a Spark Streaming application, and how to integrate Spark Streaming with MapR Streams to consume and produce messages using the Kafka API.

References and More Information:

Editor's note: This blog post was originally published in the MapR Converge Blog on January 05, 2017.

By Carol McDonald

 

A Formula 1 race is a high-speed example of the Internet of Things, where gathering, analyzing, and acting on tremendous amounts of data in real time is essential for staying competitive. The sport’s use of such information is so sophisticated that some teams are exporting their expertise to other industries, even for use on oil rigs. Within the industry, automobile companies such as Daimler and Audi are leveraging deep learning on the MapR Converged Data Platform to successfully analyze the continuous data generated from car sensors.

This blog discusses a proof of concept demo, which was developed as part of a pre-sales project to demonstrate an architecture to capture and distribute lots of data, really fast, for a Formula 1 team.

What’s the Point of Data in Motor Sports?

Formula 1 cars are some of the most heavily instrumented objects in the world.

Picture1

Read more about The Formula 1 Data Journey

Formula 1 data engineers analyze data from ~150 sensors per car, tracking vital stats such as tire pressure, fuel efficiency, wind force, GPS location, and brake temperature in real time. Each sensor communicates with the track, the crew in the pit, a broadcast crew on-site, and a second team of engineers back in the factory. They can transmit 2GB of data in one lap and 3TB in a full race.

Picture2

Watch WSJ's video "The F1 Big Data Explainer"

Data engineers can make sense of the car’s speed, stability, aerodynamics, and tire degradation around a race track. The graph below shows an example of what is being analyzed by race engineers: rpm, speed, acceleration, gear, throttle, brakes, by lap.

Picture3

More analysis is also completed at the team’s manufacturing base. Below is an example race strategy, analyzing whether it would be faster to do a pit stop tire change at lap 13 and lap 35 vs. lap 17 and lap 38.

Picture4

The Challenge: How Do You Capture, Analyze, and Store this Amount of Data in Real Time at Scale?

The 2014 U.S. Grand Prix collected more than 243 terabytes of data in a race weekend, and now there is even more data. Formula 1 teams are looking for newer technologies with faster ways to move and analyze their data in the cockpits and at the factory.

Below is a proposed proof of concept architecture for a Formula 1 team:

Picture5

MapR Edge in the cars provides an on-board storage system that buffers the most recent data and retries when transmission fails. MapR Edge addresses the need to capture, process, and provide backup for data transmission close to the source. A radio frequency link publishes the data from the edge to the referee “FIA” topic, and from there it is published to each team’s topic, where local analytics is done. The “team engineers” Stream replicates to the “trackside” and “factory” Streams, so that the data is pushed in real time for analytics. In a sport where seconds make a difference, it’s crucial that the trackside team can communicate quickly with the engineers at headquarters and team members around the world. MapR Streams replication provides an unusually powerful way to handle data across distant data centers at large scale and low latency.

The Demo Architecture

The demo architecture is considerably simplified because it needs to be able to run on a single node MapR sandbox. The demo does not use real cars; it uses the Open Racing Car Simulator (TORCS), a race car simulator often used for AI race games and as a research platform. 

Picture6

The demo architecture is shown below. Sensor data from the TORCS simulator is published to a MapR Streams topic using the Kafka API. Two consumers are subscribed to the topic. One Kafka API consumer, a web application, provides a real-time dashboard using websockets and HTML5. Another Kafka API consumer stores the data in MapR-DB JSON, where analytics with SQL are performed using Apache Drill. You can download the demo code here:  https://github.com/mapr-demos/racing-time-series.

Picture7

How Do You Capture this Amount of Data in Real Time at Scale?

Older messaging systems track message acknowledgements on a per-message, per-listener basis. A new approach is needed to handle the volume of data for the Internet of Things. 

Picture8

MapR Streams is a distributed messaging system that enables producers and consumers to exchange events in real time via the Apache Kafka 0.9 API. In MapR Streams, topics are logical collections of messages, which organize events into categories and decouple producers from consumers.

Picture9

Topics are partitioned for throughput and scalability. Producers are load balanced between partitions, and consumers can be grouped to read in parallel from multiple partitions within a topic for faster performance.

Picture10

You can think of a partition like a queue; new messages are appended to the end, and messages are delivered in the order they are received.

Picture11

Unlike a queue, events are not deleted when read; they remain on the partition, available to other consumers.

Picture12

A key to high performance at scale, in addition to partitioning, is minimizing the time spent on disk reads and writes. With Kafka and MapR Streams, messages are persisted sequentially as produced and read sequentially when consumed. These design decisions mean that non-sequential reading or writing is rare, allowing messages to be handled at very high speeds with scale. Not deleting messages when they are read also allows processing of the same messages by different consumers for different purposes. 

Picture13

Example Producer Using the Kafka API

Below is an example producer using the Kafka Java API. For more information, see the MapR Streams Java applications documentation.

Picture14

How to Store the Data

One of the challenges, when you have 243 terabytes of data every race weekend, is where do you want to store it? With a relational database and a normalized schema, related data is stored in different tables. Queries joining this data together can cause bottlenecks with lots of data. For this application, MapR-DB JSON was chosen for its scalability and flexible ease of use with JSON. MapR-DB and a denormalized schema scale, because data that is read together is stored together.

Picture15

With MapR-DB (HBase API or JSON API), a table is automatically partitioned across a cluster by key range, providing for really fast reads and writes by row key.

Picture16

JSON Schema Flexibility

JSON facilitates the natural evolution of your data schema during the life of your application. For example, in version 1, we have the following schema, where each JSON message group sensors values for Speed, Distance, and RPM:

 

{  "_id":"1.458141858E9/0.324",
   "car" = "car1",
   "timestamp":1458141858,
   "racetime”:0.324,
   "records": 
      [ 
         { 
            "sensors":{ 
               "Speed":3.588583,
               "Distance":2003.023071,
               "RPM":1896.575806
            },
         { 
            "sensors":{ 
               "Speed":6.755624,
               "Distance":2004.084717,
               "RPM":1673.264526
            },
        },

For version 2, you can easily capture more data values quickly without changing the architecture of your application, by adding attributes as shown below:

 

...
"records": 
      [ 
         { 
            "sensors":{ 
               "Speed":3.588583,
               "Distance":2003.023071,
               "RPM":1896.575806,
               "Throttle" : 37,
               "Gear" : 2

            },
. . .

As discussed before, MapR Streams allows processing of the same messages for different purposes or views. With this type of architecture and flexible schema, you can easily create new services and new APIs–for example, by adding Apache Spark Streaming or an Apache Flink Kafka Consumer for in-stream analysis, such as aggregations, filtering, alerting, anomaly detection, and/or machine learning.

Picture17

Processing the Data with Apache Flink

Apache Flink is an open source distributed data stream processor. Flink provides efficient, fast, consistent, and robust handling of massive streams of events as well as batch processing, a special case of stream processing. Stream processing of events is useful for filtering, creating counters and aggregations, correlating values, joining streams together, machine learning, and publishing to a different topic for pipelines.

Picture18

The code snippet below uses the FlinkKafkaConsumer09 class to get a DataStream from the MapR Streams “sensors” topic. The DataStream is a potentially unbounded distributed collection of objects. The code then calculates the average speed with a time window of 10 seconds. 

Picture19Picture20

Querying the Data with Apache Drill

Apache Drill is an open source, low-latency query engine for big data that delivers interactive SQL analytics at petabyte scale. Drill provides a massively parallel processing execution engine, built to perform distributed query processing across the various nodes in a cluster. 

Picture21

With Drill, you can use SQL to query and join data from files in JSON, Parquet, or CSV format, Hive, and NoSQL stores, including HBase, MapR-DB, and Mongo, without defining schemas.

Below is an example query to ‘show all’ from the race car’s datastore :

SELECT * 
FROM dfs.`/apps/racing/db/telemetry/all_cars`
LIMIT 10

And the results:

Picture22

Below is an example query to show the average speed by car and race:

SELECT race_id, car, 
ROUND(AVG( `t`.`records`.`sensors`.`Speed` ),2) as `mps`,
ROUND(AVG( `t`.`records`.`sensors`.`Speed` * 18 / 5 ), 2) as `kph`
FROM
(
SELECT race_id, car, flatten(records) as `records`
FROM dfs.`/apps/racing/db/telemetry/all_cars`
) AS `t`
GROUP BY race_id, car

 

And the results:

Picture23

All of the components of the use case architecture that we just discussed can run on the same cluster with the MapR Converged Data Platform.

Software

You can download the code and instructions to run this example from here: https://github.com/mapr-demos/racing-time-series.

Editor's note: This blog post was originally shared in the Converge Blog on May 11, 217. Since the publication of this blog MapR Streams product name has been changed to MapR-ES.

Additional Resources

MapR Edge product information

 

MapR Streams Application Development

Getting Started with Apache Flink and MapR Streams

Big Data On The Road

NorCom selects MapR converged data platform for foundation of deep learning framework for Mercedes

Apache Drill

MapR-ES

MapR-DB

by Karen Whipple

 

Human trafficking is the third largest crime industry in the world today. India has an estimated 18.3 million slaves, according to the Global Slavery Index, and every year 200,000 Indian children are tricked, kidnapped, or coerced into slavery. Every three minutes, a girl is sold into slavery. The average age of these girls is 12 years old, with children as young as six trafficked for sexual exploitation, and less than one percent of them are ever rescued. Public awareness is the most effective barrier to human trafficking, and MapR is proud to be part of the big data analytics solution, developed by Quantium for My Choices Foundation, to help alleviate one of the world’s worst social problems.

While many organizations work to rescue girls and prosecute the traffickers, Operation Red Alert, a program of My Choices Foundation, is a prevention program designed to help parents, teachers, village leaders, and children understand how the traffickers work, so they can block their efforts. Poor village girls are typically targeted, with promises that the girls are being offered wonderful opportunities for an education, jobs, or marriage. But with over 600,000 villages in India, Operation Red Alert needed help to determine which areas were most at risk to prioritize their education efforts.

 

Watch the video 'Red Alert Saves Lives'

As a pro bono project, the Australian analytics firm Quantium developed a big data solution, which runs on Cisco UCS infrastructure and uses the MapR Converged Data Platform. The solution analyzes India’s census data, government education data, and other sources for factors such as drought, poverty level, proximity to transportation stations, educational opportunities, population, and distance to police stations, so as to identify the villages and towns that are most at risk of human trafficking.

Elca Grobler, the founder of My Choices Foundation, explains the significance of this work: “The general Indian public is still largely unaware that trafficking exists, and most parents have no idea that their children are actually being sold into slavery. That’s why grassroots awareness and education at the village level is so important to ending the human traffic trade.”

Operation Red Alert

With a vision to end sex trafficking in India by preventing girls from ever entering it, Operation Red Alert began its efforts in 2014 by conducting research on root causes and by building relationships with long-standing non-government organizations (NGOs) in the field. Working with Quantium, they analyzed data while constantly refining and iterating to define a model that could identify villages most at risk. Red Alert now has 40 NGO partners, who have helped conduct the Safe Village education program in 600 villages throughout four states in India, reaching over 600,000 villagers. Red Alert also created India’s first national anti-trafficking helpline and is conducting mass media awareness campaigns.

As Grobler explains, “We are helping to banish human trafficking, one village at a time, through a combination of highly sophisticated technology and grassroots Safe Village education implemented through our NGO partners. The national helpline gives villagers a link to continual support, and with 95 million mobile phones in India that gives us a very broad reach. We’re also adding data and refining our predictive analytics, and we’re expanding our education efforts to cover more states this year.”

MapR Technology Behind the Scenes

Quantium brings together proprietary data, technology, and innovative data scientists to enable the development of groundbreaking analytical applications and develops insights into consumer needs, behaviors, and media consumption by analyzing consumer transaction data. Quantium upgraded its legacy server platform with Cisco UCS to gain centralized management and the computing power needed to process complex algorithms in a dense, scalable form factor that also reduces power consumption. Cisco Nexus 9000 Switches provide a simplified network with the scalable bandwidth to meet their current and future requirements. The MapR Converged Data Platform makes this enterprise possible by enabling organizations to create intelligent applications that fully integrate analytics with operational processes in real time.

Rigorous testing by Quantium demonstrated that the MapR-Cisco platform decreased query processing time by 92%, a performance increase of 12.5 times the legacy platform. With the Cisco-MapR solution, Quantium’s data scientists can design complex queries that run against multi-terabyte data sets and get more accurate results in minutes, rather than hours or days. In addition, the more powerful platform drives innovation because scientists can shorten development time by testing alternative scenarios quickly and accurately.

“UCS gives us the agility that’s key to supporting our iterative approach to analytics,” explains Simon Reid, Group Executive for Technology at Quantium. “For example, with the analytics for Operation Red Alert, we’re fine-tuning the algorithm, adding more hypothesis and more granular data to improve our predictive capabilities. MapR adds performance security and the ability to segregate multiple data sets from multiple data partners for Operation Red Alert.” MapR is proud that the unique multi-tenancy and high-speed performance and scale of the MapR Platform serves as the underlying technology to power the Operation Red Alert data platform.

Editor's note: This blog post was originally posted in the Converge Blog on May 30, 2017

Additional Resources

I'll guess that many people reading this have spend time wrestling with configuration to get Python and  Spark to play nicely. Having gone through the process myself, I've documented my steps and share the knowledge, hoping it will save some time and frustration for some of you.

 

This article targets the latest releases of MapR 5.2.1 and the MEP 3.0 version of Spark 2.1.0. It should work equally well for earlier releases of MapR 5.0 and 5.1. In fact, I've tested this to work with MapR 5.0 with MEP 1.1.2 (Spark 1.6.1) for a customer.  

 

The version of Jupyter is 4.3. It seems like it changed quite a bit since the earlier versions and so most of the information I found in blogs were pretty outdated. Hence having so much trouble getting everything working to my satisfaction.

 

My goals:

  • Running PySpark successfully from either a cluster node or an edge node
  • Running python code in YARN distributed mode
  • Have access to modules like numpy, scipy, pandas and others.
  • Do all this using Jupyter in server mode that I access from my own laptop

 

I'm leaving out Jupyter server mode security, which could be the topic of a separate blog, potentially. I've implemented it before and found the Jupyter documentation to explain setting it up for encryption (HTTPS) and authentication to be pretty good.

Installing an updated version of Python

Verify your version of Python:

python --version

 

If it's Python 2.6.X, it's probably a good idea to use a recent build of Python 2.7 If it's Python 2.7.X, then you'll need to choose to use the system python or not.

  • System python is easier to make work, it's already there and shared everywhere.
  • Isolated separate python (anaconda or a separate python) is harder to get working but will provide a more consistent environment where each user can have their own (and only their own) modules installed.

 

I will use Miniconda for Python 2.7 64bits throughout. It works very well. Using Python 3 would be just the same, with the only difference being in terms of code and module compatibility. Either work fine with Spark.

 

Note: Python 3.6 doesn't work with Spark 1.6.1 See SPARK-19019

Installing Anaconda

There is a choice between Anaconda and Miniconda, as well as between python 2.7 and Python 3.6.

Miniconda is very nice because the download is small and you only install what you need. Anaconda is very nice for having everything installed from the start, so all needed modules will be there from the start for most needs.

 

Here, we show installing miniconda and Python 2.7 (64bits):

wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh
bash Miniconda2-latest-Linux-x86_64.sh -b -p /opt/miniconda2

 

To install it on all nodes at once, we recommend to check out Clustershell.

 

#copy the file to all nodes

clush -ac Miniconda2-latest-Linux-x86_64.sh



#install on all nodes at same time:

clush -aB bash Miniconda2-latest-Linux-x86_64.sh -b -p /opt/miniconda2

 

Important: Get all nodes in same exact state, with python/anaconda installed exactly in the same location with all nodes having exactly the same modules installed. Miss here and it guarantees weird errors that will be hard to diagnose.

 

Update Spark environment to use Python 2.7

Add to /opt/mapr/spark/spark-2.1.0/conf/spark-env.sh:

    export PYSPARK_PYTHON=/opt/miniconda2/bin/python
    export PYSPARK_DRIVER_PYTHON=/opt/miniconda2/bin/python

 

update file on all nodes:

# using clustershell to copy file ("c") to all nodes ("a")

clush -ac /opt/mapr/spark/spark-2.1.0/conf/spark-env.sh

 

Note: this is known to work on previous MEP versions. I have also tested it with MEP 1.1.2 (Spark 1.6.1) and it worked very well. just use the correct path to Spark and it will work just fine.

 

Testing

For testing, lets use the data from MapR Academy's Spark Essentials course. Specifically the Ebay auction data.

Copy the data into the foloder: /user/mapr/data

 

Start pyspark and run the following code:

 >>> auctionRDD = sc.textFile("/user/mapr/data/auctiondata.csv").map(lambda line:line.split(","))
>>> auctionRDD.first()
[u'8213034705', u'95', u'2.927373', u'jake7870', u'0', u'95', u'117.5', u'xbox', u'3']
>>> auctionRDD.count()
10654
Ok, so now we have a working pyspark shell!

Note: don't do this as root or as user mapr on a production cluster. However, for doing tutorials, user mapr is convenient as it is a superuser and you don't need to worry about file permissions on MapR.

 

Errors:

  • pyspark java.io.IOException: Cannot run program "python2.7": error=2, No such file or directory
  • This error is because the driver and/or the executors can't find the python executable. It's fixed by setting the PYSPARK_PYTHON (and PYSPARK_DRIVER_PYTHON) variables in spark-env.sh (see above)

ipython Notebook

If you want to able to choose to use spark when launch ipython shell:

  1.         Ensure SPARK_HOME env variable is defined.
export SPARK_HOME=/opt/mapr/spark/spark-2.1.0
  1.         Install ipython with Anaconda
/opt/miniconda2/bin/conda install jupyter
  1.         add a ipython profile named pyspark
ipython profile create pyspark
  1.         Add ~/.ipython/profile_pyspark/startup/00-pyspark-setup.py:
import os
import system
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
  raise ValueError('SPARK_HOME environment variable is not set')
path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())
  1.         launch
/opt/miniconda2/bin/ipython --profile=pyspark

 

Try the sample code above it should also work without issue.

Jupyter Notebook

Now on to Jupyter. In this case, we're looking to have the notebook run on an edge node (less ideally, on a cluster node) in server mode and access it from our development laptop.

The following instructions assume the user mapr, but should work equally well for any other user. For production use, never use mapr user as it is a superuser with read-write access to all data in MapR.

With Anaconda:

clush -aB /opt/miniconda2/bin/conda install jupyter -y
  1.         Generate a profile:
 /opt/miniconda2/bin/jupyter notebook --generate-config

 

We need to update the profile so you can log on to the notebook from your local computer, not just from localhost.

This generates the following file: $HOME/.jupyter/jupyter_notebook_config.py In this file, we're going to update the following setting: NotebookApp.ip

 

 # here it's possible to use the server's IP address as well ex: '10.0.0.111'
# the important point is that leaving it to the default (i.e. 'localhost') prevents any remote connection
NotebookApp.ip = '*'

 

It's a good time to remind about security. it's pretty easy to configure Jupyter to use https and have a password. See Jupyter documentation.

  1.        The startup script from the ipython step is helpful:

      [mapr@ip-10-0-0-180 ~]$ cat .ipython/profile_default/startup/00-default-setup.py

 import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
  raise ValueError('SPARK_HOME environment variable is not set')
path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

       This step is essential or else the kernel won't initialize properly. Alternatively, you can past the code above in the first cell to initialize pyspark first. Another alternative is to use the module findspark, which probably does something similar to this, but with less code.

  1.         Add a PySpark Kernel. create the json file in the location as shown below:
[mapr@ip-10-0-0-20 ~]$ cat .ipython/kernels/pyspark/kernel.json
{
"display_name": "pySpark (Spark 2.1.0)",
 "language": "python",
 "argv": [
  "/opt/miniconda2/bin/python",
  "-m",
  "ipykernel",
  "-f",
  "{connection_file}"
 ],
"env": {
        "CAPTURE_STANDARD_OUT": "true",
        "CAPTURE_STANDARD_ERR": "true",
        "SEND_EMPTY_OUTPUT": "false",
        "SPARK_HOME": "/opt/mapr/spark/spark-2.1.0"
    }
}
  1.         Start a notebook and have fun with Spark and Python!
 jupyter notebook --no-browser

 

Open your browser to the indicated link and... Success!

 

Launch Jupyter notebook instead of pyspark

  1.         Update $SPARK_HOME/conf/spark-env.sh:
  •         [mapr@ip-10-0-0-20 ~]$ tail /opt/mapr/spark/spark-2.1.0/conf/spark-env.sh
    export PYSPARK_DRIVER_PYTHON=/opt/miniconda2/bin/jupyter

    # Setup env variable for Jupyter + Pyspark
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser"
  1.         Launch pyspark, it will launch a Jupyter notebook
  •  
         $SPARK_HOME/bin/pyspark

 

Conclusion

Getting this is harder work that it ought to be. Jupyter have changed their usage a lot over the versions, especially as they moved from ipython to Jupyter. The change is worth it, but the configuration is a big pain.

 

Getting the information all together with a repeatable, tested process was also quite a bit of work. Hopefully this will save some time to some people out there.

 

Thanks

Dong Meng's blog proved to be a life saver. Check him out: https://mengdong.github.io