Skip navigation
All Places > The Exchange > Blog
1 2 3 Previous Next

The Exchange

113 posts

By Ian Downard

 

 

A lot of people choose MapR as their core platform for processing and storing big data because of its advantages for speed and performance. MapR consistently performs faster than any other big data platform for all kinds of applications, including Hadoop, distributed file I/O, NoSQL data storage, and data streaming. In this post, I’m focusing on the latter to provide some perspective on how much better/faster/cheaper MapR Streams can be compared to Apache Kafka as a data streaming technology.

MapR Streams is a cluster-based messaging system for streaming data at scale. It’s integrated into the MapR Converged Data Platform and implements the Apache Kafka Java API so applications written for Kafka can also run on MapR Streams. What differentiates the MapR Streams technology from Kafka are its built-in features for global replication, security, multi-tenancy, high availability, and disaster recovery—all of which it inherits from the MapR Converged Data Platform. From an operational perspective, these features make MapR Streams easier to manage than Kafka, but there are speed advantages, too. I’ve been looking at this a lot lately, trying to understand where and why MapR Streams outperforms Kafka. In this blog post, I will share with you how clearly MapR Streams can transport a much faster stream of data, with much larger message sizes, and to far more topics than what can be achieved with Kafka.

Test Strategy

In this study, I wanted to compare Kafka and MapR Streams as to how they perform “off the shelf” without the burden of tuning my test environment to perfectly optimize performance in each test scenario. So, I have pretty much stuck with the default settings for services and clients. The only exceptions are that I configured each Kafka topic with a replication factor of 3 and configured producers to send messages synchronously, since these are the default modes for MapR Streams. I also disabled stream compression in order to control message sizes and measure throughput more precisely.

Test Configurations

I measured performance from both producer and consumer perspectives. However, consumers run faster than producers, so I focused primarily on the producer side since the throughput of a stream is bounded by the throughput of its producers. I used two threads in my producer clients so that message generation could happen in parallel with sending messages and waiting for acknowledgments. I used the following properties for producers and topics:

acks = all
batch.size = 16384
latency.ms = 0ms
block.on.buffer.full = true
compression = none
default.replication.factor = 3

 

My test environment consisted of three Ubuntu servers running Kafka 2.11-0.10.0.1 or MapR 5.2 on Azure VMs sized with the following specs:

  • Intel Xeon CPU E5-2660 2.2 GHz processor with 16 cores
  • SSD disk storage with 64,000 Mbps cached / 51,200 uncached max disk throughput
  • 112GB of RAM
  • Virtual networking throughput between 1 and 2 Gbits/sec (I measured this quantitatively since I couldn’t easily find virtual network throughput specs from Microsoft).

Performance Metrics

Throughput, latency, and loss are the most important metrics measuring the performance of a message bus system. MapR Streams and Kafka both guarantee zero loss through at-least-once semantics. MapR provides some advantages when it comes to latency, but typically both MapR Streams and Kafka deliver messages sufficiently quick for real-time applications. For those reasons, I chose to focus on throughput in this study.

Throughput is important because if an application generates messages faster than a message bus can consume and deliver them, then those messages must be queued. Queueing increases end-to-end latency and destabilizes applications when queues grow too large.

Furthermore, throughput in Kafka and MapR Streams is sensitive to the size of the messages being sent and to the distribution of those messages into topics. So, I analyzed those two attributes independently in order to measure how message size and stream topics affect throughput.

Throughput Performance

To measure producer throughput, I measured how fast a single producer could publish a sustained flow of messages to single topic with 1 partition and 3x replication. I ran this test for a variety of message sizes to see how that affects throughput. The results show MapR Streams consistently achieving much higher throughput than Kafka and having a much higher capacity for handling large message sizes, as shown below.

Throughput MB/s

MapR Streams doesn’t just send a faster volume of data than Kafka; it also has the capacity to send more records per second. We can see this by plotting throughput in terms of raw record count, as shown below:

Throughput Msgs/s

I recorded these results with two different code bases. First, I used custom tests that I wrote using the Java unit test framework (JUnit), then I used the performance test scripts included with Kafka and MapR. These different approaches did not produce exactly the same results but they were close, as shown below. This correlation helps validate the conclusions stated above, that MapR Streams can transport a larger volume of data and more frequent messages than Kafka.

Throughput Correlation

How does MapR Streams achieve more than 4x throughput than Kafka?

There are a lot of reasons why MapR Streams is faster, and without getting too technical, I’ll mention just a few. First, the MapR Streams client more efficiently flushes data to the MapR Streams server. It spawns its own threads to do this work, whereas Kafka uses the client application threads directly to flush to a Kafka broker, which in many cases is limited to just a single thread.

On the server side, MapR Streams inherits efficient I/O patterns from the core MapR storage layer which keeps files coherent and clean so that I/O operations can be efficiently buffered and addressed to sequential locations on disk. Replication is more efficient, too, since the underlying MapR storage platform has distributed synchronous replication built in, along with other operational features that simply don’t exist in Kafka, such as snapshots, mirroring, quotas, access controls, etc.

Replicating this test

My JUnit tests for benchmarking Kafka and MapR Streams is available at https://github.com/iandow/kafka_junit_tests. Here are the commands that I used to generate the data shown above:

git clone https://github.com/iandow/kafka_junit_tests
cd kafka_junit_tests
# Create a Kafka topic...
/opt/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic t-00000 --config compression.type=uncompressed
# or create a MapR Streams topic.
maprcli stream create -path /user/mapr/iantest -produceperm p -consumeperm p -topicperm p -defaultpartitions 1 -compression off
# Then compile.
mvn -e -Dtest=MessageSizeSpeedTest test
# Test data will be saved in size-count.csv

You can also measure throughput using the performance test utilities included with Kafka and MapR. Here are the commands that I used to do that:

Kafka script:

https://gist.github.com/iandow/bf5df0f9b4f19e6a19aa5a7a93b7c81c

MapR script:

https://gist.github.com/iandow/0750185f1d3631301d476b426c109a50

Topic Scalability

Another major advantage that MapR Streams holds over Kafka relates to how well it can handle large quantities of stream topics. Topics are the primary means of organizing stream data; however, there is overhead associated with categorizing streams into topics, and producer throughput is sensitive to that overhead. I quantified this by measuring how fast a single producer could publish a sustained flow of messages to an increasingly large quantity of topics. This is essentially a "fan-out" producer (illustrated below) and it is very common for fast data pipelines to use this pattern so that data can be more easily consumed downstream.

Fanout Producer

Each of the topics created for this scenario were configured with a single partition and 3x replication. Record size was held constant at 100 bytes.

It’s clear from the following graph that MapR Streams scales to a larger quantity of topics than Kafka.

Topic Scalability

How does MapR Streams handle so many more topics than Kafka?

A topic is just metadata in MapR Streams; it does not introduce overhead to normal operations. MapR Streams uses only one data structure for a stream, no matter how many topics it has, and the MapR storage system provides extremely fast and scalable storage for that data.

On the other hand, Kafka represents each topic by at least one directory and several files in a general purpose file system. The more topics/partitions Kafka has the more files it creates. This makes it harder to buffer disk operations, perform sequential I/O, and it increases the complexity of what ZooKeeper must manage.

Replicating this test

This scenario can be run with another JUnit test from https://github.com/iandow/kafka_junit_tests, as follows:

git clone https://github.com/iandow/kafka_junit_tests
cd kafka_junit_tests
# For MapR only, create the stream first:
maprcli stream create -path /user/mapr/taq -produceperm p -consumeperm p -topicperm p -compression off
mvn -e -Dtest= ThreadCountSpeedTest test
# Test data will be saved in thread-count.csv

 

Partition Scalability

Stream topics are often subdivided into partitions in order to allow multiple consumers to read from a topic simultaneously. Both Kafka and MapR Streams allow topics to be partitioned, but partitions in MapR Streams are much more powerful and easier to manage than partitions in Kafka. For example, Kakfa requires partitions to fit within the disk space of a single cluster node and cannot be split across machines. MapR Streams is not limited by the storage capacity of any one node because the MapR storage system automatically grows (or shrinks) partitions across servers. I’ll talk more about these operational advantages later, but let’s consider the performance implications of partitioning now.

ZooKeeper elects separate nodes to be leaders for each partition. Leaders are responsible for processing the client reads and writes for their designated partition. This helps load balance client requests across the cluster, but it complicates the work the ZooKeeper must do to keep topics synchronized and replicated. Leader election takes time and does not scale well. In my tests, I saw leader election take at least 0.1 seconds per partition and it ran serially. So, for example, it would take more than 10 seconds to configure a topic with 100 partitions, that is, if ZooKeeper didn’t crash, which it frequently did when I created topics with 100 or more partitions.

In MapR Streams, I had no problem streaming data to topics with thousands of partitions, as shown below. This graph shows the throughput for a producer sending synchronously to a 3x replicated topic subdivided into an increasingly large number of partitions. I could not run my test in Kafka beyond 400 partitions, so that line is cut short.

Partitioning Scalability

Replicating this test

I used the performance scripts included with Kafka and MapR to generate the partition vs. throughput data shown above. Here is the script I used to run this test in Kafka:

https://gist.github.com/iandow/625d783333a53b592f0381e6b37ee9ab

That script will silently freeze if ZooKeeper fails, but it will continue once ZooKeeper starts again. So in another terminal, I simultaneously ran the following script to automatically restart ZooKeeper if it fails (which it is likely to do during this test):

https://gist.github.com/iandow/2dc07bde132669706467e8ee45507561

Here is the script I used to generate partitions vs. throughput data in MapR:

https://gist.github.com/iandow/8074962f6205552c9cdc3fceccdd9793

Operational Advantages for MapR Streams

Increasing throughput capacity and decreasing message latency can often be accomplished simply by adding nodes to your distributed messaging cluster. However, doing so costs money and complicates management, so essentially saying that MapR Streams performs better than Kafka is another way of saying that operating a distributed messaging platform can be done with less hardware on MapR than with Kafka.

However, unless you’re working on applications that scale to extreme lengths, then the challenges you face with Kafka are more likely to be operational rather than performance in nature. And this is where the MapR total cost of ownership really shines.

Not only does MapR Streams execute with higher performance, it also addresses major operational deficiencies in Kafka. Here are three examples relating to replication, scaling, and mirroring:

  • Kafka requires that the MirrorMaker processes be manually configured in order to replicate across clusters. Replication is easy to configure with MapR Streams and supports unique capabilities for replicating streams across data centers and allowing streams to be updated in multiple locations at the same time.

  • Kafka’s mirroring design simply forwards messages to a mirror cluster. The offsets in the source cluster are useless in the mirror, which means consumers and producers cannot automatically failover from one cluster to a mirror. MapR continuously transfers updated records for near real-time replication and preserves message offsets in all replicated copies.

  • Kakfa requires partitions to fit within the disk space of a single cluster node and cannot be split across machines. This is especially risky, because ZooKeeper could automatically assign multiple large partitions to a node that doesn’t have space for them. You can move them manually, but that can quickly become unmanageable. MapR Streams is not limited by the storage capacity of any one node because it distributes stream data across the cluster.

References

For more information about the operational advantages of MapR Streams, see Will Ochandarena’s blog post, Scaling with Kafka – Common Challenges Solved.

I also highly recommend reading Chapter 5 of Streaming Architecture: New Designs Using Apache Kafka and MapR Streams, by Ted Dunning & Ellen Friedman.

Conclusion

MapR Streams outperforms Kafka in big ways. I measured the performance of distributed streaming in a variety of cases that focused on the effects of message size and topic quantity, and I saw MapR Streams transport a much faster stream of data, with much larger message sizes, and to far more topics than what could be achieved with Kafka on a similarly sized cluster. Although performance isn’t the only thing that makes MapR Streams desirable over Kafka, it offers one compelling reason to consider it.

 

Editor's Note: This blog post was originally published in the Converge Blog on January 11, 2017. Since the publication of the original article MapR Streams has been renamed as MapR-ES

 

Related

MapR-ES

Getting Started with MapR-ES Event Data Streams

By Carol McDonald

This is a 4-Part Series, see the previously published posts below:

Part 1 - Spark Machine Learning

Part 3 – Real-Time Dashboard Using Vert.x

This post is the second part in a series where we will build a real-time example for analysis and monitoring of Uber car GPS trip data. If you have not already read the first part of this series, you should read that first.

The first post discussed creating a machine learning model using Apache Spark’s K-means algorithm to cluster Uber data based on location. This second post will discuss using the saved K-means model with streaming data to do real-time analysis of where and when Uber cars are clustered.

Example Use Case: Real-Time Analysis of Geographically Clustered Vehicles/Items

The following figure depicts the architecture for the data pipeline:

  1. Uber trip data is published to a MapR Streams topic using the Kafka API
  2. A Spark streaming application subscribed to the first topic:
    1. Ingests a stream of uber trip events
    2. Identifies the location cluster corresponding to the latitude and longitude of the uber trip
    3. Adds the cluster location to the event and publishes the results in JSON format to another topic
  3. A Spark streaming application subscribed to the second topic:
    1. Analyzes the uber trip location clusters that are popular by date and time

Example Use Case Data

The example data set is Uber trip data, which you can read more about in part 1 of this series. The incoming data is in CSV format, an example is shown below , with the header:

date/time, latitude,longitude,base
2014-08-01 00:00:00,40.729,-73.9422,B02598

The enriched Data Records are in JSON format. An example line is shown below:

Spark Kafka Consumer Producer Code

Parsing the Data Set Records

A Scala Uber case class defines the schema corresponding to the CSV records. The parseUber function parses the comma separated values into the Uber case class.

Loading the K-Means Model

The Spark KMeansModel class is used to load the saved K-means model fitted on the historical Uber trip data.

Output of model clusterCenters:

Below the cluster centers are displayed on a google map:

Spark Streaming Code

These are the basic steps for the Spark Streaming Consumer Producer code:

  1. Configure Kafka Consumer Producer properties.
  2. Initialize a Spark StreamingContext object. Using this context, create a DStream which reads message from a Topic.
  3. Apply transformations (which create new DStreams).
  4. Write messages from the transformed DStream to a Topic.
  5. Start receiving data and processing. Wait for the processing to be stopped.

We will go through each of these steps with the example application code.

  1. Configure Kafka Consumer Producer properties

The first step is to set the KafkaConsumer and KafkaProducer configuration properties, which will be used later to create a DStream for receiving/sending messages to topics. You need to set the following paramters:

  • Key and value deserializers: for deserializing the message.
  • Auto offset reset: to start reading from the earliest or latest message.
  • Bootstrap servers: this can be set to a dummy host:port since the broker address is not actually used by MapR Streams.

For more information on the configuration parameters, see the MapR Streams documentation.

  1. Initialize a Spark StreamingContext object.

ConsumerStrategies.Subscribe, as shown below, is used to set the topics and Kafka configuration parameters. We use the KafkaUtils createDirectStream method with a StreamingContext, the consumer and location strategies, to create an input stream from a MapR Streams topic. This creates a DStream that represents the stream of incoming data, where each message is a key value pair. We use the DStream map transformation to create a DStream with the message values.

  1. Apply transformations (which create new DStreams)

We use the DStream foreachRDD method to apply processing to each RDD in this DStream. We parse the message values into Uber objects, with the map operation on the DStream. Then we convert the RDD to a DataFrame, which allows you to use DataFrames and SQL operations on streaming data.

Here is example output from the df.show:

A VectorAssembler is used to transform and return a new DataFrame with the latitude and longitude feature columns in a vector column.

Then the model is used to get the clusters from the features with the model transform method, which returns a DataFrame with the cluster predictions.

The output of categories.show is below:

The DataFrame is then registered as a table so that it can be used in SQL statements. The output of the SQL query is shown below:

  1. Write messages from the transformed DStream to a Topic

The Dataset result of the query is converted to JSON RDD Strings, then the RDD sendToKafka method is used to send the JSON key-value messages to a topic (the key is null in this case).

Example message values (the output for temp.take(2) ) are shown below:

{"dt":"2014-08-01 00:00:00","lat":40.729,"lon":-73.9422,"base":"B02598","cluster":7}

{"dt":"2014-08-01 00:00:00","lat":40.7406,"lon":-73.9902,"base":"B02598","cluster":7}

  1. Start receiving data and processing it. Wait for the processing to be stopped.

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

Spark Kafka Consumer Code

Next, we will go over some of the Spark streaming code which consumes the JSON-enriched messages.

We specify the schema with a Spark Structype:

Below is the code for:

  • Creating a Direct Kafka Stream
  • Converting the JSON message values to Dataset[Row] using spark.read.json with the schema
  • Creating two temporary views for subsequent SQL queries
  • Using ssc.remember to cache data for queries

Now we can query the streaming data to ask questions like: which hours had the highest number of pickups? (Output is shown in a Zeppelin notebook):

spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")

How many pickups occurred in each cluster?

df.groupBy("cluster").count().show()

or

spark.sql("select cluster, count(cluster) as count from uber group by cluster")

Which hours of the day and which cluster had the highest number of pickups?

spark.sql("SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)")

Display datetime and cluster counts for Uber trips:

%sql select cluster, dt, count(cluster) as count from uber group by dt, cluster order by dt, cluster

Software

Summary

In this blog post, you learned how to use a Spark machine learning model in a Spark Streaming application, and how to integrate Spark Streaming with MapR Streams to consume and produce messages using the Kafka API.

References and More Information:

Editor's note: This blog post was originally published in the MapR Converge Blog on January 05, 2017.

By Carol McDonald

 

A Formula 1 race is a high-speed example of the Internet of Things, where gathering, analyzing, and acting on tremendous amounts of data in real time is essential for staying competitive. The sport’s use of such information is so sophisticated that some teams are exporting their expertise to other industries, even for use on oil rigs. Within the industry, automobile companies such as Daimler and Audi are leveraging deep learning on the MapR Converged Data Platform to successfully analyze the continuous data generated from car sensors.

This blog discusses a proof of concept demo, which was developed as part of a pre-sales project to demonstrate an architecture to capture and distribute lots of data, really fast, for a Formula 1 team.

