AnsweredAssumed Answered

Running a mapreduce job on cloudera demo cdh3u4 (airline data example)

Question asked by alinghi90 on Nov 3, 2012
Latest reply on Nov 5, 2012 by mandoskippy
Hi all

I'm doing the R-Hadoop tutorial (october 2012) of Jeffrey Breen. At the moment I try to populate hdfs and then run the commands Jeffrey published in his tutorial in RStudio. Unfortunately I got some troubles with it:

I moved the data folder to: /home/cloudera/data/hadoop/airline
No when I run populate.hdfs.sh I get the following output:

    [cloudera@localhost ~]$ /home/cloudera/TutorialBreen/bin/populate.hdfs.sh
    mkdir: cannot create directory /user/cloudera: File exists
    mkdir: cannot create directory /user/cloudera/wordcount: File exists
    mkdir: cannot create directory /user/cloudera/wordcount/data: File exists
    mkdir: cannot create directory /user/cloudera/airline: File exists
    mkdir: cannot create directory /user/cloudera/airline/data: File exists
    put: Target /user/cloudera/airline/data/20040325.csv already exists

And then I tried the commands in RStudio as shown in the tutorial but I get errors at the end. Can someone show me what I did wrong?

        > if (LOCAL)
    + {
    +   rmr.options.set(backend = 'local')
    +   hdfs.data.root = 'data/local/airline'
    +   hdfs.data = file.path(hdfs.data.root, '20040325-jfk-lax.csv')
    +   hdfs.out.root = 'out/airline'
    +   hdfs.out = file.path(hdfs.out.root, 'out')
    +   if (!file.exists(hdfs.out))
    +     dir.create(hdfs.out.root, recursive=T)
    + } else {
    +   rmr.options.set(backend = 'hadoop')
    +   hdfs.data.root = 'airline'
    +   hdfs.data = file.path(hdfs.data.root, 'data')
    +   hdfs.out.root = hdfs.data.root
    +   hdfs.out = file.path(hdfs.out.root, 'out')
    + }
    > asa.csvtextinputformat = make.input.format( format = function(con, nrecs) {
    +   line = readLines(con, nrecs)
    +   values = unlist( strsplit(line, "\\,") )
    +   if (!is.null(values)) {
    +     names(values) = c('Year','Month','DayofMonth','DayOfWeek','DepTime','CRSDepTime',
    +                       'ArrTime','CRSArrTime','UniqueCarrier','FlightNum','TailNum',
    +                       'ActualElapsedTime','CRSElapsedTime','AirTime','ArrDelay',
    +                       'DepDelay','Origin','Dest','Distance','TaxiIn','TaxiOut',
    +                       'Cancelled','CancellationCode','Diverted','CarrierDelay',
    +                       'WeatherDelay','NASDelay','SecurityDelay','LateAircraftDelay')
    +     return( keyval(NULL, values) )
    +   }
    + }, mode='text' )
    > mapper.year.market.enroute_time = function(key, val) {
    +   if ( !identical(as.character(val['Year']), 'Year')
    +        & identical(as.numeric(val['Cancelled']), 0)
    +        & identical(as.numeric(val['Diverted']), 0) ) { 
    +     if (val['Origin'] < val['Dest'])
    +       market = paste(val['Origin'], val['Dest'], sep='-')
    +     else
    +       market = paste(val['Dest'], val['Origin'], sep='-')
    +     output.key = c(val['Year'], market)
    +     output.val = c(val['CRSElapsedTime'], val['ActualElapsedTime'], val['AirTime'])
    +     return( keyval(output.key, output.val) )
    +   }
    + }
    > reducer.year.market.enroute_time = function(key, val.list) {
    +   if ( require(plyr) ) 
    +     val.df = ldply(val.list, as.numeric)
    +   else { # this is as close as my deficient *apply skills can come w/o plyr
    +     val.list = lapply(val.list, as.numeric)
    +     val.df = data.frame( do.call(rbind, val.list) )
    +   }  
    +   colnames(val.df) = c('crs', 'actual','air')
    +   output.key = key
    +   output.val = c( nrow(val.df), mean(val.df$crs, na.rm=T),
    +                   mean(val.df$actual, na.rm=T),
    +                   mean(val.df$air, na.rm=T) )
    +   return( keyval(output.key, output.val) )
    + }
    > mr.year.market.enroute_time = function (input, output) {
    +   mapreduce(input = input,
    +             output = output,
    +             input.format = asa.csvtextinputformat,
    +             output.format='csv', # note to self: 'csv' for data, 'text' for bug
    +             map = mapper.year.market.enroute_time,
    +             reduce = reducer.year.market.enroute_time,
    +             backend.parameters = list(
    +               hadoop = list(D = "mapred.reduce.tasks=2")
    +             ),
    +             verbose=T)
    + }
    > out = mr.year.market.enroute_time(hdfs.data, hdfs.out)
    Error in file(f, if (format$mode == "text") "r" else "rb") :
      cannot open the connection
    In addition: Warning message:
    In file(f, if (format$mode == "text") "r" else "rb") :
      cannot open file 'data/local/airline/20040325-jfk-lax.csv': No such file or directory
    > if (LOCAL)
    + {
    +   results.df = as.data.frame( from.dfs(out, structured=T) )
    +   colnames(results.df) = c('year', 'market', 'flights', 'scheduled', 'actual', 'in.air')
    +   print(head(results.df))
    + }
    Error in to.dfs.path(input) : object 'out' not found


I hope someone can help me!
Thank you so much!

Outcomes