AnsweredAssumed Answered

Spark Scala write DF to MapR-DB JSON

Question asked by terryhealy on Jun 12, 2018
Latest reply on Jun 15, 2018 by cmcdonald

Running V6.0.1 / MEP 5.0.0 IntelliJ on a system with MapR Client and Spark installed.


I'm trying to write a DataFrame to a MapR-DB JSON file. I'm reading a .csv file and filtering some fields and adding an _id field. In the code below I can't find a reference to saveToMapRDB() for the DataFrame, despite trying all sorts of recommended inclusions in pom.xml (part included below as well). 3 days of searching has me going in circles.

From Saving an Apache Spark DataFrame to a MapR-DB JSON Table , it's not clear to me if or how the 

'def saveToMapRDB...' is intended to be used in a complete example, or if it should be used at all. In another related thread here, the ScalaDocs entry for saveToMapRDB is clearly visible.


The error the compiler throws is:


Error:(80, 11) value saveToMapRDB is not a member of org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
flows.saveToMapRDB("/tables/flow2", createTable = true, idFieldPath = "_id")

Error:(80, 41) not found: value createTable
Error:(80, 61) not found: value idFieldPath


It seems to be an issue in the dependencies, but 

import java.util.UUID
import com.mapr.db.spark._
import com.typesafe.scalalogging.slf4j.Logger
import org.apache.spark.sql.{Encoders, SparkSession}
import org.slf4j.LoggerFactory

object NetflowStream {

  val flowCsvFilename = "/tmp/nfcapd.201806111325.csv"
  val logger = Logger(LoggerFactory.getLogger("NetflowStream"))

  // --------------------------------------------------------------------------

  def main(args: Array[String]) {

    val spark = SparkSession.builder()

    val sc = spark.sparkContext

    // --------------------------------------------------------------------------

    // Input file is netflow as .csv via nfdump
    import spark.implicits._
    import org.apache.spark.sql.functions._

    val makeId = udf(() => UUID.randomUUID().toString())

    val flowSchema = Encoders.product[NetflowRecord].schema
    val flows =
      .option("header", "true")
      .option("mode", "DROPMALFORMED")
      .withColumn("_id", makeId())
    flows.saveToMapRDB("/tables/flow2", createTable = true, idFieldPath = "_id")


  // --------------------------------------------------------------------------

  case class NetflowRecord(
                            var treceived: String = null,
                            var tryear: Int = 0,
                            var trmonth: Int = 0,
                            var trday: Int = 0,
                            var trhour: Int = 0,
                            var trminute: Int = 0,
                            var trsec: Int = 0,
                            var trdur: Double = 0.0,
                            var sip: String = null,
                            var dip: String = null,
                            var sport: Int = 0,
                            var dport: Int = 0,
                            var proto: String = null,
                            var flag: String = null,
                            var fwd: Int = 0,
                            var stos: Int = 0,
                            var ipkt: Long = 0,
                            var ibyt: Long = 0,
                            var opkt: Long = 0,
                            var obyt: Long = 0,
                            var input: Int = 0,
                            var output: Int = 0,
                            var sas: Int = 0,
                            var das: Int = 0,
                            var dtos: Int = 0, // Always 0
                            var dir: Int = 0, // Always 0
                            var rip: String = null // Always


From pom.xml:

    <!--spark.version>2.2.1</spark.version -->

        <name>Scala-Tools Maven2 Repository</name>

        <name>Scala-Tools Maven2 Repository</name>

    <!-- -->
        <!-- version>2.1.0-mapr-1703</version -->
    <!-- -->
    <!-- -->
    <!-- -->
    <!-- -->
    <!-- -->