What’s the Point of Data in Motor Sports?

Formula 1 cars are some of the most heavily instrumented objects in the world.

Picture1

Read more about The Formula 1 Data Journey

Formula 1 data engineers analyze data from ~150 sensors per car, tracking vital stats such as tire pressure, fuel efficiency, wind force, GPS location, and brake temperature in real time. Each sensor communicates with the track, the crew in the pit, a broadcast crew on-site, and a second team of engineers back in the factory. They can transmit 2GB of data in one lap and 3TB in a full race.

Picture2

Watch WSJ's video "The F1 Big Data Explainer"

Data engineers can make sense of the car’s speed, stability, aerodynamics, and tire degradation around a race track. The graph below shows an example of what is being analyzed by race engineers: rpm, speed, acceleration, gear, throttle, brakes, by lap.

Picture3

More analysis is also completed at the team’s manufacturing base. Below is an example race strategy, analyzing whether it would be faster to do a pit stop tire change at lap 13 and lap 35 vs. lap 17 and lap 38.

Picture4

The Challenge: How Do You Capture, Analyze, and Store this Amount of Data in Real Time at Scale?

The 2014 U.S. Grand Prix collected more than 243 terabytes of data in a race weekend, and now there is even more data. Formula 1 teams are looking for newer technologies with faster ways to move and analyze their data in the cockpits and at the factory.

Below is a proposed proof of concept architecture for a Formula 1 team:

Picture5

MapR Edge in the cars provides an on-board storage system that buffers the most recent data and retries when transmission fails. MapR Edge addresses the need to capture, process, and provide backup for data transmission close to the source. A radio frequency link publishes the data from the edge to the referee “FIA” topic, and from there it is published to each team’s topic, where local analytics is done. The “team engineers” Stream replicates to the “trackside” and “factory” Streams, so that the data is pushed in real time for analytics. In a sport where seconds make a difference, it’s crucial that the trackside team can communicate quickly with the engineers at headquarters and team members around the world. MapR Streams replication provides an unusually powerful way to handle data across distant data centers at large scale and low latency.

The Demo Architecture

The demo architecture is considerably simplified because it needs to be able to run on a single node MapR sandbox. The demo does not use real cars; it uses the Open Racing Car Simulator (TORCS), a race car simulator often used for AI race games and as a research platform. 

Picture6

The demo architecture is shown below. Sensor data from the TORCS simulator is published to a MapR Streams topic using the Kafka API. Two consumers are subscribed to the topic. One Kafka API consumer, a web application, provides a real-time dashboard using websockets and HTML5. Another Kafka API consumer stores the data in MapR-DB JSON, where analytics with SQL are performed using Apache Drill. You can download the demo code here:  https://github.com/mapr-demos/racing-time-series.

Picture7

How Do You Capture this Amount of Data in Real Time at Scale?

Older messaging systems track message acknowledgements on a per-message, per-listener basis. A new approach is needed to handle the volume of data for the Internet of Things. 

Picture8

MapR Streams is a distributed messaging system that enables producers and consumers to exchange events in real time via the Apache Kafka 0.9 API. In MapR Streams, topics are logical collections of messages, which organize events into categories and decouple producers from consumers.

Picture9

Topics are partitioned for throughput and scalability. Producers are load balanced between partitions, and consumers can be grouped to read in parallel from multiple partitions within a topic for faster performance.

Picture10

You can think of a partition like a queue; new messages are appended to the end, and messages are delivered in the order they are received.

Picture11

Unlike a queue, events are not deleted when read; they remain on the partition, available to other consumers.

Picture12

A key to high performance at scale, in addition to partitioning, is minimizing the time spent on disk reads and writes. With Kafka and MapR Streams, messages are persisted sequentially as produced and read sequentially when consumed. These design decisions mean that non-sequential reading or writing is rare, allowing messages to be handled at very high speeds with scale. Not deleting messages when they are read also allows processing of the same messages by different consumers for different purposes. 

Picture13

Example Producer Using the Kafka API

Below is an example producer using the Kafka Java API. For more information, see the MapR Streams Java applications documentation.

Picture14

How to Store the Data

One of the challenges, when you have 243 terabytes of data every race weekend, is where do you want to store it? With a relational database and a normalized schema, related data is stored in different tables. Queries joining this data together can cause bottlenecks with lots of data. For this application, MapR-DB JSON was chosen for its scalability and flexible ease of use with JSON. MapR-DB and a denormalized schema scale, because data that is read together is stored together.

Picture15

With MapR-DB (HBase API or JSON API), a table is automatically partitioned across a cluster by key range, providing for really fast reads and writes by row key.

Picture16

JSON Schema Flexibility

JSON facilitates the natural evolution of your data schema during the life of your application. For example, in version 1, we have the following schema, where each JSON message group sensors values for Speed, Distance, and RPM:

 

