AnsweredAssumed Answered

Flume - sink as Maprfs. Files are not copied,Flume - sink as Maprfs. Files not writing

Question asked by manirangasamy on May 26, 2015
My source type is spooldir and sink type is hdfs. There is no error but files are not copied.

Between I am completely aware of the NFS mount feature to copy data. I am in learning flume and I want to try this feature. Once this is working I would like try to write data using log4j, avro as source and hdfs as sink.

Any help is greatly appreciated

Regards Mani

    # Name the components of this agents
    maprfs-agent.sources = spool-collect
    maprfs-agent.sinks = maprfs-write
    maprfs-agent.channels = memory-channel
    
    # Describe/ Configure the sources
    maprfs-agent.sources.spool-collect.type = spooldir
    maprfs-agent.sources.spool-collect.spoolDir = /home/appdata/mani
    maprfs-agent.sources.spool-collect.fileHeader = true
    maprfs-agent.sources.spool-collect.bufferMaxLineLength = 500
    maprfs-agent.sources.spool-collect.bufferMaxLines = 10000
    maprfs-agent.sources.spool-collect.batchSize = 100000
    
    # Describe/ Configure sink
    maprfs-agent.sinks.maprfs-write.type = hdfs
    maprfs-agent.sinks.maprfs-write.hdfs.fileType  = DataStream
    maprfs-agent.sinks.maprfs-write.hdfs.path = maprfs:///sample.node.com/user/hive/test
    maprfs-agent.sinks.maprfs-write.writeFormat = Text
    maprfs-agent.sinks.maprfs-write.hdfs.proxyUser = root
    maprfs-agent.sinks.maprfs-write.hdfs.kerberosPrincipal = mapr
    maprfs-agent.sinks.maprfs-write.hdfs.kerberosKeytab = /opt/mapr/conf/flume.keytab
    maprfs-agent.sinks.maprfs-write.hdfs.filePrefix = %{file}
    maprfs-agent.sinks.maprfs-write.hdfs.fileSuffix = .csv
    maprfs-agent.sinks.maprfs-write.hdfs.rollInterval = 0
    maprfs-agent.sinks.maprfs-write.hdfs.rollCount = 0
    maprfs-agent.sinks.maprfs-write.hdfs.rollSize = 0
    maprfs-agent.sinks.maprfs-write.hdfs.batchSize = 100
    maprfs-agent.sinks.maprfs-write.hdfs.idleTimeout = 0
    maprfs-agent.sinks.maprfs-write.hdfs.maxOpenFiles = 5
    
    
    # Configure channel buffer
    maprfs-agent.channels.memory-channel.type = memory
    maprfs-agent.channels.memory-channel.capacity = 1000
    
    # Bind the source and the sink to the channel
    maprfs-agent.sources.spool-collect.channels = memory-channel
    maprfs-agent.sinks.maprfs-write.channel = memory-channel


I am getting below message. no error and no files copied when I execute below command.

