Skip navigation
All Places > The Exchange > Blog
1 2 3 Previous Next

The Exchange

120 posts

Multi-cloud environments can be an effective way to hedge risk and enable future flexibility for applications. With a secure VPN in between two or more sites, you can leverage the global namespace and high availability features in MapR (like mirroring and replication) to drive business continuity across a wide variety of use cases.

 

In this tutorial, I will walk through the steps of how to connect Microsoft Azure and Amazon AWS cloud using an IPsec VPN, which will enable secure IP connectivity between the two clouds and serve as a Layer-3 connection you can then use to connect two or more MapR clusters.

 

Multi-Cloud Architecture with Site-to-Site VPN

 

Let's first take a look at the end result for which we're aiming.

 

 

On the left side of the below figure is an Azure setup with a single Resource Group (MapR_Azure). We'll set this up in the 'US West' zone. On the right side is an Amazon EC2 network in a VPC, which we will deploy in the Northern Virginia zone. This is an example of using geographically disperse regions to lessen the risk of disruptions to operations.  After the VPN is completed we can use MapR replication to ensure that data lives in the right place and applications can read/write to both sites seamlessly.

 

This "site-to-site" VPN will be encrypted and run over the open Internet, with a separate IP subnet in each cloud, using the VPN gateways to route traffic through a tunnel.  Note that this is a "Layer 3" VPN, in that traffic is routed between two subnets using IP forwarding.  It's also possible to do this at Layer 2, bridging Ethernet frames between the two networks, but I'll leave that for another post (or an exercise for the curious reader. )

 

Setting up the Azure Side

 

First, prepare the Resource Group; in our example, we called the group 'MapR_Azure.'

Select it, and find the 'Virtual Network' resource by selecting the '+' icon, then type 'virtual network' in the search box.

 

 

 

Select 'Resource Manager' as the deployment model and press 'Create.' We use the name 'clusternet' for the network.

 

We will create two subnets, one on each cloud side. On Azure, we create the address range of 10.11.0.0/16 and a single subnet of 10.11.11.0/24 within the range. We'll make the Azure and EC2 address prefixes 10.11 and 10.10, respectively, to make for easy identification and troubleshooting.

 

 

Create a public IP for the VPN connections. Select '+' to add a resource, then type 'public IP address' in the search box. Press 'Create,' then set up the IP as follows. Select 'Use existing' for the Resource Group, and keep 'Dynamic' for the IP address assignment. The name is 'AzureGatewayIP.'

 

 

Take note of this address; we will use it later.

 

Next, create a 'Virtual Network Gateway' the same way. This entity will serve as the VPN software endpoint.

 

 

Reference the public IP you created in the previous step (in our case, 'AzureGatewayIP').

 

Note the concept of a 'GatewaySubnet' here. This is a subnet that is to be used exclusively by the Azure VPN software. It must be within the configured address range, and you can't connect any other machines to it. Microsoft says "some configurations require more IP addresses to be allocated to the gateway services than do others."  It sounds a little mysterious, but allocating a /24 network seems to work fine for most scenarios.

 

Select 'clusternet' as the virtual network (what you created in the earlier step), use a Gateway subnet of 10.11.0.0/24, and use the 'AzureGatewayIP' address. This will create a new subnet entry called 'GatewaySubnet' for the 10.11.0.0/24 network.

 

For testing purposes, select the VpnGw1 SKU. This allows up to 650 Mbps of network throughput, which is more than enough to connect a couple of small clusters, but you can go up to 1.25 Gbps with the VpnGw3 SKU.

 

This may take up to 45 minutes (according to Microsoft) but it usually completes in a few minutes.

 

Setting up the AWS Side

 

We need to pause here to set up a few things on AWS. First, create a VPC in the AWS Console VPC Dashboard. Here we set the IPv4 address range as 10.10.0.0/16.

 

 

Navigate to 'Subnets,' and create a subnet in the VPC. Here we use 10.10.10.0/24.

 

 

Next, create an Internet gateway to connect our VPC to the internet.  This step is important (and easily overlooked), otherwise traffic cannot be routed in between the Elastic IP and the subnet we just created.

 

 

 

Select 'Attach to VPC,' and use the new VPC 'clusterVPC'.

 

 

 

Go back to the EC2 Dashboard and select 'Launch Instance.' We will create an Amazon Linux instance to maintain the VPN. Select the 'Amazon Linux' AMI, and configure the instance details as follows:

 

 

 

 

Be sure to select the 'clusterVPC' we just created and 'clustersubnet' for the subnet. Select 'Disable' for 'Auto-assign Public IP' because we want to use an Elastic IP that we will associate later.

 

Under the last step, select 'Edit Security Groups,' and then select 'Create a new security group.' Open the group to all traffic coming from the AzureGatewayIP we configured previously (in this case, 40.80.158.169). Also (optionally), add any rules that you need to connect to the instance via ssh.

 

 

Click on 'Launch,' and optionally create a new key pair or use an existing one for the instance.

 

While the instance launches, let's create an Elastic IP for the VPN endpoint. In the 'Network & Security' menu of the AWS Console, select 'Elastic IPs,' and then allocate an address.

 

 

Note this address (here, 34.231.217.197) for later.

 

Associate the address with the instance we just created.

 

 

Finalizing the connection on the Azure side

 

Let's return to the Azure setup, and use the information from AWS to complete the connection. Add a Local Network Gateway.

 

 

The term 'local' is a bit of a misnomer here because the other network is not a local one; it's another cloud network.  Microsoft uses the term 'local' to refer to an on-premise network that you might want to connect to Azure.  For 'IP address,' use the Elastic IP you created in the previous section. For 'Address space,' use the 10.10.10.0/24 range (the AWS subnet).

 

Next, add a Connection. Select 'Site-to-site' VPN, and fill in the remaining details for your Resource Group.

 

 

Select the AzureGateway we configured as well as the Local Network Gateway (AWSVPN). Enter a key that will be used for the session.

 

 

Now is a good time to launch an instance for testing. Type 'Ubuntu Server' into the search box, and select Ubuntu Server 14.04 LTS. Configure the instance details, size, and settings.

 

 

