Skip navigation
All Places > The Exchange > Blog
1 2 3 Previous Next

The Exchange

95 posts

SparkR is an R package that provides a lightweight front end for using MapR Spark from R. In MapR-Spark 2.1.0, SparkR provides a Distributed DataFrame implementation that supports operations like selection, filtering, aggregation, etc. The entry point into SparkR is the SparkSession, which connects your R program to a MapR/Spark cluster. 

 

SparkContext, SQLContext, and SparkSession -  In Spark 1.x, SparkContext and SQLContext let you access Spark. In Spark 2.x, SparkSession becomes the primary method. We can create SparkSession using sparkR.session and pass in options, such as the application name, Spark packages depended on, etc.

 

Here are a few examples with different use cases, which would connect R program to a MapR cluster from R shell. Set the SPARK_HOME, load the R packages, create sparkR session with passing the required arguments, and execute the program.

 

The following Spark driver properties can be set in sparkConfig with sparkR.session from R:

 

Property NameProperty groupspark-submit equivalent
spark.masterApplication Properties--master
spark.yarn.keytabApplication Properties--keytab
spark.yarn.principalApplication Properties--principal
spark.driver.memoryApplication Properties--driver-memory
spark.driver.extraClassPathRuntime Environment--driver-class-path
spark.driver.extraJavaOptionsRuntime Environment--driver-java-options
spark.driver.extraLibraryPathRuntime Environment--driver-library-path

 

Use Case 1: Local DataFrames -  Convert a local R data frame into a SparkDataFrame. The following creates a SparkDataFrame, based on using the faithful dataset from R.

 

Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
# results is now a SparkDataFrame
df <- as.DataFrame(faithful)
head(df)

 

Use Case 2: From Data Sources - The method for creating SparkDataFrames from data sources is read.df. This method takes in the path for the file to load and the type of data source, and the currently active SparkSession will be used automatically, which supports reading JSON,CSV, and Parquet.

 

Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
# results is now a SparkDataFrame
people <- read.df("file:///opt/mapr/spark/spark-2.1.0/examples/src/main/resources/people.json", "json")
head(people)

 

Use Case 3: From Data Sources with YARN as Master -  SparkR supports reading JSON, CSV, and Parquet files natively, and through packages available from sources like Third Party Projects, you can find data source connectors for popular file formats like Avro. These packages can either be added by specifying --packages with spark-submit or sparkR commands, or if initializing SparkSession with sparkPackages parameter when in an interactive R shell.

 

sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")

 

Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "yarn", sparkConfig = list(spark.driver.memory = "2g"))
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")
# results is now a SparkDataFrame
people <- read.df("file:///opt/mapr/spark/spark-2.1.0/examples/src/main/resources/people.json", "json")
head(people)

 

Use Case 4: Create SparkDataFrames from Hive tables - In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (enableHiveSupport = TRUE).

 

Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session()
# Queries can be expressed in HiveQL.
sql("CREATE TABLE IF NOT EXISTS mapr(id INT, name STRING)")
sql("LOAD DATA LOCAL INPATH '/opt/mapr/spark/spark-2.1.0/examples/src/main/resources/kv1.txt' INTO TABLE src")
employeeCount <- sql("SELECT count(*) from mapr")
# results is now a SparkDataFrame
head(employeeCount)

 

Prerequisites:
1) yum install R
2) yum install mapr-spark-2.1.0.201703271134-1.noarch
3) yum install mapr-spark-master-2.1.0.201703271134-1.noarch

 

The MapR-Spark-2.1.0 packages can be downloaded from http://package.mapr.com/releases/MEP/MEP-3.0.0/redhat/.

 

Program Execution Snippet R Program from R shell

 

[mapr@sn2 bin]$ R

R version 3.3.3 (2017-03-06) -- "Another Canoe"
Copyright (C) 2017 The R Foundation for Statistical Computing
Platform: x86_64-redhat-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

> Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
> library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var, window

The following objects are masked from ‘package:base’:

    as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
    rank, rbind, sample, startsWith, subset, summary, transform, union

> sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
Spark package found in SPARK_HOME: /opt/mapr/spark/spark-2.1.0
Launching java with spark-submit command /opt/mapr/spark/spark-2.1.0/bin/spark-submit   --driver-memory "2g" sparkr-shell /tmp/RtmpzKbIS6/backend_port310323a10d39
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Java ref type org.apache.spark.sql.SparkSession id 1
> df <- as.DataFrame(faithful)
> head(df)
  eruptions waiting
1     3.600      79
2     1.800      54
3     3.333      74
4     2.283      62
5     4.533      85
6     2.883      55 

 

Conclusion

In this blog you have learned how R can be used with MapR Spark packages with few examples. Thanks for reading my post, if you have any questions please leave a comment in the comments section below.

 

 

Related

Apache Spark

sparkr

by Mathieu Dumoulin

MapR Streams is a “Kafka-esque” message streaming system which, similarly to Apache Kafka, provides very high throughput performance combined with low message latency and high reliability. Unique to MapR Streams, however, is a broker-less design that vastly simplifies configuration and increases reliability, in addition to providing replication capabilities that enable some pretty cool use cases (see MapR Streams in Action demo).

With MEP 2.0, the MapR Converged Platform adds a Kafka REST Proxy server. This upgrade opens MapR Streams to use any language that supports REST API calls over HTTP, which is to say, virtually all modern languages. For example, Python and the requests module work really well.

But is the Kafka REST Proxy able to access the tremendous performance potential of MapR Streams at the same level as its primary Java API?

In this post, I’d like to go over a few performance objectives and provide some guidance to help data engineers get the most out of this very useful technology.

THE DEFAULT CASE

We should start with some good news. MapR Streams is very fast and is shipped by default with settings that should provide enough performance for most applications.

FIX VERY HIGH LATENCY FOR SINGLE API CALL (WITH CURL)

You have a shiny new MapR 5.2 cluster installed with all the bells and whistles. Everything works great, and you get around to wanting to give MapR Streams a try. With the REST Proxy, this is a piece of cake.

curl -X POST -H "Content-Type:application/vnd.kafka.json.v1+json" --data '{"records":[{"value":{"foo":"bar"}}]}' "http://demo1:8082/topics/%2Fstreams%2Ftest%3Atopic1"

 

And the response takes about 3 seconds to come back. This very high latency is because of the default streams buffer time value of 3000ms.

To fix, add the following to the kafka-rest.properties file (in /opt/mapr/kafka-rest/kafka-rest-/config):

 

consumer.request.timeout.ms=125
streams.buffer.max.time.ms=125

Reference: http://maprdocs.mapr.com/home/Kafka/REST-config-parameters.html

Beware of high CPU if the timeout is very low

Lowering the value of this property seems to correlate to much higher CPU utilization. When the value is 0, one or two of my cores get pegged to 100%. Above about 125ms, the impact to CPU utilization isn’t noticeable, at least to something like top.

Thanks to Vince Gonzalez

ABOUT THE URL FOR THE TOPIC

“/%2Fstreams%2Ftest%3Atopic1” in the URL is because MapR Streams include a path and topic (i.e. /path/to/stream:topic) and that’s going to need to be URL encoded or else it won’t work.

It’s possible to avoid this by setting a default stream, adding the following property to kafka-rest.properties:

streams.default.streams=/streams/test

 

In that case, the above example URL would simplify to “http://demo1:8082/topics/topic1.”

Reference: http://maprdocs.mapr.com/home/Kafka/REST-get-topic-metadata.html

INCREASE THROUGHPUT PERFORMANCE

Number of topics and Partitions

MapR Streams is fast by default and handles a lot, albeit not everything, automatically. Some performance tuning comes from design considerations and just aren’t up to the streams messaging system at all.

Partitions > topics

Pros

  • Throughput should be good, and data spread out evenly across the cluster
  • Easier to create and use, less moving parts Cons
  • Finding data specific to a particular object/event type/location will require scanning through more data, which will be slower.

Topics >> partitions

Pros

  • It’s very efficient to get data from a specific object/event type/location if they are all stored in their own stream.
  • A very high number of streams (hundreds of thousands or even millions) will naturally spread across the cluster and will spread out well on all nodes of the cluster. Cons
  • The consumer needs to specify a regex pattern to pick all (or a group of) data. This may come at a performance penalty compared to a single topic with many partitions.
  • Stream split is a relatively heavy operation, and it could trigger high load as new topics are created after the initial creation of topics is done.

Of course, one could also decide to use an intermediate solution, in which there are lots of topics and each topic has some number of partitions. The way to decide is to consider how the application is going to be used and where flexibility is needed. In any case, the default number of partitions for new topics is one, so that’s something to change for sure.

How to create streams with a custom number of partitions:

stream create
     -path Stream Path
    [ -ttl Time to live in seconds. default:604800 ]
    [ -autocreate Auto create topics. default:true ]
    [ -defaultpartitions Default partitions per topic. default:1 ]

$> maprcli stream create -path /streams/test -defaultpartitions 10

 

As a rule of thumb, try to keep about 10 partitions per node per topic.

Thanks to Akihiko Kusanagi

Session keep-alive and record arrays

To get the highest throughput, it’s going to be important to reduce overhead to maximize the CPU/network resources that do useful work moving your bits around. Here are some findings from recent engagements with customers using MapR Streams in pilot and production projects:

Use an array of records as payload

Instead of producing a single record on each API call, push an array of records.

Bad:

{"value":{"foo":"bar"}}

 

Good:

 

{"records":[ {"value":{"foo1":"bar1"}},{"value":{"foo2":"bar2"}} ,… ]}

Getting the best performance will require some experimentation to find the balance between how frequently to make calls vs. how many records to pack into each call.

Our own experience shows that the Proxy can handle as much as 280MB/s on very large (100-200KB) message sizes. Internal tests demonstrate modest 5 node AWS clusters that are able to handle millions of small (1-200B) messages per second.

There is no substitute for experimentation, given variability of data set, throughput, and cluster hardware resources as well as the business requirements of a specific use case.

Reuse a session to push data into the REST Proxy

We’ve found significant gains from switching from single, isolated POST calls to multiple calls within the same session.

Here is an example with Python and the excellent requests module:

Bad:

def produce(payload):  
    headers = {'Content-Type':'application/vnd.kafka.binary.v1+json'}
    r = requests.post('http://gw1:8082/topics/test', headers=headers, json=payload)

 

Good:

{"records":[ {"value":{"foo1":"bar1"}},{"value":{"foo2":"bar2"}} ,… ]}

 

Thanks to Ted Dunning

Tuning the embedded Jetty server

One of the resources that limits the throughput performance of the Kafka REST Proxy is CPU resource. Well, it turns out that the Proxy is running the Jetty 9 server in embedded mode. It is possible to do some tuning at that level.

There is a good article about tuning the operating system (of both load generator and server) and load generators and jetty for high load in Jetty server. For sure, we cannot tune Jetty as it's embedded. But have a look at the following link. You can certainly tune the following meetings for high load:

  • TCP buffer sizes
  • Queue sizes for connection listening queue
  • Port range at the load generator side, so it won’t starve on parts during high load

Reference: http://wiki.eclipse.org/Jetty/Howto/High_Load

Thanks to Muthu Lalapet

How to increase the memory buffer

It is possible to tune the “buffer.memory” parameter. Its default value is 32m. However, this setting cannot exceed the total memory that the producer is going to use. At the end of the day, the kaka-rest is a JVM process.

Without changing any parameters, the Kafka REST API uses 256m of memory at most. Therefore, the “buffer.memory” parameter cannot exceed this value. How come 256m? See the kaka-rest-run-class script (in /opt/mapr/kafka-rest/kakfa-rest-/bin). It says the following:

# Memory options
if [ -z "$KAFKAREST_HEAP_OPTS" ]; then
  KAFKAREST_HEAP_OPTS="-Xmx256M"
Fi

 

So, if you want to increase “buffer.memory” beyond 256m, provide the KAFKAREST_HEAP_OPTS value accordingly.

Waste-of-time parameters

The producer throughput of a single Kafka REST Proxy doesn't scale by increasing the “producer.threads” parameter. We tried to set it to 20, 50, 500, and even 10,000, but there were no visible performance differences.

According to https://github.com/confluentinc/kafka-rest/issues/181, it is not used in Kafka REST code, and the Kafka REST Proxy that runs on MapR is largely identical to the Confluent implementation, only with the libraries changed to MapR libraries. Our implementation shares this known issue for now.

CLUSTER ARCHITECTURE

Run the Proxy on dedicated server(s)

A great way to ensure optimal performance for performance-critical use cases is to use one or more dedicated servers for the Kafka REST Proxy. Instead of installing it on a shared cluster node, you can install the MapR Client on a separate server and install the REST Proxy there.

To boost performance further, add additional servers and put them behind a load balancer. From the Client to the cluster, ensure that the network connectivity is as fast as can be afforded, since MapR will take advantage of all the network interfaces on the node automatically.

Kafka REST Proxy

Run two or more Proxy processes on a dedicated node This can be done by running the other server on a different port (e.g. 8083 instead of the default 8082). Given a server with enough physical cores, such as a two-socket design, this strategy can further increase the throughput.

Note that running two proxy processes on a single server will not scale linearly the throughput. Our testing, in one instance, showed throughput to increase from 1,580 msg/s to 2,660 msg/s, good for close to a 70% increase.

ABOUT MESSAGE SIZE

The performance characteristics of MapR Streams and the Kafka REST Proxy change, depending on the message size. Very small messages will be handled faster than very large messages. Your design should take this difference into consideration and favor smaller messages.

Keep in mind that the largest message size that can be handled very efficiently is about 100KB. Larger messages will come at some cost in peak performance, with a maximum best practice size of 2MB. Smaller messages are super-efficiently handled, so those are always fine.

Given the large sweet spot, we’d advise favoring development simplicity and not worrying about it too much until individual messages get over about 100KB in size.

DO'S AND DON'TS

  • DO choose your performance targets based on the business needs and the use case.
  • DO monitor the CPU, memory, and network load of the server running the Kafka REST Proxy.
  • DO consider your design (cluster architecture, topics vs. partitions) before changing parameters.
  • DO use a session if throughput is important.
  • Do favor lots of smaller messages.
  • DON'T change default parameters without a clear performance goal (latency, throughput, lower CPU usage, etc.).
  • DON’T create too large messages (2MB+).

SOME ADDITIONAL RESOURCES

Script to measure throughput in MapR Streams

#!/bin/bash

STREAM="/streams/stream1"
TOPIC="test"

function sum_of_offset {
  maprcli stream topic info -path $STREAM -topic $TOPIC -json | awk -F':|,' '/maxoffset/ {n+=$2} END {print n}' 2> /dev/null
}

function epoch_ms {
  date +%s%3N
}

date +%T,%3N

o=$(sum_of_offset); t=$(epoch_ms)

while true
do
  prev_o=$o; prev_t=$t
  o=$(sum_of_offset); t=$(epoch_ms)
  echo "$(date +%T,%3N) $((($o - $prev_o)*1000/($t - $prev_t))) msg/s"
done

 

Editor's note: Blog post originally shared in Converge Blog on Jan 04, 2017

 

Related:

MapR-Streams

 

 

 

By Dong Meng

This is the third installment in our blog series about deep learning. In this series, we will discuss the deep learning technology, available frameworks/tools, and how to scale deep learning using big data architecture. Read Part 1 and Part 2.

INTRODUCTION

Deep learning is a class of machine learning algorithms that learns multiple levels of representation of the data, through message passing and derivation, cascading many layers of nonlinear processing units. Recently, there has been a lot of traction in the deep learning field, thanks to the research breakthroughs made by commercial entities in the tech field and the advancement of parallel computing performance in general. Quite a few deep learning applications have surpassed human performance: famous use cases in the field include AlphaGo, Image Recognition, and Autonomous Driving.

In most practices, development of deep learning applications is done using single a DevBox with multiple GPU cards installed. In some larger organizations, dedicated High Performance Computing (HPC) clusters are used to develop and train deep learning applications. While these practices are more likely to achieve better computation performance, they lack fault tolerance and create issues with moving data across different DevBoxes or clusters.

DISTRIBUTED DEEP LEARNING QUICK START SOLUTION

The MapR Converged Data Platform provides the only state-of-the-art distributed file system in the world. With MapR File System (MapR-FS), our customers gain a unique opportunity to put your deep learning development, training, and deployment closer to your data. MapR leverages open source container technology, such as Docker, and orchestration technology, such as Kubernetes, to deploy deep learning tools, like TensorFlow, in a distributed fashion. In the meanwhile, since MapR-DB and MapR Streams are also tied closely to our file system, if you were developing a deep learning application on MapR, it is convenient to deploy your model to extend our MapR Persistent Application Client Container (PACC) to harness the distributed key-value storage of MapR-DB and cutting-edge streaming technology of MapR Streams for different use cases. Click here if you want learn more.

The distributed deep learning Quick Start Solution we propose has three layers (see Figure 1 above). The bottom layer is the data layer, which is managed by the MapR File System (MapR-FS) service. You can create dedicated volumes for your training data. We also support many enterprise features like security, snapshots, and mirroring to keep your data secure and highly manageable in an enterprise setting.

