AnsweredAssumed Answered

Spark Structured Streaming on MapR Does Not Work

Question asked by PETER.EDIKE on Jun 14, 2018
Latest reply on Jun 14, 2018 by vmeghraj

Hello Everyone,

 

I am trying to implement a simple read and print messages from a mapr-streams topic using the spark structured streaming using the following code on spark-shell

import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.kafka010.KafkaSourceProvider
import org.apache.spark.sql.functions.get_json_object
import org.apache.spark.sql.streaming.Trigger

//THE GOAL OF THIS SCRIPT IS TO QUERY TOPICS OF INTEREST IN QUICKTELLER AND DISPLAY THEIR AGGREGATE NUMBERS IN REALTIME
//This does not mean that this job will be run via zeppelin in production. This is just a POC

val topic1 = "/iswdata/streams/qu:fundstransfer"
val topic2 = "/iswdata/streams/qu:billpayment"
val topic3 = "/iswdata/streams/qu:transactions"
val topics = topic3+","+topic1+","+topic2

//We Build the Spark Session That We Will Use For Listening To These Topics
val spark = SparkSession.builder.appName("QuicktellerTransactionTypeAggregator").getOrCreate()

val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.35.15.151").option("subscribe", topic1).load().selectExpr("CAST(value AS STRING)").as[String]
val lines2 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.35.15.151").option("subscribe", topic2).load().selectExpr("CAST(value AS STRING)").as[String]
val lines3 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.35.15.151").option("subscribe", topic3).load().selectExpr("CAST(value AS STRING)").as[String]


val query1 = lines.writeStream
.foreach(new ForeachWriter[String] {

override def process(row: String): Unit = {
println(s"Processing 1")
}

override def close(errorOrNull: Throwable): Unit = {}

override def open(partitionId: Long, version: Long): Boolean = {
true
}
}).start()

val query2 = lines2.writeStream
.foreach(new ForeachWriter[String] {

override def process(row: String): Unit = {
println(s"Processing {row}")
}

override def close(errorOrNull: Throwable): Unit = {}

override def open(partitionId: Long, version: Long): Boolean = {
true
}
}).start()

val query3 = lines3.writeStream
.foreach(new ForeachWriter[String] {

override def process(row: String): Unit = {
println(s"Processing 1")
}

override def close(errorOrNull: Throwable): Unit = {}

override def open(partitionId: Long, version: Long): Boolean = {
true
}
}).start()

query.awaitTermination(10000)
query2.awaitTermination(10000)
query3.awaitTermination(10000)

 

The problem is that Nothing ever happens or gets printed to the screen. i check the exceptions field on the query objects and they all return none. Am I doing something wrong

Outcomes