hadoop mfs -ls /user/hive/test

    15/05/26 13:55:45 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
    15/05/26 13:55:45 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:mapr-spool.conf
    15/05/26 13:55:45 INFO conf.FlumeConfiguration: Added sinks: maprfs-write Agent: maprfs-agent
    15/05/26 13:55:45 INFO conf.FlumeConfiguration: Processing:maprfs-write
    15/05/26 13:55:45 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [maprfs-agent]
    15/05/26 13:55:45 INFO node.AbstractConfigurationProvider: Creating channels
    15/05/26 13:55:45 INFO channel.DefaultChannelFactory: Creating instance of channel memory-channel type memory
    15/05/26 13:55:45 INFO node.AbstractConfigurationProvider: Created channel memory-channel
    15/05/26 13:55:45 INFO source.DefaultSourceFactory: Creating instance of source spool-collect, type spooldir
    15/05/26 13:55:45 INFO sink.DefaultSinkFactory: Creating instance of sink: maprfs-write, type: hdfs
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink: Auth method: PROXY
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  User name: root
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  Using keytab: false
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  Superuser auth: SIMPLE
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  Superuser name: root
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  Superuser using keytab: false
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink: Logged in as user root
    15/05/26 13:55:47 INFO node.AbstractConfigurationProvider: Channel memory-channel connected to [spool-collect, maprfs-write]
    15/05/26 13:55:47 INFO node.Application: Starting new configuration:{ sourceRunners:{spool-collect=EventDrivenSourceRunner: { source:Spool Directory source spool-collect: { spoolDir: /home/appdata/mani } }} sinkRunners:{maprfs-write=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@7fc7efa0 counterGroup:{ name:null counters:{} } }} channels:{memory-channel=org.apache.flume.channel.MemoryChannel{name: memory-channel}} }
    15/05/26 13:55:47 INFO node.Application: Starting Channel memory-channel
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory-channel: Successfully registered new MBean.
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory-channel started
    15/05/26 13:55:47 INFO node.Application: Starting Sink maprfs-write
    15/05/26 13:55:47 INFO node.Application: Starting Source spool-collect
    15/05/26 13:55:47 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/appdata/mani
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: maprfs-write: Successfully registered new MBean.
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: maprfs-write started
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: spool-collect: Successfully registered new MBean.
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: spool-collect started
    15/05/26 13:55:47 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/appdata/mani/cron-s3.log to /home/appdata/mani/cron-s3.log.COMPLETED
    15/05/26 13:55:47 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
    15/05/26 13:55:48 INFO hdfs.BucketWriter: Creating maprfs:///sample.node.com/user/hive/test/.1432644947885.csv.tmp
    15/05/26 13:57:08 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/appdata/mani/network-usage.log to /home/appdata/mani/network-usage.log.COMPLETED
    15/05/26 13:57:08 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/appdata/mani/processor-usage-2014-10-17.log to /home/appdata/mani/processor-usage-2014-10-17.log.COMPLETED
    15/05/26 13:57:25 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/appdata/mani/total-processor-usage.log to /home/appdata/mani/total-processor-usage.log.COMPLETED
    15/05/26 13:57:25 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:26 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:26 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:27 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:27 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:28 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
,My source type is spooldir and sink type is hdfs. There is no error but files are not copied.

Between I am completely aware of the NFS mount feature to copy data. I am in learning flume and I want to try this feature. Once this is working I would like try to write data using log4j, avro as source and hdfs as sink.

Any help is greatly appreciated

Regards Mani

    # Name the components of this agents
    maprfs-agent.sources = spool-collect
    maprfs-agent.sinks = maprfs-write
    maprfs-agent.channels = memory-channel
    
    # Describe/ Configure the sources
    maprfs-agent.sources.spool-collect.type = spooldir
    maprfs-agent.sources.spool-collect.spoolDir = /home/appdata/mani
    maprfs-agent.sources.spool-collect.fileHeader = true
    maprfs-agent.sources.spool-collect.bufferMaxLineLength = 500
    maprfs-agent.sources.spool-collect.bufferMaxLines = 10000
    maprfs-agent.sources.spool-collect.batchSize = 100000
    
    # Describe/ Configure sink
    maprfs-agent.sinks.maprfs-write.type = hdfs
    maprfs-agent.sinks.maprfs-write.hdfs.fileType  = DataStream
    maprfs-agent.sinks.maprfs-write.hdfs.path = maprfs:///sample.node.com/user/hive/test
    maprfs-agent.sinks.maprfs-write.writeFormat = Text
    maprfs-agent.sinks.maprfs-write.hdfs.proxyUser = root
    maprfs-agent.sinks.maprfs-write.hdfs.kerberosPrincipal = mapr
    maprfs-agent.sinks.maprfs-write.hdfs.kerberosKeytab = /opt/mapr/conf/flume.keytab
    maprfs-agent.sinks.maprfs-write.hdfs.filePrefix = %{file}
    maprfs-agent.sinks.maprfs-write.hdfs.fileSuffix = .csv
    maprfs-agent.sinks.maprfs-write.hdfs.rollInterval = 0
    maprfs-agent.sinks.maprfs-write.hdfs.rollCount = 0
    maprfs-agent.sinks.maprfs-write.hdfs.rollSize = 0
    maprfs-agent.sinks.maprfs-write.hdfs.batchSize = 100
    maprfs-agent.sinks.maprfs-write.hdfs.idleTimeout = 0
    maprfs-agent.sinks.maprfs-write.hdfs.maxOpenFiles = 5
    
    
    # Configure channel buffer
    maprfs-agent.channels.memory-channel.type = memory
    maprfs-agent.channels.memory-channel.capacity = 1000
    
    # Bind the source and the sink to the channel
    maprfs-agent.sources.spool-collect.channels = memory-channel
    maprfs-agent.sinks.maprfs-write.channel = memory-channel