{  "_id":"1.458141858E9/0.324",
   "car" = "car1",
   "timestamp":1458141858,
   "racetime”:0.324,
   "records": 
      [ 
         { 
            "sensors":{ 
               "Speed":3.588583,
               "Distance":2003.023071,
               "RPM":1896.575806
            },
         { 
            "sensors":{ 
               "Speed":6.755624,
               "Distance":2004.084717,
               "RPM":1673.264526
            },
        },

For version 2, you can easily capture more data values quickly without changing the architecture of your application, by adding attributes as shown below:

 

...
"records": 
      [ 
         { 
            "sensors":{ 
               "Speed":3.588583,
               "Distance":2003.023071,
               "RPM":1896.575806,
               "Throttle" : 37,
               "Gear" : 2

            },
. . .

As discussed before, MapR Streams allows processing of the same messages for different purposes or views. With this type of architecture and flexible schema, you can easily create new services and new APIs–for example, by adding Apache Spark Streaming or an Apache Flink Kafka Consumer for in-stream analysis, such as aggregations, filtering, alerting, anomaly detection, and/or machine learning.

Picture17

Processing the Data with Apache Flink

Apache Flink is an open source distributed data stream processor. Flink provides efficient, fast, consistent, and robust handling of massive streams of events as well as batch processing, a special case of stream processing. Stream processing of events is useful for filtering, creating counters and aggregations, correlating values, joining streams together, machine learning, and publishing to a different topic for pipelines.

Picture18

The code snippet below uses the FlinkKafkaConsumer09 class to get a DataStream from the MapR Streams “sensors” topic. The DataStream is a potentially unbounded distributed collection of objects. The code then calculates the average speed with a time window of 10 seconds. 

Picture19Picture20

Querying the Data with Apache Drill

Apache Drill is an open source, low-latency query engine for big data that delivers interactive SQL analytics at petabyte scale. Drill provides a massively parallel processing execution engine, built to perform distributed query processing across the various nodes in a cluster. 

Picture21

With Drill, you can use SQL to query and join data from files in JSON, Parquet, or CSV format, Hive, and NoSQL stores, including HBase, MapR-DB, and Mongo, without defining schemas.

Below is an example query to ‘show all’ from the race car’s datastore :

SELECT * 
FROM dfs.`/apps/racing/db/telemetry/all_cars`
LIMIT 10

And the results:

Picture22

Below is an example query to show the average speed by car and race:

SELECT race_id, car, 
ROUND(AVG( `t`.`records`.`sensors`.`Speed` ),2) as `mps`,
ROUND(AVG( `t`.`records`.`sensors`.`Speed` * 18 / 5 ), 2) as `kph`
FROM
(
SELECT race_id, car, flatten(records) as `records`
FROM dfs.`/apps/racing/db/telemetry/all_cars`
) AS `t`
GROUP BY race_id, car

 

And the results:

Picture23

All of the components of the use case architecture that we just discussed can run on the same cluster with the MapR Converged Data Platform.

Software

You can download the code and instructions to run this example from here: https://github.com/mapr-demos/racing-time-series.

Editor's note: This blog post was originally shared in the Converge Blog on May 11, 217. Since the publication of this blog MapR Streams product name has been changed to MapR-ES.

Additional Resources

MapR Edge product information

 

MapR Streams Application Development

Getting Started with Apache Flink and MapR Streams

Big Data On The Road

NorCom selects MapR converged data platform for foundation of deep learning framework for Mercedes

Apache Drill

MapR-ES

MapR-DB

by Karen Whipple

 

Human trafficking is the third largest crime industry in the world today. India has an estimated 18.3 million slaves, according to the Global Slavery Index, and every year 200,000 Indian children are tricked, kidnapped, or coerced into slavery. Every three minutes, a girl is sold into slavery. The average age of these girls is 12 years old, with children as young as six trafficked for sexual exploitation, and less than one percent of them are ever rescued. Public awareness is the most effective barrier to human trafficking, and MapR is proud to be part of the big data analytics solution, developed by Quantium for My Choices Foundation, to help alleviate one of the world’s worst social problems.

While many organizations work to rescue girls and prosecute the traffickers, Operation Red Alert, a program of My Choices Foundation, is a prevention program designed to help parents, teachers, village leaders, and children understand how the traffickers work, so they can block their efforts. Poor village girls are typically targeted, with promises that the girls are being offered wonderful opportunities for an education, jobs, or marriage. But with over 600,000 villages in India, Operation Red Alert needed help to determine which areas were most at risk to prioritize their education efforts.

 

Watch the video 'Red Alert Saves Lives'

As a pro bono project, the Australian analytics firm Quantium developed a big data solution, which runs on Cisco UCS infrastructure and uses the MapR Converged Data Platform. The solution analyzes India’s census data, government education data, and other sources for factors such as drought, poverty level, proximity to transportation stations, educational opportunities, population, and distance to police stations, so as to identify the villages and towns that are most at risk of human trafficking.

Elca Grobler, the founder of My Choices Foundation, explains the significance of this work: “The general Indian public is still largely unaware that trafficking exists, and most parents have no idea that their children are actually being sold into slavery. That’s why grassroots awareness and education at the village level is so important to ending the human traffic trade.”

Operation Red Alert

With a vision to end sex trafficking in India by preventing girls from ever entering it, Operation Red Alert began its efforts in 2014 by conducting research on root causes and by building relationships with long-standing non-government organizations (NGOs) in the field. Working with Quantium, they analyzed data while constantly refining and iterating to define a model that could identify villages most at risk. Red Alert now has 40 NGO partners, who have helped conduct the Safe Village education program in 600 villages throughout four states in India, reaching over 600,000 villagers. Red Alert also created India’s first national anti-trafficking helpline and is conducting mass media awareness campaigns.

As Grobler explains, “We are helping to banish human trafficking, one village at a time, through a combination of highly sophisticated technology and grassroots Safe Village education implemented through our NGO partners. The national helpline gives villagers a link to continual support, and with 95 million mobile phones in India that gives us a very broad reach. We’re also adding data and refining our predictive analytics, and we’re expanding our education efforts to cover more states this year.”

MapR Technology Behind the Scenes

Quantium brings together proprietary data, technology, and innovative data scientists to enable the development of groundbreaking analytical applications and develops insights into consumer needs, behaviors, and media consumption by analyzing consumer transaction data. Quantium upgraded its legacy server platform with Cisco UCS to gain centralized management and the computing power needed to process complex algorithms in a dense, scalable form factor that also reduces power consumption. Cisco Nexus 9000 Switches provide a simplified network with the scalable bandwidth to meet their current and future requirements. The MapR Converged Data Platform makes this enterprise possible by enabling organizations to create intelligent applications that fully integrate analytics with operational processes in real time.

Rigorous testing by Quantium demonstrated that the MapR-Cisco platform decreased query processing time by 92%, a performance increase of 12.5 times the legacy platform. With the Cisco-MapR solution, Quantium’s data scientists can design complex queries that run against multi-terabyte data sets and get more accurate results in minutes, rather than hours or days. In addition, the more powerful platform drives innovation because scientists can shorten development time by testing alternative scenarios quickly and accurately.

“UCS gives us the agility that’s key to supporting our iterative approach to analytics,” explains Simon Reid, Group Executive for Technology at Quantium. “For example, with the analytics for Operation Red Alert, we’re fine-tuning the algorithm, adding more hypothesis and more granular data to improve our predictive capabilities. MapR adds performance security and the ability to segregate multiple data sets from multiple data partners for Operation Red Alert.” MapR is proud that the unique multi-tenancy and high-speed performance and scale of the MapR Platform serves as the underlying technology to power the Operation Red Alert data platform.

Editor's note: This blog post was originally posted in the Converge Blog on May 30, 2017

Additional Resources

I'll guess that many people reading this have spend time wrestling with configuration to get Python and  Spark to play nicely. Having gone through the process myself, I've documented my steps and share the knowledge, hoping it will save some time and frustration for some of you.

 

This article targets the latest releases of MapR 5.2.1 and the MEP 3.0 version of Spark 2.1.0. It should work equally well for earlier releases of MapR 5.0 and 5.1. In fact, I've tested this to work with MapR 5.0 with MEP 1.1.2 (Spark 1.6.1) for a customer.  

 

The version of Jupyter is 4.3. It seems like it changed quite a bit since the earlier versions and so most of the information I found in blogs were pretty outdated. Hence having so much trouble getting everything working to my satisfaction.

 

My goals:

  • Running PySpark successfully from either a cluster node or an edge node
  • Running python code in YARN distributed mode
  • Have access to modules like numpy, scipy, pandas and others.
  • Do all this using Jupyter in server mode that I access from my own laptop

 

I'm leaving out Jupyter server mode security, which could be the topic of a separate blog, potentially. I've implemented it before and found the Jupyter documentation to explain setting it up for encryption (HTTPS) and authentication to be pretty good.

Installing an updated version of Python

Verify your version of Python:

python --version

 

If it's Python 2.6.X, it's probably a good idea to use a recent build of Python 2.7 If it's Python 2.7.X, then you'll need to choose to use the system python or not.

  • System python is easier to make work, it's already there and shared everywhere.
  • Isolated separate python (anaconda or a separate python) is harder to get working but will provide a more consistent environment where each user can have their own (and only their own) modules installed.

 

I will use Miniconda for Python 2.7 64bits throughout. It works very well. Using Python 3 would be just the same, with the only difference being in terms of code and module compatibility. Either work fine with Spark.

 

Note: Python 3.6 doesn't work with Spark 1.6.1 See SPARK-19019

Installing Anaconda

There is a choice between Anaconda and Miniconda, as well as between python 2.7 and Python 3.6.

Miniconda is very nice because the download is small and you only install what you need. Anaconda is very nice for having everything installed from the start, so all needed modules will be there from the start for most needs.

 

Here, we show installing miniconda and Python 2.7 (64bits):

wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh
bash Miniconda2-latest-Linux-x86_64.sh -b -p /opt/miniconda2

 

To install it on all nodes at once, we recommend to check out Clustershell.

 

#copy the file to all nodes

clush -ac Miniconda2-latest-Linux-x86_64.sh



#install on all nodes at same time:

clush -aB bash Miniconda2-latest-Linux-x86_64.sh -b -p /opt/miniconda2

 

Important: Get all nodes in same exact state, with python/anaconda installed exactly in the same location with all nodes having exactly the same modules installed. Miss here and it guarantees weird errors that will be hard to diagnose.

 

Update Spark environment to use Python 2.7

Add to /opt/mapr/spark/spark-2.1.0/conf/spark-env.sh:

    export PYSPARK_PYTHON=/opt/miniconda2/bin/python
    export PYSPARK_DRIVER_PYTHON=/opt/miniconda2/bin/python

 

update file on all nodes:

# using clustershell to copy file ("c") to all nodes ("a")

clush -ac /opt/mapr/spark/spark-2.1.0/conf/spark-env.sh

 

Note: this is known to work on previous MEP versions. I have also tested it with MEP 1.1.2 (Spark 1.6.1) and it worked very well. just use the correct path to Spark and it will work just fine.

 

Testing

For testing, lets use the data from MapR Academy's Spark Essentials course. Specifically the Ebay auction data.

Copy the data into the foloder: /user/mapr/data

 

Start pyspark and run the following code:

 >>> auctionRDD = sc.textFile("/user/mapr/data/auctiondata.csv").map(lambda line:line.split(","))
>>> auctionRDD.first()
[u'8213034705', u'95', u'2.927373', u'jake7870', u'0', u'95', u'117.5', u'xbox', u'3']
>>> auctionRDD.count()
10654
Ok, so now we have a working pyspark shell!

Note: don't do this as root or as user mapr on a production cluster. However, for doing tutorials, user mapr is convenient as it is a superuser and you don't need to worry about file permissions on MapR.

 

Errors:

  • pyspark java.io.IOException: Cannot run program "python2.7": error=2, No such file or directory
  • This error is because the driver and/or the executors can't find the python executable. It's fixed by setting the PYSPARK_PYTHON (and PYSPARK_DRIVER_PYTHON) variables in spark-env.sh (see above)

ipython Notebook

If you want to able to choose to use spark when launch ipython shell:

  1.         Ensure SPARK_HOME env variable is defined.
export SPARK_HOME=/opt/mapr/spark/spark-2.1.0
  1.         Install ipython with Anaconda
/opt/miniconda2/bin/conda install jupyter
  1.         add a ipython profile named pyspark
ipython profile create pyspark
  1.         Add ~/.ipython/profile_pyspark/startup/00-pyspark-setup.py:
import os
import system
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
  raise ValueError('SPARK_HOME environment variable is not set')
path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())
  1.         launch
/opt/miniconda2/bin/ipython --profile=pyspark

 

Try the sample code above it should also work without issue.

Jupyter Notebook

Now on to Jupyter. In this case, we're looking to have the notebook run on an edge node (less ideally, on a cluster node) in server mode and access it from our development laptop.

The following instructions assume the user mapr, but should work equally well for any other user. For production use, never use mapr user as it is a superuser with read-write access to all data in MapR.

With Anaconda:

clush -aB /opt/miniconda2/bin/conda install jupyter -y
  1.         Generate a profile:
 /opt/miniconda2/bin/jupyter notebook --generate-config

 

We need to update the profile so you can log on to the notebook from your local computer, not just from localhost.

This generates the following file: $HOME/.jupyter/jupyter_notebook_config.py In this file, we're going to update the following setting: NotebookApp.ip

 

 # here it's possible to use the server's IP address as well ex: '10.0.0.111'
# the important point is that leaving it to the default (i.e. 'localhost') prevents any remote connection
NotebookApp.ip = '*'

 

It's a good time to remind about security. it's pretty easy to configure Jupyter to use https and have a password. See Jupyter documentation.

  1.        The startup script from the ipython step is helpful:

      [mapr@ip-10-0-0-180 ~]$ cat .ipython/profile_default/startup/00-default-setup.py

 import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
  raise ValueError('SPARK_HOME environment variable is not set')
path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

       This step is essential or else the kernel won't initialize properly. Alternatively, you can past the code above in the first cell to initialize pyspark first. Another alternative is to use the module findspark, which probably does something similar to this, but with less code.

  1.         Add a PySpark Kernel. create the json file in the location as shown below:
[mapr@ip-10-0-0-20 ~]$ cat .ipython/kernels/pyspark/kernel.json
{
"display_name": "pySpark (Spark 2.1.0)",
 "language": "python",
 "argv": [
  "/opt/miniconda2/bin/python",
  "-m",
  "ipykernel",
  "-f",
  "{connection_file}"
 ],
"env": {
        "CAPTURE_STANDARD_OUT": "true",
        "CAPTURE_STANDARD_ERR": "true",
        "SEND_EMPTY_OUTPUT": "false",
        "SPARK_HOME": "/opt/mapr/spark/spark-2.1.0"
    }
}
  1.         Start a notebook and have fun with Spark and Python!
 jupyter notebook --no-browser

 

Open your browser to the indicated link and... Success!

 

Launch Jupyter notebook instead of pyspark

  1.         Update $SPARK_HOME/conf/spark-env.sh:
  •         [mapr@ip-10-0-0-20 ~]$ tail /opt/mapr/spark/spark-2.1.0/conf/spark-env.sh
    export PYSPARK_DRIVER_PYTHON=/opt/miniconda2/bin/jupyter

    # Setup env variable for Jupyter + Pyspark
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser"
  1.         Launch pyspark, it will launch a Jupyter notebook
  •  
         $SPARK_HOME/bin/pyspark

 

Conclusion

Getting this is harder work that it ought to be. Jupyter have changed their usage a lot over the versions, especially as they moved from ipython to Jupyter. The change is worth it, but the configuration is a big pain.

 

Getting the information all together with a repeatable, tested process was also quite a bit of work. Hopefully this will save some time to some people out there.

 

Thanks

Dong Meng's blog proved to be a life saver. Check him out: https://mengdong.github.io

BY Mathieu Dumoulin

 

Goal of the System

The Kafka/Spark Streaming system aims to provide better customer support by providing their support staff with always up-to-date call quality information for all their mobile customers.

Mobile customers, while making calls and using data, connect to the operator’s infrastructure and generate logs in many different systems. Three specific logs were identified that, if correlated with each other, give visibility in the actual quality of service experienced by each individual customer. The three logs were selected because they can be correlated through a simple relational database-like join operation.

For improving customer support, the quality of call information needs to be kept updated in near to real time; otherwise, it has no value. This has led, down the road, to building a streaming architecture rather than a batch job. The data volume at production load reaches several GB/s, generated by several million mobile customers, 24 hours a day, 365 days a year. Performance and stability at that scale is required for the system to reach production.

Project SLA Goals

The application has clear performance requirements based on the known worst-case throughput of the input data. This log data is generated by real-world use of the services of the company. If the application is to be useful at all, as a real-time streaming application, it must be able to handle this data without getting behind.

In term of numbers, the goal is to handle up to 3GB/min of input data. For this large mobile operator, such throughput represents about 150-200,000 events/second. Ordinarily, the throughput is about half of that value or 1.5GB/min and 60,000-80,000 events/second.

Data Sources

The raw data source are the logs of three remote systems, labeled A, B, and C here, where the log from A comprises about 84-85% of the entries, the log from B about 1-2%, and the log from C about 14-15%. The fact that the data is unbalanced is one of the (many) sources of difficulty in this application.

Picture1

The raw data is ingested into the system by a single Kafka producer into Kafka running on 6 servers. The producer reads the various logs and adds each log's records into its own topic. As there are three logs, there are three Kafka topics.

Picture2

The data is consumed by a Spark Streaming application, which picks up each topic, does a simple filter to cut out unnecessary fields, a map operation to transform the data, and then a foreachRDD operation (each micro-batch generates an RDD in Spark Streaming) that saves the data to Ignite and to HDFS as Hive tables for backup.

A second batch Spark application runs once per hour on the data stored in-memory in Ignite to join the records from the three separate logs into a single table. The batch job has a maximum data size of about 100GB. The cluster CPU resources should be sufficient to process this amount of data in one hour or less.

Picture3

Ignite stores 3 hours’ worth of data at all time to account for calls that begin in one hour and end in the hour getting processed, as well as calls that begin in the target hour and end in the next one. The telecom operator judges that calls that are so long they aren’t captured in this scheme can be ignored, as they are very rare.

It’s worth noting that a better all-streaming architecture could have avoided the whole issue with the intermediate representation in the first place. An illustrative, real-world case with more time and thought upfront can make the entire project end faster than just rushing headlong into coding the first working solution that comes to mind.

System Hardware and Software: At the Bleeding Edge of Open Source Big Data

The cluster has a lot of CPU and memory resources. It has 12 nodes of enterprise-grade servers, each equipped with two E5 Xeon CPUs (16 physical cores), 256GB memory, and eight 6TB spinning HDD (2 for OS in RAID 1). Each server has one 10GbE network interface.

Picture4

The technology stack selected for this project are centered around Kafka 0.8 for streaming the data into the system, Apache Spark 1.6 for the ETL operations (essentially a bit of filter and transformation of the input, then a join), and the use of Apache Ignite 1.6 as an in-memory shared cache to make it easy to connect the streaming input part of the application with joining the data. Backup is done to HDFS, as Hive ORC tables are also used to serve as a just-in-case backup for Ignite and to serve future need for other analytics use cases (none at the time).

The Spark applications are both coded in Scala 2.10 and Kafka’s direct approach (no receivers). Apache Ignite has a really nice Scala API with a magic IgniteRDD that can allow applications to share in-memory data, a key feature for this system to reduce coding complexity.

The cluster is running Apache Hadoop's HDFS as a distributed storage layer, with resources managed by Mesos 0.28. Finally, HBase is used as the ultimate data store for the final joined data. It will be queried by other systems outside the scope of this project. The cluster design with all relevant services is shown in the table above.

Performance Issues

The original system had several issues:

  1. Performance
    • First Spark Streaming job is not stable
    • Second Spark batch job can’t process 1 hour of data before the next hour of data arrives
  2. Stability: The application crashes under load

A Spark Streaming application is said to be stable if the processing time of each micro-batch is less than or equal to that micro-batch time. In this case, the application processes each 30 seconds of data in as much as 6 minutes. We need a 12x speedup.

Second, there is a batch process to join data one hour at a time that was targeted to run in 30 minutes but was taking over 2 hours to complete.

Third, the application was randomly crashing after running for a few hours. Stability of such a complex, fully open-source stack should never be assumed. Rather, it is the result of a constant effort by the team to better understand the system. We can expect that there will still be a lot of learning required to keep the system up and running once it is moved to production as well.

Performance Tuning

In my opinion, all performance and stability issues stem from the terrible idea of management to push a very good POC project developed on AWS into production on some on-premises hardware. It’s hard to believe, but they fully expected the POC code to run as-is on a production system it was never tested on.

Regardless, the task was set, and we had only a few short days to identify what could be done and get the system up to production speed. Final QA testing of the system was barely 1 week away, and management wasn’t in the mood to accept delays. We got to work...

First target: Improve Spark Streaming Performance

At maximum load, the Spark Streaming application is taking between 4.5 to 6 minutes for each micro-batch of 30 seconds. We need to find 9-12x speedup worth of improvements.

Spark has a lot of moving parts, but it will always be true that fast algorithms beat tweaking the configuration. In this case, there is nothing to get from the code; it’s all very parallelizable with no obvious issues, like doing two computations separately when they could be combined or any O(n^2) loop-in-another loop issues. The job is nothing more than a filter and a map.

What we need to determine, then, is whether the job is indeed being processed in parallel to make the most of all those CPU cores. In a Spark Streaming job, Kafka partitions map 1 to 1 with Spark partitions.

Increase Parallelism: Increase Number of Partitions in Kafka

A quick check of the Spark UI shows 36 partitions. As each server has 6 physical disks, I assume the choice of partitioning was selected by the formula node * physical disks = partition count per topic. Quickly checking online reveals that partitioning is quite a bit more complex than that and the formula to decide on partition number isn’t from any known Kafka best practices guide. (Ref: https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/)

The input data was unbalanced and most of the application processing time was spent processing Topic 1 (with 85% of the throughput). Kafka partitions are matched 1:1 with the number of partitions in the input RDD, leading to only 36 partitions, meaning we can only keep 36 cores busy on this task. To increase the parallelism, we need to increase the number of partitions. What we did was split topic 1 into 12 topics each with 6 partitions for a total of 72 partitions. The way it was done was a simple modification to the producer to evenly divide the data from the first log into 12 topics instead of just one. Zero code needed to be modified on the consumer side.

We also right-sized the number of partitions for the two other topics, in proportion to their relative importance in the input data, so we set topic 2 to two partitions and topic 3 to eight partitions.

Picture5Running more tasks in parallel. Before tuning, each stage always had 36 partitions!

Fix RPC Timeout Exceptions

When looking at the application logs, we could see a lot of RPC timeout exceptions. We do a web search and find what we believe is the relevant JIRA (SPARK-14140 in JIRA). The recommended fix is to increase the spark.executor.heartbeatInterval from 10s (default) to 20s.

I think this could be caused by nodes getting busy from disk or CPU spikes because of Kafka, Ignite, or garbage collector pauses. Since Spark runs on all nodes, the issue was random (see the cluster services layout table in the first section).

The configuration change fixed this issue completely. We haven’t seen it happen since. (Yay!)

Increase Driver and Executor Memory

Out of memory issues and random crashes of the application were solved by increasing the memory from 20g per executor to 40g per executor as well as 40g for the driver. Happily, the machines in the production cluster were heavily provisioned with memory. This is a good practice with a new application, since you don’t know how much you will need at first.

The issue was difficult to debug with precision and reliable information, since the Spark UI reports very little memory consumption. In practice, as this setting is easy to change, we empirically settled on 40g being the smallest memory size for the application to run stably.

Right Size the Executors

The original application was running only 3 executors with 72 total cores. We configured the application to run with 80 cores with a maximum of 10 cores per executor, for a total of 8 executors. Note that with 16 real cores per node on a 10 node cluster, we’re leaving plenty of resources for Kafka brokers, Ignite, and HDFS/NN to run on.

Increase the Batch Window from 30s to 1m

The data is pushed into Kafka by the producer as batches every 30s, as it is gathered by FTP batches from the remote systems. Such an arrangement is common in telecom applications due to a need to deal with equipment and systems from a bewildering range of manufacturers, technology, and age.

This meant that the input stream was very spiky, when looking at the processing time from the Spark UI’s streaming tab.

Increasing the window to 1m allowed us to smooth out the input and gave the system a chance to process the data in 1 minute or less and still be stable.

To make sure of it, the team had a test data which simulated the known worst-case data, and with the new settings, the Spark Streaming job was now indeed stable. We also tried it on real production data, and everything looked good. Win!

Drop Requirement to Save Hive Tables to HDFS

Discussion with the project managers revealed that Hive was not actually part of the requirements for the streaming application! Mainly, this is because the other analytics, mostly SQL requests, could be serviced from the data in HBase.

Considering the goal of the system, the worst-case scenario for missing data is that a customer's call quality information cannot be found... which is already the case. In other words, the consequence of data loss is not negative; rather, the consequence of gaining data is additional insights. If the great majority of the data is processed and stored, the business goals can be reached.

There wasn’t much point in saving the data to Hive mid-flight for increased fault-tolerance either, as once the data is in Ignite, it’s safe even if the Spark application crashes. This made Ignite an even more critical part of the application, despite it having some issues of its own. It was a difficult decision that we made entirely due to the advanced stage of the project. As we’ll explain in more detail in the conclusion, the architecture itself was problematic, and it’s not time to play with architecture when you’re a week or two from production.

Spark Performance Tuning Results

The Spark Streaming application finally became stable, with an optimized runtime of 30-35s.

As it turns out, cutting out Hive also sped up the second Spark application that joins the data together, so that it now ran in 35m, both now well within the project requirements.

With improvements from the next part, the final performance of the Spark Streaming job went down in the low 20s range, for a final speedup of a bit over 12 times.

Second target: Improve System Stability

We had to work quite hard on stability. Several strategies were required, as we will explain below.

Make the Spark Streaming Application Stable

The work we did to fix the performance had a direct impact on system stability. If both applications are stable themselves and running on right-sized resources, then the system has the best chance to be stable overall.

Remove Mesos and Use Spark Standalone

The initial choice of Mesos to manage resources was forward-looking, but ultimately we decided to drop it from the final production system. At the onset, the plan was to have Mesos manage all the applications. But the team never could get Kafka and Ignite to play nice with Mesos, and so they were running in standalone mode, leaving only Spark to be managed by Mesos. Surely, with more time, there is little doubt all applications could be properly configured to work with Mesos.

Proposing to remove Mesos was a bit controversial, as Mesos is much more advanced and cool than Spark running in standalone mode.

But the issue with Mesos was twofold:

  1. Control over executor size and number was poor, a known issue (SPARK-5095) with Spark 1.6 and now fixed in Spark 2.X.
  2. Ignite and Kafka aren’t running on Mesos, only Spark is. Given the schedule pressure, the team had given up trying to get those two services running in Mesos.

Mesos can only ever allocate resources well if it controls resources. In the case of this system, Kafka and Ignite are running outside of Mesos’ knowledge, meaning it’s going to assign resources to the Spark applications incorrectly.

In addition, it’s a single purpose cluster, so we can live with customizing the sizing of the resources for each application with a global view of the system’s resources. There is little need for dynamic resource allocations, scheduling queues, multi-tenancy, and other buzzwords.

Change the Ignite Memory Model

It is a known issue that when the Heap controlled by the JVM gets very big (> 32GB), the cost of garbage collection is quite large. We could indeed see this when the join application runs, where the stages with 25GB shuffle had some rows with spikes in GC time from 10 seconds range up to more than a minute.

The initial configuration of Ignite was to run ONHEAP_TIERED, with 48GB worth of data cached on heap, then overflow drops to 12GB of off-heap memory. That setting was changed to the OFFHEAP_TIERED model. While slightly slower due to serialization cost, OFFHEAP_TIERED doesn't rely on the JVM’s garbage collection. It still runs in memory, so we estimated it would be a net gain.

With this change, the run time for each batch dutifully came down by about five seconds, from 30 seconds down to about 25 seconds. In addition, successive batches tended to have much more similar processing time, with a delta of 1-3 seconds, whereas it would vary by over 5 to 10 seconds, previously.

Update the Ignite JVM Settings

We followed the recommended JVM options as found in Ignite documentation’s performance tuning section (http://apacheignite.gridgain.org/docs/jvm-and-system-tuning).

Improve the Spark Code

Some parts of the code assumed reliability, like queries to Ignite, when in fact there was possibility of the operations failing. These problems can be fixed in the code, which now handles exceptions more gracefully, though there is probably work left to increase the robustness of the code. We can only find these spots by letting the application run now.

Reassign ZooKeeper to Nodes 10-12

Given the cluster is of medium size, it’s worth spreading the services as much as possible. We moved the ZooKeeper services from nodes 1-3 to nodes 10-12.

Final System Architecture

Picture6

Conclusion

Tuning this application took about 1 week of full-time work. The main information we used was Spark UI and Spark logs, easily accessible from the Spark UI. The view of Jobs and Stages as well as the streaming UI are really very useful.

Essential Takeaways

  • Migrating a streaming application from a prototype on AWS to an on-site cluster requires schedule time for testing
  • Not testing the AWS prototype with realistic data was a big mistake
  • Including many “bleeding-edge” OSS components (Apache Ignite and Mesos) with expectations of very high reliability is unrealistic
  • A better architecture design could have simplified the system tremendously
  • Tuning a Kafka/Spark Streaming application requires a holistic understanding of the entire system; it’s not just about changing parameter values of Spark. It’s a combination of the data flow characteristics, the application goals and value to the customer, the hardware and services, the application code, and then playing with Spark parameters.
  • The MapR Converged Data Platform would have cut the development time, complexity, and cost for this project.

This project was a hell of a dive in the deep end of the pool for a telecom operator with very little experience with the open-source enterprise big data world. They should be applauded for ambition and desire to take up such a challenge with the goal of benefiting their customers. But a better choice of platform and application architecture could have made their life a lot easier.

A Converged Platform is the Correct Approach

In fact, the requirements for this project show the real-world business need for a state-of-the-art converged platform with a fast distributed file system, high performance key-value store for persistence and real-time, and high performance streaming capabilities.

A MapR-based solution would have been a lot easier to build and maintain, absolutely for sure. Since MapR Streams is built-in, there is one less cluster to manage (bye bye, Kafka brokers). The Spark application could run with the same code but without needing to rely on a speculative open-source project like Apache Ignite.

Saving to MapR-DB uses the same HBase API, so likely no code change there either, and you’re saving to a DB that’s built into the native C MapR-FS, so that’s going to be super fast as well. Finally, sharing the resources is simplified by running only Spark on YARN or standalone-mode, while the platform is left to deal with the resource requirements of the MapR Streams, MapR-FS, and MapR-DB with reliability and performance, guaranteed, since highly trained support engineers are available 24/7 to support every single part of this application.

Given this system is heading into production for a telecom operator with 24/7 reliability expectation, I’d argue that built-in simplicity, performance, and support are pretty compelling and hopefully will be adopted by this customer for the next iteration of the system. (Stay tuned!)

 

Editor's note: This blog post was originally published in the Converge Blog on May 31, 2017.

 

Related

MapR Guide to Big Data in Telecommunications | MapR 

Big Data Opportunities for Telecommunications | MapR 

Apache Apex on MapR Converged Platform | MapR 

How To Get Started With Spark Streaming And MapR Streams Using The Kafka API 

Apache Spark

Apache Hbase

Apache Apex

Products and Services 

By Justin Brandenburg

 

Time series analysis has significance in econometrics and financial analytics but can be utilized in any field, where understanding trends is important to decision making and reacting to changes in behavioral patterns. For example, a MapR Converged Data Platform customer, who is a major oil and gas provider, places sensors on wells, sending data to MapR Streams that is then used for trend monitoring well conditions, such as volume and temperature. In finance, time series analytics is used for financial forecasting for stock prices, assets, and commodities. Econometricians have long leveraged “autoregressive integrated moving average” (ARIMA) models to perform univariate forecasts.

ARIMA models have been used for decades and are well understood. However, with the rise of machine learning and, more recently, deep learning, other models are being explored and utilized, either to support ARIMA results or replace them.

Deep learning (DL) is a branch of machine learning based on a set of algorithms that attempts to model high-level abstractions in data by using artificial neural network (ANN) architectures composed of multiple non-linear transformations. One of the more popular DL deep neural networks is the Recurrent Neural Network (RNN). RNNs are a class of neural networks that depend on the sequential nature of their input. Such inputs could be text, speech, time series, and anything else in which the occurrence of an element in the sequence is dependent on the elements that appeared before it. For example, the next word in a sentence, if someone writes “the grocery…” is most likely to be “store” instead of “school.” In this case, given this sequence, an RNN would likely predict store rather than school.

Artificial Neural Networks

Actually, it turns out that while neural networks are sometimes intimidating structures, the mechanism for making them work is surprisingly simple: stochastic gradient descent. For each of the parameters in our network (such as weights or biases), all we have to do is calculate the derivative of the parameter with respect to the loss, and nudge it a little bit in the opposite direction.

ANNs use a method known as backpropagation to tune and optimize the results. Backpropagation is a two-step process, where the inputs are fed into the neural network via forward propagation and multiplied with (initially random) weights and bias before they are transformed via an activation function. The depth of your neural network will depend on how many transformations your inputs should go through. Once the forward propagation is complete, the backpropagation step measures the error from your final output to the expected output by calculating the partial derivatives of the weights generating the error and adjusts them. Once the weights are adjusted, the model will repeat the process of the forward and backpropagation steps to minimize the error rate until convergence. If you notice how the inputs are aligned in Fig. 1, you will see that this is an ANN with only one hidden layer, so the back propagation will not need to perform multiple gradient descent calculations.

Picture1Figure 1

Recurrent Neural Networks

Recurrent Neural Networks (RNNs) are called recurrent because they perform the same computations for all elements in a sequence of inputs. RNNs are becoming very popular due to their wide utility. They can analyze time series data, such as stock prices, and provide forecasts. In autonomous driving systems, they can anticipate car trajectories and help avoid accidents. They can take sentences, documents, or audio samples as input, making them extremely useful for natural language processing (NLP) systems, such as automatic translation, speech-to-text, or sentiment analysis. It can be applied in situations where you have a sequences of “events” with events being a data point.

Picture2Figure 2

Fig. 2 shows an example of an RNN architecture, and we see xt is the input at time step t. For example, x1 could be the first price of a stock in time period one. st is the hidden state at time step tn and is calculated based on the previous hidden state and the input at the current step, using an activation function. St-1 is usually initialized to zero. ot is the output at step t. For example, if we wanted to predict the next value in a sequence, it would be a vector of probabilities across our time series.

RNN cells are developed on the notion that one input is dependent on the previous input by having a hidden state, or memory, that captures what has been seen so far. The value of the hidden state at any point in time is a function of the value of the hidden state at the previous time step and the value of the input at the current time step. RNNs have a different structure than ANNs and use backpropagation through time (BPTT) to compute the gradient descent after each iteration.

Example

This example was done with a small MapR cluster of 3 nodes. This example will use the following:

  • Python 3.5
  • TensorFlow 1.0.1
  • Red Hat 6.9

If you are using Anaconda, you should be able to install TensorFlow version 1.0.1 on your local machine and Jupyter Notebook. This code will not work with versions of TensorFlow < 1.0. It can be run on your local machine and conveyed to a cluster if the TensorFlow versions are the same or later. Other deep learning libraries to consider for RNNs are MXNet, Caffe2, Torch, and Theano. Keras is another library that provides a python wrapper for TensorFlow or Theano.

Picture3

MapR provides the ability to integrate Jupyter Notebook (or Zeppelin) at the user’s preference. What we are showing here would be the end of a data pipeline. The true value of running a RNN time series model in a distributed environment is the data pipelines you can construct to push your aggregated series data into a format that can be fed into the TensorFlow computational graph.

If I am aggregating network flows from multiple devices (IDS, syslogs, etc.), and I want to forecast future network traffic pattern behavior, I could set up a real-time data pipeline using MapR Streams that aggregates this data into a queue that can be fed into my TensorFlow model. For this example, I am using only a single node on my cluster, but I could have installed TensorFlow on the two other nodes and could have three TF models running with different hyper-parameters.

For this example, I generated some dummy data.

Picture4Picture5

We have 209 total observations in our data. I want to make sure I have the same number of observations for each of my batch inputs.

What we see is our training data set is made up of 10 batches, containing 20 observations. Each observation is a sequence of a single value.

Picture6

Now that we have our data, let’s create our TensorFlow graph that will do the computation. ^1^

Picture7

There is a lot going on there, so let's examine one step at a time. We are specifying the number of periods we are using to predict. In this case, it is the number of sequences that we are feeding into the model as a single input. We specify our variable placeholders. We initialize a type of RNN cell to use (size 100) and the type of activation function we want. ReLU stands for “Rectified Linear Unit” and is the default activation function, but it can be changed to Sigmoid, Hyberbolic Tangent (Tanh), and others, if desired.

We want our outputs to be in the same format as our inputs so we can compare our results using the loss function. In this case, we are using mean squared error (MSE), since this is a regression problem, in which our goal is to minimize the difference between the actual and the predicted. If we were dealing with a classification outcome, we might use cross-entropy. Now that we have this loss function defined, it is possible to define the training operation in TensorFlow that will optimize our network of input and outputs. To execute the optimization, we will use the Adam optimizer. Adam optimizer is a great general-purpose optimizer that performs our gradient descent via backpropagation through time. This allows faster convergence at the cost of more computation.

Now it is time to implement this model on our training data.

Picture8

We specify the number of iterations/epochs that will cycle through our batches of training sequences. We create our graph object (tf.Session()) and initialize our data to be fed into the model as we cycle through the epochs. The abbreviated output shows the MSE after each 100 epochs. As our model feeds the data forward and backpropagation runs, it adjusts the weights applied to the inputs and runs another training epoch. Our MSE continues to improve (decrease). Finally, once the model is done, it takes the parameters and applies them to the test data to give us our predicted output for Y.

Let’s check our predicted versus actual. For our test data, we were focused on the last 20 periods of the entire 209 periods.

Picture9Picture10

It would appear there is some room for improvement ☺. However, this can be done by changing the number of hidden neurons and/or increasing the number of epochs. Optimizing our model is a process of trial and error, but we have a great start. This is random data, so we were expecting great results, but perhaps applying this model to a real-time series would give the ARIMA models some quality competition.

RNNs (and Deep Learning in general) are expanding the options available to data scientists to solve interesting problems. One issue that many data scientists face is how can we automate our analysis to run, once we have optimized it? Having a platform like MapR allows for this ability because you can construct, train, test, and optimize your model on a big data environment. In this example, we only used 10 training batches. What if my data allowed me to leverage hundreds of batches, not merely of 20 periods, but 50 or 100 or 500? I think I could definitely improve this model’s performance. Once I did, I could package it up into an automated script to run on an individual node, a GPU node, in a Docker container, or all of the above. That’s the power of doing data science and deep learning on a converged data platform.

^1^ Portions of this model were taken from the fantastic book Hands-On Machine Learning with Scikit-Learn and TensorFlow, 1st Edition, by Aurélien Géron.

By Carol McDonald

 

Churn prediction is big business. It minimizes customer defection by predicting which customers are likely to cancel a subscription to a service. Though originally used within the telecommunications industry, it has become common practice across banks, ISPs, insurance firms, and other verticals.

The prediction process is heavily data-driven and often utilizes advanced machine learning techniques. In this post, we'll take a look at what types of customer data are typically used, do some preliminary analysis of the data, and generate churn prediction models–all with Spark and its machine learning frameworks.

Customer 360 Using data science in order to better understand and predict customer behavior is an iterative process, which involves:

  1. Data discovery and model creation:
    • Analysis of historical data
    • Identifying new data sources, which traditional analytics or databases are not using, due to the format, size, or structure
    • Collecting, correlating, and analyzing data across multiple data sources
    • Knowing and applying the right kind of machine learning algorithms to get value out of the data
  2. Using the model in production to make predictions
  3. Data discovery and updating the model with new data

Picture1

In order to understand the customer, a number of factors can be analyzed, such as:

  • Customer demographic data (age, marital status, etc.)
  • Sentiment analysis of social media
  • Customer usage patterns and geographical usage trends
  • Calling-circle data
  • Browsing behavior from clickstream logs
  • Support call center statistics
  • Historical data that show patterns of behavior that suggest churn

With this analysis, telecom companies can gain insights to predict and enhance the customer experience, prevent churn, and tailor marketing campaigns.

CLASSIFICATION

Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (e.g., whether a transaction is fraud or not fraud), based on labeled examples of known items (e.g., transactions known to be fraud or not). Classification takes a set of data with known labels and pre-determined features and learns how to label new records based on that information. Features are the “if questions” that you ask. The label is the answer to those questions. In the example below, if it walks, swims, and quacks like a duck, then the label is "duck."

Picture2

Let’s go through an example of telecom customer churn:

  • What are we trying to predict?
    • Whether a customer has a high probability of unsubscribing from the service or not
    • Churn is the Label: True or False
  • What are the “if questions” or properties that you can use to make predictions?
    • Call statistics, customer service calls, etc.
    • To build a classifier model, you extract the features of interest that most contribute to the classification.

DECISION TREES

Decision trees create a model that predicts the class or label, based on several input features. Decision trees work by evaluating an expression containing a feature at every node and selecting a branch to the next node, based on the answer. A possible decision tree for predicting credit risk is shown below. The feature questions are the nodes, and the answers “yes” or “no” are the branches in the tree to the child nodes.

  • Q1: Is checking account balance > 200DM?
    • No
    • Q2: Is length of current employment > 1 year?
      • No
      • Not Creditable

Picture3

EXAMPLE USE CASE DATA SET

For this tutorial, we'll be using the Orange Telecoms Churn Dataset. It consists of cleaned customer activity data (features), along with a churn label specifying whether the customer canceled the subscription or not. The data can be fetched from BigML's S3 bucket, churn-80 and churn-20. The two sets are from the same batch but have been split by an 80/20 ratio. We'll use the larger set for training and cross-validation purposes and the smaller set for final testing and model performance evaluation. The two data sets have been included with the complete code in this repository for convenience. The data set has the following schema:

1. State: string
2. Account length: integer
3. Area code: integer
4. International plan: string
5. Voice mail plan: string
6. Number vmail messages: integer
7. Total day minutes: double
8. Total day calls: integer
9. Total day charge: double
10.Total eve minutes: double
11. Total eve calls: integer
12. Total eve charge: double
13. Total night minutes: double
14. Total night calls: integer
15. Total night charge: double
16. Total intl minutes: double
17. Total intl calls: integer
18. Total intl charge: double
19. Customer service calls: integer

 

The CSV file has the following format:

LA,117,408,No,No,0,184.5,97,31.37,351.6,80,29.89,215.8,90,9.71,8.7,4,2.35,1,False IN,65,415,No,No,0,129.1,137,21.95,228.5,83,19.42,208.8,111,9.4,12.7,6,3.43,4,True

The image below shows the first few rows of the data set:

Picture4

SOFTWARE

This tutorial will run on Spark 2.0.1 and above.

LOAD THE DATA FROM A CSV FILE

Picture5

First, we will import the SQL and machine learning packages.

import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.feature.VectorAssembler

 

We use a Scala case class and Structype to define the schema, corresponding to a line in the CSV data file.

// define the Churn Schema
case class Account(state: String, len: Integer, acode: String,
    intlplan: String, vplan: String, numvmail: Double,
    tdmins: Double, tdcalls: Double, tdcharge: Double,
    temins: Double, tecalls: Double, techarge: Double,
    tnmins: Double, tncalls: Double, tncharge: Double,
    timins: Double, ticalls: Double, ticharge: Double,
    numcs: Double, churn: String)

val schema = StructType(Array(
    StructField("state", StringType, true),
    StructField("len", IntegerType, true),
    StructField("acode", StringType, true),
    StructField("intlplan", StringType, true),
    StructField("vplan", StringType, true),
    StructField("numvmail", DoubleType, true),
    StructField("tdmins", DoubleType, true),
    StructField("tdcalls", DoubleType, true),
    StructField("tdcharge", DoubleType, true),
    StructField("temins", DoubleType, true),
    StructField("tecalls", DoubleType, true),
    StructField("techarge", DoubleType, true),
    StructField("tnmins", DoubleType, true),
    StructField("tncalls", DoubleType, true),
    StructField("tncharge", DoubleType, true),
    StructField("timins", DoubleType, true),
    StructField("ticalls", DoubleType, true),
    StructField("ticharge", DoubleType, true),
    StructField("numcs", DoubleType, true),
    StructField("churn", StringType, true)
  ))

 

Using Spark 2.0, we specify the data source and schema to load into a Dataset. Note that with Spark 2.0, specifying the schema when loading data into a DataFrame will give better performance than schema inference. We cache the Datasets for quick, repeated access. We also print the schema of the Datasets.

val train: Dataset[Account] = spark.read.option("inferSchema", "false")
      .schema(schema).csv("/user/user01/data/churn-bigml-80.csv").as[Account]
train.cache

val test: Dataset[Account] = spark.read.option("inferSchema", "false")
      .schema(schema).csv("/user/user01/data/churn-bigml-20.csv").as[Account]
test.cache
train.printSchema()

 

root
|-- state: string (nullable = true)
|-- len: integer (nullable = true)
|-- acode: string (nullable = true)
|-- intlplan: string (nullable = true)
|-- vplan: string (nullable = true)
|-- numvmail: double (nullable = true)
|-- tdmins: double (nullable = true)
|-- tdcalls: double (nullable = true)
|-- tdcharge: double (nullable = true)
|-- temins: double (nullable = true)
|-- tecalls: double (nullable = true)
|-- techarge: double (nullable = true)
|-- tnmins: double (nullable = true)
|-- tncalls: double (nullable = true)
|-- tncharge: double (nullable = true)
|-- timins: double (nullable = true)
|-- ticalls: double (nullable = true)
|-- ticharge: double (nullable = true)
|-- numcs: double (nullable = true)
|-- churn: string (nullable = true)

//display the first 20 rows:
train.show

Picture6

SUMMARY STATISTICS

Spark DataFrames include some built-in functions for statistical processing. The describe() function performs summary statistics calculations on all numeric columns and returns them as a DataFrame.

train.describe()

Out:

Picture7

Data Exploration

We can use Spark SQL to explore the dataset. Here are some example queries using the Scala DataFrame API:

train.groupBy("churn").sum("numcs").show
+-----+----------+
|churn|sum(numcs)|
+-----+----------+
|False|    3310.0|
| True|     856.0|
+-----+----------+

train.createOrReplaceTempView("account")
spark.catalog.cacheTable("account")

 

Picture8

Total day minutes and Total day charge are highly correlated fields. Such correlated data won't be very beneficial for our model training runs, so we're going to remove them. We'll do so by dropping one column of each pair of correlated fields, along with the State and Area code columns, which we also won’t use.

val dtrain =train.drop("state").drop("acode").drop("vplan")   .drop("tdcharge").drop("techarge")

Picture9

Grouping the data by the Churn field and counting the number of instances in each group shows that there are roughly 6 times as many false churn samples as true churn samples.

dtrain.groupBy("churn").count.show

Out:

+-----+-----+
|churn|count|
+-----+-----+
|False| 2278|
| True|  388|
+-----+-----+

Business decisions will be used to retain the customers most likely to leave, not those who are likely to stay. Thus, we need to ensure that our model is sensitive to the Churn=True samples.

STRATIFIED SAMPLING

We can put the two sample types on the same footing using stratified sampling. The DataFrames sampleBy() function does this when provided with fractions of each sample type to be returned. Here, we're keeping all instances of the Churn=True class, but downsampling the Churn=False class to a fraction of 388/2278.

val fractions = Map("False" -> .17, "True" -> 1.0) 
val strain = dtrain.stat.sampleBy("churn", fractions, 36L)
strain.groupBy("churn").count.show

Out:

-----+-----+ 
|churn|count|
+-----+-----+
|False|  379|
| True|  388|
+-----+-----+

FEATURES ARRAY

To build a classifier model, you extract the features that most contribute to the classification. The features for each item consist of the fields shown below:

  • Label → churn: True or False
  • Features → {"len", "iplanIndex", "numvmail", "tdmins", "tdcalls", "temins", "tecalls", "tnmins", "tncalls", "timins", "ticalls", "numcs"}

In order for the features to be used by a machine learning algorithm, they are transformed and put into Feature Vectors, which are vectors of numbers representing the value for each feature.

Picture10

Reference: Learning Spark

USING THE SPARK ML PACKAGE

The ML package is the newer library of machine learning routines. Spark ML provides a uniform set of high-level APIs built on top of DataFrames.

Picture11

We will use an ML Pipeline to pass the data through transformers in order to extract the features and an estimator to produce the model.

  • Transformer: A Transformer is an algorithm which transforms one DataFrame into another DataFrame. We will use a transformer to get a DataFrame with a features vector column.
  • Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer (e.g., training/tuning on a DataFrame and producing a model).
  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify a ML workflow.

FEATURE EXTRACTION AND PIPELINING

The ML package needs data to be put in a (label: Double, features: Vector) DataFrame format with correspondingly named fields. We set up a pipeline to pass the data through 3 transformers in order to extract the features: 2 StringIndexers and a VectorAssembler. We use the StringIndexers to convert the String Categorial feature intlplan and label into number indices. Indexing categorical features allows decision trees to treat categorical features appropriately, improving performance.

Picture12

// set up StringIndexer transformers for label and string feature 
val ipindexer = new StringIndexer()
       .setInputCol("intlplan")
       .setOutputCol("iplanIndex")
val labelindexer = new StringIndexer()
       .setInputCol("churn")
       .setOutputCol("label")

The VectorAssembler combines a given list of columns into a single feature vector column.

// set up a VectorAssembler transformer 
val featureCols = Array("len", "iplanIndex", "numvmail", "tdmins",
      "tdcalls", "temins", "tecalls", "tnmins", "tncalls", "timins",
      "ticalls", "numcs") 
val assembler = new VectorAssembler()
       .setInputCols(featureCols)
       .setOutputCol("features")

The final element in our pipeline is an estimator (a decision tree classifier), training on the vector of labels and features.

Picture13

// set up a DecisionTreeClassifier estimator 
val dTree = new DecisionTreeClassifier().setLabelCol("label")
       .setFeaturesCol("features") 
// Chain indexers and tree in a Pipeline
val pipeline = new Pipeline()
       .setStages(Array(ipindexer, labelindexer, assembler, dTree))

TRAIN THE MODEL

Picture14

We would like to determine which parameter values of the decision tree produce the best model. A common technique for model selection is k-fold cross validation, where the data is randomly split into k partitions. Each partition is used once as the testing data set, while the rest are used for training. Models are then generated using the training sets and evaluated with the testing sets, resulting in k model performance measurements. The average of the performance scores is often taken to be the overall score of the model, given its build parameters. For model selection we can search through the model parameters, comparing their cross validation performances. The model parameters leading to the highest performance metric produce the best model.

Spark ML supports k-fold cross validation with a transformation/estimation pipeline to try out different combinations of parameters, using a process called grid search, where you set up the parameters to test, and a cross validation evaluator to construct a model selection workflow.

Below, we use a ParamGridBuilder to construct the parameter grid.

// Search through decision tree's maxDepth parameter for best model 
val paramGrid = new ParamGridBuilder().addGrid(dTree.maxDepth,
Array(2, 3, 4, 5, 6, 7)).build()

We define a BinaryClassificationEvaluator Evaluator, which will evaluate the model according to a precision metric by comparing the test label column with the test prediction column. The default metric is the area under the ROC curve.

// Set up Evaluator (prediction, true label) 
val evaluator = new BinaryClassificationEvaluator()
       .setLabelCol("label")
       .setRawPredictionCol("prediction")

We use a CrossValidator for model selection. The CrossValidator uses the Estimator Pipeline, the Parameter Grid, and the Classification Evaluator. The CrossValidator uses the ParamGridBuilder to iterate through the maxDepth parameter of the decision tree and evaluate the models, repeating 3 times per parameter value for reliable results.

// Set up 3-fold cross validation  
val crossval = new CrossValidator().setEstimator(pipeline)
       .setEvaluator(evaluator)
       .setEstimatorParamMaps(paramGrid).setNumFolds(3) 
val cvModel = crossval.fit(ntrain)

We get the best decision tree model, in order to print out the decision tree and parameters.

// Fetch best model 
val bestModel = cvModel.bestModel
val treeModel = bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel]
.stages(3).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)

Out:

Picture15

//0-11 feature columns: len, iplanIndex, numvmail, tdmins, tdcalls, temins, tecalls, tnmins, tncalls, timins, ticalls, numcs 
println( "Feature 11:" +  featureCols(11))
println( "Feature 3:" +  featureCols(3)) 

Feature 11:numcs
Feature 3:tdmins

We find that the best tree model produced using the cross-validation process is one with a depth of 5. The toDebugString() function provides a print of the tree's decision nodes and final prediction outcomes at the end leaves. We can see that features 11 and 3 are used for decision making and should thus be considered as having high predictive power to determine a customer's likeliness to churn. It's not surprising that these feature numbers map to the fields Customer service calls and Total day minutes. Decision trees are often used for feature selection because they provide an automated mechanism for determining the most important features (those closest to the tree root).

PREDICTIONS AND MODEL EVALUATION

Picture16

The actual performance of the model can be determined using the test data set that has not been used for any training or cross-validation activities. We'll transform the test set with the model pipeline, which will map the features according to the same recipe.

val predictions = cvModel.transform(test)

Picture17

The evaluator will provide us with the score of the predictions, and then we'll print them along with their probabilities.

val accuracy = evaluator.evaluate(predictions) evaluator.explainParams() val result = predictions.select("label", "prediction", "probability") result.show

Out:

accuracy: Double = 0.8484817813765183 
metric name in evaluation (default: areaUnderROC)

Picture18

In this case, the evaluation returns 84.8% precision. The prediction probabilities can be very useful in ranking customers by their likeliness to defect. This way, the limited resources available to the business for retention can be focused on the appropriate customers.

Below, we calculate some more metrics. The number of false/true positive and negative predictions is also useful:

  • True positives are how often the model correctly predicted subscription canceling.
  • False positives are how often the model incorrectly predicted subscription canceling.
  • True negatives indicate how often the model correctly predicted no canceling.
  • False negatives indicate how often the model incorrectly predicted no canceling.

 

val lp = predictions.select("label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count()
val wrong = lp.filter(not($"label" === $"prediction")).count()
val ratioWrong = wrong.toDouble / counttotal.toDouble
val ratioCorrect = correct.toDouble / counttotal.toDouble
val truep = lp.filter($"prediction" === 0.0)
.filter($"label" === $"prediction").count() / counttotal.toDouble
val truen = lp.filter($"prediction" === 1.0)
.filter($"label" === $"prediction").count() / counttotal.toDouble
val falsep = lp.filter($"prediction" === 1.0)
.filter(not($"label" === $"prediction")).count() / counttotal.toDouble
val falsen = lp.filter($"prediction" === 0.0)
.filter(not($"label" === $"prediction")).count() / counttotal.toDouble

println("counttotal : " + counttotal)
println("correct : " + correct)
println("wrong: " + wrong)
println("ratio wrong: " + ratioWrong)
println("ratio correct: " + ratioCorrect)
println("ratio true positive : " + truep)
println("ratio false positive : " + falsep)
println("ratio true negative : " + truen)
println("ratio false negative : " + falsen)

counttotal : 667 correct : 574 wrong: 93 ratio wrong: 0.13943028485757122 ratio correct: 0.8605697151424287 ratio true positive : 0.1184407796101949 ratio false positive : 0.0239880059970015 ratio true negative : 0.7421289355322339 ratio false negative : 0.11544227886056972

CODE

WANT TO LEARN MORE?

In this blog post, we showed you how to get started using Apache Spark’s machine learning decision trees and ML Pipelines for classification. If you have any further questions about this tutorial, please ask them in the comments section below.

Editor's note: This blog post was originally posted in the Converge Blog on June 05, 2017

Related Content

machine learning

Apache Spark

Apache Zeppelin

 

BY Vikash Raja Samuel Selvin

 

Introduction

Elasticsearch (ES) is a search engine based on Lucene. It provides a distributed, multitenant-capable, full-text search engine with an HTTP web interface and schema-free JSON documents. 

Kibana is an open source data visualization plugin for Elasticsearch. It provides visualization capabilities on top of the content indexed on an Elasticsearch cluster. Users can create bar, line, and scatter plots, or pie charts and maps on top of large volumes of data.

These two products are widely used in the market today for data analytics. However, security is one aspect that was not initially built into the product. Since data is the lifeline of any organization today, it becomes essential that Elasticsearch and Kibana be “secured.” In this blog post, we will be looking at one of the ways in which authentication, authorization, and encryption can be implemented for them.

Assumptions

The tutorial assumes the following:

  1. MapR Sandbox is running
  2. Elasticsearch and Kibana have been installed and are running

Options Available for Securing Elasticsearch and Kibana

The most popular options for securing Elasticsearch and Kibana are compared in the table below.

Shield is a security plugin developed by the same company that developed Elasticsearch. It allows you to easily protect this data with a username and password while simplifying your architecture. Advanced security features like encryption, role-based access control, IP filtering, and auditing are also available when you need them.

NGINX is an open source web server. It can act as a proxy server and can do load balancing, among other things. In combination with LUA and external scripts, it can be used for securing Elasticsearch and Kibana. We will be using this approach in this tutorial.

Searchguard is an open source alternative for Shield. It provides almost all the same functionalities as Shield, except for some features like LDAP authentication. However, these features are available in the paid variant.

SHIELDNGINXSEARCHGUARD
A security plugin for Elasticsearch and Kibana made by Elasticsearch.NGINX (pronounced "engine x") is a web server. It can act as a reverse proxy server, load balancer, and an HTTP cache.Search Guard is an Elasticsearch plugin that offers encryption, authentication, and authorization.
Has native support for: 1. Session Management 2. Encrypted Communications 3. Role-based Access Control 4. Audit LoggingRole-based access control is implemented with the help of the LUA module. LDAP authentication can be implemented by the use of external programs.No support for LDAP-based authentication or audit logging is available in the free version.
Pricing for an Elastic Cloud subscription starts at $45/month and includes Shield.Free.
NGINX Plus is the paid version - $1900 / instance
Free. A commercial version is also available.

Installing NGINX

NGINX is an open source web server, focused on high performance, concurrency, and a low memory footprint.

NGINX has been designed with a proxy role in mind from the start, and supports many related configuration directives and options. We will be using NGINX to set up LDAP-based authentication and authorization.

OpenResty™ is a full-fledged web platform by integrating the standard NGINX core, LuaJIT, many carefully written Lua libraries, lots of high quality, third party NGINX modules, and most of their external dependencies.

By taking advantage of various well-designed NGINX features, OpenResty effectively turns the NGINX server into a powerful web app server.

Steps

  1. Download the latest release of “OpenResty” from here:
    http://openresty.org/en/download.html
  2. Build and install the package using the following commands:

    tar xvf openresty-<version>.tar.gz  cd openresty- <version>./configure --prefix=/usr/local/openresty --with-luajit   --with-http_auth_request_module   gmake  gmake install  export PATH=/usr/local/openresty/bin:/usr/local/openresty/nginx/sbin:$PATH</version></version>

Authentication

We will be looking at the following two methods of authentication in this tutorial:

  1. Basic HTTP Authentication
  2. LDAP Authentication

Basic HTTP Authentication

Step 1 — Installing Apache Tools

You'll need the htpassword command to configure the password that will restrict access to Elasticsearch and Kibana. This command is part of the apache2-utils package, so the first step is to install that package.

sudo apt-get install apache2-utils

Step 2 — Setting up HTTP Basic Authentication Credentials

In this step, you'll create a password for the user who should be allowed access to Elasticsearch and Kibana. That password and the associated username will be stored in a file that you specify.

The password will be encrypted and the name of the file can be anything you like. Here, we use the file /opt/elk/.espasswd and the username vikash.

To create the password, run the following command. You'll need to authenticate, then specify and confirm a password.

sudo htpasswd -c /opt/elk/.espasswd vikash

You can check the contents of the newly created file to see the username and hashed password.

cat /opt/elk/.espasswd

Step 3 — Updating the NGINX Configuration

Now that you've created the HTTP basic authentication credential, the next step is to update the NGINX configuration for Elasticsearch and Kibana to use it.

HTTP basic authentication is made possible by the auth_basicand auth_basic_user_file directives.

The value of auth_basic is any string, and will be displayed at the authentication prompt. The value of auth_basic_user_file is the path to the password file that was created in Step 2. Both directives should be added under the location section.

Check if any NGINX processes are running and kill them:

cd /usr/local/openresty/nginx/ sbin/nginx -s stop          (Or)        ps –ef | grep nginx kill -9 <pid1> <pid2> …. <pidn>

Start the NGINX server with this configuration file as given below:

cd /usr/local/openresty/nginx sbin/nginx -p $PWD -c conf/nginx_basic_http_authentication.conf

The contents of the configuration file are given below:

worker_processes  1;  error_log /usr/local/openresty/nginx/logs/lua.log debug;  events {   worker_connections 1024; }  http {   upstream elasticsearch {     server 127.0.0.1:9201;     keepalive 15;   }    upstream kibana {     server 127.0.0.1:5701;     keepalive 15;   }    server {     listen 8881;      location / {       auth_basic           "Protected Elasticsearch";<       auth_basic_user_file /opt/elk/.espasswd;        proxy_pass http://elasticsearch;       proxy_redirect off;       proxy_buffering off;        proxy_http_version 1.1;       proxy_set_header Connection "Keep-Alive";       proxy_set_header Proxy-Connection "Keep-Alive";     }    }    server {     listen 8882;      location / {       auth_basic           "Protected Kibana";       auth_basic_user_file /opt/elk/.espasswd;        proxy_pass http://kibana;       proxy_redirect off;       proxy_buffering off;        proxy_http_version 1.1;       proxy_set_header Connection "Keep-Alive";       proxy_set_header Proxy-Connection "Keep-Alive";     }    }  }

Provided below are the screenshots when the user tries to access Elasticsearch.
Note: NGINX is configured to listen to port 8881 for connections to Elasticsearch and port 8882 for connections to Kibana in this example.

Screenshots showing “evil_user” not having access to Elasticsearch

Since the user is not present in the password file he/she has been redirected to the Login page again.

Screenshot showing that user ‘vikash’ has access to Elasticsearch

LDAP Authentication

Step 1: If an LDAP server is not already running, install and configure one. For the purpose of this example, please follow the instructions at https://github.com/osixia/docker-openldap to setup a LDAP server.

As the LDAP server runs on a Docker container, the below commands will be useful for restarting and performing administrative tasks with it:

docker ps –a – Lists all Docker processes running

docker exec -it <Docker PID> bash – Opens a bash shell on the Docker machine

Step 2: For the purpose of this example, we will be running a backend server written in Python, which serves a Login page, and an LDAP Authentication Daemon written in Python. The Python code files are available in this GitHub repository:

https://github.com/nginxinc/nginx-ldap-auth

Step 3: On the host where the ldap-auth daemon is to run, install the following additional software. We recommend using the versions that are distributed with the operating system, instead of downloading the software from an open source repository.

  • Python version 2. Version 3 is not supported.
  • The Python LDAP module, python-ldap (By python-ldap.org OS project).

Step 4: Provided below is the NGINX Plus configuration file. The important directives are discussed herein.

Filename - /usr/local/openresty/nginx/conf/conf/nginx-ldap-auth.conf

error_log logs/error.log debug;

error_log /usr/local/openresty/nginx/logs/lua.log notice;

events { }

http {
    proxy_cache_path cache/  keys_zone=auth_cache:10m;

    # The back-end daemon listens on port 9000 as implemented
    # in backend-sample-app.py.
    # Change the IP address if the daemon is not running on the
    # same host as NGINX/NGINX Plus.
    upstream backend {
        server 127.0.0.1:9000;
    }

    upstream elasticsearch {
        server 127.0.0.1:9200;
    }

    upstream kibana4 {
        server 127.0.0.1:5601;
    }

    # NGINX/NGINX Plus listen on port 8081 for requests that require
    # authentication. Change the port number as appropriate.

    server {
        listen 8881;

        # Protected application
        location / {

            auth_request /auth-proxy;

            # redirect 401 and 403 to login form
            error_page 401 403 =200 /login;

               auth_request_set $user $upstream_http_LDAPUser;

        access_by_lua_file '/usr/local/openresty/nginx/authorize_es_ldap.lua';

        proxy_pass http://elasticsearch/;
        }

        location /login {
            proxy_pass http://backend/login;
        # Login service returns a redirect to the original URI
            # and sets the cookie for the ldap-auth daemon
            proxy_set_header X-Target $request_uri;
        }

        location = /auth-proxy {
            internal;

            # The ldap-auth daemon listens on port 8888, as set
            # in nginx-ldap-auth-daemon.py.
            # Change the IP address if the daemon is not running on
            # the same host as NGINX/NGINX Plus.
            proxy_pass http://127.0.0.1:8888;

            proxy_pass_request_body off;
            proxy_set_header X-Target 'http://localhost:9200/';
        #proxy_set_header Content-Length "";
            proxy_cache auth_cache;
            proxy_cache_valid 200 403 10m;

            # The following directive adds the cookie to the cache key
            proxy_cache_key "$http_authorization$cookie_nginxauth";

            # As implemented in nginx-ldap-auth-daemon.py, the ldap-auth daemon
            # communicates with an OpenLDAP server, passing in the following
            # parameters to specify which user account to authenticate. To
            # eliminate the need to modify the Python code, this file contains
            # 'proxy_set_header' directives that set the values of the
            # parameters. Set or change them as instructed in the comments.
            #
            #    Parameter      Proxy header
            #    -----------    ----------------
            #    basedn         X-Ldap-BaseDN
            #    binddn         X-Ldap-BindDN
            #    bindpasswd     X-Ldap-BindPass
            #    cookiename     X-CookieName
            #    realm          X-Ldap-Realm
            #    template       X-Ldap-Template
            #    url            X-Ldap-URL

            # (Required) Set the URL and port for connecting to the LDAP server,
            # by replacing 'example.com' and '636'.
            proxy_set_header X-Ldap-URL      "ldap://172.17.0.1:389";

            # (Required) Set the Base DN, by replacing the value enclosed in
            # double quotes.
            proxy_set_header X-Ldap-BaseDN   "dc=example,dc=org";

            # (Required) Set the Bind DN, by replacing the value enclosed in
            # double quotes.
            proxy_set_header X-Ldap-BindDN   "cn=admin,dc=example,dc=org";

            # (Required) Set the Bind password, by replacing 'secret'.
            proxy_set_header X-Ldap-BindPass "admin";

            # (Required) The following directives set the cookie name and pass
            # it, respectively. They are required for cookie-based
            # authentication. Comment them out if using HTTP basic
            # authentication.
            proxy_set_header X-CookieName "nginxauth";
            proxy_set_header Cookie nginxauth=$cookie_nginxauth;

            # (Required if using Microsoft Active Directory as the LDAP server)
            # Set the LDAP template by uncommenting the following directive.
            #proxy_set_header X-Ldap-Template "(SAMAccountName=%(username)s)";

            # (Optional if using OpenLDAP as the LDAP server) Set the LDAP
            # template by uncommenting the following directive and replacing
            # '(cn=%(username)s)' which is the default set in
            # nginx-ldap-auth-daemon.py.
            #proxy_set_header X-Ldap-Template "(cn=%(username)s)";

            # (Optional) Set the realm name, by uncommenting the following
            # directive and replacing 'Restricted' which is the default set
            # in nginx-ldap-auth-daemon.py.
            #proxy_set_header X-Ldap-Realm    "Restricted";

    }
    }

    server {
        listen 8882;

        # Protected application
        location / {

            auth_request /auth-proxy;

            # redirect 401 and 403 to login form
            error_page 401 403 =200 /login;

               auth_request_set $user $upstream_http_LDAPUser;

        access_by_lua_file '/usr/local/openresty/nginx/authorize_kibana4_ldap.lua';

        proxy_pass http://kibana4/;
        }

        location /login {
            proxy_pass http://backend/login;
        # Login service returns a redirect to the original URI
            # and sets the cookie for the ldap-auth daemon
            proxy_set_header X-Target $request_uri;
        }

        location = /auth-proxy {
            internal;

            # The ldap-auth daemon listens on port 8888, as set
            # in nginx-ldap-auth-daemon.py.
            # Change the IP address if the daemon is not running on
            # the same host as NGINX/NGINX Plus.
            proxy_pass http://127.0.0.1:8888;

            proxy_pass_request_body off;
            proxy_set_header X-Target 'http://localhost:5601/';
        #proxy_set_header Content-Length "";
            proxy_cache auth_cache;
            proxy_cache_valid 200 403 10m;

            # The following directive adds the cookie to the cache key
            proxy_cache_key "$http_authorization$cookie_nginxauth";

            # As implemented in nginx-ldap-auth-daemon.py, the ldap-auth daemon
            # communicates with an OpenLDAP server, passing in the following
            # parameters to specify which user account to authenticate. To
            # eliminate the need to modify the Python code, this file contains
            # 'proxy_set_header' directives that set the values of the
            # parameters. Set or change them as instructed in the comments.
            #
            #    Parameter      Proxy header
            #    -----------    ----------------
            #    basedn         X-Ldap-BaseDN
            #    binddn         X-Ldap-BindDN
            #    bindpasswd     X-Ldap-BindPass
            #    cookiename     X-CookieName
            #    realm          X-Ldap-Realm
            #    template       X-Ldap-Template
            #    url            X-Ldap-URL

            # (Required) Set the URL and port for connecting to the LDAP server,
            # by replacing 'example.com' and '636'.
            proxy_set_header X-Ldap-URL      "ldap://172.17.0.1:389";

            # (Required) Set the Base DN, by replacing the value enclosed in
            # double quotes.
            proxy_set_header X-Ldap-BaseDN   "dc=example,dc=org";

            # (Required) Set the Bind DN, by replacing the value enclosed in
            # double quotes.
            proxy_set_header X-Ldap-BindDN   "cn=admin,dc=example,dc=org";

            # (Required) Set the Bind password, by replacing 'secret'.
            proxy_set_header X-Ldap-BindPass "admin";

            # (Required) The following directives set the cookie name and pass
            # it, respectively. They are required for cookie-based
            # authentication. Comment them out if using HTTP basic
            # authentication.
            proxy_set_header X-CookieName "nginxauth";
            proxy_set_header Cookie nginxauth=$cookie_nginxauth;

            # (Required if using Microsoft Active Directory as the LDAP server)
            # Set the LDAP template by uncommenting the following directive.
            #proxy_set_header X-Ldap-Template "(SAMAccountName=%(username)s)";

            # (Optional if using OpenLDAP as the LDAP server) Set the LDAP
            # template by uncommenting the following directive and replacing
            # '(cn=%(username)s)' which is the default set in
            # nginx-ldap-auth-daemon.py.
            #proxy_set_header X-Ldap-Template "(cn=%(username)s)";

            # (Optional) Set the realm name, by uncommenting the following
            # directive and replacing 'Restricted' which is the default set
            # in nginx-ldap-auth-daemon.py.
            #proxy_set_header X-Ldap-Realm    "Restricted";

    }
    }

}

LDAP Settings

 

# URL and port for connecting to the LDAP server
# Use “ldaps://< IP Address of the LDAP Server >:636” if you are using secure LDAP
proxy_set_header X-Ldap-URL "ldap://< IP Address of the LDAP Server > ";

# Base DN
proxy_set_header X-Ldap-BaseDN "cn=admin,dc=example,dc=org";

# Bind DN
proxy_set_header X-Ldap-BindDN "cn=admin,dc=example,dc=org";

# Bind password
proxy_set_header X-Ldap-BindPass "admin";

IP Address for Backend Daemon

If the backend daemon is not running on the same host as NGINX Plus, change the IP address for it in the upstream configuration block:

upstream backend {
    server 127.0.0.1:9000;
}

IP Address for ldap-auth Daemon

If the ldap-auth daemon is not running on the same host as NGINX Plus, change the IP address in this proxy_pass directive:

location = /auth-proxy {
    proxy_pass http://127.0.0.1:8888;
    ...
}

IP Address and Port on Which NGINX Listens

If the client is not running on the same host as NGINX Plus, change the IP address in this listen directive (or remove the address completely to accept traffic from any client). You can also change the port on which NGINX listens from 8081 if you wish:

server {
    listen 127.0.0.1:8081;        
    …
}

Note: Elasticsearch is running on port 9200 and NGINX is listening for connections to Elasticsearch on port 8081.

Step 5: Start the NGINX server, backend and LDAP authentication daemon:

Check if any NGINX server process is already running and kill them:

cd /usr/local/openresty/nginx/
sbin/nginx -s stop

        (Or)

ps –ef | grep nginx
kill -9 <pid1> <pid2> …. <pidn>

Start the NGINX server with the corresponding configuration file:

cd  /usr/local/openresty/nginx/ sbin/nginx -p $PWD -c conf/nginx-ldap-auth.conf

Start the backend server:

cd  /usr/local/openresty/nginx-ldap-auth python backend-sample-app.py

Start the LDAP authentication daemon:

cd  /usr/local/openresty/nginx-ldap-auth python nginx-ldap-auth-daemon.py

Below are screenshots showing the output from the backend, LDAP authentication daemon, and the browser trying to access Elasticsearch.

  1. Trying to access Elasticsearch with invalid credentials
  2. LDAP authentication daemon showing authentication failure (drag image at bottom corner to get a clearer view).
  3. Trying to access Elasticsearch by providing valid credentials
  4. Web browser being redirected to Elasticsearch upon successful authentication
  5. LDAP authentication for CURL requests

Authorization

We will be showing how to implement the following methods of authorization:

  1. Access Control based on LUA scripts
  2. Multi Level Security for different Elasticsearch / Kibana instances

Access Control with LUA Scripts

The _“access_by_luafile” directive in NGINX configuration file is used to specify the path to the LUA file that controls access to a specific resource within Elasticsearch.

Below is a sample LUA script which shows how to allow only user “vikash” to access the index “traffic” and restrict user “swapnil”.

 

-- authorization rules

local restrictions = {
  all  = {
    ["^/$"]                             = { "HEAD" }
  },

  swapnil = {
    ["^/$"]                             = { "GET" },
    ["^/?[^/]*/?[^/]*/_search"]         = { "GET", "POST" },
    ["^/?[^/]*/?[^/]*/_msearch"]        = { "GET", "POST" },
    ["^/?[^/]*/?[^/]*/_validate/query"] = { "GET", "POST" },
    ["/_aliases"]                       = { "GET" },
    ["/_cluster.*"]                     = { "GET" }
  },
  vikash = {
    ["^/$"]                             = { "GET" },
    ["^/?[^/]*/?[^/]*/_search"]         = { "GET", "POST" },
    ["^/?[^/]*/?[^/]*/_msearch"]        = { "GET", "POST" },
    ["^/?[^/]*/traffic*"]               = { "GET", "POST", "PUT", "DELETE" },
    ["^/?[^/]*/?[^/]*/_validate/query"] = { "GET", "POST" },
    ["/_aliases"]                       = { "GET" },
    ["/_cluster.*"]                     = { "GET" }
  },

  admin = {
    ["^/?[^/]*/?[^/]*/_bulk"]          = { "GET", "POST" },
    ["^/?[^/]*/?[^/]*/_refresh"]       = { "GET", "POST" },
    ["^/?[^/]*/?[^/]*/?[^/]*/_create"] = { "GET", "POST" },
    ["^/?[^/]*/?[^/]*/?[^/]*/_update"] = { "GET", "POST" },
    ["^/?[^/]*/?[^/]*/?.*"]            = { "GET", "POST", "PUT", "DELETE" },
    ["^/?[^/]*/?[^/]*$"]               = { "GET", "POST", "PUT", "DELETE" },
    ["/_aliases"]                      = { "GET", "POST" }
  }
}

-- get authenticated user as role
local role = ngx.var.remote_user
ngx.log(ngx.DEBUG, role)

-- exit 403 when no matching role has been found
if restrictions[role] == nil then
  ngx.header.content_type = 'text/plain'
  ngx.log(ngx.WARN, "Unknown role ["..role.."]")
  ngx.status = 403
  ngx.say("403 Forbidden: You don\'t have access to this resource.")
  return ngx.exit(403)
end

-- get URL
local uri = ngx.var.uri
ngx.log(ngx.DEBUG, uri)

-- get method
local method = ngx.req.get_method()
ngx.log(ngx.DEBUG, method)

local allowed  = false

for path, methods in pairs(restrictions[role]) do

  -- path matched rules?
  local p = string.match(uri, path)

  local m = nil

  -- method matched rules?
  for _, _method in pairs(methods) do
    m = m and m or string.match(method, _method)
  end

  if p and m then
    allowed = true
    ngx.log(ngx.NOTICE, method.." "..uri.." matched: "..tostring(m).." "..tostring(path).." for "..role)
    break
  end
end

if not allowed then
  ngx.header.content_type = 'text/plain'
  ngx.log(ngx.WARN, "Role ["..role.."] not allowed to access the resource ["..method.." "..uri.."]")
  ngx.status = 403
  ngx.say("403 Forbidden: You don\'t have access to this resource.")
  return ngx.exit(403)
end
cd /usr/local/openresty/nginx/
sbin/nginx -s stop

        (Or)

ps –ef | grep nginx
kill -9 <pid1> <pid2> …. <pidn>

Start NGINX service with these configurations:

cd /usr/local/openresty/nginx/ sbin/nginx -p $PWD -c conf/nginx_authorize_by_lua.conf

Screenshots showing access being denied to the “traffic” index for user ‘swapnil’

URL: http://localhost:8881

Screenshots showing user ‘swapnil’ being able to access other resources other than “traffic”

Screenshot showing that user ‘vikash’ has access to “traffic” index

Multilevel Security (MLS)

The problem with the above approach is that we cannot replicate that process for Kibana. Kibana gets all its data from Elasticsearch, and it rewrites all the URLs internally, so we no longer know what are the URLs and hence cannot write rules for them.

In situations like these, it is better to go for MLS approaches. Accordingly, we will be having three different instances of Elasticsearch and Kibana, each corresponding to different levels of clearance, and thus we solve the problem of authorization. Authentication for these instances can be either Basic HTTP Authentication or LDAP-based.

Step 1: Setup multiple instances of Elasticsearch (ES) and expose one common URL to the end users. In this case, it was set to http://localhost:8081.

For this experiment, three Elasticsearch instances were set up, each listening to a different port on the local machine.

ES Node 1 – http://localhost:9201 ES Node 2 – http://localhost:9202 ES Node 3 – http://localhost:9203

Create three different Elasticsearch configuration files, one for each of these instances, and run them using the following command:

ES_HOME=/opt/elk/elasticsearch-1.4.4/

nohup $ES_HOME/bin/elasticsearch \
-Des.config=$ES_HOME/config/elasticsearch_node1.yml >>  /tmp/elasticsearch_node1.out \ 2>&1 &

nohup $ES_HOME/bin/elasticsearch \
-Des.config=$ES_HOME/config/elasticsearch_node2.yml >>  /tmp/elasticsearch_node2.out \ 2>&1 &

nohup $ES_HOME/bin/elasticsearch \
-Des.config=$ES_HOME/config/elasticsearch_node3.yml >>  /tmp/elasticsearch_node3.out \ 2>&1 &

 

A sample configuration file for Elasticsearch is given in the resource section. The name of the file is :

/opt/elk/elasticsearch-1.4.4/config/elasticsearch_node1.yml

Step 2: Setup multiple instances of Kibana, one for each level of clearance and expose one URL for the end users; in this case, it was set to http://localhost:8082.

For this experiment, three Kibana instances were set up, each listening to a different port on the local machine corresponding to a different clearance level.

Kibana Node 1 – http://localhost:5701 - Top Secret – Connects to ES Node 1 
Kibana Node 2 – http://localhost:5702 - Secret – Connects to ES Node 2
Kibana Node 3 – http://localhost:5703 - Public – Connects to ES Node 3

Run them using the following command:

nohup /opt/elk/kibana_nodes/kibana_node1/bin/kibana > /tmp/kibana_node1.out 2>&1 &
nohup /opt/elk/kibana_nodes/kibana_node2/bin/kibana > /tmp/kibana_node2.out 2>&1 &
nohup /opt/elk/kibana_nodes/kibana_node3/bin/kibana > /tmp/kibana_node3.out 2>&1 &

A sample configuration file for Kibana is given in the resource section. The name of the file is :

/opt/elk/kibana_nodes/kibana_node1/config/kibana.yml

Step 3: Setup a NGINX proxy listening for connections to either Elasticsearch (ES) or Kibana (NGINX will be listening on http://localhost:8081 for connections to ES and http://localhost:8082 for connections to Kibana). Kindly look at the File “Installing NGINX And LDAP Authentication.docx” in the zip for step by step instructions on how to setup the same.

Authenticate the user against an LDAP database and check what level of authorization he/she has based on information read from a database (in this case, a local file which contained a listing of users and authorization levels was used), and redirect the user to the appropriate Elasticsearch / Kibana server.

LDAP Server – Runs on a Docker container (https://github.com/osixia/docker-openldap)

User-Roles Database – The file is provided in the resources section (File Name - “user_authorization_level.dat”)

NGINX Config File – The file is provided in the resources section (File Name – “nginx-ldap-auth-clusters.conf”)

Python Daemon to check against LDAP Server (File Name “nginx-ldap-auth-daemon.py”)

Python Backend Login Form (File Name “backend-sample-app.py”)

Both the above Python files can be found at: https://github.com/nginxinc/nginx-ldap-auth

Step 4: Start the NGINX server as given below:

Check if any NGINX server processes are already running and kill them:

 cd /usr/local/openresty/nginx/
sbin/nginx -s stop

        (Or)

ps –ef | grep nginx
kill -9 <pid1> <pid2> …. <pidn>

 

Start the backend server:

cd  /usr/local/openresty/nginx-ldap-auth 
python backend-sample-app.py

Start the LDAP authentication daemon:

cd  /usr/local/openresty/nginx-ldap-auth 
python nginx-ldap-auth-daemon.py

Check if any NGINX server processes are already running and kill them:

cd /usr/local/openresty/nginx/
sbin/nginx -s stop

        (Or)

ps –ef | grep nginx
kill -9 <pid1> <pid2> …. <pidn>

 

Start the NGINX server with the corresponding configuration file:

cd /usr/local/openresty/nginx/ 
sbin/nginx -p $PWD -c conf/nginx-ldap-auth-clusters.conf

Screenshots:

The user “admin” is logged in and he has “Top Secret” clearance level. The Kibana server corresponding to this is http://localhost:5701 which is connected to the Elasticsearch server http://localhost:9201 which has only the index “shakespeare” in it.

Elasticsearch showing that only “shakespeare” index is available on it


Log file showing that “user” admin has Top Secret “clearance” level

The user “vikash” is logged in and he has “Secret” clearance level. The Kibana server corresponding to this is http://localhost:5702which is connected to the Elasticsearch server http://localhost:9202 which has the indices “logstash*” in it.

Elasticsearch showing that only “logstash*” indices are available on it

Log file showing that “user” vikash has only Secret “clearance” level

The user “swapnil” is logged in and he has “Public” clearance level. The Kibana server corresponding to this is http://localhost:5703 which is connected to the Elasticsearch server http://localhost:9203 which has only the index “bank” in it.

Elasticsearch showing that only “bank” index is available on it

Log file showing that “user” swapnil has only public “clearance” level

Note: All the index mappings and data was downloaded from https://www.elastic.co/guide/en/kibana/current/getting-started.html

ENCRYPTION

If you noticed carefully you would have figured out that we have been using only “http” so far. In production environments, we would often want to use “https” as this encrypts all the data and prevents attackers from stealing the information. The tutorial below walks you through the steps needed to use “https” protocol.

Step 1: Create a self-signed SSL certificate.

cd /usr/local/openresty/nginx

mkdir certs

cd certs

openssl genrsa 2048 > host.key

openssl req -new -x509 -nodes -sha1 -days 3650 -key host.key > host.cert

openssl x509 -noout -fingerprint -text < host.cert > host.info

cat host.cert host.key > host.pem

 

Step 2: Add the information related to the certificate to the NGINX configuration file.

 

  ssl on;
  ssl_certificate /usr/local/openresty/nginx/certs/host.cert;
  ssl_certificate_key /usr/local/openresty/nginx/certs/host.key;
  ssl_session_timeout 5m;
  ssl_protocols TLSv1.2 TLSv1.1 TLSv1;
  ssl_ciphers HIGH:!aNULL:!eNULL:!LOW:!MD5;
  ssl_prefer_server_ciphers on;

Note: The entire NGINX configuration file is provided below for your reference.

NGINX listens to port 8080 for connections to ES and it implements Basic HTTP Authentication

NGINX Config File

 

worker_processes  1;

error_log /usr/local/openresty/nginx/logs/nginx_https.log debug;

events {
  worker_connections 1024;
}

http {

    upstream elasticsearch {
          server 127.0.0.1:9200;
          keepalive 15;
    }

    server {
          listen 8080;
          keepalive_timeout   60s;
          ssl on;
          ssl_certificate /usr/local/openresty/nginx/certs/host.cert;
          ssl_certificate_key /usr/local/openresty/nginx/certs/host.key;
          ssl_session_timeout 1m;
          ssl_protocols TLSv1.2 TLSv1.1 TLSv1;
          ssl_ciphers HIGH:!aNULL:!eNULL:!LOW:!MD5;
          ssl_prefer_server_ciphers on;

          auth_basic "ElasticSearch";
          auth_basic_user_file /opt/elk/.espasswd;

          location / {
                proxy_pass http://elasticsearch;
                proxy_http_version 1.1;
                proxy_set_header Connection "Keep-Alive";
                proxy_set_header Proxy-Connection "Keep-Alive";
          }
    }
}

Since this is a self-signed certificate, you will see a “red” https icon that is crossed out. If you use a certificate from a trusted third party like Verisign, you will see that it turns to a “green” icon.

In this blog post, you learned about one of the ways in which authentication, authorization, and encryption can be implemented for Elasticsearch and Kibana. If you have any questions, please ask them in the comments section below.

 

Editor's Note: This post was originally posted in the Converge Blog on September 12, 2016

By Nick Amato

 

One of the challenges of connecting to multiple data sources in Hadoop is managing the proliferation of modules that must be loaded into an application to access them -- each with their own implementations to learn, code libraries to manage, and caveats to consider. There is (very often) more than one way to connect to the same data, in the same database, within the same language.

A lot of this goes away if you use Apache Drill -- by presenting a unified, schema-free ODBC interface to all of the common data sources in Hadoop, like files in HDFS, Hive, NoSQL databases, Amazon S3 buckets and more. If you’re a Tableau user, Drill provides a quick and easy way to run SQL queries on Hadoop-scale data with a distributed query engine.

But wait… did you know you can programmatically do the same thing from within an application or script, without a BI tool? Yes you can! In this post we will look at three different ways to do it, and you can pick the example that suits your favorite language: with Python via pyodbc, with R via RODBC, and Perl via DBD::ODBC. From within applications built in these languages (and others, this is just a sampling), Drill enables you to use SQL on data that’s ordinarily too large to manipulate directly with a script, connect to multiple Hadoop sources under one interface, and easily navigate semi-structured data without building and maintaining a schema.

Let’s look at how to do this using the pre-loaded example data on the MapR Drill sandbox. The overall view of this data is described here. These examples will make use of a subset of what’s available in the sandbox to keep things concise:

  • A Hive tabled called ‘orders’, which contains the order ID, customer ID, products, date, etc. and other details about a particular order.

  • A CSV file, ‘customers.all.csv’ which contains information about a specific customer, keyed by customer ID.

Prerequisite Steps: Setting up ODBC on Linux

In this example I will use ODBC and scripts running on an Ubuntu Linux 14.04.1 LTS desktop machine; the steps will be similar for other flavors. If you’re running scripts on a Mac, the steps are a little different; consult the Drill ODBC Mac page for up-to-date procedures on getting ODBC running. The good news is that once the configuration of unixODBC is complete, the connection to Drill is available to all of the languages we’ll use without any additional steps.

  1. A single node or cluster running Drill is assumed. Consult these pages to get your own sandbox running with a few minutes of setup, or here for the quick-start documentation on the Apache site for installing it on a Hadoop cluster.

  2. On the machine where you are running Python, R or Perl scripts, install the Drill ODBC driver following the instructions here. You can install the downloaded RPM file with ‘rpm -i ’ and you may have to add the --nodeps flag to the end if you get a warning about /bin/sh being a dependency.

  3. Copy the ODBC configuration files from the driver ‘Setup’ directory into your home directory. For example, assuming your driver was installed in /opt/mapr, this shell snippet will copy the files:

for f in $(ls /opt/mapr/drillodbc/Setup/); do cp /opt/mapr/drillodbc/Setup/$f ~/.$f; done

Follow the remaining instructions here for configuring the driver. At a minimum you will need to: edit the file ~/.odbc.ini to configure either a direct mode connection (directly to a Drillbit) or a cluster mode connection with your zookeeper quorum details.

I used the following ~/.odbc.ini file for the subsequent examples, which sets up a DSN (Data Source Name) called ‘drill64’. This is important because the DSN we pass in each script must match the name in this file.

In this case the source is configured to connect to host ‘maprdemo’ for zookeeper. For ZKClusterID you can usually use the name yourcluster-drillbits. If you’re using the sandbox, the value for ZKClusterID should be mapr_demo_com-drillbits as shown below.

It’s a good idea to add an entry to /etc/hosts for ‘maprdemo’ to make sure the name resolves.

[ODBC] Trace=no

[ODBC Data Sources]

[drill64]
# This key is not necessary and is only to give a description of the data source.
Description=MapR Drill ODBC Driver (64-bit) DSN

# Driver: The location where the ODBC driver is installed to.
Driver=/opt/mapr/drillodbc/lib/64/libmaprdrillodbc64.so

ConnectionType=ZooKeeper ZKQuorum=maprdemo:5181
ZKClusterID=mapr_demo_com-drillbits
AuthenticationType=No Authentication
Catalog=DRILL
Schema=

 

Install a Linux ODBC driver on the same machine (running the scripts), if one is not already present.  On Ubuntu, you can do this with 'apt-get install unixodbc-dev'.  On other systems, do this with your preferred package manager.

If you get an error saying ‘file not found’ from unixODBC, depending on your Linux configuration you may need to put the absolute path to your libodbcinst library by editing the file ~/.mapr.drillodbc.ini and setting:

ODBCInstLib=/usr/lib/x86_64-linux-gnu/libodbcinst.so.1.0.0

Finally, you may also need to add the following to your ~/.bashrc:

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/mapr/drillodbc/lib/64:/usr/lib

Connecting to Drill in Python and Querying Multiple Hadoop Sources

Using Drill from within Python scripts opens up a new world of data analysis capabilities by coupling the distributed query power of Drill with all of the Python open source modules and frameworks available like numpy and pandas. The following code snippet connects to Drill and runs a query on the Hive table ‘orders’, joined with a CSV-formatted customer database. Let’s say we want to see all customers with orders totaling at least $3000 to send them a special offer:

import pyodbc
from pandas import *

# initialize the connection
conn = pyodbc.connect("DSN=drill64", autocommit=True)
cursor = conn.cursor()

# setup the query and run it
s = "SELECT c.columns[1] as fullname, SUM(o.order_total) as ordertotal " \
    "FROM hive.orders o INNER JOIN " \
    "dfs.`root`.`customers.all.csv` c " \
    "ON CAST(o.cust_id as BIGINT) = CAST(c.columns[0] as BIGINT)" \
    "GROUP BY c.columns[1] ORDER BY ordertotal desc"

# fetch and display filtered output
cursor.execute(s)
b = Series(dict(cursor.fetchall()))
print b[b > 3000]

You can see in this example we created a pandas Series from the output of Drill, which was a result of a distributed query on the data sources. Both data sources were referenced as “tables” in our SQL syntax. We then did a quick filter on the output. Notice we didn’t have to add any modules to talk to Hadoop data sources -- Drill did it all for us and we even did a join across more than one source.

Connecting to Drill in R

Let’s see how to do the same thing in R. We’ll use the RODBC package and install it directly from within the R interpreter (trust me, this is easier than installing it by hand):

> install.packages("RODBC")

This should run a configure script and do a download and build of the package and install it system-wide (if run as root). Using the following script (run with ‘Rscript’) we run a similar query, importing the results into an R DataFrame.

library(RODBC)

# initialize the connection
ch <- odbcConnect("drill64")

# run the query
df = sqlQuery(ch, paste("SELECT columns[2] as state, COUNT(*) as ",
"countbystate from `dfs`.`root`.`customers.all.csv` ",
" GROUP BY columns[2] ORDER BY countbystate DESC"))
df$perctotal <- df$countbystate / sum(df$countbystate) * 100

print(paste0("Total: ", sum(df$countbystate)))
print(head(df, 10))

# close the connection so we don't get a warning at the end
odbcClose(ch)

We can then connect to the driver within Perl and fetch rows in a similar fashion. This query will return total sales by member level (basic, silver, gold) and gender.

#!/usr/bin/perl
use DBI;

$dbh = DBI->connect('dbi:ODBC:DSN=drill64');

# setup the query and run it
my $results = $dbh->prepare(
"SELECT c.columns[6] as level," .
"c.columns[3] as gender," .
"SUM(o.order_total) as ordertotal " .
"FROM hive.orders o INNER JOIN " .
"dfs.`root`.`customers.all.csv` c " .
"ON CAST(o.cust_id as BIGINT) = CAST(c.columns[0] as BIGINT)" .
"GROUP BY c.columns[6], c.columns[3] ORDER BY ordertotal desc");

$results->execute();

# fetch and display the value
while (my @row = $results->fetchrow_array) {
print "@row\n";
}

As in the other examples, this is a synchronous version where the script waits for the result, and other ways of doing this (including asynchronously) are documented in the CPAN page above, but this gives you the general idea for how to get connected and run queries.

Conclusion and Next Steps

You’ll notice that in all these examples I connected to a few different Hadoop data sources (a CSV file in HDFS and a Hive table), only loaded one package/library in each language, and wrote a single query across the sources as tables -- this shows why Drill simplifies and enhances connecting to data in Hadoop and is a time-saver when analyzing data in applications and scripts at scale.

For use as a reference, all three of these code examples are available on github here and can be run against the sandbox VM or your own cluster. These aren’t the only languages that have ODBC bindings and can connect to Drill (there are many more!) but hopefully these examples can help get you started accessing Drill from within an application or script.

Editor's note: Article originally posted in Converge Blog on June 29, 2015

 

RELATED

Apache Drill

Free Code Fridays: Discover Drill Custom Functions

Apache Drill Best Practices from the MapR Drill Team

r python

By Onkar Ambekar

This is the first installment in our blog series about deep learning. In this series, we will discuss the deep learning technology, available frameworks/tools, and how to scale deep learning using big data architecture.

Machine Learning: Algorithms that can learn

Machine learning is a branch of computer science that studies the algorithms that learn with the help of observational data but without explicit programming. In other words, the goal is to design algorithms that can learn automatically without the intervention of humans. In general, Machine Learning can be considered as a subfield of AI.

Machine learning, in a broader sense, can be categorized as:

  1. Classification: When we classify, we wish to categorize/predict objects into fixed, predefined categories (e.g., fraudulent transaction vs. non-fraudulent transaction or predicting the objects seen by an autonomous car as pedestrians, traffic signs, etc.).
  2. Regression: We are trying to predict a real value (e.g., how much will the demand for a product be in the next quarter, or when will a machine have a failure, etc.).

In the data analytics field, both of these categories are heavily used, particularly with the advent of big data. As more and more data is generated and stored, predictive analytics or machine learning is used in autonomous vehicles, healthcare, banking, telecommunication, smart cities, smart factories, and across all the verticals to solve multiple use cases.

Neural Networks: Human-like learning

Neural Networks–or as they are more appropriately called, Artificial Neural Networks (ANN)–were invented in 1940 by McCulloch, Pitts, and Hebbian. ANN was largely based on the behavior of the axions found in the human brain. ANNs are composed of large number of highly interconnected neurons (a processing unit) that solves a specific problem by working in unison. ANNs learn to solve a specific problem by first learning through examples, like humans do. ANNs are problem specific, as their learning is restricted, but they can be applied to all sorts of classification or pattern recognition problems.

ANNs were famous for their remarkable ability to derive meaning from complicated and imprecise data. ANNs have been widely studied since their inception and have also been used widely. Over a period of time, their efficacy was questioned due to limitations posed by ANNs, namely the utility and hardware requirements. Utility of ANNs was restricted to solving “toy” problems, and they were not able to solve real and more complex problems. Hardware issues also restricted their usage as a large amount of memory and computational power was required to effectively implement large ANNs. There were other theoretical objections that ANNs do not necessarily reflect the function of neurons that are seen in the human brain. The restrictions posed by ANNs paved the way to different algorithms, such as Support Vector Machines, Random Forests, Compressed Sensing, etc., but recently research in the field of Deep Learning has again brought attention back to this field.

Deep Learning: neural networks with multiple layers

Deep Learning is part of a broader family of Machine Learning methods, which uses a cascaded structure of what is known as hidden layers of neural networks. The difference between shallow neural networks vs. deep neural networks is the number of hidden layers (i.e., in shallow neural networks, the number of hidden layers are few, while in deep neural networks, it is high). Although artificial neural networks (shallow) have been in existence since the 1940s, only recently has research taken off on deep learning. The reasons are threefold:

  1. Computational power: Until the invention of GPUs, computational power was restricted to CPUs, even more so when Neural Networks were invented. The computational power required for Deep Architecture is enormous. Even with today’s CPUs training, Deep Architecture with a moderate amount of data may take couple of days, while on GPUs, it is drastically reduced to couple of hours.
  2. Size of the data: We are facing a data deluge, due to so many data sources (sensors, internet, etc.). In the days of the inception of Neural Networks (i.e., 1980s), data was sparse, and training on such sparse data with Deep Architecture wasn’t recommended. In contrast, in today’s world, we speak of peta/zeta bytes of data. Big data has changed the paradigm for the development of machine learning algorithms.
  3. Handling of parameter over/underflows: One of the inherent success factors behind deep learning is handling of the over/underflows of the parameters that are optimized. The more hidden layers, the more these parameters need optimization. This optimization of parameters, as the number of hidden layers grows, tends to overflow or underflow (i.e., –Inf to +Inf). Recently, algorithms have been developed to counter the under/overflows of the parameters.

Deep learning has been made possible, thanks to the above factors. In recent years, Deep Learning has been used in nearly all possible applications: object recognition in images, automated machine translation, drug discovery, recommendation engines, biomedical informatics, NLP, etc.

Although deep learning inherently has properties of distributed computing, it has still been used on machines that have high-end GPUs. These multi-GPU single machines are very expensive, and as data gets larger, single machines aren’t capable of holding it. The big data world, and MapR in particular, brings the alternative of distributed computing (GPUs/CPUs), using commodity hardware. Peta/zeta bytes of data can be easily stored, hence providing a platform where deep learning can run at scale.

Deep Learning Landscape

Multitudes of open-source libraries offer the deep learning functionality. The most prominent ones are listed below:

Open-source libraries

Among the available libraries, TensorFlow and Caffe provide a higher level of abstraction. This is helpful to people who lack an in-depth skill-set of deep learning or machine learning. A higher level of abstraction also enables the developers to transcend the intricacies of tuning the hidden layers or reinventing the wheel. TensorFlow is an open-source project from Google, while Caffe is from Berkley’s BVLC lab.

Deeplearning4j has been well known among the developers. However, presumably due to restricted language support other than Java and Scala, its popularity hasn’t grown.

TensorFlow is becoming more popular among the developers and the industry alike, as it provides a higher level of abstraction, is stable, and is perceived as production ready.

Scaling deep learning: Using Distributed Computing Frameworks

Apache Spark is the de-facto choice for a distributed computational framework for the big data world. It is an open-source framework that has been adopted across all verticals. The current Spark stack has Spark SQL, Spark Streaming, GraphX, and native MLLib, which is a library for conventional Machine Learning algorithms. Spark doesn’t have any Deep Learning Library in its stack, though.

Spark’s lack of support for Deep Learning on its stack has been a challenge for the big data community that is willing to explore Deep Learning frameworks. On a positive note, in October 2016, HDFS support was introduced by TensorFlow. Yet it needed separate clusters for running the application. This inherently comes at a higher cost of cluster installation, and at scale it means significant latency.

There are some initiatives started by open-source community to address the said limitations by binding TensorFlow on top of Spark framework. One such initiative is SparkNet, which launches TensorFlow networks on Spark executors. Recently, TensorFrames ( i.e., TensorFlow + DataFrames) was proposed, a seemingly great workaround, but in its current state, it is still in development mode, and migrating current TensorFlow projects to TensorFrames framework demands significant efforts. Another aspect is that Spark is a synchronous computing framework, and deep learning, on the other hand, is an asynchronous one; therefore, a significant change is imposed by TensorFrames on computing with TensorFlow.

A recently published framework called TensorFlowOnSpark (TFoS) addresses the above mentioned problems. TFoS enables execution of TensorFlow in distributed fashion on Spark and Hadoop clusters. TFoS can read data directly from HDFS via TensorFlow’s file readers or using QueueRunners and then feed it to a TensorFlow graph. This flexibility helps users of TensorFlow migrate to the big data environment relatively easily.

TFoS supports all sorts of TensorFlow programs. It also enables synchronous and asynchronous ways of training and inferencing. TFoS supports model parallelism and data parallelism. It also supports TensorFlow ecosystem tools, such as TensorBoard on a Spark Cluster. TFoS is expected to work seamlessly with Spark stack (i.e., SparkSQL, MLlib, etc.) The biggest advantage of TFoS is that the programming paradigm hasn’t changed for TensorFlow, and migration from TensorFlow to TFoS is easy. Another advantage of TFoS in terms of scaling is that the architecture is designed such that process-to-process communication doesn’t involve Spark drivers, which enables the program to scale easily.

It seems that TFoS provides a solution which is scalable and provides the advantage of a big data compute framework with the best available deep learning library, TensorFlow.

Scaling Deep Learning: Using Containerization

Recent advancement in container technology has given rise to microservices architecture. The same technology has also been used for implementation of customized applications on top of big data frameworks, as they are application agnostic. Deep learning can benefit from containerization technology. Deep learning library-based programs can be containerized and given access to petabytes of data, which reside on big data file systems such as MapR-FS. To distribute the learning, multiple containers that are running on multiple machines can be orchestrated using technologies such as Kubernetes. Container-based scaling has a significant advantage over Spark-based scaling, because a Spark cluster beyond a certain point (>100 nodes) requires significant tuning, but tools such as Kubernetes use containers to address those limitations. Kubernetes via containers has the potential to exploit the heterogeneous hardware (GPUs and CPUs). We’ll be writing more about scaling deep learning using containers in the following blog.

Editor's note: This article was originally shared in the Converge Blog on April 26, 2017

Related

TensorFlow on MapR Tutorial 

Distributed Deep Learning on the MapR Converged Data Platform

Apache Spark Machine Learning Tutorial

Anomaly Detection Using Metrics and Exception Logs | Whiteboard Walkthrough

Some Important Streaming Algorithms You Should Know About  

 

 

containers tensorflow deep learning Apache Spark

BY Dong Meng

 

Introduction

A time series is a collection of observations (x~t~), where x is the event recorded at time t. Common motivations for time series analysis include forecasting, clustering, classification, point estimation, and detection (in signal process domain).

With the prevalence of sensor technologies, the popularity of the Internet of Things (IoT) is trending. In a highly-distributed IoT scenario (autonomous driving, oil drilling, healthcare wearables), data with timestamps will be streaming back to your data center and stored. Today, the value of data is higher than the value of the IoT technology. If you can leverage the data upon arrival into your data center, rather than wait for a certain period, and engage in exploratory analysis on that data, you will gain more value from that information and be able to make an impact faster.

MapR Time Series Quick Start Solution

The aim of MapR is to solve the time series data collection and forecasting problem at scale. The applications that form the technology stack are MapR Streams (streaming the event data into your data center), OpenTSDB (storing the data in a high performance time series database) and Spark (data processing and forecasting). A high-level diagram of the workflow appears below:Picture 1

MapR Streams is the integrated publish/subscribe messaging engine in the MapR Converged Data Platform. Producer applications can publish messages to topics (i.e., logical collections of messages) that are managed by MapR Streams. Consumer applications can then read those messages at their own pace. All messages published to MapR Streams are persisted, allowing future consumers to “catch-up” on processing and analytics applications to process historical data. In addition to reliably delivering messages to applications within a single data center, MapR Streams can continuously replicate data between multiple clusters, delivering messages globally. Like other MapR services, MapR Streams has a distributed, scale-out design, allowing it to scale to billions of messages per second, millions of topics, and millions of producer and consumer applications. Find more information on MapR Streams here.

OpenTSDB is an open source scalable time series database with HBase as the main back-end. Since MapR-DB implements HBase API, MapR-DB serves as the back-end in this quick start solution, instead. The high performance achieved by OpenTSDB is due to the following optimizations, specifically targeted at time series data:

  1. A separate look-up table is used to assign unique IDs to metric names and tag names in the time series;
  2. The number of rows is reduced by storing multiple consecutive data points in the same row, so it seeks faster when reading.

On MapR, the performance benchmark can be as high as 100 million data points ingested per second (link).

Apache Spark provides us with the capacity to harness MapR Streams and provide data processing/parsing functions while training machine learning models with multivariate time series regression algorithms. Our Spark streaming code will pick up the data from MapR Streams, briefly process them, and write them to OpenTSDB; meanwhile, the machine learning model is fit to the data and writes the prediction into OpenTSDB as well.

In our example, we used gas sensor data from the UCI machine learning repository (link). With this dataset, we try to predict the ethylene level based on 16 sensors that monitor the gas content. The exploratory plot below shows the time series for 16 sensor readings:Picture 2

We use basic linear regression to regress on some auto-regressor features as well as some second derivative features. It is also good practice to look into the seasonality and stationarity of the time series data and apply smoothing/differentiation algorithms to prepare the data for processing. For a target with obvious on/off status, we could also consider combining a regression model and binary classification model to obtain a better RMSE.

The screenshot below gives an example from the UI for openTSDB:Picture 3

The metrics name in the data is stored as tags in OpenTSDB. In this figure, the blue and purple lines are two feature metrics, r15 and r16. The green line is the target time series, and the red line is our prediction: notice how the red and green lines track very closely. OpenTSDB provides options for automatically refreshing this dashboard.

Summary

The focus of this article is on the workflow, while the algorithm applied can be customized, given the distribution of the data and requirement of business. I have packaged the quick start solution to extend MapR 5.2 Docker container for demo purposes. You can launch the demo from your laptop, if you have Docker installed, and follow the steps in my Docker hub link.

There is a recording of this demo, showing how the Docker image works. It requires some time to start, due to the MapR and OpenTSDB services plus the MapR Streams and Spark applications. The video can be viewed at on YouTube.

Editor's Notes: Article originally posted in the Converge Blog on May 02 2017.

Additional resources

Persistence in the Age of Microservices: Introducing MapR Converged Data Platform for Docker

Getting Started with MapR Client Container

MapR-XD - PACC

containers 

Using an Embedded Metastore (Like MySQL or Oracle) to Connect Directly to a Metastore Database 

 

When your Hive Client (Hive CLI) connects directly to a metastore database, without using hive.metastore.uris, then Hive CLI will be using an embedded metastore to run its Hive job. During the upgrade process, if you have missed upgrading Hive CLI and are using a lower version, but your metastore database is already upgraded to the latest schema version using metastore scripts like (upgrade-<OLDVERSION>-to-<NEWVERSION>.mysql.sql, it can still connect. The old version of Hive Client will still be able to talk to the metastore database by modifying the schema to version, even though Hive has come up with property (hive.metastore.schema.verification) that can restrict schema modification. However, this property’s default value changes in different versions of Hive.    

 

Example 1:

Let’s say you are upgrading from Hive 1.2 to Hive 2.1.0. You have missed upgrading a few Clients, and therefore some of the nodes are still in Hive 1.2. And you have used the schematool to upgrade the metastore database. Now, after an upgrade of schema, your new schema will have its Version Table changed to the new Hive version 2.1. And your hive-site.xml is having the below properties: 

  1. jdo.option.ConnectionURL
  2. jdo.option.ConnectionDriverName
  3. jdo.option.ConnectionUserName
  4. jdo.option.ConnectionPassword

When you accidentally invoke the Hive Client that is on a lower Hive version (like Hive 1.2), Hive CLI will still connect to the metastore database by doing schema changes, using hive-metastore-1.2.0-mapr-*.jar.

 

Below is the Hive information message:

ERROR [main]: metastore.ObjectStore (ObjectStore.java:checkSchema(6691)) - Version information found in metastore differs 2.1.0 from expected schema version 1.2.0. Schema verification is disabled hive.metastore.schema.verification so setting version.

 

When checked, the metastore MySQL Version Table is now reflecting the Hive version 1.2. The below screenshot explains it:

 

When a user tries Hive Client using an upgraded Hive CLI (like Hive 2.1), then Hive CLI errors out with the below message:

Caused by: MetaException(message:Hive Schema version 2.1.0 does not match metastore's schema version 1.2.0 Metastore is not upgraded or corrupt)

This is because the metastore database version is already downgraded by the lower version Hive Client (Hive 1.2), and when the new version of Hive CLI tries to connect to the metastore database, it fails to connect.

 

Example 2: 

A similar issue is observed when Spark Client (like SPARK-SUBMIT,SPARK-SQL) connects to the Hive metastore database using hive-metastore-1.2.0-mapr-*.jar and having Hive metastore SCHEMA_VERSION=2.1.0 in its Version Table. This behavior is observed in Spark 2.1.0, because the Spark package is shipped with Hive 1.2 jars:
 

 

  

A user not using a remote Hive metastore service and neither schema verification will connect directly to the Hive metastore database (property hive.metastore.uris for remote metastore in hive-site.xml under Spark conf directory). Spark Client will be able to modify the schema of the Hive metastore database.

 

Remediation: 

1) The metastore version is controlled by a parameter (hive.metastore.schema.verification), which is default false in Hive 1.2 and below versions. This allows the metastore Client to connect a metastore database by doing the schema modification. Whereas in Hive 2.1, hive.metastore.schema.verification is true by default, which prevents Hive Client from changing the schema of the metastore database. Hence, when you try a connection to the Hive metastore database (which is already modified by Hive 1.2 Client) from Hive 2.1.0 Client, it errors out, even though the user hasn't used a hive.metastore.schema.verification, like Hive 1.2.

 

Best practice: 

i) Make sure hive.metastore.schema.verification is always set to true in hive-site.xml, when you are in Hive 1.2 and lower.

ii) Make the metastore database into read-only tables, so that the database administrator can control the upgrade of the tables during and after the upgrade process. Below are the grant privileges needed for Hive tables (tables may differ for different versions of Hive), so that you can prevent accidental changes to the schema of the Hive metastore database.

 

Grant privileges

Metastore DB tables

SELECT,INSERT,UPDATE,DELETE

BUCKETING_COLS ,CDS ,COLUMNS_V2 ,COMPACTION_QUEUE ,COMPLETED_TXN_COMPONENTS ,
DATABASE_PARAMS ,DBS ,DB_PRIVS ,DELEGATION_TOKENS ,FUNCS ,FUNC_RU ,GLOBAL_PRIVS ,
HIVE_LOCKS ,IDXS ,INDEX_PARAMS ,MASTER_KEYS ,NEXT_COMPACTION_QUEUE_ID ,
NEXT_LOCK_ID ,NEXT_TXN_ID ,NUCLEUS_TABLES ,PARTITIONS ,PARTITION_EVENTS ,PARTITION_KEYS ,
PARTITION_KEY_VALS ,PARTITION_PARAMS ,PART_COL_PRIVS ,PART_COL_STATS ,PART_PRIVS ,ROLES ,
ROLE_MAP ,SDS ,SD_PARAMS ,SEQUENCE_TABLE ,SERDES ,SERDE_PARAMS ,SKEWED_COL_NAMES ,
SKEWED_COL_VALUE_LOC_MAP ,SKEWED_STRING_LIST ,SKEWED_STRING_LIST_VALUES ,
SKEWED_VALUES ,SORT_COLS ,TABLE_PARAMS ,TAB_COL_STATS ,TBLS ,TBL_COL_PRIVS
,TBL_PRIVS ,TXNS ,TXN_COMPONENTS ,TYPES ,TYPE_FIELDS ,VERSION

SELECT

Version

 

2) For handling Spark 2.1.0, which is currently shipped with Hive 1.2 jar, users need to use a Hive remote metastore service (hive.metastore.uris), where metastore service is started with hive.metastore.schema.verification as TRUE for any Spark SQL context. This will force the Spark Client to talk to a higher version of the Hive metastore (like Hive 2.1.0), using lower Hive jars (like Hive 1.2), without modifying or altering the existing Hive schema of the metastore database.  

by Hanumath Rao Maduri

Apache Spark is an open source big data processing framework, which is being widely used for analytics on streaming and batch workloads. Spark is fully supported on MapR, and it typically uses data in the form of large files. With the Spark/MapR-DB connectors, you can use MapR-DB as a data source and as a data destination for Spark jobs.

MapR-DB is a high performance NoSQL database, which supports two primary data models: JSON documents and wide column tables. A Spark connector is available for each data model. The Native Spark Connector for MapR-DB JSON provides APIs to access MapR-DB JSON documents from Apache Spark, using the Open JSON Application Interface (OJAI) API. To access the wide column data model, which is often referred to as “MapR-DB Binary,” the Spark HBase and MapR-DB Binary Connector should be used. This blog article will describe examples of the connector for MapR-DB JSON.

BACKGROUND

Big data applications are moving towards data sets with flexible (or no predefined) schemas. Hence, the OJAI API was introduced in the MapR-DB 5.1 release. The OJAI API is the set of interfaces that allows the application to manipulate structured, semi-structured, or unstructured data. Please refer to the following GitHub repository for more details on the OJAI API: https://github.com/ojai/ojai.

With the new release (MapR 5.2, MEP 3.0), a new connector was developed to integrate MapR-DB JSON tables with Spark. This connector uses OJAI API internally to access/mutate the tables. It is this connector API that will be further explored in this blog post.

THE SPARK/MAPR-DB JSON CONNECTOR API

In the MapR Ecosystem Pack (MEP) 3.0 release, the Native Spark Connector for MapR-DB JSON supports loading data from a MapR-DB table as a Spark Resilient Distributed Dataset (RDD) of OJAI documents and saving a Spark RDD into a MapR-DB JSON table. (An RDD is the base format for storing data for use by Spark.)

Here are the interfaces for loading the JSON table into an RDD:

loadFromMapRDB(<path-of-mapr-db-json-table>)

The above function also supports another variant wherein one can directly load the documents as an RDD of Scala objects:

loadFromMapRDB[<BeanClass>](<path-of-mapr-db-json-table>)

Below is the API for saving the objects into a MapR-DB table:

rdd.saveToMapRDB(<path-of-mapr-db-json-table>)

The above function (i.e., saveToMapRDB) also contains more self-explanatory parameters:

createTable – Create the table before saving the documents, and throw an exception if the table already exists. The default value is set to false. bulkInsert – Save a group of rows of data at once into a MapR-DB table. The default value is set to false. idFieldPath – Key to be used to identify the document. The default value is set to “id."

Similar to loading the document into a Scala bean class, one can save an RDD of user-specified Scala class objects into the MapR-DB JSON table.

SAVING OBJECTS IN A MAPR-DB JSON TABLE

To access the connector API, it is required to import the Scala package “com.mapr.db.spark._.” All the required implicit definitions are included in the com.mapr.db.spark package.

Below is the code, which saves the RDD of Person objects into the MapR-DB JSON table:

val spark = new SparkConf().setAppName("json app")
.setMaster(“local[*]”)
    val sc = new SparkContext(spark)
    val people = sc.parallelize(getUsers())
    people.saveToMapRDB("/tmp/UserInfo", createTable= true)

Here is the getUsers function, which allocates Person objects:

LOADING DATA FROM A MAPR-DB JSON TABLE

The code provided below will load the documents from the "/tmp/UserInfo" table into an RDD:

val usersInfo = sc.loadFromMapRDB("/tmp/UserInfo").collect

 

Here is the result from the printing of usersInfo documents:

usersInfo.foreach(println(_))

{
  "address":
{"Pin":95035,"city":"milpitas","street":"350 holger way"},
  "dob":"1947-11-29",
  "first_name":"David",
  "interests":["football","books","movies"],
   "last_name":"Jones"
}

{
  "address":{"Pin":95985,"city":"sunnyvale","street":"35 town way"},
   "dob":"1987-05-04",
   "first_name":"Indiana",
   "interests":["squash","comics","movies"],
   "last_name":"Jones"
}



{
  "address":{"Pin":67765,"city":"phoenix","street":"358 pond way"},
   "dob":"1968-10-02",
   "first_name":"James",
   "interests":["tennis","painting","music"],
   "last_name":"junior"
}

{
  "address":{"Pin":95652,"city":"san jose","street":"305 city way"},
  "dob":"1976-01-09",
   "first_name":"Jimmy",
   "interests":["cricket","sketching"],
    "last_name":"gill"
}

{
   "address":{"Pin":89898,"city":"salt lake","street":"351 lake way"},
   "dob":"1974-01-29",
    "first_name":"Peter",
    "interests":["boxing","music","movies"],
    "last_name":"pan"
}

 

PROJECTION PUSHDOWN AND PREDICATE PUSHDOWN FOR THE LOAD API

The “load” API of the connector also supports “select” and “where” clauses. These can be used for projection pushdown of subsets of fields and/or can filter out documents by using a condition.

Here is an example on how to use the “where” clause to restrict the rows:

val usersLivingInMilpitas = sc.loadFromMapRDB("/tmp/UserInfo")
.where(field("address.city") === "milpitas")

 

Similarly, if one wants to project only first_name and last_name fields, the following code will generate the required output:

val namesOfUsers = sc.loadFromMapRDB("/tmp/UserInfo")
.select("first_name", "last_name")

 

SETTING UP THE PROJECT TO USE THE SPARK/MAPR-DB CONNECTOR

To access the loadFromMapRDB and saveToMapRDB API, the following Maven package and artifactId information is required in the project’s pom.xml file:

   <dependency>
            <groupId>com.mapr.db</groupId>
            <artifactId>maprdb-spark</artifactId>
            <version>5.2.1-mapr</version>
   </dependency>

 

To add the Spark core dependency into the pom.xml file:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0-mapr-1703</version>
        </dependency>

 

MapR specific jars are located in the mapr-releases repository. The following repository information should be included in the pom.xml file to enable Maven to download the dependencies:

        <repository>
            <id>mapr-releases</id>
            <url>http://repository.mapr.com/maven/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
       </repository>

 

The code for the example application can be accessed here.

SUMMARY

Once the data is loaded as an RDD of either OJAI documents or an RDD of a Scala bean class, it can be processed further using Spark transformations. The data loaded from MapR-DB tables can be enriched using the data from other data sources.

The Spark connector will be further enhanced to support the DataFrame and DataSet APIs. It enables you to use Spark SQL and Spark Streaming to transform the data seamlessly from MapR-DB JSON tables.

This is just a quick note on how to get these samples: GitHub - mapr-demos/maprdb-ojai-101: Basic examples for OJAI & Mapr DB  running easily on a deployment server that may not have maven integration.

 

The samples are built to be run via maven  like this:

mvn exec:java -Dexec.mainClass="com.mapr.db.samples.basic.Ex01SimpleCRUD"

This works fine as maven has all of the dependencies available to it already.  If you have to run outside of maven, you need to carry all of the dependencies over.  If you just use the following Shade plugin configuration in your maven build plugins though:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.0.0</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>com.mapr.db.samples.basic.Ex01SimpleCRUD</mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

Then you can easily run the jar just by doing "./jar-name.jar" on your MapR client server.

 

Also note that you should update the version of MapR-DB in the POM to be the same as whatever you actually have on your cluster.  We had 5.2 and this was for 5.1.