AnsweredAssumed Answered

Spark Scala write DF to MapR-DB JSON

Question asked by terryhealy on Jun 12, 2018
Latest reply on Jun 15, 2018 by cmcdonald

Running V6.0.1 / MEP 5.0.0 IntelliJ on a system with MapR Client and Spark installed.

 

I'm trying to write a DataFrame to a MapR-DB JSON file. I'm reading a .csv file and filtering some fields and adding an _id field. In the code below I can't find a reference to saveToMapRDB() for the DataFrame, despite trying all sorts of recommended inclusions in pom.xml (part included below as well). 3 days of searching has me going in circles.

From Saving an Apache Spark DataFrame to a MapR-DB JSON Table , it's not clear to me if or how the 

'def saveToMapRDB...' is intended to be used in a complete example, or if it should be used at all. In another related thread here, the ScalaDocs entry for saveToMapRDB is clearly visible.

 

The error the compiler throws is:

 

Error:(80, 11) value saveToMapRDB is not a member of org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
flows.saveToMapRDB("/tables/flow2", createTable = true, idFieldPath = "_id")

Error:(80, 41) not found: value createTable
Error:(80, 61) not found: value idFieldPath

 

It seems to be an issue in the dependencies, but 


import java.util.UUID
import com.mapr.db.spark._
import com.typesafe.scalalogging.slf4j.Logger
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.slf4j.LoggerFactory


object NetflowStream {

  val flowCsvFilename = "/tmp/nfcapd.201806111325.csv"
  val logger = Logger(LoggerFactory.getLogger("NetflowStream"))

  // --------------------------------------------------------------------------

  def main(args: Array[String]) {

    val spark = SparkSession.builder()
      .appName("NetflowFile")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    // --------------------------------------------------------------------------

    // Input file is netflow as .csv via nfdump
    import spark.implicits._
    import org.apache.spark.sql.functions._

    val makeId = udf(() => UUID.randomUUID().toString())

    val flowSchema = Encoders.product[NetflowRecord].schema
    val flows = spark.read.schema(flowSchema)
      .option("header", "true")
      .option("mode", "DROPMALFORMED")
      .csv(flowCsvFilename)
      .as[NetflowRecord]
      .drop("opkt")
      .drop("obyt")
      .drop("dtos")
      .drop("dir")
      .drop("rip")
      .withColumn("_id", makeId())
      .toDF()
      .persist(StorageLevel.MEMORY_AND_DISK)

    flows.show(5)
    flows.saveToMapRDB("/tables/flow2", createTable = true, idFieldPath = "_id")

  }

  // --------------------------------------------------------------------------

  case class NetflowRecord(
                            var treceived: String = null,
                            var tryear: Int = 0,
                            var trmonth: Int = 0,
                            var trday: Int = 0,
                            var trhour: Int = 0,
                            var trminute: Int = 0,
                            var trsec: Int = 0,
                            var trdur: Double = 0.0,
                            var sip: String = null,
                            var dip: String = null,
                            var sport: Int = 0,
                            var dport: Int = 0,
                            var proto: String = null,
                            var flag: String = null,
                            var fwd: Int = 0,
                            var stos: Int = 0,
                            var ipkt: Long = 0,
                            var ibyt: Long = 0,
                            var opkt: Long = 0,
                            var obyt: Long = 0,
                            var input: Int = 0,
                            var output: Int = 0,
                            var sas: Int = 0,
                            var das: Int = 0,
                            var dtos: Int = 0, // Always 0
                            var dir: Int = 0, // Always 0
                            var rip: String = null // Always 192.168.4.86
                          )

}

From pom.xml:

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.6</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <spark.version>2.1.0-mapr-1703</spark.version>
    <!--spark.version>2.2.1</spark.version -->
</properties>

<repositories>
    <repository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
    </repository>
    <repository>
        <id>mapr-releases</id>
        <url>http://repository.mapr.com/maven/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
        <releases>
            <enabled>true</enabled>
        </releases>
    </repository>
</repositories>

<pluginRepositories>
    <pluginRepository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
</pluginRepositories>

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
        <!-- version>2.1.0-mapr-1703</version -->
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.mapr.db/maprdb -->
    <dependency>
        <groupId>com.mapr.db</groupId>
        <artifactId>maprdb-spark</artifactId>
        <version>5.2.1-mapr</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/log4j/log4j -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging-slf4j -->
    <dependency>
        <groupId>com.typesafe.scala-logging</groupId>
        <artifactId>scala-logging-slf4j_2.11</artifactId>
        <version>2.1.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.8.0-beta2</version>
    </dependency>

</dependencies>

Outcomes