The middle layer is the orchestration layer. In this example, we propose to use Kubernetes to manage the GPU/CPU resources and launch parameter server and training workers for deep learning tasks in the unit of pods. Starting from Kubernetes 1.6, you can manage cluster nodes with multiple GPU cards; you can also manage a heterogeneous cluster, where you can use CPU nodes to serve the model while using GPU nodes to train the model. You can even take a step forward and mark nodes with different GPU cards to task with lower priority on older GPU cards and task with high priority on newer cards.

The top layer is the application layer, where we use TensorFlow as the deep learning tool. With the high performance NFS features from MapR-FS, it is easy to use TensorFlow to checkpoint the deep learning variables and models to persist in the MapR file system. This makes it easy for you to look into the TensorFlow training process and harness the models, then put them into deployment. The advantage of using container technology in the application layer for deep learning applications is that we can control the versions of the deep learning model by controlling the metadata of the container images. We can harness the trained model into a Docker image with metadata as image tags to keep the version information; all the dependencies/libraries are already install-free in the container image. When deploying the deep learning models, we just have to specify which version we wanted to deploy, and there is no need to worry about dependency.

There are typically 5 steps to get your deep learning application running on our proposed Quick Start Solution.

  1. Modify the TensorFlow application to add the distributed server. There are a number of ways to enable data parallelism in TensorFlow: synchronous training and between-graph replication is the more practical approach overall (click here for more information); we can, for example, add code snippet like:

    cluster = tf.train.ClusterSpec({"ps" : "tf-ps0:2222,tf-ps1:22222",   "worker": "tf-worker0:2222, tf-worker2:2222"}) server = tf.train.Server(cluster, job_name='ps', task_index=0)

    In this example, the ps/worker hostname, job_name, task_index could be passed in through the yaml file used to launch Kubernetes pods. You can also put the code on MapR-FS and mount it to multiple pods when launching the Kubernetes job.

  2. Prepare the training data and also load it onto MapR-FS. We recommend creating dedicated MapR volumes for your deep learning applications, so it can be better managed. Meanwhile, the persistent volume design in Kubernetes makes it possible to share the MapR volume between a few applications.

  1. Choose the container image to use: for example, we use the latest TensorFlow GPU images, but to fully leverage MapR-FS, we recommend extending your deep learning image to our MapR client container to utilize MapR-DB and MapR Streams.

  2. Write a YAML file to create a Kubernetes job. We want to mount the required NVIDIA library, the TensorFlow application, the destination folder for checkpoint, and the training data location. Here, we can easily create a persistent volume mounted to a MapR-FS volume and grant multiple pods access to the persistent volume claim attached.

  3. Check that the result persisted to MapR-FS and further deploy the model if the result looks satisfying.

SUMMARY

Summing up, we want to note that both Kubernetes and TensorFlow are young projects; there are definitely ongoing issues in different deployment scenarios. But our experiences show that with the MapR Converged Data Platform, we make running distributed deep learning tasks easier and more suitable in enterprise environments with our advanced file system features. With the lightweight container technology in place, we believe this is the right approach/tool for various deep learning R&D tasks. There is a lot of potential going forward. And it is truly the thriving open source communities that make such technology available to be used. We want to thank the Kubernetes and TensorFlow community and encourage more users to contribute.

With the distributed deep learning Quick Start Solution MapR offers, we provide the flexibility for users to choose their own deep learning tool, such as MXNet, Caffe and PyTorch. Utilizing a parameter server, we can launch the training task in a truly distributed fashion. From a machine server perspective to look at this quick start solution, since the deep learning models are all build into containers, we can easily move the model from dev environment to production environment. We can further manage the model version/dependency by creating meta tags with the container images.

Editor's note: Article originally posted in the Converge Blog on May 23, 2017

Being a hacker of sorts, I wanted to throw together some tools that I could reuse over and over again, run with "decent" scale (handle different work loads) and have it be somewhat easy to reproduce and have other implement/improve on. 

 

To that end, I wanted a single tool that could read json off a Kafka or MaprR stream topic, and then given instructions for the topic, push that into a location that could be read easily by Apache Drill or other tools.  This includes outputs of MapR-DB.  Thus, PYETL was born. 

 

Now, Kakfa Connect is one of those tools that should be able to do what I want it to do, MapR does have a Kafka Connect package, but I haven't reverse engineered it yet, although I plan to.  I am guessing that Kafka Connect gives me some additional features (like having a Kafka Connect cluster be able to handle multiple topics to multiple locations).  My setup has all independent consumers. More on that in a bit. 

 

Another thing that Kafka Connect didn't have is a MapR-DB example, now, there is an HBASE sink, so maybe I can hack around on that, but I am not a Java expert, my scripting talents lie in Python, and thus, I wanted to demonstrate what I am trying to do in Python. Now, for performance reasons etc, perhaps Kafka Connect would be great, but I need to see some examples of it doing what my stuff is doing, and be easy to mange... the complexity of what Kafka Connect offers and what I am doing are vastly different, I pay a penalty for that in two areas.

 

1. Size (I use a Docker image that ends up 2 GB in size!, luckily, that is cached, so if I have 5 instances running on a node, it only takes up 2 GB).

2. Performance.  Using Python, even with C based libraries being wrapped (fast parquet, libhase, and librdkafka) I am sure Java performance is going to be better... that said, I haven't had issues... yet. 

 

So Python, did I use messy threading etc? Nope... I run a Mesos cluster based on Jim Scott talks on Zeta Architecture. Ted Dunning got me hooked on Mesos a few years ago, and I've been playing around merging MapR and Mesos for a while.  Since I am running Mesos and Marathon, I run multiple instances of my script for each topic I ETL.  So let's say I have a topic, web logs, with 6 partitions in it. I have each instance run the same config file (with the same group name) so it joins the group.  If I run 2 instances, and can handle all the data, Kafka/MapR streams distributes the two partitions across my two instances.  I can run up to 6 individual instances to handle the partitions for my topic. Marathon just lets me scale up or down for that task because the config is the same. (I use the HOSTNAME of the docker container as a unique identifier when I am writing json files or Parquet files).   Thus I can have multiple instances of my ETL running, managed by Marathon. This allows me some sense of scale, even with Python. 

 