Under the last Settings window, configure the virtual network, subnet, and 'None' for a public IP address (we don't need one because the VPN will handle outbound/inbound connectivity). Select a new or existing network security group.

 

 

Finalizing the connection on the AWS side

 

It's a good time to make sure the subnet you created has a default route to the internet gateway. From the AWS Console, navigate to 'Route Tables' and find the subnet associated with your VPC, select the subnet and the 'Routes' tab, and add a default route:

 

 

Returning to AWS, ssh into the instance we just created and download/install strongswan along with some dependency packages.

 

sudo yum install gcc gmp-devel

wget https://download.strongswan.org/strongswan-5.6.0.tar.bz2

bzip2 -d strongswan-5.6.0.tar.bz2

tar xvf strongswan-5.6.0.tar

cd strongswan-5.6.0

./configure && make && sudo make install

 

This should install strongswan in /usr/local, where we will edit the configuration files.

 

Edit the file /usr/local/etc/ipsec.conf and add the following entry:

conn azure
   authby=secret
   type=tunnel
   leftsendcert=never
   left=10.10.10.222
   leftsubnet=10.10.10.0/24
   right=40.80.158.169
   rightsubnet=10.11.11.0/24
   keyexchange=ikev2
   ikelifetime=10800s
   keylife=57m
   keyingtries=1
   rekeymargin=3m
   compress=no
   auto=start

 

The 'left' and 'leftsubnet' options refer to the Amazon (local) side. Use the local private IP address and subnet. For the right side, use the AzureGatewayIP we configured (40.80.158.169) and the 'clusternet' subnet.

 

Finally, edit the file /usr/local/etc/ipsec.secrets, and add your shared secret key.

 

10.10.10.222 40.80.158.169 : PSK "testing123"

 

Start the VPN with:

 

sudo sudo /usr/local/sbin/ipsec start

 

You can run sudo tail -f /var/log/messages to check the status of the connection.

 

You should now be able to ping the Azure machine by running ping 10.11.11.4 (or the address of the single interface of that machine). You can check it on the Azure side by viewing the Connection:

 

 

If you see 'Connected' as above, congratulations: you have a working two-cloud environment!

 

Additional Notes

 

Here are a few other concerns to watch when embarking on a multi-cloud adventure.

 

Ingress and Egress Data Transfers

 

Inter-site bandwidth is something to consider in your plan. At the time of this writing, most use cases of data transfer in to EC2 are free, with some exceptions. Data transfer out is free to most other Amazon services, like S3 and Glacier, and also free up to 1GB/month to other internet-connected sites, but costs a small amount per GB after that.

 

Data transfer in Azure is similar: all inbound data transfers are free, and there is a schedule of costs for outgoing transfers to the internet and other Azure zones.

 

Bandwidth and Latency

 

Amazon has a page on how to check bandwidth. Doing some quick tests with iperf between the two sites, here are some typical results:

 

Accepted connection from 10.99.1.237, port 35688
[ 5] local 10.10.10.4 port 5201 connected to 10.99.1.237 port 35690
[ ID] Interval Transfer Bitrate
[ 5] 0.00-1.00 sec 36.7 MBytes 308 Mbits/sec
[ 5] 1.00-2.00 sec 39.8 MBytes 334 Mbits/sec
[ 5] 2.00-3.00 sec 42.1 MBytes 353 Mbits/sec
[ 5] 3.00-4.00 sec 39.7 MBytes 333 Mbits/sec
[ 5] 4.00-5.00 sec 30.5 MBytes 256 Mbits/sec
[ 5] 5.00-6.00 sec 30.0 MBytes 252 Mbits/sec
[ 5] 6.00-7.00 sec 30.9 MBytes 259 Mbits/sec
[ 5] 7.00-8.00 sec 36.7 MBytes 308 Mbits/sec
[ 5] 8.00-9.00 sec 41.5 MBytes 348 Mbits/sec
[ 5] 9.00-10.00 sec 37.0 MBytes 311 Mbits/sec
[ 5] 10.00-10.03 sec 977 KBytes 245 Mbits/sec
- - - - - - - - - - - - - - - - - - - - - - - - -
- [ ID] Interval Transfer Bitrate
- [ 5] 0.00-10.03 sec 366 MBytes 306 Mbits/sec receiver

 

That's some pretty hefty bandwidth (306 Mbits/sec) between the two sites.

 

Ready to setup a MapR cluster?  

 

By Ronald van Loon

 

Editor's Note: This post was originally published on LinkedIn on June 12, 2017.

The world is long past the Industrial Revolution, and now we are experiencing an era of Digital Revolution. Machine Learning, Artificial Intelligence, and Big Data Analysis are the reality of today’s world.

I recently had a chance to talk to Ciaran Dynes, Senior Vice President of Products at Talend and Justin Mullen, Managing Director at Datalytyx. Talend is a software integration vendor that provides Big Data solutions to enterprises, and Datalytyx is a leading provider of big data engineering, data analytics, and cloud solutions, enabling faster, more effective, and more profitable decision-making throughout an enterprise.

The Evolution of Big Data Operations

To understand more about the evolution of big data operations, I asked Justin Mullen about the challenges his company faced five years ago and why they were looking for modern integration platforms. He responded with, “We faced similar challenges to what our customers were facing. Before Big Data analytics, it was what I call ‘Difficult Data analytics.’ There was a lot of manual aggregation and crunching of data from largely on premise systems. And then the biggest challenge that we probably faced was centralizing and trusting the data before applying the different analytical algorithms available to analyze the raw data and visualize the results in meaningful ways for the business to understand.”

Picture1

He further added that, “Our clients not only wanted this analysis once, but they wanted continuous refreshes of updates on KPI performance across months and years. With manual data engineering practices, it was very difficult for us to meet the requirements of our clients, and that is when we decided we needed a robust and trustworthy data management platform that solves these challenges.”

The Automation and Data Science

Most of the economists and social scientists are concerned about the automation that is taking over the manufacturing and commercial processes. If the digitalization and automation continues to grow at the same pace it is currently happening, there is a high probability of machines partly replacing humans in the workforce. We are seeing some examples of the phenomena in our world today, but it is predicted to be far more prominent in the future.

However, Dynes says, “Data scientists are providing solutions to intricate and complex problems confronted by various sectors today. They are utilizing useful information from data analysis to understand and fix things. Data science is an input and the output is yielded in the form of automation. Machines automate, but humans provide the necessary input to get the desired output.”

This creates a balance in the demand for human and machine services. Both, automation and data science go parallel. One process is incomplete without the other. Raw data is worth nothing if it cannot be manipulated to produce meaningful results and similarly, machine learning cannot happen without sufficient and relevant data.

Start Incorporating Big Data and Machine Learning Solutions into Business Models

Dynes says, “Enterprises are realizing the importance of data, and are incorporating Big Data and Machine Learning solutions into their business models.” He further adds that, “We see automation happening all around us. It is evident in the ecommerce and manufacturing sectors, and has vast applications in the mobile banking and finance.”

When I asked him about his opinion regarding the transformation in the demand of machine learning processes and platforms, he added that, “The demand has always been there. Data analysis was equally useful five years ago as it is now. The only difference is that five years ago there was entrepreneurial monopoly and data was stored secretively. Whoever had the data, had the power, and there were only a few prominent market players who had the access to data.”

Justin has worked with different companies. Some of his most prominent clients were Calor Gas, Jaeger and Wejo. When talking about the challenges those companies faced before implementing advanced analytics or machine learning he said, “The biggest challenges most of my clients face was the accumulation of the essential data at one place so that the complex algorithms can be run simultaneously but the results can be viewed in one place for better analysis. The data plumbing and data pipelines were critical to enable data insights to become continuous rather than one-off.”

The Reasons for Rapid Digitalization

Dynes says, “We are experiencing rapid digitalization because of two major reasons. The technology has evolved at an exponential rate in the last couple of years and secondly, organization culture has evolved massively.” He adds, “With the advent of open source technologies and cloud platforms, data is now more accessible. More people have now access to information, and they are using this information to their benefits.”

In addition to the advancements and developments in the technology, “the new generation entering the workforce is also tech dependent. They rely heavily on the technology for their everyday mundane tasks. They are more open to transparent communication. Therefore, it is easier to gather data from this generation, because they are ready to talk about their opinions and preferences. They are ready to ask and answer impossible questions,” says Dynes.

Integrating a New World with the Old World

When talking about the challenges that companies face while opting for Big Data analytics solutions Mullen adds, “The challenges currently faced by industry while employing machine learning are twofold. The first challenge they face is related to data collection, data ingestion, data curation (quality) and then data aggregation. The second challenge is to combat the lack of human skills in data-engineering, advanced analytics, and machine learning”

“You need to integrate a new world with the old world.

The old world relied heavily on data collection in big batches

while the new world focuses mainly on the real-time data solutions”

Dynes says, “You need to integrate a new world with the old world. The old world relied heavily on data collection while the new world focuses mainly on the data solutions. There are limited solutions in the industry today that deliver on both these requirements at once right now.”

Picture3

He concludes by saying that, “The importance of data engineering cannot be neglected, and machine learning is like Pandora’s Box. Its applications are widely seen in many sectors, and once you establish yourself as a quality provider, businesses will come to you for your services. Which is a good thing.”

Follow Ciaran Dynes, Justin Mullen, and Ronald van Loon on Twitter and LinkedIn for more interesting updates on Big Data solutions and machine learning.

Additional Resources

By Rachel Silver

Are you a data scientist, engineer, or researcher, just getting into distributed processing and PySpark, and you want to run some of the fancy new Python libraries you've heard about, like MatPlotLib?

If so, you may have noticed that it's not as simple as installing it on your local machine and submitting jobs to the cluster. In order for the Spark executors to access these libraries, they have to live on each of the Spark worker nodes.

You could go through and manually install each of these environments using pip, but maybe you also want the ability to use multiple versions of Python or other libraries like pandas? Maybe you also want to allow other colleagues to specify their own environments and combinations?

If this is the case, then you should be looking toward using condas to provide specialized and personalized Python configurations that are accessible to Python programs. Conda is a tool to keep track of conda packages and tarball files containing Python (or other) libraries and to maintain the dependencies between packages and the platform.

Continuum Analytics provides an installer for conda called Miniconda, which contains only conda and its dependencies, and this installer is what we’ll be using today.

For this blog, we’ll focus on submitting jobs from spark-submit. In a later iteration of this blog, we’ll cover how to use these environments in notebooks like Apache Zeppelin and Jupyter.

Installing Miniconda and Python Libraries to All Nodes

If you have a larger cluster, I recommend using a tool like pssh (parallel SSH) to automate these steps across all nodes.

To begin, we’ll download and install the Miniconda installer for Linux (64-bit) on each node where Apache Spark is running. Please make sure, before beginning the install, that you have the bzip2 library installed on all hosts:


I recommend choosing /opt/miniconda3/ as the install directory, and, when the install completes, you need to close and reopen your terminal session.

If your install is successful, you should be able to run ‘conda list’ and see the following packages:

 

Miniconda installs an initial default conda, running Python 3.6.1. To make sure this installation worked, run a version command:

python -V Python 3.6.1 :: Continuum Analytics, Inc.

To explain what’s going on here: we haven’t removed the previous default version of Python, and it can still be found by referencing the default path: /bin/python. We’ve simply added some new Python packages, like Java alternatives, that we can point to while submitting jobs without disrupting our cluster environment. See:

/bin/python -V Python 2.7.5

Now, let’s go ahead and create a test environment with access to Python 3.5 and NumPy libraries.

First, we create the conda and specify the Python version (do this as your cluster user):

conda create --name mapr_numpy python=3.5

Next, let’s go ahead and install NumPy to this environment:

conda install --name mapr_numpy numpy

Then, let’s activate this environment, and check the Python version:

 

Please complete these steps for all nodes that will run PySpark code.

Using Spark-Submit with Conda

Let’s begin with something very simple, referencing environments and checking the Python version to make sure it’s being set correctly. Here, I’ve made a tiny script that prints the Python version:

 

Testing NumPy

Now, let’s make sure this worked!

I’m creating a little test script called spark_numpy_test.py, containing the following:

 

If I were to run this script without activating or pointing to my conda with NumPy installed, I would see this error:

 

In order to get around this error, we’ll specify the Python environment in our submit statement:

 

Now for Something a Little More Advanced...

This example of PySpark, using the NLTK Library for Natural Language Processing, has been adapted from Continuum Analytics.

We’re going to run through a quick example of word tokenization to identify parts of speech that demonstrates the use of Python environments with Spark on YARN.

First, we’ll create a new conda, and add NLTK to it on all cluster nodes:

conda create --name mapr_nltk nltk python=3.5 source activate mapr_nltk

Note that some builds of PySpark are not compatible with Python 3.6, so we’ve specified an older version.

Next, we have to download the demo data from the NLTK repository:

 

This step will download all of the data to the directory that you specify–in this case, the default MapR-FS directory for the cluster user, accessible by all nodes in the cluster.

Next, create the following Python script: nltk_test.py

 

Then, run the following as the cluster user to test:

 

Additional Resources

Editor's note: this blog post was originally published in the Converge Blog on June 29, 2017

By Carol McDonald

 

This post is the second in a series where we will go over examples of how MapR data scientist Joe Blue assisted MapR customers, in this case a regional bank, to identify new data sources and apply machine learning algorithms in order to better understand their customers. If you have not already read the first part of this customer 360° series, then it would be good to read that first. In this second part, we will cover a bank customer profitability 360° example, presenting the before, during and after.

Bank Customer Profitability - Initial State

The back story: a regional bank wanted to gain insights about what’s important to their customers based on their activity with the bank. They wanted to establish a digital profile via a customer 360 solution in order to enhance the customer experience, to tailor products, and to make sure customers have the right product for their banking style.


As you probably know, profit is equal to revenue minus cost. Customer profitability is the profit the firm makes from serving a customer or customer group, which is the difference between the revenues and the costs associated with the customer in a specified period.

Banks have a combination of fixed and variable costs. For example, a building is a fixed cost, but how often a person uses an ATM is a variable cost. The bank wanted to understand the link between their product offerings, customer behavior, and customer attitudes toward the bank in order to identify growth opportunities.

Bank Customer Profitability - During

The bank had a lot of different sources of customer data:

  • Data warehouse account information.
  • Debit card purchase information.
  • Loan information such as what kind of loan and how long it has been open.
  • Online transaction information such as who they’re paying, how often they’re online, or if they go online at all.
  • Survey data.

Analyzing data across multiple data sources

This data was isolated in silos, making it difficult to understand the relationships between the bank’s products and the customer’s activities. The first part of the solution workflow is to get the data into the MapR Platform, which is easy since MapR enables you to mount the cluster itself via NFS. Once all of the data is on the MapR Platform, the data relationships can be explored interactively using the Apache Drill schema-free SQL query engine.

A key data source was a survey that the bank had conducted in order to segment their customers based on their attitudes toward the bank. The survey asked questions like, “Are you embracing new technologies?” and “Are you trying to save?” The responses were then analyzed by a third party in order to define four customer personas. But the usefulness of this survey was limited because it was only performed on 2% of the customers. The question for the data scientist was, “How do I take this survey data and segment the other 98% of the customers, in order to make the customer experience with the bank more profitable?”

Feature extraction

Feature engineering is the process of transforming raw data into inputs for a machine learning algorithm. With data science you often hear about the algorithms that are used, but actually a bigger part—consuming about 80% of a data scientist’s time—is taking the raw data and combining it in a way that is most predictive.

The goal was to find interesting properties in the bank’s data that could be used to segment the customers into groups based on their activities. A key part of finding the interesting properties in the data was working with the bank’s domain experts, because they know their customers better than anyone.

Apache Drill was used to extract features such as:

  • What kind of loans, mix of accounts, and mix of loans does the customer have?
  • How often does the customer use a debit card?
  • How often does the customer go online?
  • What does the customer buy? How much do they spend? Where do they shop?
  • How often does the customer go into the branches?

Accentuate business expertise with new learning

After the behavior of the customers was extracted, it was possible to link these features by customer ID with the labeled survey data, in order to perform machine learning.

The statistical computing language R was used to build segmentation models using many machine learning classification techniques. The result was four independent ensembles, each predicting the likelihood of belonging to one persona, based on their banking activity.

The customer segments were merged and tested with the labeled survey data, allowing to link the survey “customer attitude” personas with their banking actions and provide insights.

Banking Customer 360° - After

The solution results of modeling with the customer data are displayed in the Customer Products heat map below. Each column is an “attitude”-based persona, and each row is a type of bank account or loan. Green indicates the persona is more likely to have this product, and red indicates less likely.

This graph helps define these personas by what kinds of products they like or don’t like.

This can give insight into:

  • How to price some products
  • How to generate fees
  • Gateway products, which allow to go from a less profitable customer segment to a more profitable one.

In the Customer Payees heat map below, the rows are electronic payees. This heat map shows where customer personas are spending their money, which can give channels for targeting and attracting a certain persona to grow your business.

In this graph, the bright green blocks show that Fitness club A, Credit card/Bank A, and Credit card/Bank C are really strong for Persona D. Persona A is almost the opposite of the other personas. This “customer payees” heat map gives a strong signal about persona behavior. It’s hard to find signals like this, but they provide an additional way to look at customer data in a way that the bank could not conceive of before.

Bank Customer 360° Summary

In this example, we discussed how data science can link customer behavioral data with a small sample survey to find customer groups who share behavioral affinities, which can be used to better identify a customer base for growth. The bank now has the ability to project growth rates based on transition between personas over time and find direct channels that allow them to target personas through marketing channels. 

 

Editor's Note: this post was originally published in the Converge Blog on December 08, 2016. 

 

Related Resources:

How to Use Data Science and Machine Learning to Revolutionize 360° Customer Views 

Customer 360: Knowing Your Customer is Step One | MapR 

Applying Deep Learning to Time Series Forecasting with TensorFlow 

Churn Prediction with Apache Spark Machine Learning 

Apache Drill Apache Spark

By Carol McDonald

 

There is more and more data that is available that can help inform businesses about their customers, and those businesses that successfully utilize these new sources and quantities of data will be able to provide a superior customer experience. However, predicting customer behavior remains very challenging. This post is the first in a series where we will go over examples of how Joe Blue, a Data Scientist in MapR Professional Services, assisted MapR customers in identifying new data sources and applying machine learning algorithms in order to better understand their customers. The first example in the series is an advertising customer 360°; the next blog post in the series will cover banking and healthcare customer 360° examples.

Customer 360° Revolution: Before, During, and After

Icon_Use-Cases_Gray_Customer360_v1_300x300px_wht-bk (1).png

Before

MapR works with companies who have solved business problems but are limited with what they can do with their data, and they are looking for the next step. Often, they are analyzing structured data in their data warehouse, but they want to be able to do more.

After

The goal of the customer 360° revolution is to transform your business by figuring out what customers are going to do—if you can predict what your customer is going to do next, then you can add value to your business.

During = Data Scientist

There is a lot of confusion about what a data scientist does. A data scientist is someone who can draw the lines between the before and after, and this involves:

  1. Identifying new data sources, which traditional analytics or databases are not using due to the format, size, or structure.
  2. Collecting, correlating, and analyzing data across multiple data sources.
  3. Knowing and applying the right kind of machine learning algorithms to get value out of the data.

The goal of the customer 360° revolution is generally to:

  • Find additional information
  • Accentuate business expertise with new learning
  • Use new learning to change your business

Use Case: Advertising Customer 360° Before

An example use case for this customer 360° is a mobile banking web site which displays ads related to what the customer bought with his/her debit card. A “card-to-link” company hosts the ads and determines which ad to display. The advertisers are pharmacies, restaurants, grocery stores, etc., and they want to get their ads to the people who would be interested in buying their products. In order to target ads to interested customers, you need to accurately predict the likelihood that a given ad will be clicked, also known as "click-through rate" (CTR).

The “card-to-link” company was already capable of displaying ads related to purchases, but they wanted to do a better job of targeting and measuring the success of their ads.

As an example, let’s take a look at a new campaign for Chris’s Crème doughnuts:

  • Chris’s Crème asks the “card-to-link” company to find people who like doughnuts but who are not already their customers.
  • Card-to-link first uses data warehouse debit card merchant information to find Dunkin Doughnut customers and customers of other quick-service restaurants that sell breakfast.
  • Card-to-link also uses customer zip code information to only display Chris’s Crème ads to customers who live near a Chris’s Crème location.

This works great, but what if this method increased new customers to 600,000, but Chris’s Crème wanted to pay more to increase the number of new customers to 1,000,000? There is no way to do this with just the existing data warehouse data.

Advertising Customer 360° During Part 1

The first question for the data scientist is, "What kind of data does card-to-link have that they could use to augment new Chris’s Crème customers from 600,000 to 1,000,000, and therefore maximize the profit of this campaign?"

The New Data

The new data comes from Internet browsing history; it consists of billions of device IDs and the keyword content of the browsing history for that ID.

So how can the data scientist take this new data and find a bigger audience?

Advertising Customer 360° During Part 2

Get the Data on the MapR Platform (NFS)

The first part of the solution workflow is to get the data on the MapR Platform, which is easy via the Network File System (NFS) protocol. Unlike other Hadoop distributions that only allow cluster data import or import as a batch operation, MapR enables you to mount the cluster itself via NFS and the MapR File System enables direct file modification and multiple concurrent reads and writes via POSIX semantics. An NFS-mounted cluster allows easy data ingestion from other machines leveraging standard Linux commands, utilities, applications, and scripts.

The complete customer purchase and past campaign history data is exported from the traditional data warehouse and put on the MapR Platform as Parquet tables. The Internet browsing history is put on the MapR platform as text documents.

Once the browsing, campaign and purchase history data is on the MapR platform, Apache Drill is used for interactive exploration and preprocessing of the data with a schema-free SQL query engine.

Feature Extraction

Features are the interesting properties in the data that you can use to make predictions.

Feature engineering is the process of transforming raw data into inputs for a machine learning algorithm. In order to be used in Spark machine learning algorithms, features have to be put into Feature Vectors, which are vectors of numbers representing the value for each feature. To build a classifier model, you extract and test to find the features of interest that most contribute to the classification.

Apache Spark for text Analytics

The TF-IDF (Term Frequency–Inverse Document Frequency) function in Spark MLlib can be used to convert text words into feature vectors. TF-IDF calculates the most important words in a document compared to a collection of documents. For each word in a collection of documents, it computes:

  • Term Frequency is the number of times a word occurs in a specific document.
  • Document Frequency is the number of times a word occurs in a collection of documents.
  • TF * IDF measures the significance of a word in a document (the word occurs a lot in that document, but is rare in the collection of documents).

For example, if you had a collection of documents about football, then the word concussion in a document would be more relevant for that document than the word football.

Machine Learning and Classification

Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (such as whether a customer likes doughnuts or not), based on labeled data (such as purchase history). Classification takes a set of data with labels and features and learns how to label new records based on that information. In this example, the purchase history is used to label customers who bought doughnuts. The browsing history of millions of text keywords, many of which have seemingly nothing to do with doughnuts, is used as the features to discover similarities and categorize customer segments.

  • Label → bought doughnuts → 1 or 0
  • Features → browsing history → TF-IDF features

Machine Learning

Once the browsing and purchase history data is represented as labeled feature vectors, Spark machine learning classification algorithms such as logistic regression, decision trees, and random forests can be used to return a model representing the learning decision for that data. This model can then be used to make predictions on new data.

After the feature extraction and model building, it is possible to use the model to rank the billions of device IDs, from highest to lowest, who are most likely to eat doughnuts. With that, the audience of potential doughnut eaters can be augmented to the goal of one million.

Advertising Customer 360° After

Each colored line in the graph below refers to the results of modeling with data sets from different categories of advertising campaigns: Quick Serve Restaurant (QSR), Full Serve Restaurant, Light Fare, Auto, Grocery, and Apparel. For each campaign:

1. The purchase history was used to label the population:

  • For the grocery campaign, the following modeling populations were created:

  • Those who shopped at grocery stores multiple times

  • Those who bought their food at other stores

  • For the Full Serve restaurant campaign, the following modeling populations were created:

  • Yes: had at least 2 visits to a full serve restaurant in the last 2 months (90th percentile)

  • No: had at least 18 visits to a quick serve restaurant in last 2 months (90th percentile)

  • Excluded those who were in both (small percentage)

2. The browsing history from the labeled populations was used for the features.

3. The classification model was built from the labeled features.

4. The model was used to predict the Click-Through Rate.

The left side of this graph is the percentage of people visiting the banking web site who clicked on an advertisement. The right side is the predicted probability that a person would click on the ad. If the colored lines are continuously increasing from left to right, that means the machine learning model worked well at predicting the click-through rates.

For full-service restaurants (the yellow line), the probability of clicking tracks well with the click-through rate. At a low probability, the click-through rate was 30%, and at a high probability the model found populations with a 70% click-through rate. So the model really responded to the method of finding similar customer profiles using their Internet browsing history.

Some campaigns, like grocery, did not work. This shows that the browsing history does not really apply to increased click-through rates for grocery ads, but the click-to-link company did not know this. These are insights that can be used, for example, if click-to-link has a new automotive, QSR, or full-serve campaign. This is a method that works well and can be used for advertisers that will pay to target more customers.

Advertising Customer 360° Summary

In this example, we discussed how data science can use customer behavioral data to find customer groups who share behavioral affinities in order to better target advertisements. For more information review the following resources.

Want to learn more?

 

Editor's note: This blog post was originally published in the Converge Blog on August 08, 2016. 

By Nick Amato

 

If you’ve had a chance to work with Hadoop or Spark a little, you probably already know that HDFS doesn't support full random read-writes or many other capabilities typically required in a production-ready file system. Attempting to use HDFS as an enterprise filestore for landing and moving data through transformation pipelines means trying to get various hacks to work or employing additional nodes acting as gateways, getting you only halfway there.

This post isn’t about how MapR solves the problem and has a much easier way of landing data than our competitors (even though we do). Instead, I’ll show you a concrete example that both illustrates the benefits of using MapR-FS in a way that you might be able to relate to your own workflows, and also serves as a quick "how-to" for another way to leverage all that big storage in a MapR cluster if you happen to have one running or are considering it.

Using a MapR Cluster as a Datastore for VMware ESXi

Gartner estimates that around 75% of server workloads are now virtualized. If you have VMware in your datacenter, you might be aware that it supports the use of datastores over NFS (as well as iSCSI, but let’s leave that for another article). It's often a multi-faceted, complex effort to support this in a big environment, having to plan ahead for things like backups, having enough space and cycles for snapshots, and handling both the known and unknown scale-out behaviors.

Virtual machine files and their associated disks (one or more .vmdk files) also tend to be large and consume a lot of space. VM "sprawl" and the sheer space required to host large environments can compound the problem.

What if you could put all that scalable, fast, transparently compressed, replicated space in a MapR cluster to work by using it as a datastore in ESXi? Of course you can! You can even configure volumes and policies that make it pretty easy to dedicate a portion or all of the storage to your virtualized store.

One last thing before we jump in: this particular example involves VMware, but if you are a Docker user, check out the recent announcements around our platform’s unique ability to provide a data services layer for Docker containers.

Let's start hosting some virtual machines.

By the way, don't try this with just any Hadoop cluster, as you'll quickly find it doesn't work!

Prerequisites

In this example we'll use the following components:

  • A 10-node Hadoop cluster running MapR and YARN. The cluster has NFS enabled.
  • An ESXi server running ESXi-6.0.0-2494585-standard
  • The individual node hardware consists of HP ProLiant DL380G6 servers with 12 cores per node (Xeon X5670), as well as 6 7200RPM disks and 128G RAM per node.

This is a snapshot of the current parameters we've used in our lab, and this will most likely work with any version of ESXi that supports NFS and any recent (4.x or later) MapR version. The same goes for the hardware.

Video Demo and Tutorial

Watch the below video for a live example of how this works, or follow the steps in the next section for the quick version.

Steps

1) In the MapR MCS console, configure one or more volumes (by selecting Volumes on the left side, then New Volume) to hold the VM data. 