I am getting below message. no error and no files copied when I execute below command.

hadoop mfs -ls /user/hive/test


    15/05/26 13:55:45 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
    15/05/26 13:55:45 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:mapr-spool.conf
    15/05/26 13:55:45 INFO conf.FlumeConfiguration: Added sinks: maprfs-write Agent: maprfs-agent
    15/05/26 13:55:45 INFO conf.FlumeConfiguration: Processing:maprfs-write
    15/05/26 13:55:45 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [maprfs-agent]
    15/05/26 13:55:45 INFO node.AbstractConfigurationProvider: Creating channels
    15/05/26 13:55:45 INFO channel.DefaultChannelFactory: Creating instance of channel memory-channel type memory
    15/05/26 13:55:45 INFO node.AbstractConfigurationProvider: Created channel memory-channel
    15/05/26 13:55:45 INFO source.DefaultSourceFactory: Creating instance of source spool-collect, type spooldir
    15/05/26 13:55:45 INFO sink.DefaultSinkFactory: Creating instance of sink: maprfs-write, type: hdfs
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink: Auth method: PROXY
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  User name: root
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  Using keytab: false
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  Superuser auth: SIMPLE
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  Superuser name: root
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink:  Superuser using keytab: false
    15/05/26 13:55:47 INFO hdfs.HDFSEventSink: Logged in as user root
    15/05/26 13:55:47 INFO node.AbstractConfigurationProvider: Channel memory-channel connected to [spool-collect, maprfs-write]
    15/05/26 13:55:47 INFO node.Application: Starting new configuration:{ sourceRunners:{spool-collect=EventDrivenSourceRunner: { source:Spool Directory source spool-collect: { spoolDir: /home/appdata/mani } }} sinkRunners:{maprfs-write=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@7fc7efa0 counterGroup:{ name:null counters:{} } }} channels:{memory-channel=org.apache.flume.channel.MemoryChannel{name: memory-channel}} }
    15/05/26 13:55:47 INFO node.Application: Starting Channel memory-channel
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory-channel: Successfully registered new MBean.
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory-channel started
    15/05/26 13:55:47 INFO node.Application: Starting Sink maprfs-write
    15/05/26 13:55:47 INFO node.Application: Starting Source spool-collect
    15/05/26 13:55:47 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/appdata/mani
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: maprfs-write: Successfully registered new MBean.
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: maprfs-write started
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: spool-collect: Successfully registered new MBean.
    15/05/26 13:55:47 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: spool-collect started
    15/05/26 13:55:47 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/appdata/mani/cron-s3.log to /home/appdata/mani/cron-s3.log.COMPLETED
    15/05/26 13:55:47 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
    15/05/26 13:55:48 INFO hdfs.BucketWriter: Creating maprfs:///sample.node.com/user/hive/test/.1432644947885.csv.tmp
    15/05/26 13:57:08 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/appdata/mani/network-usage.log to /home/appdata/mani/network-usage.log.COMPLETED
    15/05/26 13:57:08 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/appdata/mani/processor-usage-2014-10-17.log to /home/appdata/mani/processor-usage-2014-10-17.log.COMPLETED
    15/05/26 13:57:25 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/appdata/mani/total-processor-usage.log to /home/appdata/mani/total-processor-usage.log.COMPLETED
    15/05/26 13:57:25 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:26 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:26 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:27 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:27 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
    15/05/26 13:57:28 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.

Outcomes