So, json, parquet? MapR DB?  Yes, I have multiple outputs... I wanted to have some basic config files or ENV variables as how I configured things, and then be able to easily output to directory partitioned json or Parquet files for Apache Drill or Spark, while still being able to output with the same script to Mapr DB.  I allow the users to specify the partition fields, and even column family mappings in MapR DB. The row key can be specified from other fields, or you can add "RANDOMVALUE" (Fun fact, I was ETLing some web logs, and I ran two instances, one to JSON and one to MapRDB, and for some reason the MapRDB table was steadily losing the race when I ran select count(*) Apparently, my rowkey definition didn't have enough entropy, and I had collision effectively overwriting records...)

 

The multiple outputs are sure interesting when I do performance checking, especially as data sets grow!

 

I wouldn't consider this "production" ready yet, and would happily take improvements from the community.  I had some basic goals in mind here

 

1. Make this dockerizable so others could repeat exactly what I am doing to make it work

2. Make it fairly simple/easy to understand for Python folks

3. Make have an acceptable level of performance. 

4. Make it something I would WANT to use to quickly get data on streams into sane formats from JSON on a topic. 

 

It's something to play with, 

 

John

 

GitHub - JohnOmernik/pyetl: Python based ETL of JSON records from into multiple destinations 

As of today May 22nd 2017, there are over 70 Kafka Connect connectors for streaming data into and out of Apache Kafka! 

 

 

The connectors themselves for different applications or data systems are not maintained with Apache Kafka main code base. An easy way to discover Kafka Connect resources including connectors is to search GitHub for ’kafka-connect’ or directly open this URL https://github.com/search?q=kafka-connect.

 

Kafka Connect is included in MapR Streams, please see Kafka Connect for MapR Streams.  Now,  through simple configurations and no code necessary, we can leverage these Kafka Connect connectors for large scale streaming of data in and out of  Kafka/MapR Streams for a variety of data systems! 

 I categorized the available Kafka Connect connectors into several categories while specifying their type as either source, for getting from data from another data system into Apache Kafka ; Or sink, for getting data from Kafka into another data system:

  • Change Data Capture: Attunity Replicate (Source) , Dbvisit Replicate Connector for Oracle (Source), Oracle Golden Gate (Source) , IBM Data Replication (Source), Debezium [MySQL, PostgreSQL, MongoDB]
  • Databases: JDBC (Source, Sink), MySQL, Blockchain, Edge Intelligence   InfluxDB (Sink), KineticaDB (Sink), KLP-PostgreSQL (Sink) from InfoBright, SAP HANA (Source, Sink), Vertica (Source, Sink) , VoltDB (Sink) , ReThinkDB (Sink), OpenTSDB (Sink)
  • NoSQL: Azure DocumentDb (Sink), Aerospike (Sink), Cassandra (Source, Sink), Couchbase (Source), Druid (Sink), Dynamo DB (Source, Sink), HBase (Source, Sink), MongoDB (Source, Sink), Redis (Sink), MarkLogic (Sink)  
  • File Systems: FTP (Source) , HTTP (Source) , File (Source, Sink), FileSystem (Source), HDFS (Sink), Apache Kudu (Sink), spooldir (Source)
  • Log: Splunk (Sink, Source) , Syslog (Source)
  • Search: Elasticsearch (Sink), Solr (Sink, Source)  
  • Object Stores: Amazon S3, Google Cloud Storage, Azure Blob Store ( on the roadmap) 
  • Mainframe: Syncsort DMX (Source, Sink)
  • IoT: Azure IoT Hub (Source), CoAP [Constrained Application Protocol] (Source, Sink) , MQTT( Source), Flogo (Source)
  • Data Warehouse: BigQuery (Sink), Hive (Sink)
  • IMDB: Apache Ignite (Source, Sink), Hazelcast (Sink)  
  • Messaging: AMQP, Google PubSub (Source, Sink), JMS (Source, Sink), Amazon SQS (source) , MQTT( Source), Slack via webhooks (Sink), RabbitMQ, AWS Kinesis   
  • Application Feed: Bloomberg Feeds (Source), Jenkins (Source), Salesforce (Source), IRC (Internet Relay Chat) Source, PubNub, Mobile Apps , Twitter (Source, Sink), Yahoo Finance ( Source), GitHub (Source) 
  • Analytics: Mixpanel (Source)  
  • JMX: JMX (Source)
  • Content Extraction: DocumentSource

 

A few examples of use cases of Kafka Connect connectors would be: 

  • Publishing SQL Tables (or an entire SQL database) into Apache Kafka
  • Consuming streams from Apache Kafka into HDFS for batch processing
  • Consuming streams from Apache Kafka into Elasticsearch for secondary indexing
  • Integrating legacy systems such as mainframe ones with Apache Kafka
  • … 

 

Please share your experience, in the comments section, using Kafka Connect connectors with MapR Streams!

 

Thanks 

Slim Baltagi

Advanced Analytics LLC

by Jimmy Bates

 

Over the last few releases, the options for how you store data in Hive has advanced in many ways. In this post, let’s take a look at how to go about determining what Hive table storage format would be best for the data you are using. Starting with a basic table, we’ll look at creating duplicate tables for each of the storage format options, and then comparing queries and data compression. Just keep in mind that the goal of this post is to talk about ways of comparing table formats and compression options, and not define the fastest Hive setup for all things data. After all, the fun is in figuring out the Hive table storage format for your own Hive project, and not just reading about mine.

THE HADOOP CLUSTER LAYOUT

For our discussion today, I used a MapR Hadoop cluster consisting of 5 nodes in an Amazon EC2 environment. The MapR Version is 4.0.1 (Hadoop 2.4.1) running Hive 0.13. I have MRv1 and YARN running on all nodes, so I can run comparisons between legacy MRv1 jobs and YARN-controlled jobs.

CLUSTER DIAGRAM

In this cluster, I spun up 5 nodes in an Amazon EC2 multi-tenant environment. The systems are running CentOS 6.5. I’ll focus on the Hive aspects for the purpose of this blog post, and save the specifics of supporting separate projects with separate SLAs for a later post.

1. OUR STARTING DATA

We have two tables that we will start with. One table consists of information that details bike rental stations. The second table contains information on trips where bikes were rented and where they were turned in. The starting dataset is stored as a standard text table with delimited columns.

 **1.1      Table 1: Bike Stations**  

This table has information on the rental stations. Here is an example of some of the fields and data:

 **1.2      Table 2: Bike Trips**  

This table has information on bike rental activity. Here is an example of some of the fields and data:

2. OUR TABLE STORAGE OPTIONS

Now that we have an idea of what is in our original data set, we can take a look at the storage options available in Hive. You can add or subtract to the list of storage formats, but for this example, we will look at our starting text tables, RC and ORC. This section covers what some of these formats mean and “why” tables are laid out this way in my MapR cluster as opposed to the “how.” Don’t worry, we’ll cover the “how” later.

 **2.1      Different Storage Formats**      **2.1.1      Text File**  

Text is where Hive started, and has evolved into handling just about any text file with any separation you may be looking for. It’s one of the things that gives Hive the ability to get your data from files into SQL-EC2 multi-tenant-fed tools.

This is our text file setup:

ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

    **2.1.2      RC: Record Columnar File**  

The RC format was designed for clusters with MapReduce in mind. It is a huge step up over standard text files. It’s a mature format with ways to ingest into the cluster without ETL. It is supported in several Hadoop system components. For our comparison, we will ETL the data from text into the RC table using Hive.

The full table creation and load process is covered later, but this is what our table format looks like:

ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'

    **2.1.3      ORC: Optimized Row Columnar File**  

The ORC format showed up in Hive 0.11. As the name implies, it is more optimized than the RC format. If you want to hold onto speed and compress the data as much as possible, then ORC is for you. We won’t be digging into the how or why the “O” in ORC works—we’re just taking it for granted that it does and will be using it in our comparison.

Our ORC settings in our ORC table:

ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.ORCFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.ORCFileOutputFormat'

 **2.2      Table Locations and Partitions**  

Do I use an external table? Do I use partitions? Do file system settings make a difference? All good questions, but in this section we’ll lay out some of those options to give a scope of reference on deciding what you want to include in your comparison for your project.

    **2.2.1      External or Internal**  

The main difference between internal and external tables is a matter of perspective. What tool do I expect will control the data—Hive, or something else? To oversimplify this, external tables are normally used when alterations to the data could happen with some other tool, or the data already exists and you want to keep it in its original form and use it in Hive.

For our example, you could use either, or do comparisons around both. In the table creation process later we will do both, but specify the storage location for the tables on the file system.

    **2.2.2      File System Volumes**  

One of the great things about using MapR is the power of logical file system volumes in my Hadoop cluster. On a small cluster with one use case this is not a big deal, but when you get to 300 nodes and 20 projects, all with specific SLAs, then it makes a huge difference. It’s one of the reasons you can support all those SLAs and projects in one cluster. But in this case, we are using separate volumes to help get a solid assessment on the statistics of how the data is stored for each table in our comparison.

3. DATA COMPRESSION OPTIONS

For those of you who can remember Schoolhouse Rock: “Compression, Compression…what’s my compression?” There are several options for compression. For those of you who don’t want to worry about compression, you can just pick an option for the MapR Hadoop file system compression and not worry about it. For those of you who have a drive to tweak all things, then you can run comparisons on the file system, Hive, and mixes of both till smoke comes out your ears. In this post, we are sticking to one set of values for each table format. Different compression settings can affect data in projects differently but the combinations picked, while not the final say in all things data, will hopefully result in some interesting comparisons.

 **3.1      Hive Compression Options**  

Here’s a list of some of the Hive compression options looked at in our example:

 **3.2      MapR File System Options**  

The file system itself can also be set for specific compression formats. The tradeoff is always compression vs. speed. Below is a list of the file system compression options in MapR:

 **3.3      Our Comparison Matrix**  

4. SETTING UP HADOOP FILE SYSTEM LOCATIONS IN MAPR

The ability to set up logical file system volumes inside Hadoop is a powerful capability not found anywhere else in Hadoop world. It allows you to isolate the access and locality of data, which is handy when your Hadoop system graduates to production or past one use case. Here we are using it to help isolate the statistics of our Hive tables for the purposes of comparison, but if you’re interested in this capability, you can read more about it on our "Get Real with Hadoop: True Multi-tenancy" blog post.

This step is optional, but this capability opens up a world of possibilities when you gain such granular control of your data.

Let’s quickly step through setting up our volumes in our Hadoop cluster where I created our tables.

 **4.1      Using MapR Control System UI to Establish File System Volumes**  

The most common way to create volumes is using the MapR Control System (MCS). All aspects of block replication, snapshot scheduling, data mirroring, access, data locality and data quotas can be set through the MCS.

 **4.2      Using MapR REST API to Establish File System Volumes**  

Anything that is done through the MCS can be done through the REST API. The full documentation for this can be found on our MapR Volume Creation documentation.

To create the same volume pictured above, the following cur command to the rest API would get the job done:

curl -X GET -k -H "Authorization: Basic bWFwcjpyb290NG1hcHI=" -H 
"Cache-Control: no-cache"
'https://jbates1:8443/rest/volume/create?name=hive_txt&path=/data/hive/text
"a=500M&replication=3&schedule=2&topology=/data&type=0&advisoryquota=100M'

 **4.3      Using MapR CLI to Establish File System Volumes**  

The last method to create a volume is using the MapR CLI. This is the method I used for my volume creation process.

maprcli volume create -name hive_txt -path /data/hive/text -advisoryquota 100M -quota 500M -replication 3 -schedule 2 -topology "/data" -type 0

 **4.4      Volumes Used for This Comparison**  

Here are all the volumes created from the methods above:

 **4.5      Setting the File System Compression Options**  

As mentioned above, you can use Hive the file system to set the compression. Compression settings are managed at the directory level. Since MapR is a Hadoop system, I can use a Hadoop command to set the compression settings for my directories.

Set compression with something like this: hadoop mfs -setcompression zlib /data/hive/orc1

Verifying the compression setting can be done with this command: hadoop mfs -ls /data/hive

That’s all there is to adjusting file system compression. All new data will be compressed with the provisioned setting. More details on compression can be found at MapR Compression Documentation.

5. CREATING THE TABLES

The text data in my csv format loaded into the file system at /data/hive/text. We have external tables created in Hive partitioned around the year and the month.

 **5.1      Original Text Tables**  

Here are the commands used to create our original Hive tables:

    **5.1.1      Bike stations table**  
CREATE EXTERNAL TABLE `STATIONS`(
  `id` int,
  `installdate` string,
  `name` string,
  `longitude` float,
  `latitude` float)
PARTITIONED BY (
  `year` int,
  `month` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/text/bikestations';
    **5.1.2      Bike trips table**
CREATE EXTERNAL TABLE `TRIPS`(
  `bike_nr` string,
  `duration` int,
  `start_date` string,
  `start_station` string,
  `end_station` string)
PARTITIONED BY (
  `year` int,
  `month` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'maprfs:/mapr/my_cluster/data/hive/text/trips';

 

 **5.2      Hive RC Tables**  

The RC tables in Hive will have a significant performance increase over our original text files. Table creation is almost identical. In this case, the table location was specified, but it was not built as an external table. Dropping an external table will not drop the data, but with this one, dropping it discards the dataset.

    **5.2.2      trips_rc table**
CREATE TABLE `TRIPS_RC`(
  `bike_nr` string,
  `duration` int,
  `start_date` string,
  `start_station` string,
  `end_station` string)
PARTITIONED BY (
  `year` int,
  `month` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/rc/trips;

 **5.3      Hive ORC Tables**  

With the ORC tables here, we added the wrinkle of setting a table property in the table creation process that will set the compression settings for our table.

    **5.3.1      stations_orc1 table**
CREATE EXTERNAL TABLE `STATIONS_ ORC1`(
  `id` int,
  `installdate` string,
  `name` string,
  `longitude` float,
  `latitude` float)
PARTITIONED BY (
  `year` int,
  `month` string)
STORED AS ORC
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/orc1/bikestations'
TBLPROPERTIES ( "orc.compress"="NONE" );

    **5.3.2      trips_orc1 table**
CREATE EXTERNAL TABLE `TRIPS_ ORC1`(
  `bike_nr` string,
  `duration` int,
  `start_date` string,
  `start_station` string,
  `end_station` string)
PARTITIONED BY (
  `year` int,
  `month` string)
STORED AS ORC
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/orc1/trips
TBLPROPERTIES ( "orc.compress"="NONE" );

    **5.3.3      stations_orc2 table**
CREATE EXTERNAL TABLE `STATIONS_ ORC2`(
  `id` int,
  `installdate` string,
  `name` string,
  `longitude` float,
  `latitude` float)
PARTITIONED BY (
  `year` int,
  `month` string)
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/orc2/bikestations'
TBLPROPERTIES ( "orc.compress"="LZ4" );

    **5.3.4      trips_orc2 table**
CREATE EXTERNAL TABLE `TRIPS_ORC2`(
  `bike_nr` string,
  `duration` int,
  `start_date` string,
  `start_station` string,
  `end_station` string)
PARTITIONED BY (
  `year` int,
  `month` string)
STORED AS ORC
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/orc2/trips
TBLPROPERTIES ( "orc.compress"="LZ4" );

6. LOADING OUR DATA INTO THE TABLES

Now that our tables are all created, we can load the RC and ORC tables from the original text dataset. There are all kinds of examples on creating a new table from an old table, but those are for simple tables. When you add in some partitions, things get more complicated. All the tables we have created are partitioned. In general, partitions improve performance on larger datasets. When writing data into your table, the partitions must be named. Below are examples of copying data into the new tables.

 **6.1      Writing to the RC Tables**  

The RC tables in Hive will have a significant performance increase over our original tables. In order to have the correct compression, we need to set that before we load the data. The commands here will write the data into the rc tables with the correct compression values.

    **6.1.1      Loading stations_rc**
set mapred.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set hive.default.rcfile.serde=org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;
set zlib.compress.level=BEST_SPEED;
set hive.exec.compress.output=true;
set mapred.output.compress=true;
INSERT INTO TABLE bikes.stations_rc partition(year=2014,month="nov")
SELECT `id`, `installdate`, `name`, `longitude`, `latitude`
FROM bikes.stations WHERE year=2014 AND month="nov";

    **6.1.2      Loading trips_rc**
set mapred.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set hive.default.rcfile.serde=org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;
set zlib.compress.level=BEST_SPEED;
set hive.exec.compress.output=true;
set mapred.output.compress=true;
INSERT INTO TABLE bikes.trips_rc partition(year=2014,month="dec")
SELECT `bike_nr`, `duration`, `start_date`, `start_station`, `end_station`
FROM bikes.trips WHERE year=2014 AND month="dec";

 **6.2      Writing to the ORC Tables**  

The process of putting data into the ORC tables is a little simpler, since the compression is set in the table properties. Since we are using partitioned tables, that part remains unchanged.

    **6.2.1      Loading stations_orc1**
INSERT INTO TABLE bikes.stations_orc1 partition(year=2014,month="nov") 
SELECT `id`, `installdate`, `name`, `longitude`, `latitude`
FROM bikes.stations WHERE year=2014 AND month="nov";

    **6.2.2      Loading trips_orc1**
INSERT INTO TABLE bikes.trips_orc1 partition(year=2014,month="dec") 
SELECT `bike_nr`, `duration`, `start_date`, `start_station`, `end_station`
FROM bikes.trips WHERE year=2014 AND month="dec";

    **6.2.3      Loading stations_orc2**
INSERT INTO TABLE bikes.stations_orc2 partition(year=2014,month="nov") 
SELECT `id`, `installdate`, `name`, `longitude`, `latitude`
FROM bikes.stations WHERE year=2014 AND month="nov";

    **6.2.4      Loading trips_orc2**
INSERT INTO TABLE bikes.trips_orc2 partition(year=2014,month="dec") 
SELECT `bike_nr`, `duration`, `start_date`, `start_station`, `end_station`
FROM bikes.trips WHERE year=2014 AND month="dec";

 **6.3      Hive Table Partitions**  

The examples above have partitions. Partitions are transparent for the most part, except for two areas. Specify the partition when you’re writing data into your tables. Make sure your partition is loaded into Hive when reading from the tables. Some quick scripting can get you past this without much effort.

7. COMPARING DATA COMPRESSION

OK…the tables are made… compression values set… table format specified…the data is loaded…what does it look like on disk?

Now that we have data in our tables, we can look at the effect of the compression settings on disk. Since I’m using a MapR Hadoop cluster, I have a read/write POSIX file system with native NFS on every node. I can use standard Linux commands to see how large my data set is on disk.

I also put each table in its own logical file system volume, so I can pull stats for each one of those over REST API, CLI or from the MapR Control System UI.

 **7.1      Pulling Data Size from the File System**  

Nothing complicated here. I ran a du command on each of the table directories.

du -h /mapr/my_cluster/data/hive/text

From that I pulled the values below:

From the data above, it looks like the table using ORC SNAPPY compression is the winner, but that may not be the not the case. If you remember, earlier we set up different file system compressions as well. The command above will not reflect that.

 **7.2      Pulling Data Size from the MapR Volumes**  

It looks like the SNAPPY compression in the ORC table worked best, but let’s look at the information from the MapR file system volumes. I used a simple cli command to pull my values. I’m pulling the logical space used, which should be close to the table above and the actual space used.

maprcli volume info -name hive_txt -columns logicalUsed,used

The results are…

It looks like the MapR file system zlib settings compressed well in the first ORC table group and the snappy compression in the second ORC table group landing right next to it, but there are always the questions of performance.

8. EXECUTING QUERY COMPARISONS

So far, we have created and loaded our Hive tables, and we have looked at the storage and compression options. Now we need to execute some queries and see what we see. This post is not the end all for everything Hive; the goal is just to get you started looking at your own data in your own cluster. With that in mind, I quickly picked a few queries that I thought were interesting, and used them to evaluate the settings I had configured in my tables.

 **8.1      The Queries**  

My query goals: I wanted dome counting, some joins, and some conversions, and some MapReduce jobs to run. With that in mind, this is what I ended up with:

    **8.1.1      Query 1: Joining My Trip and Station Info**  

This query joins my trip data with my station data, so I can get full information on where the trip started and where it ended.

SELECT a.start_date, a.bike_nr, a.duration, b.start_station_id, b.start_latitude, b.start_longitude, b.stat_station, c.end_station_id, c.end_latitude, c.end_longitude, c.end_station
FROM (SELECT duration, bike_nr, start_date, cast(split(trips_rc.start_station, '/')[4] as int) as start_station, cast(split(trips_rc.end_station, '/')[4] as int) as end_station FROM trips_rc) a
JOIN (SELECT id as start_station_id, longitude as start_longitude, latitude as start_latitude, name as stat_station FROM stations_rc) b ON a.start_station = b.start_station_id
JOIN (SELECT id as end_station_id,longitude as end_longitude, latitude as end_latitude, name as end_station FROM stations_rc) c ON a.end_station = c.end_station_id;

    **8.1.2      Query 2: Bike Utilization and Movement**  

This query just added up the time the bikes were in service, and added in the joins to get more info on the starting and ending stations.

SELECT a.bike_nr, sum(a.duration)
FROM (SELECT duration, gender, subscription_type, status, bike_nr, start_date, cast(split(trips_rc.start_station, '/')[4] as int) as start_station, cast(split(trips_rc.end_station, '/')[4] as int) as end_station FROM trips_rc) a
JOIN (SELECT id as start_station_id, longitude as start_longitude, latitude as start_latitude, name as stat_station FROM stations_rc) b ON a.start_station = b.start_station_id
JOIN (SELECT id as end_station_id,longitude as end_longitude, latitude as end_latitude, name as end_station FROM stations_rc) c ON a.end_station = c.end_station_id
WHERE a.duration > 0
GROUP BY a.bike_nr;

    **8.1.3      Query 3: Station A**  

This query takes a look at the amount of times a bike leaves a station compared to the amount of times one is returned to the station.

SELECT s.station_id as station_id, e.end_count as end_count, s.start_count as start_count FROM
(SELECT a.end_station as station_id, count(a.end_station) as end_count FROM (SELECT cast(split(trips_rc.end_station, '/')[4] as int) as end_station FROM trips_rc) a group by a.end_station) e
FULL JOIN
(SELECT b.start_station as station_id, count(b.start_station) as start_count FROM (SELECT cast(split(trips_rc.start_station, '/')[4] as int) as start_station FROM trips_rc ) b group by b.start_station) s
WHERE e.station_id = s.station_id;

 **8.2      Comparison Scripts**  

Now that I have my queries, it’s time to run them in a script. I wanted to run each query for each table multiple times and record the results. Using some of the methods in my earlier posts, POST, I put together the script below to execute each query a number of times to each table and log the results.

<font color="green">#!/bin/bash
# run_compare.sh

# this will print the usage statements and exit</font>
usage() {
    <font color="pink">case</font> $<font color="blue">1</font> <font color="pink">in</font>
        <font color="red">""</font>)
            echo <font color="red">""</font>
            echo <font color="red">"Usage: run_compare.sh [-l /my/log/file.txt] [-c run_count] [-h|--help]"</font>
            echo <font color="red">""</font>
            echo <font color="red">"  This is a quick example of comparing some hive queries to different tables  with bash"</font>
            echo <font color="red">"     The queries and tables are hard coded in the script"</font>
            echo <font color="red">""</font>
            echo <font color="red">"Params:"</font>
            echo <font color="red">"      -c|--count run_count: default is 10"</font>
            echo <font color="red">"      -h|--help: print this help info and exit"</font>
            echo <font color="red">"      -l|--logfile: default is run_compare.csv in execution dir"</font>
            echo <font color="red">"Examples:"</font>
            echo <font color="red">""</font>
            echo <font color="red">"        ./run_compare.sh -c 100 -l myfile.csv"</font>
            echo <font color="red">""</font>
            ;;

    <font color="pink">esac
    exit</font>
}

<font color="green"># this will process command line arguments enough
# to get you to a specific function</font>
args() {
    run_compare $@
}

run_compare() {
    <font color="green"># init params</font>
    database=<font color="red">"bikes"</font>
    table_entensions=<font color="red">"null rc orc1 orc2"</font>
    count=<font color="blue">10</font>
    logfile=<font color="red">"run_compare.csv"</font>
    row_count=<font color="red">""</font>
    start_time=<font color="red">""</font>
    end_time=<font color="red">""</font>
    my_value=<font color="red">""</font>
    my_query=<font color="red">""</font>
    name=<font color="red">""</font>

    <font color="green"># process args for this block</font>
    <font color="pink">while</font> test $# -gt <font color="blue">0</font>
    <font color="pink">do
        case</font> $<font color="blue">1</font> <font color="pink">in</font>
            -l|--logfile)
                <font color="pink">shift</font>
                logfile=$<font color="blue">1</font>
                ;;
            -c|--count)
                <font color="pink">shift</font>
                count=$<font color="blue">1</font>
                ;;
            -h|--help)
                usage pull_with_one_liner
                ;;
            *)
                echo >&<font color="blue">2</font> <font color="red">"Invalid argument: $1"</font>
                usage <font color="red">""</font>
                ;;
        <font color="pink">esac
        shift
    done</font>

    <font color="green"># clean out data from existing log file</font>
    cat /dev/null > $logfile

    <font color="green"># execute comparison for specified count</font>
    c=<font color="blue">0</font>
    <font color="pink">while</font> test $c -lt $count
    <font color="pink">do</font>
        let c++
        echo <font color="red">"running round $c"</font>
        <font color="pink">for</font> ext <font color="pink">in</font> $table_entensions; <font color="pink">do</font>
            <font color="pink">if</font> [ <font color="red">"$ext"</font> = <font color="red">"null"</font> ]; <font color="pink">then</font>
                ext=<font color="red">""</font>
                name=<font color="red">"text"</font>
            <font color="pink">else</font>
                name=$ext
                ext=<font color="red">"_$ext"</font>
            <font color="pink">fi</font>

            echo <font color="red">"round $c: table group $name"</font>

            <font color="green"># execute Query1</font>
            my_query_name=<font color="red">"Query1"</font>
            my_query=<font color="red">"set mapred.reduce.tasks=30;"</font>
            my_query=<font color="red">"$my_query use $database;"</font>
            my_query=<font color="red">"$my_query SELECT a.start_date, a.bike_nr, a.duration, b.start_station_id, b.start_latitude, b.start_longitude, b.stat_station, c.end_station_id, c.end_latitude, c.end_longitude, c.end_station"</font>
            my_query=<font color="red">"$my_query FROM (SELECT duration, bike_nr, start_date, cast(split(trips$ext.start_station, '/')[4] as int) as start_station, cast(split(trips$ext.end_station, '/')[4] as int) as end_station FROM trips$ext) a"</font>
            my_query=<font color="red">"$my_query JOIN (SELECT id as start_station_id, longitude as start_longitude, latitude as start_latitude, name as stat_station FROM stations$ext) b ON a.start_station = b.start_station_id"</font>
            my_query=<font color="red">"$my_query JOIN (SELECT id as end_station_id,longitude as end_longitude, latitude as end_latitude, name as end_station FROM stations$ext) c ON a.end_station = c.end_station_id"</font>

            start_time=<font color="red">`date "+%s"`</font>
            my_value=$(hive -S -e <font color="red">"$my_query"</font>)
            end_time=<font color="red">`date "+%s"`</font>
            r_count=<font color="red">`echo "$my_value"| wc -l`</font>
            log_it $logfile $name $start_time $end_time $my_query_name <font color="red">"$r_count" "$my_query"</font>

            <font color="green"># execute Query 2</font>
            my_query_name=<font color="red">"Query2"</font>
            my_query=<font color="red">"set mapred.reduce.tasks=30;"</font>
            my_query=<font color="red">"$my_query use $database;"</font>
            my_query=<font color="red">"$my_query SELECT a.bike_nr, sum(a.duration)"</font>
            my_query=<font color="red">"$my_query FROM (SELECT duration, gender, subscription_type, status, bike_nr, start_date, cast(split(trips$ext.start_station, '/')[4] as int) as start_station, cast(split(trips$ext.end_station, '/')[4] as int) as end_station FROM trips$ext) a"</font>
            my_query=<font color="red">"$my_query JOIN (SELECT id as start_station_id, longitude as start_longitude, latitude as start_latitude, name as stat_station FROM stations$ext) b ON a.start_station = b.start_station_id"</font>
            my_query=<font color="red">"$my_query JOIN (SELECT id as end_station_id,longitude as end_longitude, latitude as end_latitude, name as end_station FROM stations$ext) c ON a.end_station = c.end_station_id"</font>
            my_query=<font color="red">"$my_query WHERE a.duration > 0"</font>
            my_query=<font color="red">"$my_query GROUP BY a.bike_nr"</font>

            start_time=<font color="red">`date "+%s"`</font>
            my_value=$(hive -S -e <font color="red">"$my_query"</font>)
            <font color="green">#my_value="MyValue"</font>
            end_time=<font color="red">`date "+%s"`</font>
            r_count=<font color="red">`echo "$my_value"| wc -l`</font>
            log_it $logfile $name $start_time $end_time $my_query_name <font color="red">"$r_count" "$my_query"</font>

            <font color="green"># execute Query 3</font>
            my_query_name=<font color="red">"Query3"</font>
            my_query=<font color="red">"set mapred.reduce.tasks=30;"</font>
            my_query=<font color="red">"$my_query use $database;"</font>
            my_query=<font color="red">"$my_query SELECT s.station_id as station_id, e.end_count as end_count, s.start_count as start_count FROM"</font>
            my_query=<font color="red">"$my_query (SELECT a.end_station as station_id, count(a.end_station) as end_count FROM (SELECT cast(split(trips$ext.end_station, '/')[4] as int) as end_station FROM trips$ext) a group by a.end_station) e"</font>
            my_query=<font color="red">"$my_query FULL JOIN"</font>
            my_query=<font color="red">"$my_query (SELECT b.start_station as station_id, count(b.start_station) as start_count FROM (SELECT cast(split(trips$ext.start_station, '/')[4] as int) as start_station FROM trips$ext ) b group by b.start_station) s"</font>
            my_query=<font color="red">"$my_query WHERE e.station_id = s.station_id"</font>

            start_time=<font color="red">`date "+%s"`</font>
            my_value=$(hive -S -e <font color="red">"$my_query"</font>)
            end_time=<font color="red">`date "+%s"`</font>
            r_count=<font color="red">`echo "$my_value"| wc -l`</font>
            log_it $logfile $name $start_time $end_time $my_query_name <font color="red">"$r_count" "$my_query"</font>

        <font color="pink">done
    done
    exit</font>
}