MapR-FS read/write

2) Ensure that NFS services are configured on at least one node. Consult the documentation steps here.

3) In the VMware VSphere Client (or web interface), select the ESXi server where you want to mount the datastore.

4) Select the ‘Configuration’ tab, then ‘Add Storage’ on the far right side.

5) Select ‘NFS’ then enter the MapR server name or address running NFS services. In the below example the server is 10.200.1.101. For ‘Folder’ select the mount point of the volume you just created.

MapR-FS

You’re done! The datastore will now appear in the main list of datastores.

You can now use the distributed storage in the MapR cluster as you would any other NFS-mounted datastore (for both read-only and random read-write applications).

Conclusion

This example highlights the benefits of having an underlying random read/write capable filesystem for your data platform -- you can leverage it in ways that are highly compatible with your existing environment and applications. Having MapR-FS at the center of the architecture allows you to harness distributed storage capacity and enterprise filesystem capabilities as part of your virtualized infrastructure or enable additional services on top of your cluster.

With a few simple steps you immediately get access to a huge amount of ready-to-use space that can be efficiently snapshotted, compressed and even mirrored to other locations as part of a DR strategy. In this article, I didn't touch upon other uses of the data (in an analytical pipeline, for example) but many are possible. If you have use cases like this, leave a comment for us below.

Give this a run in your own VMware environment with these next steps:

  • Fire up the quick installer for a free, unlimited production use version of the MapR Converged Data Platform. Take a look here to compare editions of the platform.
  • For a single-node example, you can download the sandbox (for either VMware or VirtualBox) for a single-node Hadoop cluster in just a few minutes.

 

Editor's note: Post originally shared in the MapR Converge Blog on May 04, 2016

 

Related:

MapR-FS

By Mathieu Dumoulin

 

Modern Open Source Complex Event Processing For IoT

This series of blog posts details my findings as I bring to production a fully modern take on Complex Event Processing, or CEP for short. In many applications, ranging from financials to retail and IoT applications, there is tremendous value in automating tasks that require to take action in real time. Putting aside the IT system and frameworks that would support this capability, this is clearly a useful capability.

In the first post of the series, I explain how CEP has evolved to meet this requirement and how the requirements for CEP can be met in a modern big data context. In short, I present an approach based on the best practices of modern architecture, microservices, and Kafka-style stream messaging, with an up-to-date open source business rule engine.

In this second part, I’ll get more concrete and work through a working example using the system I propose. Let’s get started.

Smart City Traffic Monitoring

We made a working demo for which the code will be released on the MapR GitHub. It is made to work on either the MapR Sandbox or using a real MapR cluster. Our demo is planned to be released, so please stay tuned and check www.mapr.com for an annoucement.

For this example, we’ll use a very simple “smart city” use case for traffic monitoring. In this case, we’ll model a single sensor that can measure the speed of cars that pass on it. Using such sensor data, it would be fairly easy to detect traffic jams in real time, and thus notify the police to take action much more quickly than they otherwise could.

Some other types of use cases are easy to envision. We can add data from a public, real-time weather feed and connect to information panels along the roads and show an advisory to drivers without any human intervention. By combining road condition sensors with the weather data, we can provide advisory feedback to drivers about road conditions and warn the public very quickly. Furthermore, by adding historical accident data and using predictive analytics, we can imagine road safety measures that can be deployed in real time to areas with a higher probability of accidents.

To be honest, I’m only scratching the surface of what this kind of smart road could do to make a commuter’s life both easier and safer while saving money for the city. But how would we build a prototype of such a system without it being hugely complicated and expensive?

How to Build a Rule-Engine Based CEP System

So we now have a proper, concrete target in mind. It turns out that if we decide to base our system around reading our data from a Kafka-style stream (i.e., persistent, scalable, and high performance), then we will naturally end up with a pretty cool, modern CEP microservice.

The important point here is not to show how to build a super complicated enterprise architecture, but rather that by making some simple technology choices and building our demo in a reasonable way, we naturally end up with this elegant, modern, and simple architecture.

For simplicity’s sake, I have decided to implement my prototype on a MapR Sandbox (get it free here). This is because it will include the stream messaging system, MapR Streams, which I can use through the Kafka 0.9 API with very little configuration and know it will work the same on a production MapR 5.1+ cluster.

Finally, it should be noted that an Apache Kafka cluster could be used as well with the same design and code, just with some additional work to get it up and running.

High-Level Architecture View

As shown in the diagram above, the flow is to have sensor data get aggregated to a producer gateway, which will forward the data to the stream with topic named “data.” The data will be in JSON format so it is easy to manipulate, human readable, and easily sent to Elasticsearch as-is for monitoring with a Kibana dashboard.

The consumer side will have two tasks: to read the data from the stream and host an instance of a KieSession where the rule engine can apply rules on the facts as they are added to it.

Rules are edited in the Workbench GUI, a Java webapp which can be run on a Java application server such as WildFly or Tomcat.

Rules are fetched from the workbench by the consumer application using the appropriate methods provided by the Drools framework, which are entirely based on Maven Repository.

We can now look at the proposed system in more detail in the following sections.

List of Technologies Used

The technology we’re going to use is as follows:

  • MapR Sandbox 5.2
  • The Java programming language (any JVM language is fine)
    • The Jackson 2 library to convert to and from JSON
  • MapR Streams or Apache Kafka stream messaging system
  • Wildfly 10 application server to host the Workbench
  • JBoss Drools as our choice of OSS business rule engine
  • Log Synth to generate some synthetic data for our prototype
  • Streamsets 1.6 to connect MapR Streams and Elasticsearch
  • Elasticsearch 2.4 and Kibana 4 for monitoring

Traffic Monitoring Prototype Architecture

As I built this demo to run on the MapR Sandbox, I’m using instructions for CentOS 6.X, an open source version of RHEL 6.X. Instructions for CentOS 7 are almost identical, and finding similar instructions for Ubuntu would be pretty straightforward and left up to the reader.

To build the core of the traffic monitoring system, we’re going to need two basic parts:

  • A program to feed sensor data into MapR Streams/Kafka. This part will be using fake data modeled by a vehicle simulation coded with Log Synth. We’ll use the MapR Kafka-rest proxy implementation (just introduced with MEP 2.0) to add the data with Python.
  • A JVM-language application that will read data from the stream and pass it to a KieSession. The minimal code to get this working is surprisingly small.

To edit the rules, we deploy the Workbench on Wildfly 10, which is a fairly straightforward process. Check this blog post for instructions, or read the Drools Documentation. Installing Wildfly is pretty simple; see this blog for great instructions on how to install it as a service on Centos/RHEL (it’s for Wildfly, but the same instructions work for 9 and 10).

We made a single configuration change to Wildfly. We changed the port to 28080 instead of 8080, as it is already used by the sandbox. Wildfly runs in standalone mode, so the configuration file is in WILDFLY_HOME/standalone/configuration/standalone.xml.

For monitoring, we let the streaming architecture work for us. We use the open source Streamset Data collector to easily redirect the sensor data to Elasticsearch so that we can actually monitor the system with a nice dashboard with Kibana. To set up Streamsets with MapR Streams requires some work with version 1.6 (great blog post about it here, or from the official Streamsets documentation).

Finally, installation and setup of Elasticsearch and Kibana is well documented on Centos/RHEL.

For production, all those parts can be readily separated to run on separate servers. They can be run on either cluster nodes or edge nodes. If it’s a MapR cluster, installing the MapR Client and pointing it to the cluster CLDB nodes will be all the configuration needed for full access to the streams. For an Apache Kafka cluster, refer to the official Kafka documentation.

Traffic Monitoring Prototype - How To

Creating the Streams with maprcli

The first task is to create streams and topics for our application. To do this, it’s a best practice to create a volume for streams first. As user mapr, type from the command line:

maprcli volume create -name streams -path /streams maprcli stream create -path /streams/traffic -produceperm p -consumerperm p maprcli stream topic create -path /streams/traffic -topic data maprcli stream topic create -path /streams/traffic -topic agenda maprcli stream topic create -path /streams/traffic -topic rule-runtime

Note: MapR Streams is more of a superset of Kafka than simply a clone. In addition to being faster, MapR Streams can use all the advantages of the MapR File System, such as volumes (with permissions and quotas and so on) and replication. A cluster is not limited to just defining topics, but can define several streams which each can have several topics. Therefore, instead of a topic name, a MapR Stream has a path:topic notation. Here, our data stream’s full name is “/streams/traffic:data”. You can read more about the advantages of MapR Streams vs. Kafka in this whiteboard walkthrough by Jim Scott.

Generating Fake Data

I used the Log-Synth tool to generate data for this prototype. Log-Synth uses a schema combined with a Sampler class to generate data in a very flexible and simple manner.

My Schema:

 

[    {"name":"traffic", "class":"cars", "speed": "70 kph", "variance": "10 kph", "arrival": "25/min", "sensors": {"locations":[1, 2, 3, 4, 5, 6, 7,8,9,10], "unit":"km"},      "slowdown":[{"speed":"11 kph", "location":"2.9 km - 5.1 km", "time": "5min - 60min"}]}   ]

 

The command to generate the data is:

synth -count 10K -schema my-schema.json >> output.json

Data is generated one car at a time, and each data point is a reading at a sensor. The data will model a flow of cars driving at 70 km/h, arriving at a rate of 25 cars per minute. A slowdown will happen between km 2.9 and 5.1 where speed will be reduced to 11km/h 5 minutes to 60 minutes after the start of the simulation. This will be the traffic jam we wish to detect using our CEP system.

The generated data is a file where each line is the resulting list of sensor measurements for a single car:

[{"id":"s01-b648b87c-848d131","time":52.565782936267404,"speed":19.62484385513174},{"id":"s02-4ec04b36-2dc4a6c0","time":103.5216023752337,"speed":19.62484385513174},{"id":"s03-e06eb821-cda86389","time":154.4774218142,"speed":19.62484385513174},{"id":"s04-c44b23f0-3f3e0b9e","time":205.43324125316627,"speed":19.62484385513174},{"id":"s05-f57b9004-9f884721","time":256.38906069213255,"speed":19.62484385513174},{"id":"s06-567ebda7-f3d1013b","time":307.3448801310988,"speed":19.62484385513174},{"id":"s07-3dd6ca94-81ca8132","time":358.3006995700651,"speed":19.62484385513174},{"id":"s08-2d1ca66f-65696817","time":409.25651900903136,"speed":19.62484385513174},{"id":"s09-d3eded13-cf6294d6","time":460.21233844799764,"speed":19.62484385513174},{"id":"s0a-1cbe97e8-3fc279c0","time":511.1681578869639,"speed":19.62484385513174}]

 

A reading has a sensor ID, a speed in meters per second, and a time delta from time 0 (the moment the simulation starts) in seconds.

My producer code simply translates the readings into a list of sensor readings ordered by time, and I transform the speed into km/s and the time into a timestamp as milliseconds from epoch.

Sending to the code onto the stream can be done one line at a time using standard producer code. The code in the sample Java producer works just fine.

Another exciting new possibility is to use the brand new Kafka Rest Proxy, which is also available on MapR from MEP 2.0 (MapR Ecosystem Pack). This means sensors can connect directly to Kafka from any language since HTTP-based REST API is a global standard.

Using the Workbench

We can login to the Workbench and login with an admin user (a user with the role “admin”) created with the add-user.sh script from Wildfly.

Using the Workbench is beyond the scope of the article, but the general idea is to create an Organizational Unit and a Repository, and then create a project.

Data Objects

We will need to create facts for the rule engine to work with. Best practice for Drools is to use a separate Maven project for your data model. For the sake of simplicity, I created them right from the workbench.

The Measurement is a super generic bean which models the sensor speed measurements. For the prototype, I kept it as simple as the raw data with a timestamp, a sensor ID, and a speed.

The Sensor bean models the sensor itself and will have an ID and the average speed of all cars that it measures over a time window defined by our rules. This average speed will be used to trigger an alert for heavy traffic. The traffic String is to mark the current traffic level which can be “NONE”, “LIGHT” or “HEAVY.”

The Traffic Monitoring Rules

  • create sensors for new id
  • detect heavy traffic
  • detect light traffic
  • detect normal traffic
  • get average speed at sensor

The create sensors rule ensures that there are sensor objects available in memory. This is the fact we use to know the average speed at a certain sensor.

The detect heavy traffic is the key rule we want to use to send an alarm to the police if heavy traffic is detected at a certain sensor.

So when the average speed reaches 20 km/h or less, and the sensor isn’t already in the HEAVY traffic level, set the level to HEAVY and send an alert.

Which means we need to know the average speed. Here is the rule to compute it using the Drools rule DSL (domain specific language):

This is not rocket science! These rules illustrate rather clearly how making up simple but useful rules can be realistically left up to business analysts and developed separately from the whole stream and big data platform.

The Consumer Side

The consumer reads the data from the stream. The tutorial code in Java from the documentation is perfectly adequate. Jackson is used to convert the JSON to Measurement objects.

The consumer has an instance of the KieSession, and each measurement is added to the session using kieSession.insert(fact) and followed by a call to kieSession.fireAllRules(), which triggers the algorithm to check if any rules match with the new state of the session given the new data.

A channel, which is just a callback, is used to allow a rule to call a function “outside” of the KieSession. My Prototype uses this method to log the alert. In a production system, we could easily change the code to send an email, an SMS, or take some other action.

Importing the Rules into the Consumer Application

The way we get the rules into the running application is by fetching it from the Maven repository integrated into the Workbench.

KieServices kieServices = KieServices.Factory.get();  
ReleaseId releaseId = kieServices.newReleaseId( "org.mapr.demo", "smart-traffic-kjar", "1.0.0" ); 
KieContainer kContainer = kieServices.newKieContainer( releaseId );

KieSession kieSession = kContainer.newKieSession();

 

So the question becomes, how does the call to newReleaseId know to fetch the artifact with our rules from the Maven repository in the Workbench?

The answer is with the ~/.m2/settings.xml file, where you add this information. We recommend to use the user mapr for everything in the sandbox, so the full path is: /home/mapr/.m2/settings.xm

[mapr@maprdemo .m2]$ cat settings.xml  
<?xml version="1.0" encoding="UTF-8"?> 
<settings> 
   <servers> 
       <server> 
           <id>guvnor-m2-repo</id> 
           <username>admin</username> 
           <password>admin</password> 
       </server> 
   </servers> 
   <profiles> 
       <profile> 
           <id>cep</id> 
           <repositories> 
               <repository> 
                   <id>guvnor-m2-repo</id> 
                   <name>Guvnor M2 Repo</name> 
                   <url>**http://127.0.0.1:28080/drools-wb-webapp/maven2/**</url> 
                   <releases> 
                       <enabled>true</enabled> 
            <updatePolicy>interval:1</updatePolicy> 
                   </releases> 
                   <snapshots> 
                       <enabled>true</enabled> 
            <updatePolicy>interval:1</updatePolicy> 
                   </snapshots> 
               </repository> 
           </repositories> 
       </profile> 
   </profiles> 
   <activeProfiles> 
       <activeProfile>cep</activeProfile> 
   </activeProfiles> 