<font color="green"># pass in logfile, start, end, query_name, result_count, query
# count result set, and log the data to csv file</font>
log_it() {
    log_file=$<font color="blue">1</font>
    n=$<font color="blue">2</font>
    start_t=$<font color="blue">3</font>
    end_t=$<font color="blue">4</font>
    q_name=$<font color="blue">5</font>
    result_count=$<font color="blue">6</font>
    q=$<font color="blue">7</font>

    let duration=$end_t-$start_t
    time_run=<font color="red">`date -d @$start_t`</font>
    echo <font color="red">"$n,$q_name,\"$time_run\",$duration,$result_count,\"$q\""</font> >> $log_file
}

<font color="green"># -------------------------------------------------------------------------------------
# Beginning of script execution
#</font>

args $@

9. WHAT PERFORMANCE DID WE GET?

My goal here is to help anyone new to the game of SQL on Hadoop, as they step through what can be done with their data. I even tweaked some of the queries and ran them against Impala and Apache Drill, since they were all running in my cluster. I ran iterations in YARN as well as MRv1. Below are the averages for performance times for the little set up I ran. Hopefully this will help you in kick-starting your thoughts on running this with your data.

After running 20 iterations on this data set, the results are…

In the compression results you can see that using the ORC format saved space just by the nature of its architecture. Adding in the MapR file compression took it the rest of the way, but the snappy compression that ORC uses also works well. When you look at the RC tables compressed with gzip, you can see a performance increase and significant space savings. For this data set, the combination that performed the best was the ORC table, where the MapR file system handled the compression but the difference was close enough that you could go with either option.

When comparing the response from the Hive data to Impala or Apache Drill, you can start to see where things are going with SQL on Hadoop. Just looking at the query that took the longest in Hive yields significant improvements in my responses by moving to services that do in-memory querying without needing MapReduce. Still, these gains are not free, and for queries where you don’t mind the wait, Hive works well.

10. CLOSING THOUGHTS

The journey into Hive and Hadoop has been an eye opener for me, and has helped me re-think how I can deal with data. Using MapR has helped make the process enjoyable. From a data science perspective, I like having the options of running the Hadoop tools and even pulling in legacy scripts that have no knowledge of Hadoop. From a data engineering perspective, I like the read/write file system with standard Linux capabilities. Just being able to edit the data where it sits has saved me a large amount of time. From an IT perspective, I like the ability to run all my Hadoop projects in one cluster. As things grow, the level of effort needed to monitor and maintain the gear stays fairly constant.

As SQL on Hadoop and Hive continues to evolve, it is beneficial to re-look at what you are doing or what you may have done in the past. Being able to look at all the capabilities of Hive as well as all the other SQL possibilities on Hadoop in one cluster just makes life so much simpler as you go from project to production. The point here is not to go with the results that I found with this data set, but to jump into your own data and see where the tables take you.

Good luck,
Jimmy Bates

 

Editor's note: Article originally posted in the Converge Blog on December 22, 2014

 

Related

Quick Tips For Using The Hive Shell Inside Scripts

Apache Hive

by Sandra Wagner

 

Pentaho Data Integration (PDI) provides the ETL capabilities that facilitate the process of capturing, cleansing, and storing data. Its uniform and consistent format makes it accessible and relevant to end-users and IoT technologies.

Apache Drill is a schema-free SQL-on-Hadoop engine that lets you run SQL queries against different data sets with various formats, e.g. JSON, CSV, Parquet, HBase, etc. By integrating it with PDI, you have the flexibility to do serious data integration work through Pentaho’s powerful PDI product. The Drill Tutorials pages in MapR’s documentation can help you get familiar with Apache Drill.

STEP 1

You’ll need administrator permissions in order to do these steps. Make sure that you meet the following software requirements:

  • MapR Converged Data Platform (version 4.x or 5.x)
  • Apache Drill (version 1.6 or later) along with the latest Drill JDBC driver
  • Apache ZooKeeper (running in replicated mode)
  • Pentaho Data Integration (version 5.4 or later)

You should also make sure that the PDI client system can resolve the hostnames on your Drill cluster before you get started.

STEP 2

The first thing you’ll have to do is get the Drill cluster ID and construct a custom URL string. This will be something that we’ll be using a bit later to make the JDBC connection through PDI.

  1. Getting the Drill cluster ID isn’t too bad: all you have to do is go to the query page in the Drill interface and run an SQL query like this:
select string_val from sys.boot where name ='drill.exec.cluster-id';
  1. Apache Drill will return your cluster ID. Once you have that, you can make a custom URL that will end up looking something like this:
jdbc:drill:zk=mapr1:5181,mapr2:5181,mapr3:5181/drill/**WhateverYourClusterIDIs**

STEP 3

Once you have your custom URL string, follow these steps to make the connection to PDI:

  1. Open PDI, start a new Transformation, then click on the View tab in the far left.
  2. Expand Transformation 1, then right-click on Database connections and select New.

  1. Do the following in the Database Connection window:
    1. Name the connection. We are using Drill as the Connection Name in our example below.
    2. Select Generic Database for your database type and Native JDBC for your access type.
    3. Under Settings on the right, copy and paste your Custom Connection URL.
    4. Enter the Custom Driver Class Name.
    5. Leave the username and password fields empty for now.

  1. Click Test to verify the connection. You should see a connection success window pop up.

TROUBLESHOOTING TIPS

On the off-chance that your connection test doesn’t work, try verifying that your Custom URL string is correct, and make sure your hosts file for the PDI client can resolve the private hostnames of the cluster.

SUMMARY

By the time you get to the end of this process, you should have successfully connected your Pentaho Data Integration client to your MapR cluster using Apache Drill. Have fun with your data!

 

Editor's note: Blog post originally shared in Converge Blog on Jan 04, 2017

RELATED

Apache Drill

BY Nicolas A Perez

This blog post is the result of my efforts to show to a coworker how to get the insights he needed by using the streaming capabilities and concise API of Apache Spark. In this blog post, you'll learn how to do some simple, yet very interesting analytics that will help you solve real problems by analyzing specific areas of a social network.

Using a subset of a Twitter stream was the perfect choice to use in this demonstration, since it had everything we needed: an endless and continuous data source that was ready to be explored.

SPARK STREAMING, MINIMIZED

Spark Streaming is very well explained here and in chapter 6 of the ebook "Getting Started with Apache Spark," so we are going to skip some of the details about the Streaming API and move on to setting up our app.

SETTING UP OUR APP

Let’s see how to prepare our app before doing anything else.

val config = new SparkConf().setAppName("twitter-stream-sentiment") val sc = new SparkContext(config) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds(5))  System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")  System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")  System.setProperty("twitter4j.oauth.accessToken", accessToken)  System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret") val stream = TwitterUtils.createStream(ssc, None)

Here we have created the Spark Context sc, and set the log level to WARN to eliminate the noisy log Spark generates. We also created a Streaming Context ssc using sc. Then we set up our Twitter credentials (before doing this we needed to follow these steps) that we got from the Twitter website. Now the real fun starts.

WHAT IS TRENDING RIGHT NOW ON TWITTER?

It is easy to find out what is trending on Twitter at any given moment; it is just a matter of counting the appearances of each tag on the stream. Let’s see how Spark allows us to do this operation.

 

val config = new SparkConf().setAppName("twitter-stream-sentiment") val sc = new SparkContext(config) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds(5))  System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")  System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")  System.setProperty("twitter4j.oauth.accessToken", accessToken)  System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret") val stream = TwitterUtils.createStream(ssc, None)

First, we got the tags from the Tweets, counted how many times it (a tag) appeared, and sorted them by the count. After that, we persisted the result in order to point Splunk (or any other tool for that matter) to it. We could build some interesting dashboards using this information in order to track the most trending hashtags. Based on this information, my coworker could create campaigns and use these popular tags to attract a bigger audience.

ANALYZING TWEETS

Now we want to add functionality to get an overall opinion of what people think about a set of topics. For the sake of this example, let’s say that we want to know the sentiment of Tweets about Big Data and Food, two very unrelated topics.

There are several APIs for analyzing sentiments from Tweets, but we are going to use an interesting library from The Stanford Natural Language Processing Group in order extract the corresponding sentiments.

In our build.sbt file we need to add the corresponding dependencies.

libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"classifier "models"

 

Now, we need to select only those Tweets we really care about by filtering the stream using certain hashtag (#). This filtering is quite easy, thanks to a unified Spark API.

Let’s see how.

val tweets = stream.filter {t => val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase) tags.contains("#bigdata") && tags.contains("#food") }

 

Here, we get all tags in each Tweet, checking that it has been tagged with#bigdata and #food.

Once we have our Tweets, extracting the corresponding sentiment is quite easy. Let’s define a function that extracts the sentiment from the Tweet’s content so we can plug it in in our pipeline.

 

def detectSentiment(message: String): SENTIMENT_TYPE

We are going to use this function, assuming it does what it should, and we will put its implementation at the end, since it's not the focus of this post. In order to get an idea of how it works, let's build some tests around it.

it("should detect not understood sentiment") { detectSentiment("")should equal (NOT_UNDERSTOOD)  } it("should detect a negative sentiment") { detectSentiment("I am feeling very sad and frustrated.")should equal (NEGATIVE)  } it("should detect a neutral sentiment") { detectSentiment("I'm watching a movie")should equal (NEUTRAL)  } it("should detect a positive sentiment") { detectSentiment("It was a nice experience.")should equal (POSITIVE)  } it("should detect a very positive sentiment") { detectSentiment("It was a very nice experience.")should equal (VERY_POSITIVE) }

 

These tests should be enough to show how detectSentiment works.

Let’s see an example.

val data = tweets.map {status => val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText) val tags = status.getHashtagEntities.map(_.getText.toLowerCase) (status.getText, sentiment.toString, tags) }

 

data represents a DStream of Tweets we want, the associated sentiment, and the hashtags within the Tweet (here we should find the tags we used to filter).

SQL INTEROPERABILITY

Now we want to cross reference the sentiment data with an external dataset that we can query using SQL. For my coworker, it makes a lot of sense to be able to join the Twitter stream with his other dataset.

Let’s take a look at how we could achieve this.

val sqlContext = new SQLContext(sc)  import sqlContext.implicits._ 
data.foreachRDD {rdd => rdd.toDF().registerTempTable("sentiments") }

We have transformed our stream into a different representation (a DataFrame), which is also backed by all Spark concepts (resilient, distributed, very fast) and exposed it as a table so my coworker can use his beloved SQL to query different sources.

The table sentiment (that we defined from our DataFrame) will be queried as any other table in his system. Another possibility is that we could query other data sources (Cassandra, Xmls, or our own binary formatted files) using Spark SQL and cross them with the stream.

You can find out more information about this topic here and here.

An example of querying a DataFrame is shown next.

sqlContext.sql("select * from sentiments").show()

 

WINDOWED OPERATIONS

Spark Streaming has the ability to look back in the stream, a functionality most streaming engines lack (if they do have this functionality, it's very hard to implement).

In order to implement a windowed operation, you'll need to checkpoint the stream, but this is an easy task. You'll find more information about this here.

Here's a small example of this kind of operation:

tags  
   .window(Minutes(1)) . (...)

 

CONCLUSION

Even though our examples are quite simple, we were able to solve a real life problem using Spark. We now have the ability to identify trending topics on Twitter, which helps us both target and increase our audience. At the same time, we are able to access different data sets using a single set of tools such as SQL.

Very interesting results came back from #bigdata and #food at the same time. Perhaps people Tweet about big data at lunch time—who knows?

Editor's note: Content originally posted at Converge Blog and Medium

RELATED

Apache Spark

Twitter Analytics with Apache Drill and MicroStrategy

[Book Discussion] - Getting Started with Apache Spark

Real-Time Streaming Data Pipelines With Apache Apis: Kafka, Spark Streaming, And Hbase

MapR-Streams

streaming

     by Nicolas A Perez

 

An important part of any application is the underlying log system we incorporate into it. Logs are not only for debugging and traceability, but also for business intelligence. Building a robust logging system within our apps could be use as a great insights of the business problems we are solving.

Log4j in Apache Spark

Spark uses log4j as the standard library for its own logging. Everything that happens inside Spark gets logged to the shell console and to the configured underlying storage. Spark also provides a template for app writers so we could use the same log4j libraries to add whatever messages we want to the existing and in place implementation of logging in Spark.

Configuring Log4j

Under the SPARK_HOME/conf folder, there is log4j.properties.template file which serves as an starting point for our own logging system.

Based on this file, we created the log4j.properties file and put it under the same directory.

log4j.properties looks like follows:

log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c - %m%n

log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppender.File=/var/log/spark.log
log4j.appender.RollingAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n

log4j.appender.RollingAppenderU=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppenderU.File=/var/log/sparkU.log
log4j.appender.RollingAppenderU.DatePattern='.'yyyy-MM-dd
log4j.appender.RollingAppenderU.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppenderU.layout.ConversionPattern=[%p] %d %c %M - %m%n


# By default, everything goes to console and file
log4j.rootLogger=INFO, RollingAppender, myConsoleAppender

# My custom logging goes to another file
log4j.logger.myLogger=INFO, RollingAppenderU

# The noisier spark logs go to file only
log4j.logger.spark.storage=INFO, RollingAppender
log4j.additivity.spark.storage=false
log4j.logger.spark.scheduler=INFO, RollingAppender
log4j.additivity.spark.scheduler=false
log4j.logger.spark.CacheTracker=INFO, RollingAppender
log4j.additivity.spark.CacheTracker=false
log4j.logger.spark.CacheTrackerActor=INFO, RollingAppender
log4j.additivity.spark.CacheTrackerActor=false
log4j.logger.spark.MapOutputTrackerActor=INFO, RollingAppender
log4j.additivity.spark.MapOutputTrackerActor=false
log4j.logger.spark.MapOutputTracker=INFO, RollingAppender
log4j.additivty.spark.MapOutputTracker=false

Basically, we want to hide all logs Spark generates so we don’t have to deal with them in the shell. We redirect them to be logged in the file system. On the other hand, we want our own logs to be logged in the shell and a separated file so they don’t get mixed up with the ones from Spark. From here, we will point Splunk to the files where our own logs are which in this particular case is /var/log/sparkU.log.

This (log4j.properties) file is picked up by Spark when the application starts so we don’t have to do anything aside of placing it in the mentioned location.

Writing Our Own Logs

Now that we have configured the components that Spark requires in order to manage our logs, we just need to start writing logs within our apps.

In order to show how this is done, let’s write a small app that helps us in the demonstration.

Our App:

object app {
  def main(args: Array[String]) {

    val log = LogManager.getRootLogger
    log.setLevel(Level.WARN)

    val conf = new SparkConf().setAppName("demo-app")
    val sc = new SparkContext(conf)

    log.warn("Hello demo")

    val data = sc.parallelize(1 to 100000)

    log.warn("I am done")
  }
}

Running this Spark app will demonstrate that our log system works. We will be able to see how Hello demo and I am done messages being logged in the shell and in the file system while the Spark logs will only go to the file system.

So far, everything seems easy, yet there is a problem we haven’t mentioned.

The class org.apache.log4j.Logger is not serializable which implies we cannot use it inside a closure while doing operations on some parts of the Spark API.

For example, if we do in our app:

val log = LogManager.getRootLogger
val data = sc.parallelize(1 to 100000)

data.map { value =>
    log.info(value)
    value.toString
}

this will fail when running on Spark. Spark complaints that the log object is not Serializable so it cannot be sent over the network to the Spark workers.

This problem is actually easy to solve. Let’s create a class that does something to our data set while doing a lot of logging.

 

class Mapper(n: Int) extends Serializable{
  @transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")

  def doSomeMappingOnDataSetAndLogIt(rdd: RDD[Int]): RDD[String] =
    rdd.map{ i =>
      log.warn("mapping: " + i)
      (i + n).toString
    }
}

Mapper receives a RDD[Int] and returns a RDD[String] and it also logs what value its being mapped. In this case, noted how the log object has been marked as @transient which allows the serialization system to ignore the log object. Now, Mapper is being serialized and sent to each worker but the log object is being resolved when it is needed in the worker, solving our problem.

Another solution is to wrap the log object into a object construct and use it all over the place. We rather have log within the class we are going to use it, but the alternative is also valid.

At this point, our entire app looks like follows:

import org.apache.log4j.{Level, LogManager, PropertyConfigurator}
import org.apache.spark._
import org.apache.spark.rdd.RDD

class Mapper(n: Int) extends Serializable{
  @transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")

  def doSomeMappingOnDataSetAndLogIt(rdd: RDD[Int]): RDD[String] =
    rdd.map{ i =>
      log.warn("mapping: " + i)
      (i + n).toString
    }
}

object Mapper {
  def apply(n: Int): Mapper = new Mapper(n)
}

object app {
  def main(args: Array[String]) {
    val log = LogManager.getRootLogger
    log.setLevel(Level.WARN)

    val conf = new SparkConf().setAppName("demo-app")
    val sc = new SparkContext(conf)

    log.warn("Hello demo")

    val data = sc.parallelize(1 to 100000)

    val mapper = Mapper(1)

    val other = mapper.doSomeMappingOnDataSetAndLogIt(data)

    other.collect()

    log.warn("I am done")
  }
}

Conclusions

Our logs are now being shown in the shell and also stored in their own files. Spark logs are being hidden from the shell and being logged into their own file. We also solved the serialization problem that appears when trying to log in different workers.

We now can build more robust BI systems based on our own Spark logs as we do with other non distributed systems and applications we have today. Business Intelligence is for us a very big deal and having the right insights is always nice to have.


Editor's note: This Post was originally posted in hackermoon.com

 

 

RELATED

Apache Spark

Live Demo: Apache Spark on MapR with MLlib

Real-Time User Profiles with Spark, Drill and MapR-DB

by Vince Gonzalez


MapR-FS provides some very useful capabilities for data management and access control. These features can and should be applied to user home directories.

A user in a MapR cluster has a lot of capability at their fingertips. They can create files, two styles of NoSQL tables, and pub/sub messaging streams with many thousands of topics. They can also run MapReduce or Spark jobs, or run just about any program against the file system using the POSIX capability of the cluster. With all the stuff they can do, there's bound to be a lot of data getting stored, and it's a good idea to keep tabs on that so it doesn't get out of control.

Let's look at how we can apply MapR's data management features to user home directories hosted in a MapR cluster.

FOLLOWING ALONG


If you want to follow along, all you need is a MapR cluster or single-node sandbox running MapR 5.1.0 or later.

All of what I'll demonstrate here can be done on any license level of MapR, from Community Edition to Enterprise.

WHAT WE'LL DO


We'll use MapR-FS volumes for our user home directories. Volumes are a unit of data management, and for user home directories, can do the following for you:

LET'S DO IT


Jumping right in, we'll run a few maprcli commands. I'll explain these in a minute.

 

# The "type" argument 0 means "user"
maprcli entity modify \
  -name vince \
  -type 0 \
  -email vgonzalez@maprtech.com \
  -quota 2T \
  -advisoryquota 1T

maprcli volume create \
  -path /user/vince \
  -name home.vince \
  -quota 300M \
  -advisoryquota 200M \
  -ae vince \
  -readAce u:vince \
  -writeAce u:vince

hadoop fs -chown vince:vince /user/vince

A few things happened here.

First, we set a quota for the "accountable entity" named vince. The accountable entity quota gives us a way to constrain the space used by a user across volumes, and also a way to account for the number of volumes and the total space consumed across them.

Consider a scenario in which a user has multiple volumes, such as a basic home directory and a workspace for an application the he's developing. The accountable entity gives the cluster admins a convenient way to sum up all the usage of the volumes provisioned to that entity.

Next I create a volume for user vince. The volume has a quota of 300MB, which means the volume will stop accepting writes once it has 300MB of data. This is a hard quota.

The volume also has an advisory quota of 200MB. More about quotas in a bit.

By convention, we mount the volume at the path /user/<username> and name it home.<username>. This makes it easy to filter when dealing with large number of volumes.

The readAce and writeAce options create access control expressions (ACEs) on the volume.

Volume ACEs are very useful as a way to limit access to data in the volume. Regardless of what permissions a user sets on files within the volume, users who do not match the ACE are denied access. Since only a user with administrative privileges can modify volumes, this is a good way to prevent inadvertent data sharing.

We used the -ae option to set the "accountable entity" so that this volume is counted toward user vince's entity quota.

Finally, we set the owner of the mount point of the newly created volume. This illustrates a subtle point. Volume ACEs don't have anything to do with the POSIX permissions of the data in the volume; they only govern access to the volume and the data contained in it. So we need to make sure that ownership information is set correctly on the top level directory so that the users can actually use the volume.

MORE ABOUT QUOTAS


We set quotas on the volume, and these will trigger an alarm if exceeded. For instance, if I write more data than is allowed by my hard quota, I'll see an alarm like the following:

$ maprcli alarm list -entity home.vince -json
{
    "timestamp":1465937565435,
    "timeofday":"2016-06-14 01:52:45.435 GMT-0700",
    "status":"OK",
    "total":2,
    "data":[
        {
            "entity":"home.vince",
            "alarm name":"VOLUME_ALARM_QUOTA_EXCEEDED",
            "alarm state":1,
            "alarm statechange time":1465937455432,
            "description":"Volume usage exceeded quota. Used: 371 MB Quota : 300 MB"
        },
        {
            "entity":"home.vince",
            "alarm name":"VOLUME_ALARM_ADVISORY_QUOTA_EXCEEDED",
            "alarm state":1,
            "alarm statechange time":1465937449424,
            "description":"Volume usage exceeded advisory quota. Used: 202 MB Advisory Quota : 200 MB"
        }
    ]
}

These alarms will also be surfaced in the MCS, both in the main alarms panel of the dashboard page:

And also in the volume list, where the actual usage will be highlighted in bold red text:

GETTING USAGE FOR YOUR ENTITY (USER)


Having applied quotas to the user home volume and to the accountable entity, we can get the usage information.

First, let's get the entity info for vince:

 

maprcli entity info -name vince -json
{
    "timestamp":1465938582023,
    "timeofday":"2016-06-14 02:09:42.023 GMT-0700",
    "status":"OK",
    "total":1,
    "data":[
        {
            "EntityType":0,
            "EntityName":"vince",
            "VolumeCount":2,
            "EntityQuota":2097152,
            "EntityAdvisoryquota":1048576,
            "DiskUsage":444,
            "EntityEmail":"vgonzalez@maprtech.com",
            "EntityId":2005
        }
    ]
}

We can see here that user vince has two volumes ("VolumeCount":2) and these volumes are consuming 444MB of storage. In the example above, we can see that this exceeds the volume quota, but does not exceed the entity quota, which is much higher. This gives you a lot of flexibility to manage space usage.

WAIT, WHAT?


Did you notice that the disk usage exceeded the quota by a large amount? How did that happen if I had a hard quota of 300MB set?

It's because for the purposes of the example, I wrote some data, then set the quota to a value much lower to trigger the alarm, so I could take the screenshot. This illustrates that the quotas we apply to a volume can be adjusted over time to allow more space usage, or less.

Nice catch, by the way!

A NOTE ON VOLUME ACES


Since volumes can be created fairly liberally (MapR-FS supports many thousands of them in a single cluster), it's a great idea to use volumes liberally to organize and account for your data.

If you're organizing your data into volumes you can use ACEs to govern access to the data in the volumes. So if we're organizing things well, we can probably use volume ACEs as the primary access control mechanism for a dataset, which will allow us to use file and directory level ACEs only when absolutely necessary.

While file and directory ACEs are a great tool, you should consider using volume ACEs first, then only applying directory and file ACEs as needed.

SETTING DEFAULT QUOTAS

You can also set default quotas. On the command line, you can issue the following command to set a 1TB user quota and a 10TB group quota:

    maprcli config save -values '{"mapr.quota.user.default":"1T","mapr.quota.group.default":"10T"}'

Now, when you create a volume for a user and specify an accountable entity, they'll automatically be subject to the default entity quota, unless you change it.

As an example, if we create user fred's home, and immediately show the entity information, we see that Fred's got an entity quota of 1048576MB, or 1TB

maprcli volume create -path /user/fred -name home.fred -ae fred

maprcli entity info -name fred
EntityType  EntityId  EntityName  EntityAdvisoryquota  DiskUsage  VolumeCount  EntityQuota
0           2007      fred        0                    0          1            1048576

CONCLUSION


So a few points in summary.

  1. You should be using volumes to organize your data. Creating them for each user, unless you have many tens of thousands of users, is fully within your cluster's capability.
  2. Apply quotas (advisory and/or hard) to your entities, like users. This will fire alarms when the quotas are exceeded, helping you avoid capacity problems due to runaway jobs or "overenthusiastic" users.
  3. Apply ACEs judiciously, starting from the volume level. If data is being organized by volume, you can control access to data through volume-level ACEs. You can then apply file/directory level ACEs sparingly as required.
  4. Finally, it's a good idea to automate this sort of thing. Since it's only a few steps, a simple script should suffice.

I hope this helps you manage space in your cluster more effectively!

Editor's note: Content Originally posted in MapR Converge Blog post on June 27, 2016

 

RELATED

MapR-FS

MapR Security

MapR

Perform the following initial configuration steps for Drill deployments on MapR to optimize the initial installation. This configuration can be adjusted and modified over time as needed, but these steps are meant to provide a good starting point.

 

Note: This list is not for Drill on YARN.

 

 

1 - Drill Query Profile and Log Locations

1.1 Drill Query Profile Files

The MapR Converged Data Platform (MCDP) provides a reliable and scalable POSIX-compliant distributed file system (MapR-FS) that can handle large volumes of files efficiently, making it a great choice to store the Drill query profiles. By placing the profile files on MapR-FS, the profiles will be available to all Drill nodes in the cluster and thus viewable in the WebGUI from any Drill node. In addition, the profiles will be protected from a node failure.

 

As an additional bonus, the JSON profile files can then be queried by Drill, which is a good resource to view top users, top queries, what types of queries are being executed, and also which users were executing the queries. It makes it useful for system administration and auditing purposes.

 

We recommend that the query profiles be stored in the /users/mapr/drill directory on MapR-FS. To do so, follow these steps:

  • Edit the drill-override.conf file in the /opt/mapr/drill/<drill-version>/conf directory on all Drill nodes. Note: you can edit the file on one node and use a cluster tool like clush to copy it to all the other nodes.
  • Add the following line to the configuration file:
    • sys.store.provider.zk.blobroot: "maprfs:///user/mapr/drill"
  • Restart all Drill nodes (Drillbits) in the cluster.

 

Below is an example configuration file:

 

drill.exec: {
 cluster-id: "drilldev-drillbits",
 zk.connect: "drilldev:5181",
 sys.store.provider.zk.blobroot: "maprfs:///user/mapr/drill",
 impersonation: {
    enabled: true,
    max_chained_user_hops: 3
 },
  security.user.auth {
        enabled: true,
        packages += "org.apache.drill.exec.rpc.user.security",
        impl: "pam",
        pam_profiles: [ "login" ]
  }
}

1.2 Drill Log Files

Similar to the profile files, the log files can also be stored on MapR-FS for resiliency and convenience, since all Drillbit log files will be together in one location. To be able to do this, MapR Enterprise Edition is required with loopback NFS enabled on all the Drill nodes in the cluster.

 

Using loopback NFS on MapR-FS also makes it much easier to read and work with log files using standard Linux tools on the distributed file system. We recommend adding the Drill node hostname to the filename to make it easier to identify which node generated the log files. Follow these steps:

 

  • Create the drill logs directory in MapR-FS.
    • Hadoop fs -mkdir /user/mapr/drill/logs
  • Edit the logback.xml file in the /opt/mapr/drill/<drill-version>/conf directory on all Drill nodes. Note: you can edit the file on one node and use a cluster tool like clush to copy it to all the other nodes.
  • Edit the following appender section lines in the logback file. Note: that "insert cluster name" means that you should actually enter the cluster name.
    • <file>/mapr/<insert cluster name here>/user/mapr/drill/logs/drillbit_${HOSTNAME}.log</file>
    • <fileNamePattern>/mapr/<insert cluster name here>/user/mapr/drill/logs/drillbit_${HOSTNAME}.log.%i</fileNamePattern>
  • Restart all Drill nodes (Drillbits) in the cluster.

 

Example logback.xml section:

 

  <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
     <file>/mapr/drilldev/user/mapr/drill/logs/drillbit_${HOSTNAME}.log</file>
     <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
       <fileNamePattern>/mapr/drilldev/user/mapr/drill/logs/drillbit_${HOSTNAME}.log.%i</fileNamePattern>
       <minIndex>1</minIndex>
       <maxIndex>10</maxIndex>
     </rollingPolicy>

     <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
       <maxFileSize>100MB</maxFileSize>
     </triggeringPolicy>
     <encoder>
       <pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
     </encoder>
   </appender>

 

1.3 Manage Archival and Retention of Profile and Log Files

After configuring the profile and log files to be stored centrally on MapR-FS, it is important to consider the retention and archival policies.

 

Log Files

The log files by default will create a rolling file policy of up to 10 log files, each being 100MB for each Drillbit in the cluster. This amount can be increased by setting the file size larger or smaller as well as increasing or decreasing the number of log files in the logback.xml file, mentioned in section 1.2. Simply alter the maxIndex and maxFileSize values for all Drillbit logback.xml files and restart the Drillbits.

 

Query Profile Files

The query profiles may be best to archive for future analysis and auditing purposes. Pending the activity of the cluster, it may be best to archive the profiles on a daily basis. Keep in mind that when the files are moved from the profile location, they will no longer be visible in the Drill WebUI profile page, but still will be available for the system administrator in MapR-FS. This step will speed up the view in the WebUI for the latest queries that were run but require MapR-FS access for archived queries.

 

It is recommended that the profiles be stored in a date-based subdirectory structure, as it will allow analysis with Drill (or other tools) of the JSON file and the ability to prune directories based on date.

 

Below is a simple Linux script that utilizes the MapR NFS loopback to move files from the default profile location to an archived subdirectory location in the profiles directory on MapR-FS.

 

#!/bin/bash
# create new sub directory with structure /yyyy/mm/dd for files
   year=$(date +"%Y")
   month=$(date +"%m")
   day=$(date +"%d")
   newdir=${year}/${month}/${day}/
   mkdir -p ./profiles/${newdir}
# move files from base directory to structured archival sub directory
for file in $(find ./profiles -maxdepth 1 -name *.drill)
do
   newfile=${file:11}
   mv ${file} ./profiles/${newdir}/${newfile}
done

 

2 - Drill Spill Locations

Drill can spill data to disk when operators exceed the Drill memory available on nodes. Drill by default will use the local host filesystem /tmp as the spill location, which is limited in space and often performance.

 

By creating and using MapR-FS local volume on each node (that is not replicated to other nodes), the Drill spill to disk operation has a larger storage space available and also better performance, since typically a MapR cluster node will have more storage devices mapped to a local volume, compared to the local OS /tmp space. To utilize these benefits on MapR-FS for Drill spill data, follow these steps.

 

2.1 Create Local MapR-FS Volumes for Drill Spill Data

Either create MapR-FS local volume on each node with replication 1 manually or utilize the script below that will check if drill spill volumes are on nodes, and then create them, if not. Please modify the script as needed for the cluster environment.

#!/bin/bash

for node in $(maprcli node list -columns hn|awk  {'print $1'} | grep -v hostname);do

  volumetest="$(maprcli volume list -filter [p=="/var/mapr/local/${node}/drillspill"] -output terse| awk '/a/{++cnt} END {print cnt}')"
#   echo $volumetest

if [ $volumetest > 0 ]; then
 echo " volume exists: /var/mapr/local/${node}/drillspill"

else
 echo " volume doesn't exist: /var/mapr/local/${node}/drillspill"
 echo " creating volume: /var/mapr/local/${node}/drillspill"

 maprcli volume create \
 -name mapr.${node}.local.drillspill \
 -path /var/mapr/local/${node}/drillspill \
 -replication 1 \
 -localvolumehost ${node}

fi
done

 

2.2 Configure Drill on the Nodes to Utilize the Local Volumes for Spill

Use the same steps as in section one to update the Drill configuration files in a cluster.

Edit the drill-env.sh, and add the following lines to the bottom.

 

node=$(maprcli node list -columns hn|awk  {'print $1'} | grep -v hostname)
spilloc="/var/mapr/local/${node}/drillspill"
export drill_spilloc=$spilloc"

Edit the drill-override.conf file and add these lines.
sort.external.spill.directories: [ ${drill_spilloc} ],
sort.external.spill.fs: "maprfs:///",

 

Restart all the Drill nodes (Drillbits) in the cluster.

3 - Drill Resource Configuration on a MapR Cluster

The MapR Converged Data Platform is designed to support multiple workloads, of which Drill is one, on the same cluster.

 

3.1 MapR Topology and Drill Nodes in the Cluster

MapR provides the ability to configure a cluster in topologies for nodes and volumes. This means that Drill can be configured on either all the nodes in a cluster or only certain node topologies. MapR similarly supports Volume Topologies for Data. In most cases, it is recommended to deploy Drill on all the nodes where the Data Volumes are located that Drill will need to access.

 

For more information on MapR Topology configuration, see:

http://maprdocs.mapr.com/home/AdministratorGuide/Setting-Up-Topology.html

 

3.2 Drillbit Resource Configuration

In many cases, Drill will be deployed with other applications on the same MapR nodes. In these cases, it is important to clearly understand how much of the node resources will be available to Drill. Keep in mind the nodes will require resources for MapR core components, other EcoSystem components, and additional applications that may run on the nodes and the OS.

 

Once there is a clear picture of which nodes in the cluster will be running Drill and how much of the resources on these nodes can be allocated to Drill, the configuration can be done. In general, it is best to deploy Drillbits with a homogenous resource configuration on all nodes.

 

3.2.1 Drill CPU Resource Configuration

Drill CPU consumption is mostly controlled by two configuration settings.

 

planner.width.max_per_node: This setting is used to control the maximum number of parallel threads (minor fragments) per Drill operator (major fragment) on a node. Keep in mind that Drill can be executing multiple major fragments at the same time per query. Consider setting this parameter to 75% of available cores for Drill clusters with low query concurrency or to 25% for Drill clusters with higher concurrency. This can be used as a starting point and adjusted as needed.

 

Example: Drill is deployed on nodes with 32 cores, but only 50% of CPU resources are allocated to Drill and the rest needs to be available for other applications. The Drill cluster will be used with data exploration with low user/query concurrency.

 

Total cores available to Drill = 32 x 50% = 16 cores

planner.width.max_per_node = 16 x 75% = 12

 

planner.width.max_per_query: This setting is to limit the total number of threads for the overall Drill cluster. It can be used in very large Drill clusters to limit overall resource usage of a single query on the overall cluster. Consider changing the default setting in very large clusters with higher concurrency to prevent a single query from dominating the resource consumption. Keep in mind this change may impact query times on large queries. Keep the default value and only adjust if needed.

 

3.2.2 Drill Memory Resource Configuration

The following three configuration options are the most important for Drill memory configuration.

 

The total Drillbit memory allocation per node: The total memory allocated to a Drillbit on a node is the sum of the Direct Memory and the Heap Memory. Again, it is important to clearly define how much of the node memory is available to Drill. First, make sure that warden.conf is configured to allow enough free memory for Drill by managing the memory allocation of other Ecosystem components, MapR core components, and OS. For more information on warden.conf, see: http://maprdocs.mapr.com/home/AdministratorGuide/MemoryAllocation-OS-MFS-Hadoop.html

 

Once the total memory available to Drillbits per node is known, the configuration can be done.

 

DRILL_HEAP: The heap memory is used for JAVA objects (files, columns, data types) and used by the planner. It is recommended to set this parameter to 20% of the available memory for Drill initially and adjust as needed. This parameter is set in the $DRILL_CONF_DIR/drill-env.sh file by uncommenting the line in the file and setting the appropriate value.

 

DRILL_MAX_DIRECT_MEMORY: This memory is used for data operations in Drill. It is recommended to set this parameter to 80% of the available memory for Drill initially and adjust as needed.This parameter is set in the $DRILL_CONF_DIR/drill-env.sh file by uncommenting the line in the file and setting the appropriate value.

 

planner.memory.max_query_memory_per_node: This is a system and session configuration option and can be altered for certain sessions. It is used to limit the maximum memory per node for sort operators per query. As a system option, it is recommended to set it to whichever is the higher of the default, 2GB or to 20% of the DRILL_MAX_DIRECT_MEMORY. For highly concurrent query workloads, the value may need to be lowered, but for low concurrency and very large data sets, the value may need to be increased if Out-Of-Memory (OOM) conditions are encountered. If OOM conditions are encountered frequently, see other best practices to limit these issues.

 

Example: Drill is deployed on nodes with 256GB of memory, but only 50% of memory resources are allocated to Drill, and the rest needs to be available for other applications. The Drill cluster will be used with data exploration with low user/query concurrency.

 

Total memory available for Drill = 256GB x 50% = 128GB

DRILL_HEAP = 128GB x 20% ~ 26GB

DRILL_MAX_DIRECT_MEMORY = 128GB x 80% ~ 102GB

planner.memory.max_query_memory_per_node = 102GB x 20% ~ 26GB

 

4 - Drill Security Configuration

It is recommended to configure security for Drill immediately when installing on the MapR Platform, as it provides SQL access to the data. User Authentication and Impersonation are two key elements that need to be configured.

 

For more information on securing Drill on MapR, see:

http://maprdocs.mapr.com/home/Drill/securing_drill.html

 

4.1 User Authentication

First, configure User Authentication for Drill. This consists of configuring the Drill Node or Server:

http://maprdocs.mapr.com/home/Drill/configure_server_auth.html

 

And then the clients that connect to Drill:

http://maprdocs.mapr.com/home/Drill/drill_connectors.html

 

4.2 User Impersonation

User Impersonation is needed for various Drill storage plugins on MapR to be configured securely and utilized properly. For more information on how to configure User Impersonation and Chaining, see:

https://drill.apache.org/docs/configuring-user-impersonation/#configuring-impersonation-and-chaining

 

To configure Drill impersonation on the MapR cluster, see:

http://maprdocs.mapr.com/home/Drill/configure_user_impersonation.html

 

Drill supports Inbound Impersonation for applications managing sessions and initial connections, but also provides service to alternative end users connected to these applications. For more information, see:

https://drill.apache.org/docs/configuring-inbound-impersonation/

 

 

5 - MapR-FS Chunk Size

For optimal performance, it is recommended to match the file size to the MapR-FS chunk size for data being used by Drill.

 

5.1 Check MapR-FS Chunk Size

The default chunk size for MapR-FS is 256MB. The chunk size on MapR-FS can be set by directory for flexibility. To check the chunk size of a directory on a MapR-FS Volume, use the following command:

hadoop mfs -ls <path to directory>

 

Example:

[root@drilldev data]# hadoop mfs -ls /data
Found 3 items
drwxrwxr-x  Z U U   - mapr mapr          0 2017-02-28 17:56  536870912 /data/chunk
              p 2049.620.1182378  drilldev:5660
drwxrwxr-x  Z U U   - mapr mapr          7 2017-02-24 17:14  268435456 /data/flat
              p 2049.170.262788  drilldev:5660
drwxrwxr-x  Z U U   - mapr mapr          4 2017-03-17 15:02  268435456 /data/nested
              p 2049.287.263024  drilldev:5660

 

Note the chunk size for /data/chunk is 512MB, whereas the others are 256MB.

 

5.2 Set MapR-FS Chunk Size

To change the chunk size of a directory, use the hadoop mfs -setchunksize command. Note that all existing subdirectories (where the chunk size has not been set), new subdirectories, and new files will then use the new chunk size. However, existing files will continue to use the original chunk size.

 

Example:

hadoop mfs -setchunksize 536870912 /data/flat

 

For more information on MapR-FS chunk size, see:

http://maprdocs.mapr.com/home/AdministratorGuide/Chunk-Size.ht

5.3 Drill Block Size

When parquet data is created with Drill, the block size for the parquet files can be set. For more information, see:

https://drill.apache.org/docs/parquet-format/

 

To find the optimal Drill block size and MapR-FS chunk size for a data set, it is good to consider the total number of files that will be created for the data; however, it is recommended that the Drill block size and MapR-FS chunk size match. See this part of the Drill Best Practices for more information: https://community.mapr.com/thread/18747-in-the-case-of-parquet-does-drill-prefer-a-larger-number-of-small-files-or-a-smaller-number-of-large-files-how-do-i-get-the-best-mileage-of-drill-parallelism-by-controlling-the-layout

by Carol McDonald

Healthcare has entered an era of major data transformation spurred by the use of advanced analytics and Big Data technologies. The catalyst for this transformation includes both the move toward evidence-based medicine and value-based payments.

These new approaches raise significant challenges. Implementing evidence-based medicine demands access to the most recent research and all available clinical data from a multitude of sources while factoring in advanced analytics to improve patient care and outcomes. The shift toward value-based payments requires significant improvements in reporting, claims processing, data management, and process automation.

An estimated 75 percent of healthcare data generated today is from unstructured sources such as digital devices, emails, clinical notes, laboratory tests, imaging, telematics, and third party sources. Many organizations are finding traditional, relational database technologies can handle neither this volume of data nor its unstructured nature. New technologies and thinking are needed to fully realize the healthcare data revolution.

This is where Big Data and advanced analytics come in.

BIG DATA TRENDS IN HEALTHCARE

Reducing Fraud, Waste, And Abuse: Fraud, waste, and abuse contribute to spiraling healthcare costs in the U.S., but Big Data analytics is changing this. Using predictive analytics, the Centers for Medicare and Medicaid Services (CMS) prevented more than $210.7 million in healthcare fraud in one year which many believe is just a fraction of possible savings.

To identify fraud and abuse, insurers need the ability to analyze large unstructured datasets of historical claims using machine-learning algorithms to detect anomalies and patterns. By analyzing patient records and billing, healthcare organizations can detect anomalies such as the submission of duplicate claims, treatments that are not medically necessary, or providers who administer a higher rate of tests.

One major healthcare provider leveraged a data lake approach to aggregate massive volumes of data as a data hub for various departments, including fraud prevention. As a result, the provider is able to capture an incremental 20 percent of fraud, waste, and abuse in its claims department.

CMS uses predictive analytics to assign risk scores to specific claims and providers, to identify billing patterns, and claim aberrancies difficult to detect by older methods. Rules-based models flag certain charges automatically, such as an ID card that has been marked as lost or stolen. Anomaly models raise suspicion based on factors that seem improbable such as a doctor billing for the treatment of 50 or more patients in a day. Predictive models compare charges against a fraud profile and raise suspicion. Graph models raise suspicion based on the relations of a provider as fraudulent billers are often organized in tight networks.

Predictive Analytics To Improve Outcomes: The Health Information Technology for Economic and Clinical Health (HITECH) Act of 2009 accelerated the adoption of EHRs through a $30 billion federal grant. HITECH also provided incentives for the “meaningful use” of EHRs. Meaningful use includes the sharing of patient information between healthcare providers, patients, and insurance companies to improve patient outcomes and lower costs. As a result, the volume and detail of patient data is exploding.

With access to massive amounts of structured and unstructured patient data across a wide range of data sources, predictive analytics can aid in diagnosing patient conditions, match treatment with best outcomes, and predict patients at risk for disease or hospital readmission.

Predictive modeling is helping with the early detection of problems such as sepsis. Sepsis, an extreme and life-threatening autoimmune response to infection that can lead to tissue damage, organ failure, and even death, is often difficult to predict and diagnose as it mimics other conditions. The Agency for Healthcare Research and Quality identifies sepsis as the most expensive condition treated in U.S. hospitals, costing more than $20 billion in 2011. According to the CDC, the average hospital stay for sepsis costs twice that of other diagnoses and the mortality rate for patients with septic shock, the most severe stage of sepsis, is nearly 50 percent. Predictive analytics using real-time EHR data such as heart rate, respiratory rate, temperature, and white blood cell count can identify sepsis, which results in earlier diagnosis, faster treatment, significantly reduced mortality rates, and a lower overall cost of care.

The availability of millions of patient records means predictive analytics can find not only similar symptoms but also patients who are the same age, gender, ethnicity, and even have a similar response to a specific medication. By analyzing vast data sets across different systems, Big Data and predictive analytics are informing healthcare decisions and providing real value for companies.

The Healthcare Internet Of Things: The Internet of Things (IoT) refers to the rapidly increasing number of smart, interconnected devices and sensors that share data across the internet. In healthcare, these devices monitor almost every type of patient behavior from blood pressure and electrocardiogram monitors that capture heart function to Band-Aid like biosensors that detect the early stages of sepsis.

Though both practitioners and patients benefit from these interconnected devices, the tidal wave of data can be overwhelming. Many of these measurements require a follow-up visit with a physician. Smarter monitoring devices that communicate with other patient devices could greatly refine this process, lessening the need for direct physician intervention and replacing it with a phone call from a nurse. Other smart devices already in place can detect if medicines are being taken regularly at home from smart dispensers. If medications are not being taken, the device can initiate contact from healthcare providers to ensure patients are compliant with their medication. This real-time monitoring can have a significant impact on managing of chronic diseases such as asthma and diabetes.

Spending on healthcare IoT could top $120 billion in just four years. The possibilities to lower costs and improve patient care are almost limitless. Big Data analytics will play a major role in this as most of the data created by the healthcare IoT is unstructured.

It isn’t just the unstructured nature of most healthcare data that matches up well with Big Data analytics. It is also the sheer enormity of the data volumes from these and other sources, which would easily overwhelm traditional analytics platforms.

LOWERING COSTS AND IMPROVING PATIENT OUTCOMES

Improving patient outcomes at the same or even lower cost is an extraordinarily tall order for any healthcare provider. Full-scale digital transformation is the key to reaching this goal. Next generation technologies will achieve this through data convergence, stream processing, and application agility.

Data convergence is the ability to handle massive amounts of data from disparate sources, from structured historical data in data silos to unstructured data from internet connected devices. Convergence combines the immediacy of operational applications with the insights of analytical workloads, which historically have been separate.

Stream processing is the ability to analyze streaming data off the wire. Data that in the past would be aggregated and queried later is now being analyzed immediately in real time. This ability to make care decisions based on real time data has numerous life-saving applications.

Finally, next generation applications need to be agile to meet the demands of developers, data scientists, business analysts, and executives so that the healthcare industry can deliver improved patient outcomes at a lower cost.


Download your copy of the MapR Guide to Big Data in Healthcare for a more comprehensive view behind the scenes of big data technology in the healthcare industry.

 

Editor's Note: This article was originally posted in the Converge Blog on April 20, 2017 

by Rutger De Graaf

INTRODUCTION

Every now and then there’s a challenge at hand. I recently came across one, luckily. Someone made the decision to hand out a Windows based laptop to a promising data scientist. All the data scientists I have met so far want to run their stuff on Linux or Mac, or at least something that gives them a native ‘Unix’ prompt. The laptop is hardware encrypted with a locked bios, so the chances on getting a dual boot running were slim. Having only 4GB of memory did not give us a feasible virtual machine option either. So, Windows it is. Funny thing was, I have always wanted to get a ‘guest-laptop’ to be able to run jobs on a remote cluster, without having to do log in to the cluster itself.

Of course, there are a couple of prerequisites to get this up and running. For instance, the cluster must be ‘open’ to the laptop; the latter must be able to connect to a variety of ports. For the sake of this setup I assume that the cluster is not locked down from ‘impersonating’ the running Hadoop user. Your cluster may require different setup so your mileage may vary.

At the customer’s site is a MapR 5.2.1 development cluster with MEP 3.0 that we are allowed to use to build models and transform big amounts of data. That MapR cluster will be our target to run (eventually) a pyspark session on. Please bear in mind that this is a locally installed cluster. There are a lot of problems to achieve the below when running on (f.i.) AWS due to the ‘reverse-nat-like’ network setup with this service provider. Believe me, I have tried different tunneling- and socks options but all to no avail. If someone can enlighten me, please do.

So to summarize this is what you’ll need:

  • An installed MapR 5.2.1 cluster with MEP 3.0 (Spark 2.1.0)
  • The shiny new Windows laptop that ‘was not to be’. I assume a 64-bit install.
  • Some patience if it does not run at first startup

LET’S GO

As a first step, you are required to download a couple of files you’ll need to setup your client

  • The MapR 5.2.1 Windows client package at http://archive.mapr.com/releases/v5.2.1/. Installation of the MapR cluster itself is out of scope of this post.
  • An installed Python environment to run pyspark. Download it for instance at https://www.python.org/downloads/release/python-2713/. You’ll know how to install this one but make SURE the major version you install matches the version on the cluster. 2.7 on your Windows box with 2.6 on the cluster will fail with ‘Python in worker has different version 2.6 than that in driver 2.7, PySpark cannot run with different minor versions‘. 2.6 is highly deprecated but sadly still in use on older CentOS 6 versions. You will have some trouble getting the pip modules installed mentioned below on 2.6.
  • The Spark 2.1.0 tarball without Hadoop located at (for instance) http://spark.apache.org/downloads.html
  • The latest Java 1.8 JDK from http://www.oracle.com/technetwork/java/javase/downloads/

To prevent Java serialVersionUID or NoClassDefFound errors, you’ll have to copy a few files from the cluster, they are:

  • /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.7.0-mapr-1703.jar
  • /opt/mapr/spark/spark-2.1.0/jars/datanucleus-api-jdo-4.2.1.jar
  • /opt/mapr/spark/spark-2.1.0/jars/datanucleus-core-4.1.6.jar
  • /opt/mapr/spark/spark-2.1.0/jars/datanucleus-rdbms-4.1.7.jar
  • All spark-x-mapr-1703.jar files in /opt/mapr/spark/spark-2.1.0/jars (Just to be safe, copy them all)

Note that since Spark 2.0 there is no longer an assembly that you can copy and use. You may be tempted to create a zip holding all the jars and use Spark’s spark.yarn.archive parameter. I found that not to be working in a mixed (Windows/Linux) environment. To access the Hive metastore on the cluster, download /opt/mapr/hive/hive-1.2/conf/hive-site.xml as well.

Have those ready! Before continuing any further you’ll have to set some environmental variables to get things up and running. You may set them using the horrible Windows GUI method, I prefer to create a command script that does just that. So create a directory c:\opt\mapr and a file called c:\opt\mapr\mapr-env.bat and paste the following contents in there.

@echo off
set JAVA_HOME=C:\Progra~1\Java\jdk1.8.0_121
set MAPR_HOME=C:\opt\mapr
set SPARK_HOME=%MAPR_HOME%\spark\spark-2.1.0
set HADOOP_HOME=%MAPR_HOME%\hadoop\hadoop-2.7.0
set HADOOP_CONF_DIR=%HADOOP_HOME%\etc\hadoop
set YARN_CONF_DIR=%HADOOP_CONF_DIR%
set PATH=%JAVA_HOME%\bin;c:\Python27\;%SPARK_HOME%\bin;%HADOOP_HOME%\bin;%MAPR_HOME%\server;%PATH%
set HADOOP_USER_NAME=mapr
cmd /C hadoop classpath > tmpFile
set /p SPARK_DIST_CLASSPATH= < tmpFile
del tmpFile
set YARN_APPLICATION_CLASSPATH=%SPARK_DIST_CLASSPATH:\=/%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:;=:%
set YARN_APPLICATION_CLASSPATH=%YARN_APPLICATION_CLASSPATH:c:/=/%

 

Adjust the JDK path in JAVA_HOME if necessary but make sure you use the 8.3 notation in stead of the one that uses the spaces (or install to a space-less location like c:\opt\jdk-1.8) I will not explain the above contents, they are needed to get both the Scala as well as the Python shells running. But a remark on the YARN_APPLICATION_CLASSPATH variable: this one is used on the server, not on your Windows machine like the other ones.

START INSTALLATION

Install MapR Hadoop Client

  • Unzip the contents of mapr-client-5.2.1.42646GA-1.amd64.zip to c:\opt\mapr
  • Move the copied hadoop-yarn-server-web-proxy-2.7.0-mapr-1703.jar to C:\opt\mapr\hadoop\hadoop-2.7.0\share\hadoop\yarn
  • Run the mapr-env.bat script

Now configure the MapR Hadoop client by invoking

c:\opt\mapr\server\configure.bat
  -N my.cluster.com -c
  -C CLDB-HOST:7222
  -HS HISTORYSERVER-HOST

For instance

c:\opt\mapr\server\configure.bat
  -N mapr-521-230.whizzkit.nl -c
  -C mapr.whizzkit.nl:7222
  -HS mapr.whizzkit.nl

The cluster name (-N), CLDB-HOST (-C) and HISTORYSERVER-HOST (-HS) are specific to your cluster setup! Note that the Windows configuration does not allow you to enter Zookeeper quorum information (-Z parameter). If all goes well, no output will be given from the script.

You'll have to edit two files before you are ready to submit your first Hadoop-based YARN job. First, you'll have to tell mapreduce that you will be submitting cross-platform so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\mapred-site.xml and add

<property>
  <name>mapreduce.app-submission.cross-platform</name>
  <value>true</value>
</property>

Secondly, you'll have to tell the job that you'll be spoofing another user, so edit C:\opt\mapr\hadoop\hadoop-2.7.0\etc\hadoop\core-site.xml and add

<property>
  <name>hadoop.spoofed.user.uid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.gid</name>
  <value>5000</value>
</property>
<property>
  <name>hadoop.spoofed.user.username</name>
  <value>mapr</value>
</property>

If your cluster has different uid, gid or username, edit to your liking. Note that you are not restricted to the use of the mapr user. If there is another named user present, configure that one. If there is a user on the cluster that matches your Windows login name, you don't have to edit core-site.xml. Please note that for a YARN job to successfully run, the user needs to be present on ALL nodes of the cluster with the same uid and gid. The Resource Manager will not accept your job if there are mismatches or you are trying to use an unknown (from Linux perspective) user.

After this installation and configuration you should be able to submit the teragen job bundled with the MapR Hadoop client:

hadoop jar \
  %HADOOP_HOME%\share\hadoop\mapreduce\hadoop-mapreduce-examples-2.7.0-mapr-1703.jar \
  teragen 10000 /tmp/teragen

Delete the directory first if it already exists. You should be able to use your just installed Windows client for that.  

hadoop fs -ls /tmp
hadoop fs -rm -r -skipTrash /tmp/teragen

Do not continue until you have successfully ran the Teragen hadoop job!

Install Apache Spark

We will use the Spark distribution without Hadoop from the Apache downloads download site, but replace some of the jars with the ones from the cluster. Main reason for that is that the MapR distribution has it's own implementation of HDFS called MapR-FS and you'll need the jars provided by MapR to acces that file system. Same goes for the Hadoop client installed above, that's the reason you need the Spark without Hadoop tarball. So, do the following:

  • Create a directory called c:\opt\mapr\spark\
  • Uncompress spark-2.1.0-bin-without-hadoop.tgz to c:\opt\mapr\spark
  • Rename the directory spark-2.1.0-bin-without-hadoop to spark-2.1.0 so the Spark install will be in c:\opt\mapr\spark\spark-2.1.0
  • From the jars folder in spark-2.1.0 remove all spark-x_2.11-2.1.0.jar files
  • Move or copy the previously fetched jar spark- and datanucleus jar files to the spark-2.1.0/jars folder
  • If you wish to access the Hive metastore on the cluster from Spark, copy the previously downloaded hive-site.xml file to c:\opt\mapr\spark\spark-2.1.0\conf.

The first Spark test will be running the spark-shell in YARN client mode. You should be greeted by the familiar ASCII-art:

spark-shell --master yarn --deploy-mode client
  --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

You may opt to perform an extra optimization when starting Spark jobs. You may have noticed the Neither spark.yarn.jars nor spark.yarn.archive is set message when starting the shell. This causes your Spark driver to copy all jars to the cluster. To prevent this, you may do the following:

# Create a directory to hold the Spark jars
hadoop fs -mkdir -p /user/mapr/apps/spark/spark-2.1.0
# Copy all jars from the Window machine to MapR-FS
hadoop fs -copyFromLocal %SPARK_HOME%\jars\*jar /user/mapr/apps/spark/spark-2.1.0
# Check if it succeeded, all jars (around 105) should be listed
hadoop fs -ls /user/mapr/apps/spark/spark-2.1.0

Now you are able to start the spark-shell in a slightly different way, but quicker. Note that there will still be created and uploaded a small zip file called spark_conf.zip that contains a snapshot of your Hadoop and Spark config, as well as jars that the spark-shell seems fit to upload. They are located in /user/mapr/.sparkStaging/application__.

spark-shell --master yarn --deploy-mode client
  --conf spark.yarn.jars=maprfs:///user/mapr/apps/spark/spark-2.1.0/*
  --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%

scala> val count = sc.parallelize(1 to 100).filter { _ =>
     |   val x = math.random
     |   val y = math.random
     |   x*x + y*y < 1
     | }.count()
count: Long = 76

scala> println(s"Pi is roughly ${4.0 * count / 100}")
Pi is roughly 3.04

 

RUN PYSPARK

Actions to perform on the cluster only To run the pyspark example below, you will have to make sure numpy is installed on the cluster. You'll get an error soon enough if it's missing, so have your admin install it for you using:

sudo yum -y install numpy

Actions to perform on you Windows client On your Windows client you will need a couple of jars to get the databricks csv jar working. I have found out that passing --packages does not work with the approach in this post, but using the --jars option does. So these are the jars you need:

Download them to c:\opt\libext (or something like that). Next to that, you'll have to install a couple of Python modules as well:

python -m pip install -U pip setuptools
python -m pip install matplotlib
python -m pip install pandas
python -m pip install numpy

I have copied and modified a small part (the actual training a model, not the evaluation) of the blogpost pyspark ML example from MapR to reflect changes needed for Spark 2.1.0. But first download a file you need and put it on MapR-FS:

churn-bigml-80.csv

Put it on MapR-FS in maprfs:///tmp/

hadoop fs -copyFromLocal -f churn-bigml-80.csv /tmp/

You are now ready to start a Python Spark shell by using the command. You may notice the similarities between the used conf parameters in spark-shell and pyspark.

pyspark --master yarn --deploy-mode client
  --conf spark.yarn.jars=maprfs:///user/mapr/apps/spark/spark-2.1.0/*
  --conf spark.hadoop.yarn.application.classpath=%YARN_APPLICATION_CLASSPATH%
  --jars C:\opt\libext\spark-csv_2.10-1.3.0.jar,C:\opt\libext\univocity-parsers-1.5.1.jar,C:\opt\libext\commons-csv-1.1.jar

You may copy and paste the code below to check if you have succeeded in following along.

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

binary_map = {'Yes':1.0, 'No':0.0, True:1.0, False:0.0, 'True':1.0, 'False':0.0}
toBinary = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

def labelData(data):
    return data.rdd.map(lambda row: LabeledPoint(row[-1], row[:-1]))

churn_data = sqlContext \
    .read \
    .load('maprfs:///tmp/churn-bigml-80.csv',format='com.databricks.spark.csv',header='true',inferSchema='true')

churn_data = churn_data.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toBinary(churn_data['Churn'])) \
    .withColumn('International plan', toBinary(churn_data['International plan'])) \
    .withColumn('Voice mail plan', toBinary(churn_data['Voice mail plan']))

training_data, testing_data = labelData(churn_data).randomSplit([0.8, 0.2])

decisiontree_model = DecisionTree.trainClassifier(
    training_data,
    numClasses=2,
    maxDepth=2,
    categoricalFeaturesInfo={1:2, 2:2},
    impurity='gini',
    maxBins=32
)

print decisiontree_model.toDebugString()

It will finally print out something like:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
      /_/

Using Python version 2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016 20:53:40)
SparkSession available as 'spark'.
...
DecisionTreeModel classifier of depth 2 with 7 nodes
  If (feature 12 <= 3.0)
   If (feature 4 <= 262.8)
    Predict: 0.0
   Else (feature 4 > 262.8)
    Predict: 1.0
  Else (feature 12 > 3.0)
   If (feature 4 <= 153.4)
    Predict: 1.0
   Else (feature 4 > 153.4)
    Predict: 0.0

 

CONCLUSION AND WRAP-UP

So, although it's a bit of a hassle, you can get Spark 2.1.0 up and running using a Windows client. What we achieved is a horizontally scalable environment for the data scientists to work with, without being tied to the vertical limitations of their own laptop: running your job on a 64-core, 48GB, 3-node cluster with SAS drives seems better to me! (Remember this is a development cluster!)

As a last note: you may have noticed that I am not using one of the most distinctive features of MapR; being able to just copy files from and to MapR-FS by using MapR's NFS server. That's not possible in this setup, as we are (and that was the purpose) not running the spark-shell or pyspark on a cluster node. Have fun, feel free to comment and ask questions!

Editor's note: This post was originally featured in the Converge Blog on April 17, 2017

Related

Real-Time Kafka / MapR Streams Data Ingestion into HBase / MapR-DB via PySpark

Real-Time User Profiles with Spark, Drill and MapR-DB

How To Use Jupyter & PySpark on MapR

How to Use Spark & PySpark with Zeppelin on MapR CDP

Apache Spark

MapR

Real-Time Streaming Data Pipelines With Apache Apis: Kafka, Spark Streaming, And Hbase

by carol mcdonald

 

Many of the systems we want to monitor happen as a stream of events. Examples include event data from web or mobile applications, sensors, or medical devices.

Image title

Real-time analysis examples include:

  • Website monitoring , Network monitoring
  • Fraud detection
  • Web clicks
  • Advertising
  • Internet of Things: sensors

Batch processing can give great insights into things that happened in the past, but it lacks the ability to answer the question of "what is happening right now?”

Image title

It is becoming important to process events as they arrive for real-time insights, but high performance at scale is necessary to do this. In this blog post, I'll show you how to integrate Apache Spark Streaming, MapR-DB, and MapR Streams for fast, event-driven applications.

Image title

Example Use Case

Let's go over an example which generates lots of data and needs real-time preventive alerts. Remember what happened with BP in the Gulf coast?

Image title

The example use case we will look at here is an application that monitors oil wells. Sensors in oil rigs generate streaming data, which is processed by Spark and stored in HBase, for use by various analytical and reporting tools. We want to store every single event in HBase as it streams in. We also want to filter for, and store alarms. Daily Spark processing will store aggregated summary statistics.

Image title

What do we need to do? And how do we do this with high performance at scale?

We need to collect the data, process the data, store the data, and finally serve the data for analysis, machine learning, and dashboards.

Image title

Streaming Data Ingestion

Spark Streaming supports data sources such as HDFS directories, TCP sockets, Kafka, Flume, Twitter, etc. In our example, we will use MapR Streams, a new distributed messaging system for streaming event data at scale. MapR Streams enables producers and consumers to exchange events in real time via the Apache Kafka 0.9 API. MapR Streams integrates with Spark Streaming via the Kafka direct approach.

Image title

MapR Streams (or Kafka) topics are logical collections of messages. Topics organize events into categories. Topics decouple producers, which are the sources of data, from consumers, which are the applications that process, analyze, and share data.

Image title

Topics are partitioned for throughput and scalability. Partitions make topics scalable by spreading the load for a topic across multiple servers. Producers are load balanced between partitions and consumers can be grouped to read in parallel from multiple partitions within a topic for faster performance. Partitioned parallel messaging is a key to high performance at scale.

Image title

Another key to high performance at scale is minimizing time spent on Disk reads and writes. Compared with older messaging systems, Kafka and MapR Streams eliminated the need to track message acknowledgements on a per-message, per-listener basis. Messages are persisted sequentially as produced, and read sequentially when consumed. These design decisions mean that non sequential reading or writing is rare, and allow messages to be handled at very high speeds. MapR Streams performance scales linearly as servers are added within a cluster, with each server handling more than 1 million messages per second.

Real-time Data Processing Using Spark Streaming

Spark Streaming brings Spark's APIs to stream processing, letting you use the same APIs for streaming and batch processing. Data streams can be processed with Spark’s core APIs, DataFrames, GraphX, or machine learning APIs, and can be persisted to a file system, HDFS, MapR-FS, MapR-DB, HBase, or any data source offering a Hadoop OutputFormat or Spark connector.

Image title

Spark Streaming divides the data stream into batches of X seconds called Dstreams, which internally is a sequence of RDDs, one for each batch interval. Each RDD contains the records received during the batch interval.

Image title

Resilient distributed datasets, or RDDs, are the primary abstraction in Spark. An RDD is a distributed collection of elements, like a Java Collection, except that it’s spread out across multiple nodes in the cluster. The data contained in RDDs is partitioned and operations are performed in parallel on the data cached in memory. Spark caches RDDs in memory, whereas MapReduce involves more reading and writing from disk. Here again the key to high performance at scale is partitioning and minimizing disk I/O.

Image title

There are two types of operations on DStreams: transformations and output operations.

Your Spark application processes the DStream RDDs using Spark transformations like map, reduce, and join, which create new RDDs. Any operation applied on a DStream translates to operations on the underlying RDDs, which in turn, applies the transformation to the elements of the RDD.

Image title

Output operations write data to an external system, producing output in batches.

Examples of output operations are saveAsHadoopFiles, which saves to a Hadoop-compatible file system, and saveAsHadoopDataset, which saves to any Hadoop-supported storage system.

Image title

Storing Streaming Data Using HBase

For storing lots of streaming data, we need a data store that supports fast writes and scales.

Image title

With MapR-DB (HBase API), a table is automatically partitioned across a cluster by key range, and each server is the source for a subset of a table. Grouping the data by key range provides for really fast read and writes by row key.

Image title

Also with MapR-DB each partitioned subset or region of a table has a write and read cache. Writes are sorted in cache, and appended to a WAL; writes and reads to disk are always sequential; recently read or written data and cached column families are available in memory; all of this provides for really fast read and writes.

With a relational database and a normalized schema, query joins cause bottlenecks with lots of data. MapR-DB and a de-normalized schema scales because data that is read together is stored together.

Image title

So how do we collect, process, and store real-time events with high performance at scale? The key is partitioning, caching, and minimizing time spent on Disk reads and writes for :

  • Messaging with MapR Streams
  • Processing with Spark Streaming
  • Storage with MapR-DB

Image title

Serving the Data

End applications like dashboards, business intelligence tools, and other applications use the processed event data. The processing output can also be stored back in MapR-DB, in another Column Family or Table, for further processing later.

Image title

Example Use Case Code

Now we will step through the code for a MapR Streams producer sending messages, and for Spark Streaming processing the events and storing data in MapR-DB.

MapR Streams Producer Code

The steps for a producer sending messages are:

  1. Set producer properties

    • The first step is to set the KafkaProducer configuration properties, which will be used later to instantiate a KafkaProducer for publishing messages to topics.
  2. Create a KafkaProducer

    • You instantiate a KafkaProducer by providing the set of key-value pair configuration properties which you set up in the first step. Note that the KafkaProducer<k,v> is a Java generic class. You need to specify the type parameters as the type of the key-value of the messages that the producer will send.
  3. Build the ProducerRecord message

    • The ProducerRecord is a key-value pair to be sent to Kafka. It consists of a topic name to which the record is being sent, an optional partition number, and an optional key and a message value. The ProducerRecord is also a Java generic class, whose type parameters should match the serialization properties set before. In this example, we instantiate the ProducerRecord with a topic name and message text as the value, which will create a record with no key.
  4. Send the message

    • Call the send method on the KafkaProducer passing the ProducerRecord, which will asynchronously send a record to the specified topic. This method returns a Java Future object, which will eventually contain the response information. The asynchronous send() method adds the record to a buffer of pending records to send, and immediately returns. This allows sending records in parallel without waiting for the responses, and allows the records to be batched for efficiency.
  5. Finally, call the close method on the producer to release resources. This method blocks until all requests are complete.

The code is shown below:

Image title

Spark Streaming Code

These are the basic steps for Spark Streaming code:

  1. Initialize a Spark StreamingContext object. Using this context, create a DStream.

    • We use the KafkaUtils createDirectStream method to create an input stream from a Kafka or MapR Streams topic. This creates a DStream that represents the stream of incoming data, where each record is a line of text.

      Image title

  2. Apply transformations (which create new DStreams)

    • We parse the message values into Sensor objects, with the map operation on the dStream. The map operation applies the Sensor.parseSensor function on the RDDs in the dStream, resulting in RDDs of Sensor objects.

      Any operation applied on a DStream translates to operations on the underlying RDDs. The map operation is applied on each RDD in the dStream to generate the sensorDStream RDDs.

      Image title

      The oil pump sensor data comes in as strings of comma separated values. We use a Scala case class to define the Sensor schema corresponding to the sensor data, and a parseSensor function to parse the comma separated values into the sensor case class.

      Image title

      Next, we use the DStream foreachRDD method to apply processing to each RDD in this DStream. We register the DataFrame as a table, which allows us to use it in subsequent SQL statements. We use an SQL query to find the max, min, and average for the sensor attributes.

      Image title

      Here is example output from the query which shows the max, min, and average output from our sensors.

      Image title

  3. And/or Apply output operations

    • The sensorRDD objects are filtered for low psi , the sensor and alert data is converted to Put objects, and then written to HBase, using the saveAsHadoopDataset method. This outputs the RDD to any Hadoop-supported storage system using a Hadoop Configuration object for that storage system.

      Image title

  4. Start receiving data and processing it. Wait for the processing to be stopped.

    • To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

      Image title

HBase Table schema

The HBase Table Schema for the streaming data is as follows:

  • Composite row key of the pump name date and time stamp

The Schema for the daily statistics summary rollups is as follows:

  • Composite row key of the pump name and date
  • Column Family stats
  • Columns for min, max, avg.

All of the components of the use case architecture we just discussed can run on the same cluster with the MapR Converged Data Platform. There are several advantages of having MapR Streams on the same cluster as all the other components. For example, maintaining only one cluster means less infrastructure to provision, manage, and monitor. Likewise, having producers and consumers on the same cluster means fewer delays related to copying and moving data between clusters, and between applications.

Image title

Software

This tutorial will run on the MapR v5.1 Sandbox, which includes MapR Streams, Spark, and HBase (MapR-DB).

You can download the code, data, and instructions to run this example from here:

Code: https://github.com/caroljmcdonald/mapr-streams-sparkstreaming-hbase

Summary

In this blog post, you learned how the MapR Converged Data Platform integrates Hadoop and Spark with real-time database capabilities, global event streaming, and scalable enterprise storage.

Content Originally posted in MapR Converge Blog post on April 22, 2016, visit here 

Subscribe to Converge Blog

Related Resources

MapR-Streams

Apache Spark

MapR

kafka apis

Best Practices for Yarn Resource Management by Hao Zhu

 

In this blog post, I will discuss best practices for YARN resource management. The fundamental idea of MRv2(YARN) is to split up the two major functionalities—resource management and job scheduling/monitoring, into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM).

The ResourceManager(RM) and per-node slave, the NodeManager (NM), form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system.

Please read the Hadoop Documentation under YARN concept and architecture first before reading this article.

This blog post covers the following topics regarding YARN resource management, and also provides best practices for each topic:

  1. How does warden calculate and allocate resources to YARN?
  2. Minimum and maximum allocation unit in YARN
  3. Virtual/physical memory checker
  4. Mapper, Reducer and AM’s resource request
  5. Bottleneck resource

1. HOW DOES WARDEN CALCULATE AND ALLOCATE RESOURCES TO YARN?

n a MapR Hadoop cluster, warden sets the default resource allocation for the operating system, MapR-FS, MapR Hadoop services, and MapReduce v1 and YARN applications. Details are described in MapR documentation: Resource Allocation for Jobs and Applications.

YARN can manage 3 system resources— memory, CPU and disks. Once warden finishes calculations, it will set environment variable YARN_NODEMANAGER_OPTS for starting NM.

For example, if you “vi /proc//environ” you can find the settings below:

YARN_NODEMANAGER_OPTS= -Dnodemanager.resource.memory-mb=10817
-Dnodemanager.resource.cpu-vcores=4
-Dnodemanager.resource.io-spindles=2.0

 

They can be overridden by setting the three configurations below in yarn-site.xml on NM nodes and restarting NM.

  • yarn.nodemanager.resource.memory-mb
  • yarn.nodemanager.resource.cpu-vcores
  • yarn.nodemanager.resource.io-spindles

To view the available resources from each node, you can go to RM UI(http://:8088/cluster/nodes), and find out the “Mem Avail”, “Vcores Avail” and “Disk Avail” from each node.

In this step, make sure warden fully considers all services for resource allocation because some services do not have dedicated parameters in warden.conf, e.g., Drill and Impala. If you plan to allocate 10% of total memory for Drill and 5% for Impala on this node, please carve out those 15% memory to parameters: service.command.os.heapsize.percent/max/min.

If memory are over allocated to YARN, huge swap may be used and kernel OOM killer may be triggered to kill the container process.

Below error is a sign of OS OOM and probably memory is over allocated to YARN.

os::commit_memory(0x0000000000000000, xxxxxxxxx, 0) failed;
error=’Cannot allocate memory’ (errno=12)

 

If we see that, just double check if warden takes into account all memory consumed services on that node, and reduce the memory allocated by warden if needed.

2. MINIMUM AND MAXIMUM ALLOCATION UNIT IN YARN

Two resources—memory and CPU, as of in Hadoop 2.5.1, have minimum and maximum allocation unit in YARN, as set by the configurations below in yarn-site.xml.

Basically, it means RM can only allocate memory to containers in increments of "yarn.scheduler.minimum-allocation-mb" and not exceed "yarn.scheduler.maximum-allocation-mb";

And it can only allocate CPU vcores to containers in increments of "yarn.scheduler.minimum-allocation-vcores" and not exceed "yarn.scheduler.maximum-allocation-vcores".

If changes required, set above configurations in yarn-site.xml on RM nodes, and restart RM.

For example, if one job is asking for 1025 MB memory per map container(set mapreduce.map.memory.mb=1025), RM will give it one 2048 MB(2*yarn.scheduler.minimum-allocation-mb) container.

If you have a huge MR job which asks for a 9999 MB map container, the job will be killed with the error message below in the AM log:

MAP capability required is more than the supported max container capability in the cluster.
Killing the Job. mapResourceRequest: 9999 maxContainerCapability:8192

 

If a Spark on YARN job asks for a huge container with size larger than "yarn.scheduler.maximum-allocation-mb", the error below will show up:

Exception in thread "main" java.lang.IllegalArgumentException:
Required executor memory (99999+6886 MB) is above the max threshold (8192 MB) of this cluster!

 

In the above two cases, you can increase “yarn.scheduler.maximum-allocation-mb” in yarn-site.xml and restart RM.

So in this step, you need to be familiar with the lower and upper bound of resource requirements for each mapper and reducer of the jobs and set the minimum and maximum allocation unit according to that.

3. VIRTUAL/PHYSICAL MEMORY CHECKER

NodeManager can monitor the memory usage(virtual and physical) of the container. If its virtual memory exceeds “yarn.nodemanager.vmem-pmem-ratio” times the "mapreduce.reduce.memory.mb" or "mapreduce.map.memory.mb", then the container will be killed if “yarn.nodemanager.vmem-check-enabled” is true;

If its physical memory exceeds "mapreduce.reduce.memory.mb" or "mapreduce.map.memory.mb", the container will be killed if “yarn.nodemanager.pmem-check-enabled” is true.

The parameters below can be set in yarn-site.xml on each NM nodes to override the default behavior.

This is a sample error for a container killed by virtual memory checker:

Current usage: 347.3 MB of 1 GB physical memory used;
<font color="red">2.2 GB of 2.1 GB virtual memory used</font>. Killing container.

 

And this is a sample error for physical memory checker:

Current usage: <font color="red">2.1gb of 2.0gb physical memory used</font>;
1.1gb of 3.15gb virtual memory used. Killing container.

 

As in Hadoop 2.5.1 of MapR 4.1.0, virtual memory checker is disabled while physical memory checker is enabled by default.

Since on Centos/RHEL 6 there are aggressive allocation of virtual memory due to OS behavior, you should disable virtual memory checker or increase yarn.nodemanager.vmem-pmem-ratio to a relatively larger value.

f the above errors occur, it is also possible that the MapReduce job has memory leaking or the memory for each container is just not enough. Try to check the application logic and also tune the container memory request—"mapreduce.reduce.memory.mb" or "mapreduce.map.memory.mb".

4. MAPPER,REDUCER AND AM’S RESOURCE REQUEST

MapReduce v2 job has 3 different container types—Mapper, Reducer and AM.

Mapper and Reducer can ask for resources—memory, CPU and disk, while AM can only ask for memory and CPU.

Below are a summary of the configurations of resource requests for the three container types.

The default values are from Hadoop 2.5.1 of MapR 4.1, and they can be overridden in mapred-site.xml on the client node or set in applications like MapReduce java code, Pig and Hive Cli,etc.

  • Mapper:

  • Reducer:

  • AM:

Each container is actually a JVM process, and above “-Xmx” of java-opts should fit in the allocated memory size. One best practice is to set it to 0.8 * (container memory allocation). For example, if the requested mapper container has mapreduce.map.memory.mb=4096, we can set mapreduce.map.java.opts=-Xmx3277m.

There are many factors which can affect the memory requirement for each container. Such factors include the number of Mappers/Reducers, the file type(plain text file , parquet, ORC), data compression algorithm, type of operations(sort, group-by, aggregation, join), data skew, etc. You should be familiar with the nature of this MapReduce job and figure out the minimum requirement for Mapper,Reducer and AM. Any type of the container can run out of memory and be killed by physical/virtual memory checker, if it doesn't meet the minimum memory requirement. If so, you need to check the AM log and the failed container log to find out the cause.

For example, if the MapReduce job sorts parquet files, Mapper needs to cache the whole Parquet row group in memory. I have done tests to prove that the larger the row group size of parquet files is, the larger Mapper memory is needed. In this case, make sure the Mapper memory is large enough without triggering OOM.

Another example is AM running out of memory. Normally, AM’s 1G java heap size is enough for many jobs. However, if the job is to write lots of parquet files, during commit phase of the job, AM will call ParquetOutputCommitter.commitJob(). It will first read footers of all output parquet files, and write the metadata file named “_metadata” in output directory.

This step may cause AM being out of memory with below stacktrace in AM log:

Caused by: <font color="red">java.lang.OutOfMemoryError</font>: GC overhead limit exceeded
at java.lang.StringCoding$StringEncoder.encode(StringCoding.java:300)
at java.lang.StringCoding.encode(StringCoding.java:344)
at java.lang.String.getBytes(String.java:916)
at parquet.org.apache.thrift.protocol.TCompactProtocol.writeString(TCompactProtocol.java:298)
at parquet.format.ColumnChunk.write(ColumnChunk.java:512)
at parquet.format.RowGroup.write(RowGroup.java:521)
at parquet.format.FileMetaData.write(FileMetaData.java:923)
at parquet.format.Util.write(Util.java:56)
at parquet.format.Util.writeFileMetaData(Util.java:30)
at parquet.hadoop.ParquetFileWriter.serializeFooter(ParquetFileWriter.java:322)
at parquet.hadoop.<font color="red">ParquetFileWriter.writeMetadataFile</font>(ParquetFileWriter.java:342)
at parquet.hadoop.<font color="red">ParquetOutputCommitter.commitJob</font>(ParquetOutputCommitter.java:51)
... 10 more

 

The solution is to increase the memory requirement for AM and disable this parquet feature by “set parquet.enable.summary-metadata false”.

Besides figuring out the minimum memory requirement for each container, sometimes we need to balance the job performance and resource capacity. For example, jobs doing sorting may need a relatively larger “mapreduce.task.io.sort.mb” to avoid or reduce the number of spilling files. If the whole system has enough memory capacity, we can increase both “mapreduce.task.io.sort.mb” and container memory to get better job performance.

In this step, we need to make sure each type of container meets proper resource requirements. If OOM happens, always check AM logs first to figure out which container and what is the cause per stack trace.

5. BOTTLENECK RESOURCE

Since there are three types of resources, different containers from different jobs may ask for different amount of resources. This can result in one of the resources becoming the bottleneck. Suppose we have a cluster with capacity (1000G RAM,16 Cores,16 disks) and each Mapper container needs (10G RAM,1 Core, 0.5 disks): at most, 16 Mappers can run in parallel because CPU cores become the bottleneck here.

As a result, (840G RAM, 8 disks) resources are not used by anyone. If you meet this situation, just check the RM UI(http://:8088/cluster/nodes) to figure out which resource is the bottleneck. You can probably allocate the leftover resources to jobs which can improve performance with such resource. For example, you can allocate more memory to sorting jobs which used to spill to disk.

KEY TAKEAWAYS:

  1. Make sure warden considers all services when allocating system resources.
  2. Be familiar with lower and upper bound of resource requirements for mapper and reducer.
  3. Be aware of the virtual and physical memory checker.
  4. Set -Xmx of java-opts of each container to 0.8 * (container memory allocation).
  5. Make sure each type of container meets proper resource requirement.
  6. Fully utilize bottleneck resource.

In this blog post, you’ve learned best practices for YARN resource management. If you have any further questions, please ask them in the comments section below.

Content Originally posted in MapR Converge Blog post on July 24, 2015, visit here 

Subscribe to Converge Blog