</settings>

 

The key information is in bold, which corresponds to the URL of the maven2 repository of the Drools workbench. This information can be copied and pasted from the pom.xml which can be seen by using the repository view:

So I just copy-pasted that and now everything works like magic.

Monitoring the Smart Traffic Prototype

We have one stream with data, and two other streams to monitor the rule engine internals. This is very easy with Drools because it uses Java Listeners to report on its internal state. We simply provide a custom implementation of the listeners to produce the data to a stream, and then use Streamsets to redirect everybody to Elasticsearch.

Elasticsearch Mappings

The mappings are defined in a small script I created:

http://pastebin.com/kRCbvAkU

Streamsets for No-Code Stream To Elasticsearch Pipeline

Each stream has its own pipeline, where each looks like this:

The Jython Evaluator adds timestamp information if necessary.

Running the Prototype

Start the consumer:

Then start the producer:

In my prototype, I added code to control the rate at which the data is sent to the stream to make the rules firing easier to see. 10,000 events are quite small for Drools and for MapR Streams/Kafka, and so the whole demo would be over in less than a second. This is the meaning of the “-r 25” for 25 events per second rate.

The dashboard looks like this:

Once data starts to stream in:

The traffic jam is quite visible now:

As soon as a sensor average speed drops below 20 km/h, an alarm is fired:

And the dashboard will display a count of “1”

The simulation will continue and the two other sensors will go below 20 in turn for a total of 3 alarms fired.

In Conclusion

This project turned into a great illustration of the power and ease of streaming architectures using microservices. A common misconception of microservices is that they are limited to synchronous REST services. This is not the case at all. Our system is a backend system and so the microservices (the producer, the consumer, streamsets, and ES/Kibana) are all run asynchronously, dealing with the data as it comes off the stream.

The project was fairly easy to build technically because every part is totally independent of the others, and can therefore be tested in isolation. Once the producer can send data to the stream properly, it doesn’t need to be tested ever again for issues that concern other parts of the system. Also, each part was added one at a time, making it easy to identify issues and fix them.

In total, with very little original code, we could implement a working, useful system that would need very little change to be put into production. Only the producer needs to be customized for the particular sensor or data source.

Rules are the most time consuming and error-prone part. This is no different than if the project had been done using custom-coded Spark code. The win for an approach based on a rule engine such as Drools and the Drools Workbench is that the rules can be edited, tested, and improved independently of how the code runs on the cluster. The work in Workbench has no dependency on the system at all, as it is pulled in by the consumer application automatically.

From a business value point of view, all the value is in the rules, assuming a stable production system. There is no reason for organizations not to take advantage of this quick edition capability to become ever more agile and responsive to evolving conditions for the benefit of their customers… and the bottom line.

 

Editor's note: This blog post was originally published in the Converge Blog on January 10, 2017

Related:

By Ian Downard

 

 

A lot of people choose MapR as their core platform for processing and storing big data because of its advantages for speed and performance. MapR consistently performs faster than any other big data platform for all kinds of applications, including Hadoop, distributed file I/O, NoSQL data storage, and data streaming. In this post, I’m focusing on the latter to provide some perspective on how much better/faster/cheaper MapR Streams can be compared to Apache Kafka as a data streaming technology.

MapR Streams is a cluster-based messaging system for streaming data at scale. It’s integrated into the MapR Converged Data Platform and implements the Apache Kafka Java API so applications written for Kafka can also run on MapR Streams. What differentiates the MapR Streams technology from Kafka are its built-in features for global replication, security, multi-tenancy, high availability, and disaster recovery—all of which it inherits from the MapR Converged Data Platform. From an operational perspective, these features make MapR Streams easier to manage than Kafka, but there are speed advantages, too. I’ve been looking at this a lot lately, trying to understand where and why MapR Streams outperforms Kafka. In this blog post, I will share with you how clearly MapR Streams can transport a much faster stream of data, with much larger message sizes, and to far more topics than what can be achieved with Kafka.

Test Strategy

In this study, I wanted to compare Kafka and MapR Streams as to how they perform “off the shelf” without the burden of tuning my test environment to perfectly optimize performance in each test scenario. So, I have pretty much stuck with the default settings for services and clients. The only exceptions are that I configured each Kafka topic with a replication factor of 3 and configured producers to send messages synchronously, since these are the default modes for MapR Streams. I also disabled stream compression in order to control message sizes and measure throughput more precisely.

Test Configurations

I measured performance from both producer and consumer perspectives. However, consumers run faster than producers, so I focused primarily on the producer side since the throughput of a stream is bounded by the throughput of its producers. I used two threads in my producer clients so that message generation could happen in parallel with sending messages and waiting for acknowledgments. I used the following properties for producers and topics:

acks = all
batch.size = 16384
latency.ms = 0ms
block.on.buffer.full = true
compression = none
default.replication.factor = 3

 

My test environment consisted of three Ubuntu servers running Kafka 2.11-0.10.0.1 or MapR 5.2 on Azure VMs sized with the following specs:

  • Intel Xeon CPU E5-2660 2.2 GHz processor with 16 cores
  • SSD disk storage with 64,000 Mbps cached / 51,200 uncached max disk throughput
  • 112GB of RAM
  • Virtual networking throughput between 1 and 2 Gbits/sec (I measured this quantitatively since I couldn’t easily find virtual network throughput specs from Microsoft).

Performance Metrics

Throughput, latency, and loss are the most important metrics measuring the performance of a message bus system. MapR Streams and Kafka both guarantee zero loss through at-least-once semantics. MapR provides some advantages when it comes to latency, but typically both MapR Streams and Kafka deliver messages sufficiently quick for real-time applications. For those reasons, I chose to focus on throughput in this study.

Throughput is important because if an application generates messages faster than a message bus can consume and deliver them, then those messages must be queued. Queueing increases end-to-end latency and destabilizes applications when queues grow too large.

Furthermore, throughput in Kafka and MapR Streams is sensitive to the size of the messages being sent and to the distribution of those messages into topics. So, I analyzed those two attributes independently in order to measure how message size and stream topics affect throughput.

Throughput Performance

To measure producer throughput, I measured how fast a single producer could publish a sustained flow of messages to single topic with 1 partition and 3x replication. I ran this test for a variety of message sizes to see how that affects throughput. The results show MapR Streams consistently achieving much higher throughput than Kafka and having a much higher capacity for handling large message sizes, as shown below.

Throughput MB/s

MapR Streams doesn’t just send a faster volume of data than Kafka; it also has the capacity to send more records per second. We can see this by plotting throughput in terms of raw record count, as shown below:

Throughput Msgs/s

I recorded these results with two different code bases. First, I used custom tests that I wrote using the Java unit test framework (JUnit), then I used the performance test scripts included with Kafka and MapR. These different approaches did not produce exactly the same results but they were close, as shown below. This correlation helps validate the conclusions stated above, that MapR Streams can transport a larger volume of data and more frequent messages than Kafka.

Throughput Correlation

How does MapR Streams achieve more than 4x throughput than Kafka?

There are a lot of reasons why MapR Streams is faster, and without getting too technical, I’ll mention just a few. First, the MapR Streams client more efficiently flushes data to the MapR Streams server. It spawns its own threads to do this work, whereas Kafka uses the client application threads directly to flush to a Kafka broker, which in many cases is limited to just a single thread.

On the server side, MapR Streams inherits efficient I/O patterns from the core MapR storage layer which keeps files coherent and clean so that I/O operations can be efficiently buffered and addressed to sequential locations on disk. Replication is more efficient, too, since the underlying MapR storage platform has distributed synchronous replication built in, along with other operational features that simply don’t exist in Kafka, such as snapshots, mirroring, quotas, access controls, etc.

Replicating this test

My JUnit tests for benchmarking Kafka and MapR Streams is available at https://github.com/iandow/kafka_junit_tests. Here are the commands that I used to generate the data shown above:

git clone https://github.com/iandow/kafka_junit_tests
cd kafka_junit_tests
# Create a Kafka topic...
/opt/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic t-00000 --config compression.type=uncompressed
# or create a MapR Streams topic.
maprcli stream create -path /user/mapr/iantest -produceperm p -consumeperm p -topicperm p -defaultpartitions 1 -compression off
# Then compile.
mvn -e -Dtest=MessageSizeSpeedTest test
# Test data will be saved in size-count.csv

You can also measure throughput using the performance test utilities included with Kafka and MapR. Here are the commands that I used to do that:

Kafka script:

https://gist.github.com/iandow/bf5df0f9b4f19e6a19aa5a7a93b7c81c

MapR script:

https://gist.github.com/iandow/0750185f1d3631301d476b426c109a50

Topic Scalability

Another major advantage that MapR Streams holds over Kafka relates to how well it can handle large quantities of stream topics. Topics are the primary means of organizing stream data; however, there is overhead associated with categorizing streams into topics, and producer throughput is sensitive to that overhead. I quantified this by measuring how fast a single producer could publish a sustained flow of messages to an increasingly large quantity of topics. This is essentially a "fan-out" producer (illustrated below) and it is very common for fast data pipelines to use this pattern so that data can be more easily consumed downstream.

Fanout Producer

Each of the topics created for this scenario were configured with a single partition and 3x replication. Record size was held constant at 100 bytes.

It’s clear from the following graph that MapR Streams scales to a larger quantity of topics than Kafka.

Topic Scalability

How does MapR Streams handle so many more topics than Kafka?

A topic is just metadata in MapR Streams; it does not introduce overhead to normal operations. MapR Streams uses only one data structure for a stream, no matter how many topics it has, and the MapR storage system provides extremely fast and scalable storage for that data.

On the other hand, Kafka represents each topic by at least one directory and several files in a general purpose file system. The more topics/partitions Kafka has the more files it creates. This makes it harder to buffer disk operations, perform sequential I/O, and it increases the complexity of what ZooKeeper must manage.

Replicating this test

This scenario can be run with another JUnit test from https://github.com/iandow/kafka_junit_tests, as follows:

git clone https://github.com/iandow/kafka_junit_tests
cd kafka_junit_tests
# For MapR only, create the stream first:
maprcli stream create -path /user/mapr/taq -produceperm p -consumeperm p -topicperm p -compression off
mvn -e -Dtest= ThreadCountSpeedTest test
# Test data will be saved in thread-count.csv

 

Partition Scalability

Stream topics are often subdivided into partitions in order to allow multiple consumers to read from a topic simultaneously. Both Kafka and MapR Streams allow topics to be partitioned, but partitions in MapR Streams are much more powerful and easier to manage than partitions in Kafka. For example, Kakfa requires partitions to fit within the disk space of a single cluster node and cannot be split across machines. MapR Streams is not limited by the storage capacity of any one node because the MapR storage system automatically grows (or shrinks) partitions across servers. I’ll talk more about these operational advantages later, but let’s consider the performance implications of partitioning now.

ZooKeeper elects separate nodes to be leaders for each partition. Leaders are responsible for processing the client reads and writes for their designated partition. This helps load balance client requests across the cluster, but it complicates the work the ZooKeeper must do to keep topics synchronized and replicated. Leader election takes time and does not scale well. In my tests, I saw leader election take at least 0.1 seconds per partition and it ran serially. So, for example, it would take more than 10 seconds to configure a topic with 100 partitions, that is, if ZooKeeper didn’t crash, which it frequently did when I created topics with 100 or more partitions.

In MapR Streams, I had no problem streaming data to topics with thousands of partitions, as shown below. This graph shows the throughput for a producer sending synchronously to a 3x replicated topic subdivided into an increasingly large number of partitions. I could not run my test in Kafka beyond 400 partitions, so that line is cut short.

Partitioning Scalability

Replicating this test

I used the performance scripts included with Kafka and MapR to generate the partition vs. throughput data shown above. Here is the script I used to run this test in Kafka:

https://gist.github.com/iandow/625d783333a53b592f0381e6b37ee9ab

That script will silently freeze if ZooKeeper fails, but it will continue once ZooKeeper starts again. So in another terminal, I simultaneously ran the following script to automatically restart ZooKeeper if it fails (which it is likely to do during this test):

https://gist.github.com/iandow/2dc07bde132669706467e8ee45507561

Here is the script I used to generate partitions vs. throughput data in MapR:

https://gist.github.com/iandow/8074962f6205552c9cdc3fceccdd9793

Operational Advantages for MapR Streams

Increasing throughput capacity and decreasing message latency can often be accomplished simply by adding nodes to your distributed messaging cluster. However, doing so costs money and complicates management, so essentially saying that MapR Streams performs better than Kafka is another way of saying that operating a distributed messaging platform can be done with less hardware on MapR than with Kafka.

However, unless you’re working on applications that scale to extreme lengths, then the challenges you face with Kafka are more likely to be operational rather than performance in nature. And this is where the MapR total cost of ownership really shines.

Not only does MapR Streams execute with higher performance, it also addresses major operational deficiencies in Kafka. Here are three examples relating to replication, scaling, and mirroring:

  • Kafka requires that the MirrorMaker processes be manually configured in order to replicate across clusters. Replication is easy to configure with MapR Streams and supports unique capabilities for replicating streams across data centers and allowing streams to be updated in multiple locations at the same time.

  • Kafka’s mirroring design simply forwards messages to a mirror cluster. The offsets in the source cluster are useless in the mirror, which means consumers and producers cannot automatically failover from one cluster to a mirror. MapR continuously transfers updated records for near real-time replication and preserves message offsets in all replicated copies.

  • Kakfa requires partitions to fit within the disk space of a single cluster node and cannot be split across machines. This is especially risky, because ZooKeeper could automatically assign multiple large partitions to a node that doesn’t have space for them. You can move them manually, but that can quickly become unmanageable. MapR Streams is not limited by the storage capacity of any one node because it distributes stream data across the cluster.

References

For more information about the operational advantages of MapR Streams, see Will Ochandarena’s blog post, Scaling with Kafka – Common Challenges Solved.

I also highly recommend reading Chapter 5 of Streaming Architecture: New Designs Using Apache Kafka and MapR Streams, by Ted Dunning & Ellen Friedman.

Conclusion

MapR Streams outperforms Kafka in big ways. I measured the performance of distributed streaming in a variety of cases that focused on the effects of message size and topic quantity, and I saw MapR Streams transport a much faster stream of data, with much larger message sizes, and to far more topics than what could be achieved with Kafka on a similarly sized cluster. Although performance isn’t the only thing that makes MapR Streams desirable over Kafka, it offers one compelling reason to consider it.

 

Editor's Note: This blog post was originally published in the Converge Blog on January 11, 2017. Since the publication of the original article MapR Streams has been renamed as MapR-ES

 

Related

MapR-ES

Getting Started with MapR-ES Event Data Streams

By Carol McDonald

This is a 4-Part Series, see the previously published posts below:

Part 1 - Spark Machine Learning

Part 3 – Real-Time Dashboard Using Vert.x

This post is the second part in a series where we will build a real-time example for analysis and monitoring of Uber car GPS trip data. If you have not already read the first part of this series, you should read that first.

The first post discussed creating a machine learning model using Apache Spark’s K-means algorithm to cluster Uber data based on location. This second post will discuss using the saved K-means model with streaming data to do real-time analysis of where and when Uber cars are clustered.

Example Use Case: Real-Time Analysis of Geographically Clustered Vehicles/Items

The following figure depicts the architecture for the data pipeline:

  1. Uber trip data is published to a MapR Streams topic using the Kafka API
  2. A Spark streaming application subscribed to the first topic:
    1. Ingests a stream of uber trip events
    2. Identifies the location cluster corresponding to the latitude and longitude of the uber trip
    3. Adds the cluster location to the event and publishes the results in JSON format to another topic
  3. A Spark streaming application subscribed to the second topic:
    1. Analyzes the uber trip location clusters that are popular by date and time

Example Use Case Data

The example data set is Uber trip data, which you can read more about in part 1 of this series. The incoming data is in CSV format, an example is shown below , with the header:

date/time, latitude,longitude,base
2014-08-01 00:00:00,40.729,-73.9422,B02598

The enriched Data Records are in JSON format. An example line is shown below:

Spark Kafka Consumer Producer Code

Parsing the Data Set Records

A Scala Uber case class defines the schema corresponding to the CSV records. The parseUber function parses the comma separated values into the Uber case class.

Loading the K-Means Model

The Spark KMeansModel class is used to load the saved K-means model fitted on the historical Uber trip data.

Output of model clusterCenters:

Below the cluster centers are displayed on a google map:

Spark Streaming Code

These are the basic steps for the Spark Streaming Consumer Producer code:

  1. Configure Kafka Consumer Producer properties.
  2. Initialize a Spark StreamingContext object. Using this context, create a DStream which reads message from a Topic.
  3. Apply transformations (which create new DStreams).
  4. Write messages from the transformed DStream to a Topic.
  5. Start receiving data and processing. Wait for the processing to be stopped.

We will go through each of these steps with the example application code.

  1. Configure Kafka Consumer Producer properties

The first step is to set the KafkaConsumer and KafkaProducer configuration properties, which will be used later to create a DStream for receiving/sending messages to topics. You need to set the following paramters:

  • Key and value deserializers: for deserializing the message.
  • Auto offset reset: to start reading from the earliest or latest message.
  • Bootstrap servers: this can be set to a dummy host:port since the broker address is not actually used by MapR Streams.

For more information on the configuration parameters, see the MapR Streams documentation.

  1. Initialize a Spark StreamingContext object.

ConsumerStrategies.Subscribe, as shown below, is used to set the topics and Kafka configuration parameters. We use the KafkaUtils createDirectStream method with a StreamingContext, the consumer and location strategies, to create an input stream from a MapR Streams topic. This creates a DStream that represents the stream of incoming data, where each message is a key value pair. We use the DStream map transformation to create a DStream with the message values.

  1. Apply transformations (which create new DStreams)

We use the DStream foreachRDD method to apply processing to each RDD in this DStream. We parse the message values into Uber objects, with the map operation on the DStream. Then we convert the RDD to a DataFrame, which allows you to use DataFrames and SQL operations on streaming data.

Here is example output from the df.show:

A VectorAssembler is used to transform and return a new DataFrame with the latitude and longitude feature columns in a vector column.

Then the model is used to get the clusters from the features with the model transform method, which returns a DataFrame with the cluster predictions.

The output of categories.show is below:

The DataFrame is then registered as a table so that it can be used in SQL statements. The output of the SQL query is shown below:

  1. Write messages from the transformed DStream to a Topic

The Dataset result of the query is converted to JSON RDD Strings, then the RDD sendToKafka method is used to send the JSON key-value messages to a topic (the key is null in this case).

Example message values (the output for temp.take(2) ) are shown below:

{"dt":"2014-08-01 00:00:00","lat":40.729,"lon":-73.9422,"base":"B02598","cluster":7}

{"dt":"2014-08-01 00:00:00","lat":40.7406,"lon":-73.9902,"base":"B02598","cluster":7}

  1. Start receiving data and processing it. Wait for the processing to be stopped.

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

Spark Kafka Consumer Code

Next, we will go over some of the Spark streaming code which consumes the JSON-enriched messages.

We specify the schema with a Spark Structype:

Below is the code for:

  • Creating a Direct Kafka Stream
  • Converting the JSON message values to Dataset[Row] using spark.read.json with the schema
  • Creating two temporary views for subsequent SQL queries
  • Using ssc.remember to cache data for queries

Now we can query the streaming data to ask questions like: which hours had the highest number of pickups? (Output is shown in a Zeppelin notebook):

spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")

How many pickups occurred in each cluster?

df.groupBy("cluster").count().show()

or

spark.sql("select cluster, count(cluster) as count from uber group by cluster")

Which hours of the day and which cluster had the highest number of pickups?

spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")

Display datetime and cluster counts for Uber trips:

%sql select cluster, dt, count(cluster) as count from uber group by dt, cluster order by dt, cluster

Software

Summary

In this blog post, you learned how to use a Spark machine learning model in a Spark Streaming application, and how to integrate Spark Streaming with MapR Streams to consume and produce messages using the Kafka API.

References and More Information:

Editor's note: This blog post was originally published in the MapR Converge Blog on January 05, 2017.

By Carol McDonald

 

A Formula 1 race is a high-speed example of the Internet of Things, where gathering, analyzing, and acting on tremendous amounts of data in real time is essential for staying competitive. The sport’s use of such information is so sophisticated that some teams are exporting their expertise to other industries, even for use on oil rigs. Within the industry, automobile companies such as Daimler and Audi are leveraging deep learning on the MapR Converged Data Platform to successfully analyze the continuous data generated from car sensors.

This blog discusses a proof of concept demo, which was developed as part of a pre-sales project to demonstrate an architecture to capture and distribute lots of data, really fast, for a Formula 1 team.

What’s the Point of Data in Motor Sports?

Formula 1 cars are some of the most heavily instrumented objects in the world.

Picture1

Read more about The Formula 1 Data Journey

Formula 1 data engineers analyze data from ~150 sensors per car, tracking vital stats such as tire pressure, fuel efficiency, wind force, GPS location, and brake temperature in real time. Each sensor communicates with the track, the crew in the pit, a broadcast crew on-site, and a second team of engineers back in the factory. They can transmit 2GB of data in one lap and 3TB in a full race.

Picture2

Watch WSJ's video "The F1 Big Data Explainer"

Data engineers can make sense of the car’s speed, stability, aerodynamics, and tire degradation around a race track. The graph below shows an example of what is being analyzed by race engineers: rpm, speed, acceleration, gear, throttle, brakes, by lap.

Picture3

More analysis is also completed at the team’s manufacturing base. Below is an example race strategy, analyzing whether it would be faster to do a pit stop tire change at lap 13 and lap 35 vs. lap 17 and lap 38.

Picture4

The Challenge: How Do You Capture, Analyze, and Store this Amount of Data in Real Time at Scale?

The 2014 U.S. Grand Prix collected more than 243 terabytes of data in a race weekend, and now there is even more data. Formula 1 teams are looking for newer technologies with faster ways to move and analyze their data in the cockpits and at the factory.

Below is a proposed proof of concept architecture for a Formula 1 team:

Picture5

MapR Edge in the cars provides an on-board storage system that buffers the most recent data and retries when transmission fails. MapR Edge addresses the need to capture, process, and provide backup for data transmission close to the source. A radio frequency link publishes the data from the edge to the referee “FIA” topic, and from there it is published to each team’s topic, where local analytics is done. The “team engineers” Stream replicates to the “trackside” and “factory” Streams, so that the data is pushed in real time for analytics. In a sport where seconds make a difference, it’s crucial that the trackside team can communicate quickly with the engineers at headquarters and team members around the world. MapR Streams replication provides an unusually powerful way to handle data across distant data centers at large scale and low latency.

The Demo Architecture

The demo architecture is considerably simplified because it needs to be able to run on a single node MapR sandbox. The demo does not use real cars; it uses the Open Racing Car Simulator (TORCS), a race car simulator often used for AI race games and as a research platform. 

Picture6

The demo architecture is shown below. Sensor data from the TORCS simulator is published to a MapR Streams topic using the Kafka API. Two consumers are subscribed to the topic. One Kafka API consumer, a web application, provides a real-time dashboard using websockets and HTML5. Another Kafka API consumer stores the data in MapR-DB JSON, where analytics with SQL are performed using Apache Drill. You can download the demo code here:  https://github.com/mapr-demos/racing-time-series.

Picture7

How Do You Capture this Amount of Data in Real Time at Scale?

Older messaging systems track message acknowledgements on a per-message, per-listener basis. A new approach is needed to handle the volume of data for the Internet of Things. 

Picture8

MapR Streams is a distributed messaging system that enables producers and consumers to exchange events in real time via the Apache Kafka 0.9 API. In MapR Streams, topics are logical collections of messages, which organize events into categories and decouple producers from consumers.

Picture9

Topics are partitioned for throughput and scalability. Producers are load balanced between partitions, and consumers can be grouped to read in parallel from multiple partitions within a topic for faster performance.

Picture10

You can think of a partition like a queue; new messages are appended to the end, and messages are delivered in the order they are received.

Picture11

Unlike a queue, events are not deleted when read; they remain on the partition, available to other consumers.

Picture12

A key to high performance at scale, in addition to partitioning, is minimizing the time spent on disk reads and writes. With Kafka and MapR Streams, messages are persisted sequentially as produced and read sequentially when consumed. These design decisions mean that non-sequential reading or writing is rare, allowing messages to be handled at very high speeds with scale. Not deleting messages when they are read also allows processing of the same messages by different consumers for different purposes. 

Picture13

Example Producer Using the Kafka API

Below is an example producer using the Kafka Java API. For more information, see the MapR Streams Java applications documentation.

Picture14

How to Store the Data

One of the challenges, when you have 243 terabytes of data every race weekend, is where do you want to store it? With a relational database and a normalized schema, related data is stored in different tables. Queries joining this data together can cause bottlenecks with lots of data. For this application, MapR-DB JSON was chosen for its scalability and flexible ease of use with JSON. MapR-DB and a denormalized schema scale, because data that is read together is stored together.

Picture15

With MapR-DB (HBase API or JSON API), a table is automatically partitioned across a cluster by key range, providing for really fast reads and writes by row key.

Picture16

JSON Schema Flexibility

JSON facilitates the natural evolution of your data schema during the life of your application. For example, in version 1, we have the following schema, where each JSON message group sensors values for Speed, Distance, and RPM:

 

{  "_id":"1.458141858E9/0.324",
   "car" = "car1",
   "timestamp":1458141858,
   "racetime”:0.324,
   "records": 
      [ 
         { 
            "sensors":{ 
               "Speed":3.588583,
               "Distance":2003.023071,
               "RPM":1896.575806
            },
         { 
            "sensors":{ 
               "Speed":6.755624,
               "Distance":2004.084717,
               "RPM":1673.264526
            },
        },

For version 2, you can easily capture more data values quickly without changing the architecture of your application, by adding attributes as shown below:

 

...
"records": 
      [ 
         { 
            "sensors":{ 
               "Speed":3.588583,
               "Distance":2003.023071,
               "RPM":1896.575806,
               "Throttle" : 37,
               "Gear" : 2

            },
. . .

As discussed before, MapR Streams allows processing of the same messages for different purposes or views. With this type of architecture and flexible schema, you can easily create new services and new APIs–for example, by adding Apache Spark Streaming or an Apache Flink Kafka Consumer for in-stream analysis, such as aggregations, filtering, alerting, anomaly detection, and/or machine learning.

Picture17

Processing the Data with Apache Flink

Apache Flink is an open source distributed data stream processor. Flink provides efficient, fast, consistent, and robust handling of massive streams of events as well as batch processing, a special case of stream processing. Stream processing of events is useful for filtering, creating counters and aggregations, correlating values, joining streams together, machine learning, and publishing to a different topic for pipelines.

Picture18

The code snippet below uses the FlinkKafkaConsumer09 class to get a DataStream from the MapR Streams “sensors” topic. The DataStream is a potentially unbounded distributed collection of objects. The code then calculates the average speed with a time window of 10 seconds. 

Picture19Picture20

Querying the Data with Apache Drill

Apache Drill is an open source, low-latency query engine for big data that delivers interactive SQL analytics at petabyte scale. Drill provides a massively parallel processing execution engine, built to perform distributed query processing across the various nodes in a cluster. 

Picture21

With Drill, you can use SQL to query and join data from files in JSON, Parquet, or CSV format, Hive, and NoSQL stores, including HBase, MapR-DB, and Mongo, without defining schemas.

Below is an example query to ‘show all’ from the race car’s datastore :

SELECT * 
FROM dfs.`/apps/racing/db/telemetry/all_cars`
LIMIT 10

And the results:

Picture22

Below is an example query to show the average speed by car and race:

SELECT race_id, car, 
ROUND(AVG( `t`.`records`.`sensors`.`Speed` ),2) as `mps`,
ROUND(AVG( `t`.`records`.`sensors`.`Speed` * 18 / 5 ), 2) as `kph`
FROM
(
SELECT race_id, car, flatten(records) as `records`
FROM dfs.`/apps/racing/db/telemetry/all_cars`
) AS `t`
GROUP BY race_id, car

 

And the results:

Picture23

All of the components of the use case architecture that we just discussed can run on the same cluster with the MapR Converged Data Platform.

Software

You can download the code and instructions to run this example from here: https://github.com/mapr-demos/racing-time-series.

Editor's note: This blog post was originally shared in the Converge Blog on May 11, 217. Since the publication of this blog MapR Streams product name has been changed to MapR-ES.

Additional Resources

MapR Edge product information

 

MapR Streams Application Development

Getting Started with Apache Flink and MapR Streams

Big Data On The Road

NorCom selects MapR converged data platform for foundation of deep learning framework for Mercedes

Apache Drill

MapR-ES

MapR-DB

by Karen Whipple

 

Human trafficking is the third largest crime industry in the world today. India has an estimated 18.3 million slaves, according to the Global Slavery Index, and every year 200,000 Indian children are tricked, kidnapped, or coerced into slavery. Every three minutes, a girl is sold into slavery. The average age of these girls is 12 years old, with children as young as six trafficked for sexual exploitation, and less than one percent of them are ever rescued. Public awareness is the most effective barrier to human trafficking, and MapR is proud to be part of the big data analytics solution, developed by Quantium for My Choices Foundation, to help alleviate one of the world’s worst social problems.

While many organizations work to rescue girls and prosecute the traffickers, Operation Red Alert, a program of My Choices Foundation, is a prevention program designed to help parents, teachers, village leaders, and children understand how the traffickers work, so they can block their efforts. Poor village girls are typically targeted, with promises that the girls are being offered wonderful opportunities for an education, jobs, or marriage. But with over 600,000 villages in India, Operation Red Alert needed help to determine which areas were most at risk to prioritize their education efforts.

 

Watch the video 'Red Alert Saves Lives'

As a pro bono project, the Australian analytics firm Quantium developed a big data solution, which runs on Cisco UCS infrastructure and uses the MapR Converged Data Platform. The solution analyzes India’s census data, government education data, and other sources for factors such as drought, poverty level, proximity to transportation stations, educational opportunities, population, and distance to police stations, so as to identify the villages and towns that are most at risk of human trafficking.

Elca Grobler, the founder of My Choices Foundation, explains the significance of this work: “The general Indian public is still largely unaware that trafficking exists, and most parents have no idea that their children are actually being sold into slavery. That’s why grassroots awareness and education at the village level is so important to ending the human traffic trade.”

Operation Red Alert

With a vision to end sex trafficking in India by preventing girls from ever entering it, Operation Red Alert began its efforts in 2014 by conducting research on root causes and by building relationships with long-standing non-government organizations (NGOs) in the field. Working with Quantium, they analyzed data while constantly refining and iterating to define a model that could identify villages most at risk. Red Alert now has 40 NGO partners, who have helped conduct the Safe Village education program in 600 villages throughout four states in India, reaching over 600,000 villagers. Red Alert also created India’s first national anti-trafficking helpline and is conducting mass media awareness campaigns.

As Grobler explains, “We are helping to banish human trafficking, one village at a time, through a combination of highly sophisticated technology and grassroots Safe Village education implemented through our NGO partners. The national helpline gives villagers a link to continual support, and with 95 million mobile phones in India that gives us a very broad reach. We’re also adding data and refining our predictive analytics, and we’re expanding our education efforts to cover more states this year.”

MapR Technology Behind the Scenes

Quantium brings together proprietary data, technology, and innovative data scientists to enable the development of groundbreaking analytical applications and develops insights into consumer needs, behaviors, and media consumption by analyzing consumer transaction data. Quantium upgraded its legacy server platform with Cisco UCS to gain centralized management and the computing power needed to process complex algorithms in a dense, scalable form factor that also reduces power consumption. Cisco Nexus 9000 Switches provide a simplified network with the scalable bandwidth to meet their current and future requirements. The MapR Converged Data Platform makes this enterprise possible by enabling organizations to create intelligent applications that fully integrate analytics with operational processes in real time.

Rigorous testing by Quantium demonstrated that the MapR-Cisco platform decreased query processing time by 92%, a performance increase of 12.5 times the legacy platform. With the Cisco-MapR solution, Quantium’s data scientists can design complex queries that run against multi-terabyte data sets and get more accurate results in minutes, rather than hours or days. In addition, the more powerful platform drives innovation because scientists can shorten development time by testing alternative scenarios quickly and accurately.

“UCS gives us the agility that’s key to supporting our iterative approach to analytics,” explains Simon Reid, Group Executive for Technology at Quantium. “For example, with the analytics for Operation Red Alert, we’re fine-tuning the algorithm, adding more hypothesis and more granular data to improve our predictive capabilities. MapR adds performance security and the ability to segregate multiple data sets from multiple data partners for Operation Red Alert.” MapR is proud that the unique multi-tenancy and high-speed performance and scale of the MapR Platform serves as the underlying technology to power the Operation Red Alert data platform.

Editor's note: This blog post was originally posted in the Converge Blog on May 30, 2017

Additional Resources

I'll guess that many people reading this have spend time wrestling with configuration to get Python and  Spark to play nicely. Having gone through the process myself, I've documented my steps and share the knowledge, hoping it will save some time and frustration for some of you.

 

This article targets the latest releases of MapR 5.2.1 and the MEP 3.0 version of Spark 2.1.0. It should work equally well for earlier releases of MapR 5.0 and 5.1. In fact, I've tested this to work with MapR 5.0 with MEP 1.1.2 (Spark 1.6.1) for a customer.  

 

The version of Jupyter is 4.3. It seems like it changed quite a bit since the earlier versions and so most of the information I found in blogs were pretty outdated. Hence having so much trouble getting everything working to my satisfaction.

 

My goals:

  • Running PySpark successfully from either a cluster node or an edge node
  • Running python code in YARN distributed mode
  • Have access to modules like numpy, scipy, pandas and others.
  • Do all this using Jupyter in server mode that I access from my own laptop

 

I'm leaving out Jupyter server mode security, which could be the topic of a separate blog, potentially. I've implemented it before and found the Jupyter documentation to explain setting it up for encryption (HTTPS) and authentication to be pretty good.

Installing an updated version of Python

Verify your version of Python:

python --version

 

If it's Python 2.6.X, it's probably a good idea to use a recent build of Python 2.7 If it's Python 2.7.X, then you'll need to choose to use the system python or not.

  • System python is easier to make work, it's already there and shared everywhere.
  • Isolated separate python (anaconda or a separate python) is harder to get working but will provide a more consistent environment where each user can have their own (and only their own) modules installed.

 

I will use Miniconda for Python 2.7 64bits throughout. It works very well. Using Python 3 would be just the same, with the only difference being in terms of code and module compatibility. Either work fine with Spark.

 

Note: Python 3.6 doesn't work with Spark 1.6.1 See SPARK-19019

Installing Anaconda

There is a choice between Anaconda and Miniconda, as well as between python 2.7 and Python 3.6.

Miniconda is very nice because the download is small and you only install what you need. Anaconda is very nice for having everything installed from the start, so all needed modules will be there from the start for most needs.

 

Here, we show installing miniconda and Python 2.7 (64bits):

wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh
bash Miniconda2-latest-Linux-x86_64.sh -b -p /opt/miniconda2

 

To install it on all nodes at once, we recommend to check out Clustershell.

 

#copy the file to all nodes

clush -ac Miniconda2-latest-Linux-x86_64.sh



#install on all nodes at same time:

clush -aB bash Miniconda2-latest-Linux-x86_64.sh -b -p /opt/miniconda2

 

Important: Get all nodes in same exact state, with python/anaconda installed exactly in the same location with all nodes having exactly the same modules installed. Miss here and it guarantees weird errors that will be hard to diagnose.

 

Update Spark environment to use Python 2.7

Add to /opt/mapr/spark/spark-2.1.0/conf/spark-env.sh:

    export PYSPARK_PYTHON=/opt/miniconda2/bin/python
    export PYSPARK_DRIVER_PYTHON=/opt/miniconda2/bin/python

 

update file on all nodes:

# using clustershell to copy file ("c") to all nodes ("a")

clush -ac /opt/mapr/spark/spark-2.1.0/conf/spark-env.sh

 

Note: this is known to work on previous MEP versions. I have also tested it with MEP 1.1.2 (Spark 1.6.1) and it worked very well. just use the correct path to Spark and it will work just fine.

 

Testing

For testing, lets use the data from MapR Academy's Spark Essentials course. Specifically the Ebay auction data.

Copy the data into the foloder: /user/mapr/data

 

Start pyspark and run the following code:

 >>> auctionRDD = sc.textFile("/user/mapr/data/auctiondata.csv").map(lambda line:line.split(","))
>>> auctionRDD.first()
[u'8213034705', u'95', u'2.927373', u'jake7870', u'0', u'95', u'117.5', u'xbox', u'3']
>>> auctionRDD.count()
10654
Ok, so now we have a working pyspark shell!

Note: don't do this as root or as user mapr on a production cluster. However, for doing tutorials, user mapr is convenient as it is a superuser and you don't need to worry about file permissions on MapR.

 

Errors:

  • pyspark java.io.IOException: Cannot run program "python2.7": error=2, No such file or directory
  • This error is because the driver and/or the executors can't find the python executable. It's fixed by setting the PYSPARK_PYTHON (and PYSPARK_DRIVER_PYTHON) variables in spark-env.sh (see above)

ipython Notebook

If you want to able to choose to use spark when launch ipython shell:

  1.         Ensure SPARK_HOME env variable is defined.
export SPARK_HOME=/opt/mapr/spark/spark-2.1.0
  1.         Install ipython with Anaconda
/opt/miniconda2/bin/conda install jupyter
  1.         add a ipython profile named pyspark
ipython profile create pyspark
  1.         Add ~/.ipython/profile_pyspark/startup/00-pyspark-setup.py:
import os
import system
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
  raise ValueError('SPARK_HOME environment variable is not set')
path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())
  1.         launch
/opt/miniconda2/bin/ipython --profile=pyspark

 

Try the sample code above it should also work without issue.

Jupyter Notebook

Now on to Jupyter. In this case, we're looking to have the notebook run on an edge node (less ideally, on a cluster node) in server mode and access it from our development laptop.

The following instructions assume the user mapr, but should work equally well for any other user. For production use, never use mapr user as it is a superuser with read-write access to all data in MapR.

With Anaconda:

clush -aB /opt/miniconda2/bin/conda install jupyter -y
  1.         Generate a profile:
 /opt/miniconda2/bin/jupyter notebook --generate-config

 

We need to update the profile so you can log on to the notebook from your local computer, not just from localhost.

This generates the following file: $HOME/.jupyter/jupyter_notebook_config.py In this file, we're going to update the following setting: NotebookApp.ip

 

 # here it's possible to use the server's IP address as well ex: '10.0.0.111'
# the important point is that leaving it to the default (i.e. 'localhost') prevents any remote connection
NotebookApp.ip = '*'

 

It's a good time to remind about security. it's pretty easy to configure Jupyter to use https and have a password. See Jupyter documentation.

  1.        The startup script from the ipython step is helpful:

      [mapr@ip-10-0-0-180 ~]$ cat .ipython/profile_default/startup/00-default-setup.py

 import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
  raise ValueError('SPARK_HOME environment variable is not set')
path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

       This step is essential or else the kernel won't initialize properly. Alternatively, you can past the code above in the first cell to initialize pyspark first. Another alternative is to use the module findspark, which probably does something similar to this, but with less code.

  1.         Add a PySpark Kernel. create the json file in the location as shown below:
[mapr@ip-10-0-0-20 ~]$ cat .ipython/kernels/pyspark/kernel.json
{
"display_name": "pySpark (Spark 2.1.0)",
 "language": "python",
 "argv": [
  "/opt/miniconda2/bin/python",
  "-m",
  "ipykernel",
  "-f",
  "{connection_file}"
 ],
"env": {
        "CAPTURE_STANDARD_OUT": "true",
        "CAPTURE_STANDARD_ERR": "true",
        "SEND_EMPTY_OUTPUT": "false",
        "SPARK_HOME": "/opt/mapr/spark/spark-2.1.0"
    }
}
  1.         Start a notebook and have fun with Spark and Python!
 jupyter notebook --no-browser

 

Open your browser to the indicated link and... Success!

 

Launch Jupyter notebook instead of pyspark

  1.         Update $SPARK_HOME/conf/spark-env.sh:
  •         [mapr@ip-10-0-0-20 ~]$ tail /opt/mapr/spark/spark-2.1.0/conf/spark-env.sh
    export PYSPARK_DRIVER_PYTHON=/opt/miniconda2/bin/jupyter

    # Setup env variable for Jupyter + Pyspark
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser"
  1.         Launch pyspark, it will launch a Jupyter notebook
  •  
         $SPARK_HOME/bin/pyspark

 

Conclusion

Getting this is harder work that it ought to be. Jupyter have changed their usage a lot over the versions, especially as they moved from ipython to Jupyter. The change is worth it, but the configuration is a big pain.

 

Getting the information all together with a repeatable, tested process was also quite a bit of work. Hopefully this will save some time to some people out there.

 

Thanks

Dong Meng's blog proved to be a life saver. Check him out: https://mengdong.github.io

BY Mathieu Dumoulin

 

Goal of the System

The Kafka/Spark Streaming system aims to provide better customer support by providing their support staff with always up-to-date call quality information for all their mobile customers.

Mobile customers, while making calls and using data, connect to the operator’s infrastructure and generate logs in many different systems. Three specific logs were identified that, if correlated with each other, give visibility in the actual quality of service experienced by each individual customer. The three logs were selected because they can be correlated through a simple relational database-like join operation.

For improving customer support, the quality of call information needs to be kept updated in near to real time; otherwise, it has no value. This has led, down the road, to building a streaming architecture rather than a batch job. The data volume at production load reaches several GB/s, generated by several million mobile customers, 24 hours a day, 365 days a year. Performance and stability at that scale is required for the system to reach production.

Project SLA Goals

The application has clear performance requirements based on the known worst-case throughput of the input data. This log data is generated by real-world use of the services of the company. If the application is to be useful at all, as a real-time streaming application, it must be able to handle this data without getting behind.

In term of numbers, the goal is to handle up to 3GB/min of input data. For this large mobile operator, such throughput represents about 150-200,000 events/second. Ordinarily, the throughput is about half of that value or 1.5GB/min and 60,000-80,000 events/second.

Data Sources

The raw data source are the logs of three remote systems, labeled A, B, and C here, where the log from A comprises about 84-85% of the entries, the log from B about 1-2%, and the log from C about 14-15%. The fact that the data is unbalanced is one of the (many) sources of difficulty in this application.

Picture1

The raw data is ingested into the system by a single Kafka producer into Kafka running on 6 servers. The producer reads the various logs and adds each log's records into its own topic. As there are three logs, there are three Kafka topics.

Picture2

The data is consumed by a Spark Streaming application, which picks up each topic, does a simple filter to cut out unnecessary fields, a map operation to transform the data, and then a foreachRDD operation (each micro-batch generates an RDD in Spark Streaming) that saves the data to Ignite and to HDFS as Hive tables for backup.

A second batch Spark application runs once per hour on the data stored in-memory in Ignite to join the records from the three separate logs into a single table. The batch job has a maximum data size of about 100GB. The cluster CPU resources should be sufficient to process this amount of data in one hour or less.

Picture3

Ignite stores 3 hours’ worth of data at all time to account for calls that begin in one hour and end in the hour getting processed, as well as calls that begin in the target hour and end in the next one. The telecom operator judges that calls that are so long they aren’t captured in this scheme can be ignored, as they are very rare.

It’s worth noting that a better all-streaming architecture could have avoided the whole issue with the intermediate representation in the first place. An illustrative, real-world case with more time and thought upfront can make the entire project end faster than just rushing headlong into coding the first working solution that comes to mind.

System Hardware and Software: At the Bleeding Edge of Open Source Big Data

The cluster has a lot of CPU and memory resources. It has 12 nodes of enterprise-grade servers, each equipped with two E5 Xeon CPUs (16 physical cores), 256GB memory, and eight 6TB spinning HDD (2 for OS in RAID 1). Each server has one 10GbE network interface.

Picture4

The technology stack selected for this project are centered around Kafka 0.8 for streaming the data into the system, Apache Spark 1.6 for the ETL operations (essentially a bit of filter and transformation of the input, then a join), and the use of Apache Ignite 1.6 as an in-memory shared cache to make it easy to connect the streaming input part of the application with joining the data. Backup is done to HDFS, as Hive ORC tables are also used to serve as a just-in-case backup for Ignite and to serve future need for other analytics use cases (none at the time).

The Spark applications are both coded in Scala 2.10 and Kafka’s direct approach (no receivers). Apache Ignite has a really nice Scala API with a magic IgniteRDD that can allow applications to share in-memory data, a key feature for this system to reduce coding complexity.

The cluster is running Apache Hadoop's HDFS as a distributed storage layer, with resources managed by Mesos 0.28. Finally, HBase is used as the ultimate data store for the final joined data. It will be queried by other systems outside the scope of this project. The cluster design with all relevant services is shown in the table above.

Performance Issues

The original system had several issues:

  1. Performance
    • First Spark Streaming job is not stable
    • Second Spark batch job can’t process 1 hour of data before the next hour of data arrives
  2. Stability: The application crashes under load

A Spark Streaming application is said to be stable if the processing time of each micro-batch is less than or equal to that micro-batch time. In this case, the application processes each 30 seconds of data in as much as 6 minutes. We need a 12x speedup.

Second, there is a batch process to join data one hour at a time that was targeted to run in 30 minutes but was taking over 2 hours to complete.

Third, the application was randomly crashing after running for a few hours. Stability of such a complex, fully open-source stack should never be assumed. Rather, it is the result of a constant effort by the team to better understand the system. We can expect that there will still be a lot of learning required to keep the system up and running once it is moved to production as well.

Performance Tuning

In my opinion, all performance and stability issues stem from the terrible idea of management to push a very good POC project developed on AWS into production on some on-premises hardware. It’s hard to believe, but they fully expected the POC code to run as-is on a production system it was never tested on.

Regardless, the task was set, and we had only a few short days to identify what could be done and get the system up to production speed. Final QA testing of the system was barely 1 week away, and management wasn’t in the mood to accept delays. We got to work...

First target: Improve Spark Streaming Performance

At maximum load, the Spark Streaming application is taking between 4.5 to 6 minutes for each micro-batch of 30 seconds. We need to find 9-12x speedup worth of improvements.

Spark has a lot of moving parts, but it will always be true that fast algorithms beat tweaking the configuration. In this case, there is nothing to get from the code; it’s all very parallelizable with no obvious issues, like doing two computations separately when they could be combined or any O(n^2) loop-in-another loop issues. The job is nothing more than a filter and a map.

What we need to determine, then, is whether the job is indeed being processed in parallel to make the most of all those CPU cores. In a Spark Streaming job, Kafka partitions map 1 to 1 with Spark partitions.

Increase Parallelism: Increase Number of Partitions in Kafka

A quick check of the Spark UI shows 36 partitions. As each server has 6 physical disks, I assume the choice of partitioning was selected by the formula node * physical disks = partition count per topic. Quickly checking online reveals that partitioning is quite a bit more complex than that and the formula to decide on partition number isn’t from any known Kafka best practices guide. (Ref: https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/)

The input data was unbalanced and most of the application processing time was spent processing Topic 1 (with 85% of the throughput). Kafka partitions are matched 1:1 with the number of partitions in the input RDD, leading to only 36 partitions, meaning we can only keep 36 cores busy on this task. To increase the parallelism, we need to increase the number of partitions. What we did was split topic 1 into 12 topics each with 6 partitions for a total of 72 partitions. The way it was done was a simple modification to the producer to evenly divide the data from the first log into 12 topics instead of just one. Zero code needed to be modified on the consumer side.

We also right-sized the number of partitions for the two other topics, in proportion to their relative importance in the input data, so we set topic 2 to two partitions and topic 3 to eight partitions.

Picture5Running more tasks in parallel. Before tuning, each stage always had 36 partitions!

Fix RPC Timeout Exceptions

When looking at the application logs, we could see a lot of RPC timeout exceptions. We do a web search and find what we believe is the relevant JIRA (SPARK-14140 in JIRA). The recommended fix is to increase the spark.executor.heartbeatInterval from 10s (default) to 20s.

I think this could be caused by nodes getting busy from disk or CPU spikes because of Kafka, Ignite, or garbage collector pauses. Since Spark runs on all nodes, the issue was random (see the cluster services layout table in the first section).

The configuration change fixed this issue completely. We haven’t seen it happen since. (Yay!)

Increase Driver and Executor Memory

Out of memory issues and random crashes of the application were solved by increasing the memory from 20g per executor to 40g per executor as well as 40g for the driver. Happily, the machines in the production cluster were heavily provisioned with memory. This is a good practice with a new application, since you don’t know how much you will need at first.

The issue was difficult to debug with precision and reliable information, since the Spark UI reports very little memory consumption. In practice, as this setting is easy to change, we empirically settled on 40g being the smallest memory size for the application to run stably.

Right Size the Executors

The original application was running only 3 executors with 72 total cores. We configured the application to run with 80 cores with a maximum of 10 cores per executor, for a total of 8 executors. Note that with 16 real cores per node on a 10 node cluster, we’re leaving plenty of resources for Kafka brokers, Ignite, and HDFS/NN to run on.

Increase the Batch Window from 30s to 1m

The data is pushed into Kafka by the producer as batches every 30s, as it is gathered by FTP batches from the remote systems. Such an arrangement is common in telecom applications due to a need to deal with equipment and systems from a bewildering range of manufacturers, technology, and age.

This meant that the input stream was very spiky, when looking at the processing time from the Spark UI’s streaming tab.

Increasing the window to 1m allowed us to smooth out the input and gave the system a chance to process the data in 1 minute or less and still be stable.

To make sure of it, the team had a test data which simulated the known worst-case data, and with the new settings, the Spark Streaming job was now indeed stable. We also tried it on real production data, and everything looked good. Win!

Drop Requirement to Save Hive Tables to HDFS

Discussion with the project managers revealed that Hive was not actually part of the requirements for the streaming application! Mainly, this is because the other analytics, mostly SQL requests, could be serviced from the data in HBase.

Considering the goal of the system, the worst-case scenario for missing data is that a customer's call quality information cannot be found... which is already the case. In other words, the consequence of data loss is not negative; rather, the consequence of gaining data is additional insights. If the great majority of the data is processed and stored, the business goals can be reached.

There wasn’t much point in saving the data to Hive mid-flight for increased fault-tolerance either, as once the data is in Ignite, it’s safe even if the Spark application crashes. This made Ignite an even more critical part of the application, despite it having some issues of its own. It was a difficult decision that we made entirely due to the advanced stage of the project. As we’ll explain in more detail in the conclusion, the architecture itself was problematic, and it’s not time to play with architecture when you’re a week or two from production.

Spark Performance Tuning Results

The Spark Streaming application finally became stable, with an optimized runtime of 30-35s.

As it turns out, cutting out Hive also sped up the second Spark application that joins the data together, so that it now ran in 35m, both now well within the project requirements.

With improvements from the next part, the final performance of the Spark Streaming job went down in the low 20s range, for a final speedup of a bit over 12 times.

Second target: Improve System Stability

We had to work quite hard on stability. Several strategies were required, as we will explain below.

Make the Spark Streaming Application Stable

The work we did to fix the performance had a direct impact on system stability. If both applications are stable themselves and running on right-sized resources, then the system has the best chance to be stable overall.

Remove Mesos and Use Spark Standalone

The initial choice of Mesos to manage resources was forward-looking, but ultimately we decided to drop it from the final production system. At the onset, the plan was to have Mesos manage all the applications. But the team never could get Kafka and Ignite to play nice with Mesos, and so they were running in standalone mode, leaving only Spark to be managed by Mesos. Surely, with more time, there is little doubt all applications could be properly configured to work with Mesos.

Proposing to remove Mesos was a bit controversial, as Mesos is much more advanced and cool than Spark running in standalone mode.

But the issue with Mesos was twofold:

  1. Control over executor size and number was poor, a known issue (SPARK-5095) with Spark 1.6 and now fixed in Spark 2.X.
  2. Ignite and Kafka aren’t running on Mesos, only Spark is. Given the schedule pressure, the team had given up trying to get those two services running in Mesos.

Mesos can only ever allocate resources well if it controls resources. In the case of this system, Kafka and Ignite are running outside of Mesos’ knowledge, meaning it’s going to assign resources to the Spark applications incorrectly.

In addition, it’s a single purpose cluster, so we can live with customizing the sizing of the resources for each application with a global view of the system’s resources. There is little need for dynamic resource allocations, scheduling queues, multi-tenancy, and other buzzwords.

Change the Ignite Memory Model

It is a known issue that when the Heap controlled by the JVM gets very big (> 32GB), the cost of garbage collection is quite large. We could indeed see this when the join application runs, where the stages with 25GB shuffle had some rows with spikes in GC time from 10 seconds range up to more than a minute.

The initial configuration of Ignite was to run ONHEAP_TIERED, with 48GB worth of data cached on heap, then overflow drops to 12GB of off-heap memory. That setting was changed to the OFFHEAP_TIERED model. While slightly slower due to serialization cost, OFFHEAP_TIERED doesn't rely on the JVM’s garbage collection. It still runs in memory, so we estimated it would be a net gain.

With this change, the run time for each batch dutifully came down by about five seconds, from 30 seconds down to about 25 seconds. In addition, successive batches tended to have much more similar processing time, with a delta of 1-3 seconds, whereas it would vary by over 5 to 10 seconds, previously.

Update the Ignite JVM Settings

We followed the recommended JVM options as found in Ignite documentation’s performance tuning section (http://apacheignite.gridgain.org/docs/jvm-and-system-tuning).

Improve the Spark Code

Some parts of the code assumed reliability, like queries to Ignite, when in fact there was possibility of the operations failing. These problems can be fixed in the code, which now handles exceptions more gracefully, though there is probably work left to increase the robustness of the code. We can only find these spots by letting the application run now.

Reassign ZooKeeper to Nodes 10-12

Given the cluster is of medium size, it’s worth spreading the services as much as possible. We moved the ZooKeeper services from nodes 1-3 to nodes 10-12.

Final System Architecture

Picture6

Conclusion

Tuning this application took about 1 week of full-time work. The main information we used was Spark UI and Spark logs, easily accessible from the Spark UI. The view of Jobs and Stages as well as the streaming UI are really very useful.

Essential Takeaways

  • Migrating a streaming application from a prototype on AWS to an on-site cluster requires schedule time for testing
  • Not testing the AWS prototype with realistic data was a big mistake
  • Including many “bleeding-edge” OSS components (Apache Ignite and Mesos) with expectations of very high reliability is unrealistic
  • A better architecture design could have simplified the system tremendously
  • Tuning a Kafka/Spark Streaming application requires a holistic understanding of the entire system; it’s not just about changing parameter values of Spark. It’s a combination of the data flow characteristics, the application goals and value to the customer, the hardware and services, the application code, and then playing with Spark parameters.
  • The MapR Converged Data Platform would have cut the development time, complexity, and cost for this project.

This project was a hell of a dive in the deep end of the pool for a telecom operator with very little experience with the open-source enterprise big data world. They should be applauded for ambition and desire to take up such a challenge with the goal of benefiting their customers. But a better choice of platform and application architecture could have made their life a lot easier.

A Converged Platform is the Correct Approach

In fact, the requirements for this project show the real-world business need for a state-of-the-art converged platform with a fast distributed file system, high performance key-value store for persistence and real-time, and high performance streaming capabilities.

A MapR-based solution would have been a lot easier to build and maintain, absolutely for sure. Since MapR Streams is built-in, there is one less cluster to manage (bye bye, Kafka brokers). The Spark application could run with the same code but without needing to rely on a speculative open-source project like Apache Ignite.

Saving to MapR-DB uses the same HBase API, so likely no code change there either, and you’re saving to a DB that’s built into the native C MapR-FS, so that’s going to be super fast as well. Finally, sharing the resources is simplified by running only Spark on YARN or standalone-mode, while the platform is left to deal with the resource requirements of the MapR Streams, MapR-FS, and MapR-DB with reliability and performance, guaranteed, since highly trained support engineers are available 24/7 to support every single part of this application.

Given this system is heading into production for a telecom operator with 24/7 reliability expectation, I’d argue that built-in simplicity, performance, and support are pretty compelling and hopefully will be adopted by this customer for the next iteration of the system. (Stay tuned!)

 

Editor's note: This blog post was originally published in the Converge Blog on May 31, 2017.

 

Related

MapR Guide to Big Data in Telecommunications | MapR 

Big Data Opportunities for Telecommunications | MapR 

Apache Apex on MapR Converged Platform | MapR 

How To Get Started With Spark Streaming And MapR Streams Using The Kafka API 

Apache Spark

Apache Hbase

Apache Apex

Products and Services 

By Justin Brandenburg

 

Time series analysis has significance in econometrics and financial analytics but can be utilized in any field, where understanding trends is important to decision making and reacting to changes in behavioral patterns. For example, a MapR Converged Data Platform customer, who is a major oil and gas provider, places sensors on wells, sending data to MapR Streams that is then used for trend monitoring well conditions, such as volume and temperature. In finance, time series analytics is used for financial forecasting for stock prices, assets, and commodities. Econometricians have long leveraged “autoregressive integrated moving average” (ARIMA) models to perform univariate forecasts.

ARIMA models have been used for decades and are well understood. However, with the rise of machine learning and, more recently, deep learning, other models are being explored and utilized, either to support ARIMA results or replace them.

Deep learning (DL) is a branch of machine learning based on a set of algorithms that attempts to model high-level abstractions in data by using artificial neural network (ANN) architectures composed of multiple non-linear transformations. One of the more popular DL deep neural networks is the Recurrent Neural Network (RNN). RNNs are a class of neural networks that depend on the sequential nature of their input. Such inputs could be text, speech, time series, and anything else in which the occurrence of an element in the sequence is dependent on the elements that appeared before it. For example, the next word in a sentence, if someone writes “the grocery…” is most likely to be “store” instead of “school.” In this case, given this sequence, an RNN would likely predict store rather than school.

Artificial Neural Networks

Actually, it turns out that while neural networks are sometimes intimidating structures, the mechanism for making them work is surprisingly simple: stochastic gradient descent. For each of the parameters in our network (such as weights or biases), all we have to do is calculate the derivative of the parameter with respect to the loss, and nudge it a little bit in the opposite direction.

ANNs use a method known as backpropagation to tune and optimize the results. Backpropagation is a two-step process, where the inputs are fed into the neural network via forward propagation and multiplied with (initially random) weights and bias before they are transformed via an activation function. The depth of your neural network will depend on how many transformations your inputs should go through. Once the forward propagation is complete, the backpropagation step measures the error from your final output to the expected output by calculating the partial derivatives of the weights generating the error and adjusts them. Once the weights are adjusted, the model will repeat the process of the forward and backpropagation steps to minimize the error rate until convergence. If you notice how the inputs are aligned in Fig. 1, you will see that this is an ANN with only one hidden layer, so the back propagation will not need to perform multiple gradient descent calculations.

Picture1Figure 1

Recurrent Neural Networks

Recurrent Neural Networks (RNNs) are called recurrent because they perform the same computations for all elements in a sequence of inputs. RNNs are becoming very popular due to their wide utility. They can analyze time series data, such as stock prices, and provide forecasts. In autonomous driving systems, they can anticipate car trajectories and help avoid accidents. They can take sentences, documents, or audio samples as input, making them extremely useful for natural language processing (NLP) systems, such as automatic translation, speech-to-text, or sentiment analysis. It can be applied in situations where you have a sequences of “events” with events being a data point.

Picture2Figure 2

Fig. 2 shows an example of an RNN architecture, and we see xt is the input at time step t. For example, x1 could be the first price of a stock in time period one. st is the hidden state at time step tn and is calculated based on the previous hidden state and the input at the current step, using an activation function. St-1 is usually initialized to zero. ot is the output at step t. For example, if we wanted to predict the next value in a sequence, it would be a vector of probabilities across our time series.

RNN cells are developed on the notion that one input is dependent on the previous input by having a hidden state, or memory, that captures what has been seen so far. The value of the hidden state at any point in time is a function of the value of the hidden state at the previous time step and the value of the input at the current time step. RNNs have a different structure than ANNs and use backpropagation through time (BPTT) to compute the gradient descent after each iteration.

Example

This example was done with a small MapR cluster of 3 nodes. This example will use the following:

  • Python 3.5
  • TensorFlow 1.0.1
  • Red Hat 6.9

If you are using Anaconda, you should be able to install TensorFlow version 1.0.1 on your local machine and Jupyter Notebook. This code will not work with versions of TensorFlow < 1.0. It can be run on your local machine and conveyed to a cluster if the TensorFlow versions are the same or later. Other deep learning libraries to consider for RNNs are MXNet, Caffe2, Torch, and Theano. Keras is another library that provides a python wrapper for TensorFlow or Theano.

Picture3

MapR provides the ability to integrate Jupyter Notebook (or Zeppelin) at the user’s preference. What we are showing here would be the end of a data pipeline. The true value of running a RNN time series model in a distributed environment is the data pipelines you can construct to push your aggregated series data into a format that can be fed into the TensorFlow computational graph.

If I am aggregating network flows from multiple devices (IDS, syslogs, etc.), and I want to forecast future network traffic pattern behavior, I could set up a real-time data pipeline using MapR Streams that aggregates this data into a queue that can be fed into my TensorFlow model. For this example, I am using only a single node on my cluster, but I could have installed TensorFlow on the two other nodes and could have three TF models running with different hyper-parameters.

For this example, I generated some dummy data.

Picture4Picture5

We have 209 total observations in our data. I want to make sure I have the same number of observations for each of my batch inputs.

What we see is our training data set is made up of 10 batches, containing 20 observations. Each observation is a sequence of a single value.

Picture6

Now that we have our data, let’s create our TensorFlow graph that will do the computation. ^1^

Picture7

There is a lot going on there, so let's examine one step at a time. We are specifying the number of periods we are using to predict. In this case, it is the number of sequences that we are feeding into the model as a single input. We specify our variable placeholders. We initialize a type of RNN cell to use (size 100) and the type of activation function we want. ReLU stands for “Rectified Linear Unit” and is the default activation function, but it can be changed to Sigmoid, Hyberbolic Tangent (Tanh), and others, if desired.

We want our outputs to be in the same format as our inputs so we can compare our results using the loss function. In this case, we are using mean squared error (MSE), since this is a regression problem, in which our goal is to minimize the difference between the actual and the predicted. If we were dealing with a classification outcome, we might use cross-entropy. Now that we have this loss function defined, it is possible to define the training operation in TensorFlow that will optimize our network of input and outputs. To execute the optimization, we will use the Adam optimizer. Adam optimizer is a great general-purpose optimizer that performs our gradient descent via backpropagation through time. This allows faster convergence at the cost of more computation.

Now it is time to implement this model on our training data.

Picture8

We specify the number of iterations/epochs that will cycle through our batches of training sequences. We create our graph object (tf.Session()) and initialize our data to be fed into the model as we cycle through the epochs. The abbreviated output shows the MSE after each 100 epochs. As our model feeds the data forward and backpropagation runs, it adjusts the weights applied to the inputs and runs another training epoch. Our MSE continues to improve (decrease). Finally, once the model is done, it takes the parameters and applies them to the test data to give us our predicted output for Y.

Let’s check our predicted versus actual. For our test data, we were focused on the last 20 periods of the entire 209 periods.

Picture9Picture10

It would appear there is some room for improvement ☺. However, this can be done by changing the number of hidden neurons and/or increasing the number of epochs. Optimizing our model is a process of trial and error, but we have a great start. This is random data, so we were expecting great results, but perhaps applying this model to a real-time series would give the ARIMA models some quality competition.

RNNs (and Deep Learning in general) are expanding the options available to data scientists to solve interesting problems. One issue that many data scientists face is how can we automate our analysis to run, once we have optimized it? Having a platform like MapR allows for this ability because you can construct, train, test, and optimize your model on a big data environment. In this example, we only used 10 training batches. What if my data allowed me to leverage hundreds of batches, not merely of 20 periods, but 50 or 100 or 500? I think I could definitely improve this model’s performance. Once I did, I could package it up into an automated script to run on an individual node, a GPU node, in a Docker container, or all of the above. That’s the power of doing data science and deep learning on a converged data platform.

^1^ Portions of this model were taken from the fantastic book Hands-On Machine Learning with Scikit-Learn and TensorFlow, 1st Edition, by Aurélien Géron.

By Carol McDonald

 

Churn prediction is big business. It minimizes customer defection by predicting which customers are likely to cancel a subscription to a service. Though originally used within the telecommunications industry, it has become common practice across banks, ISPs, insurance firms, and other verticals.

The prediction process is heavily data-driven and often utilizes advanced machine learning techniques. In this post, we'll take a look at what types of customer data are typically used, do some preliminary analysis of the data, and generate churn prediction models–all with Spark and its machine learning frameworks.

Customer 360 Using data science in order to better understand and predict customer behavior is an iterative process, which involves:

  1. Data discovery and model creation:
    • Analysis of historical data
    • Identifying new data sources, which traditional analytics or databases are not using, due to the format, size, or structure
    • Collecting, correlating, and analyzing data across multiple data sources
    • Knowing and applying the right kind of machine learning algorithms to get value out of the data
  2. Using the model in production to make predictions
  3. Data discovery and updating the model with new data

Picture1

In order to understand the customer, a number of factors can be analyzed, such as:

  • Customer demographic data (age, marital status, etc.)
  • Sentiment analysis of social media
  • Customer usage patterns and geographical usage trends
  • Calling-circle data
  • Browsing behavior from clickstream logs
  • Support call center statistics
  • Historical data that show patterns of behavior that suggest churn

With this analysis, telecom companies can gain insights to predict and enhance the customer experience, prevent churn, and tailor marketing campaigns.

CLASSIFICATION

Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (e.g., whether a transaction is fraud or not fraud), based on labeled examples of known items (e.g., transactions known to be fraud or not). Classification takes a set of data with known labels and pre-determined features and learns how to label new records based on that information. Features are the “if questions” that you ask. The label is the answer to those questions. In the example below, if it walks, swims, and quacks like a duck, then the label is "duck."

Picture2

Let’s go through an example of telecom customer churn:

  • What are we trying to predict?
    • Whether a customer has a high probability of unsubscribing from the service or not
    • Churn is the Label: True or False
  • What are the “if questions” or properties that you can use to make predictions?
    • Call statistics, customer service calls, etc.
    • To build a classifier model, you extract the features of interest that most contribute to the classification.

DECISION TREES

Decision trees create a model that predicts the class or label, based on several input features. Decision trees work by evaluating an expression containing a feature at every node and selecting a branch to the next node, based on the answer. A possible decision tree for predicting credit risk is shown below. The feature questions are the nodes, and the answers “yes” or “no” are the branches in the tree to the child nodes.

  • Q1: Is checking account balance > 200DM?
    • No
    • Q2: Is length of current employment > 1 year?
      • No
      • Not Creditable

Picture3

EXAMPLE USE CASE DATA SET

For this tutorial, we'll be using the Orange Telecoms Churn Dataset. It consists of cleaned customer activity data (features), along with a churn label specifying whether the customer canceled the subscription or not. The data can be fetched from BigML's S3 bucket, churn-80 and churn-20. The two sets are from the same batch but have been split by an 80/20 ratio. We'll use the larger set for training and cross-validation purposes and the smaller set for final testing and model performance evaluation. The two data sets have been included with the complete code in this repository for convenience. The data set has the following schema:

1. State: string
2. Account length: integer
3. Area code: integer
4. International plan: string
5. Voice mail plan: string
6. Number vmail messages: integer
7. Total day minutes: double
8. Total day calls: integer
9. Total day charge: double
10.Total eve minutes: double
11. Total eve calls: integer
12. Total eve charge: double
13. Total night minutes: double
14. Total night calls: integer
15. Total night charge: double
16. Total intl minutes: double
17. Total intl calls: integer
18. Total intl charge: double
19. Customer service calls: integer

 

The CSV file has the following format:

LA,117,408,No,No,0,184.5,97,31.37,351.6,80,29.89,215.8,90,9.71,8.7,4,2.35,1,False IN,65,415,No,No,0,129.1,137,21.95,228.5,83,19.42,208.8,111,9.4,12.7,6,3.43,4,True

The image below shows the first few rows of the data set:

Picture4

SOFTWARE

This tutorial will run on Spark 2.0.1 and above.

LOAD THE DATA FROM A CSV FILE

Picture5

First, we will import the SQL and machine learning packages.

import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.feature.VectorAssembler

 

We use a Scala case class and Structype to define the schema, corresponding to a line in the CSV data file.

// define the Churn Schema
case class Account(state: String, len: Integer, acode: String,
    intlplan: String, vplan: String, numvmail: Double,
    tdmins: Double, tdcalls: Double, tdcharge: Double,
    temins: Double, tecalls: Double, techarge: Double,
    tnmins: Double, tncalls: Double, tncharge: Double,
    timins: Double, ticalls: Double, ticharge: Double,
    numcs: Double, churn: String)

val schema = StructType(Array(
    StructField("state", StringType, true),
    StructField("len", IntegerType, true),
    StructField("acode", StringType, true),
    StructField("intlplan", StringType, true),
    StructField("vplan", StringType, true),
    StructField("numvmail", DoubleType, true),
    StructField("tdmins", DoubleType, true),
    StructField("tdcalls", DoubleType, true),
    StructField("tdcharge", DoubleType, true),
    StructField("temins", DoubleType, true),
    StructField("tecalls", DoubleType, true),
    StructField("techarge", DoubleType, true),
    StructField("tnmins", DoubleType, true),
    StructField("tncalls", DoubleType, true),
    StructField("tncharge", DoubleType, true),
    StructField("timins", DoubleType, true),
    StructField("ticalls", DoubleType, true),
    StructField("ticharge", DoubleType, true),
    StructField("numcs", DoubleType, true),
    StructField("churn", StringType, true)
  ))

 

Using Spark 2.0, we specify the data source and schema to load into a Dataset. Note that with Spark 2.0, specifying the schema when loading data into a DataFrame will give better performance than schema inference. We cache the Datasets for quick, repeated access. We also print the schema of the Datasets.

val train: Dataset[Account] = spark.read.option("inferSchema", "false")
      .schema(schema).csv("/user/user01/data/churn-bigml-80.csv").as[Account]
train.cache

val test: Dataset[Account] = spark.read.option("inferSchema", "false")
      .schema(schema).csv("/user/user01/data/churn-bigml-20.csv").as[Account]
test.cache
train.printSchema()

 

root
|-- state: string (nullable = true)
|-- len: integer (nullable = true)
|-- acode: string (nullable = true)
|-- intlplan: string (nullable = true)
|-- vplan: string (nullable = true)
|-- numvmail: double (nullable = true)
|-- tdmins: double (nullable = true)
|-- tdcalls: double (nullable = true)
|-- tdcharge: double (nullable = true)
|-- temins: double (nullable = true)
|-- tecalls: double (nullable = true)
|-- techarge: double (nullable = true)
|-- tnmins: double (nullable = true)
|-- tncalls: double (nullable = true)
|-- tncharge: double (nullable = true)
|-- timins: double (nullable = true)
|-- ticalls: double (nullable = true)
|-- ticharge: double (nullable = true)
|-- numcs: double (nullable = true)
|-- churn: string (nullable = true)

//display the first 20 rows:
train.show

Picture6

SUMMARY STATISTICS

Spark DataFrames include some built-in functions for statistical processing. The describe() function performs summary statistics calculations on all numeric columns and returns them as a DataFrame.

train.describe()

Out:

Picture7

Data Exploration

We can use Spark SQL to explore the dataset. Here are some example queries using the Scala DataFrame API:

train.groupBy("churn").sum("numcs").show
+-----+----------+
|churn|sum(numcs)|
+-----+----------+
|False|    3310.0|
| True|     856.0|
+-----+----------+

train.createOrReplaceTempView("account")
spark.catalog.cacheTable("account")

 

Picture8

Total day minutes and Total day charge are highly correlated fields. Such correlated data won't be very beneficial for our model training runs, so we're going to remove them. We'll do so by dropping one column of each pair of correlated fields, along with the State and Area code columns, which we also won’t use.

val dtrain =train.drop("state").drop("acode").drop("vplan")   .drop("tdcharge").drop("techarge")

Picture9

Grouping the data by the Churn field and counting the number of instances in each group shows that there are roughly 6 times as many false churn samples as true churn samples.

dtrain.groupBy("churn").count.show

Out:

+-----+-----+
|churn|count|
+-----+-----+
|False| 2278|
| True|  388|
+-----+-----+

Business decisions will be used to retain the customers most likely to leave, not those who are likely to stay. Thus, we need to ensure that our model is sensitive to the Churn=True samples.

STRATIFIED SAMPLING

We can put the two sample types on the same footing using stratified sampling. The DataFrames sampleBy() function does this when provided with fractions of each sample type to be returned. Here, we're keeping all instances of the Churn=True class, but downsampling the Churn=False class to a fraction of 388/2278.

val fractions = Map("False" -> .17, "True" -> 1.0) 
val strain = dtrain.stat.sampleBy("churn", fractions, 36L)
strain.groupBy("churn").count.show

Out:

-----+-----+ 
|churn|count|
+-----+-----+
|False|  379|
| True|  388|
+-----+-----+

FEATURES ARRAY

To build a classifier model, you extract the features that most contribute to the classification. The features for each item consist of the fields shown below:

  • Label → churn: True or False
  • Features → {"len", "iplanIndex", "numvmail", "tdmins", "tdcalls", "temins", "tecalls", "tnmins", "tncalls", "timins", "ticalls", "numcs"}

In order for the features to be used by a machine learning algorithm, they are transformed and put into Feature Vectors, which are vectors of numbers representing the value for each feature.

Picture10

Reference: Learning Spark

USING THE SPARK ML PACKAGE

The ML package is the newer library of machine learning routines. Spark ML provides a uniform set of high-level APIs built on top of DataFrames.

Picture11

We will use an ML Pipeline to pass the data through transformers in order to extract the features and an estimator to produce the model.

  • Transformer: A Transformer is an algorithm which transforms one DataFrame into another DataFrame. We will use a transformer to get a DataFrame with a features vector column.
  • Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer (e.g., training/tuning on a DataFrame and producing a model).
  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify a ML workflow.

FEATURE EXTRACTION AND PIPELINING

The ML package needs data to be put in a (label: Double, features: Vector) DataFrame format with correspondingly named fields. We set up a pipeline to pass the data through 3 transformers in order to extract the features: 2 StringIndexers and a VectorAssembler. We use the StringIndexers to convert the String Categorial feature intlplan and label into number indices. Indexing categorical features allows decision trees to treat categorical features appropriately, improving performance.

Picture12

// set up StringIndexer transformers for label and string feature 
val ipindexer = new StringIndexer()
       .setInputCol("intlplan")
       .setOutputCol("iplanIndex")
val labelindexer = new StringIndexer()
       .setInputCol("churn")
       .setOutputCol("label")

The VectorAssembler combines a given list of columns into a single feature vector column.

// set up a VectorAssembler transformer 
val featureCols = Array("len", "iplanIndex", "numvmail", "tdmins",
      "tdcalls", "temins", "tecalls", "tnmins", "tncalls", "timins",
      "ticalls", "numcs") 
val assembler = new VectorAssembler()
       .setInputCols(featureCols)
       .setOutputCol("features")

The final element in our pipeline is an estimator (a decision tree classifier), training on the vector of labels and features.

Picture13

// set up a DecisionTreeClassifier estimator 
val dTree = new DecisionTreeClassifier().setLabelCol("label")
       .setFeaturesCol("features") 
// Chain indexers and tree in a Pipeline
val pipeline = new Pipeline()
       .setStages(Array(ipindexer, labelindexer, assembler, dTree))

TRAIN THE MODEL

Picture14

We would like to determine which parameter values of the decision tree produce the best model. A common technique for model selection is k-fold cross validation, where the data is randomly split into k partitions. Each partition is used once as the testing data set, while the rest are used for training. Models are then generated using the training sets and evaluated with the testing sets, resulting in k model performance measurements. The average of the performance scores is often taken to be the overall score of the model, given its build parameters. For model selection we can search through the model parameters, comparing their cross validation performances. The model parameters leading to the highest performance metric produce the best model.

Spark ML supports k-fold cross validation with a transformation/estimation pipeline to try out different combinations of parameters, using a process called grid search, where you set up the parameters to test, and a cross validation evaluator to construct a model selection workflow.

Below, we use a ParamGridBuilder to construct the parameter grid.

// Search through decision tree's maxDepth parameter for best model 
val paramGrid = new ParamGridBuilder().addGrid(dTree.maxDepth,
Array(2, 3, 4, 5, 6, 7)).build()

We define a BinaryClassificationEvaluator Evaluator, which will evaluate the model according to a precision metric by comparing the test label column with the test prediction column. The default metric is the area under the ROC curve.

// Set up Evaluator (prediction, true label) 
val evaluator = new BinaryClassificationEvaluator()
       .setLabelCol("label")
       .setRawPredictionCol("prediction")

We use a CrossValidator for model selection. The CrossValidator uses the Estimator Pipeline, the Parameter Grid, and the Classification Evaluator. The CrossValidator uses the ParamGridBuilder to iterate through the maxDepth parameter of the decision tree and evaluate the models, repeating 3 times per parameter value for reliable results.

// Set up 3-fold cross validation  
val crossval = new CrossValidator().setEstimator(pipeline)
       .setEvaluator(evaluator)
       .setEstimatorParamMaps(paramGrid).setNumFolds(3) 
val cvModel = crossval.fit(ntrain)

We get the best decision tree model, in order to print out the decision tree and parameters.

// Fetch best model 
val bestModel = cvModel.bestModel
val treeModel = bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel]
.stages(3).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)

Out:

Picture15

//0-11 feature columns: len, iplanIndex, numvmail, tdmins, tdcalls, temins, tecalls, tnmins, tncalls, timins, ticalls, numcs 
println( "Feature 11:" +  featureCols(11))
println( "Feature 3:" +  featureCols(3)) 

Feature 11:numcs
Feature 3:tdmins

We find that the best tree model produced using the cross-validation process is one with a depth of 5. The toDebugString() function provides a print of the tree's decision nodes and final prediction outcomes at the end leaves. We can see that features 11 and 3 are used for decision making and should thus be considered as having high predictive power to determine a customer's likeliness to churn. It's not surprising that these feature numbers map to the fields Customer service calls and Total day minutes. Decision trees are often used for feature selection because they provide an automated mechanism for determining the most important features (those closest to the tree root).

PREDICTIONS AND MODEL EVALUATION

Picture16

The actual performance of the model can be determined using the test data set that has not been used for any training or cross-validation activities. We'll transform the test set with the model pipeline, which will map the features according to the same recipe.

val predictions = cvModel.transform(test)

Picture17

The evaluator will provide us with the score of the predictions, and then we'll print them along with their probabilities.

val accuracy = evaluator.evaluate(predictions) evaluator.explainParams() val result = predictions.select("label", "prediction", "probability") result.show

Out:

accuracy: Double = 0.8484817813765183 
metric name in evaluation (default: areaUnderROC)

Picture18

In this case, the evaluation returns 84.8% precision. The prediction probabilities can be very useful in ranking customers by their likeliness to defect. This way, the limited resources available to the business for retention can be focused on the appropriate customers.

Below, we calculate some more metrics. The number of false/true positive and negative predictions is also useful:

  • True positives are how often the model correctly predicted subscription canceling.
  • False positives are how often the model incorrectly predicted subscription canceling.
  • True negatives indicate how often the model correctly predicted no canceling.
  • False negatives indicate how often the model incorrectly predicted no canceling.

 

val lp = predictions.select("label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count()
val wrong = lp.filter(not($"label" === $"prediction")).count()
val ratioWrong = wrong.toDouble / counttotal.toDouble
val ratioCorrect = correct.toDouble / counttotal.toDouble
val truep = lp.filter($"prediction" === 0.0)
.filter($"label" === $"prediction").count() / counttotal.toDouble
val truen = lp.filter($"prediction" === 1.0)
.filter($"label" === $"prediction").count() / counttotal.toDouble
val falsep = lp.filter($"prediction" === 1.0)
.filter(not($"label" === $"prediction")).count() / counttotal.toDouble
val falsen = lp.filter($"prediction" === 0.0)
.filter(not($"label" === $"prediction")).count() / counttotal.toDouble

println("counttotal : " + counttotal)
println("correct : " + correct)
println("wrong: " + wrong)
println("ratio wrong: " + ratioWrong)
println("ratio correct: " + ratioCorrect)
println("ratio true positive : " + truep)
println("ratio false positive : " + falsep)
println("ratio true negative : " + truen)
println("ratio false negative : " + falsen)

counttotal : 667 correct : 574 wrong: 93 ratio wrong: 0.13943028485757122 ratio correct: 0.8605697151424287 ratio true positive : 0.1184407796101949 ratio false positive : 0.0239880059970015 ratio true negative : 0.7421289355322339 ratio false negative : 0.11544227886056972

CODE

WANT TO LEARN MORE?

In this blog post, we showed you how to get started using Apache Spark’s machine learning decision trees and ML Pipelines for classification. If you have any further questions about this tutorial, please ask them in the comments section below.

Editor's note: This blog post was originally posted in the Converge Blog on June 05, 2017

Related Content

machine learning

Apache Spark

Apache Zeppelin