AnsweredAssumed Answered

Kafka Connect HDFS sink issue

Question asked by server on Jul 21, 2017
Latest reply on Dec 4, 2017 by cathy

Hi,

I have recently attempted to test Kafka Connect on Mapr without any success


Environment: Mapr-Sandbox-For-Hadoop-5.2.1

 

I installed all required components(http://maprdocs.mapr.com/home/AdvancedInstallation/InstallingKafka.html):

mapr-kafka-0.9.0
mapr-kafka-rest
mapr-client
mapr-kafka-connect-jdbc
mapr-kafka-connect-hdfs


Created a topic:
maprcli stream create -path /streams/test -produceperm p -consumeperm p -topicperm p

maprcli stream topic create -path /streams/test -topic topic01


Modified standalone configuration to ensure schemaless JSON as input data is set up:

config/connect-standalone.properties

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
rest.port=8083
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=1000

 

Configured hdfs sink property file as suggested here http://maprdocs.mapr.com/home/Kafka/Connect-hdfs-example-fromKafka.html:
/opt/mapr/kafka-connect-hdfs/kafka-connect-hdfs-2.0.1/etc/kafka-connect-hdfs/quickstart-hdfs.properties

 

name=maprfs-sink-connector-topic02
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=/streams/test:topic01
hdfs.url=maprfs:///test_connect
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
flush.size=3

 

Started standalone connect worker:
./bin/connect-standalone.sh ./config/connect-standalone.properties opt/mapr/kafka-connect-hdfs/kafka-connect-hdfs-2.0.1/etc/kafka-connect-hdfs/quickstart-hdfs.properties

 

produced a message into new created topic:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"value": {"temp" : 55555 , "speed" : 40 , "direction" : "NWt2"} }]}' http://localhost:8082/topics/%2Fstreams%2Ftest%3Atopic01

 

checked out the message ended up in the topic:

/opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --topic /streams/test:topic01 --new-consumer --from-beginning --bootstrap-server this.will.be.ignored:9092

 

{"temp":55555,"speed":40,"direction":"NWt2"}

 

checked out the folder for any new parquet files, which seems to be empty:
cd /mapr/demo.mapr.com/test_connect/topics/+tmp

 

I can't find any clue that would help clarifying what I've done wrong, please any assistance would be greatly appreciated.

logs output:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/mapr/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/mapr/kafka-connect-hdfs/kafka-connect-hdfs-2.0.1/share/java/kafka-connect-hdfs/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/mapr/kafka-connect-hdfs/kafka-connect-hdfs-2.0.1/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/mapr/kafka-connect-jdbc/kafka-connect-jdbc-2.0.1/share/java/kafka-connect-jdbc/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2017-07-19 03:54:15,071] INFO StandaloneConfig values:
cluster = connect
rest.advertised.host.name = null
task.shutdown.graceful.timeout.ms = 5000
rest.host.name = null
rest.advertised.port = null
bootstrap.servers = [localhost:9092]
offset.flush.timeout.ms = 5000
offset.flush.interval.ms = 1000
rest.port = 8083
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:165)
[2017-07-19 03:54:15,316] INFO Logging initialized @798ms (org.eclipse.jetty.util.log:186)
[2017-07-19 03:54:15,354] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:53)
[2017-07-19 03:54:15,355] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:89)
[2017-07-19 03:54:15,369] INFO ProducerConfig values:
fs.mapr.hardmount = true
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
streams.buffer.max.time.ms = 3000
fs.mapr.rpc.timeout = 300
max.block.ms = 9223372036854775807
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 2147483647
acks = all
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 2147483647
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
streams.producer.default.stream =
streams.partitioner.class = class org.apache.kafka.clients.producer.DefaultStreamsPartitioner
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 1
metrics.num.samples = 2
ssl.protocol = TLS
streams.parallel.flushers.per.partition = true
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0
(org.apache.kafka.clients.producer.ProducerConfig:165)
[2017-07-19 03:54:15,370] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:53)
[2017-07-19 03:54:15,444] INFO Worker started (org.apache.kafka.connect.runtime.Worker:111)
[2017-07-19 03:54:15,444] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:57)
[2017-07-19 03:54:15,444] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:58)
[2017-07-19 03:54:15,445] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:91)
[2017-07-19 03:54:15,617] INFO jetty-9.2.12.v20150709 (org.eclipse.jetty.server.Server:327)
Jul 19, 2017 3:54:16 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2017-07-19 03:54:16,449] INFO Started o.e.j.s.ServletContextHandler@21baa903{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-07-19 03:54:16,466] INFO Started ServerConnector@583840f6{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-07-19 03:54:16,467] INFO Started @1952ms (org.eclipse.jetty.server.Server:379)
[2017-07-19 03:54:16,470] INFO REST server listening at http://10.0.2.15:8083/, advertising URL http://10.0.2.15:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:132)
[2017-07-19 03:54:16,470] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:60)
[2017-07-19 03:54:16,475] INFO ConnectorConfig values:
connector.class = class io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max = 1
topics = [/streams/test:topic01]
name = maprfs-sink-connector-topic02
(org.apache.kafka.connect.runtime.ConnectorConfig:165)
[2017-07-19 03:54:16,477] INFO Creating connector maprfs-sink-connector-topic02 of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:170)
[2017-07-19 03:54:16,480] INFO Instantiated connector maprfs-sink-connector-topic02 with version 2.0.1-mapr-1703 of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:183)
[2017-07-19 03:54:16,490] INFO HdfsSinkConnectorConfig values:
filename.offset.zero.pad.width = 10
topics.dir = topics
flush.size = 3
timezone =
connect.hdfs.principal =
hive.home =
hive.database = default
rotate.interval.ms = -1
retry.backoff.ms = 5000
locale =
hadoop.home =
logs.dir = logs
schema.cache.size = 1000
format.class = io.confluent.connect.hdfs.parquet.ParquetFormat
hive.integration = false
hdfs.namenode.principal =
hive.conf.dir =
partition.duration.ms = -1
hadoop.conf.dir =
schema.compatibility = NONE
connect.hdfs.keytab =
hdfs.url = maprfs:///test_connect
hdfs.authentication.kerberos = false
hive.metastore.uris =
partition.field.name =
kerberos.ticket.renew.period.ms = 3600000
shutdown.timeout.ms = 3000
partitioner.class = io.confluent.connect.hdfs.partitioner.DefaultPartitioner
storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
path.format =
(io.confluent.connect.hdfs.HdfsSinkConnectorConfig:135)
[2017-07-19 03:54:16,490] INFO Finished creating connector maprfs-sink-connector-topic02 (org.apache.kafka.connect.runtime.Worker:193)
[2017-07-19 03:54:16,495] INFO TaskConfig values:
task.class = class io.confluent.connect.hdfs.HdfsSinkTask
(org.apache.kafka.connect.runtime.TaskConfig:165)
[2017-07-19 03:54:16,496] INFO Creating task maprfs-sink-connector-topic02-0 (org.apache.kafka.connect.runtime.Worker:256)
[2017-07-19 03:54:16,496] INFO Instantiated task maprfs-sink-connector-topic02-0 with version 2.0.1-mapr-1703 of type io.confluent.connect.hdfs.HdfsSinkTask (org.apache.kafka.connect.runtime.Worker:267)
[2017-07-19 03:54:16,511] INFO ConsumerConfig values:
fs.mapr.hardmount = true
streams.record.strip.streampath = false
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
enable.auto.commit = false
fs.mapr.rpc.timeout = 300
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
streams.consumer.default.stream =
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = connect-maprfs-sink-connector-topic02
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
streams.consumer.buffer.memory = 67108864
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
streams.zerooffset.record.on.eof = false
auto.offset.reset = earliest
(org.apache.kafka.clients.consumer.ConsumerConfig:165)
[2017-07-19 03:54:16,516] INFO Created connector maprfs-sink-connector-topic02 (org.apache.kafka.connect.cli.ConnectStandalone:82)
[2017-07-19 03:54:16,618] INFO HdfsSinkConnectorConfig values:
filename.offset.zero.pad.width = 10
topics.dir = topics
flush.size = 3
timezone =
connect.hdfs.principal =
hive.home =
hive.database = default
rotate.interval.ms = -1
retry.backoff.ms = 5000
locale =
hadoop.home =
logs.dir = logs
schema.cache.size = 1000
format.class = io.confluent.connect.hdfs.parquet.ParquetFormat
hive.integration = false
hdfs.namenode.principal =
hive.conf.dir =
partition.duration.ms = -1
hadoop.conf.dir =
schema.compatibility = NONE
connect.hdfs.keytab =
hdfs.url = maprfs:///test_connect
hdfs.authentication.kerberos = false
hive.metastore.uris =
partition.field.name =
kerberos.ticket.renew.period.ms = 3600000
shutdown.timeout.ms = 3000
partitioner.class = io.confluent.connect.hdfs.partitioner.DefaultPartitioner
storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
path.format =
(io.confluent.connect.hdfs.HdfsSinkConnectorConfig:135)
[2017-07-19 03:54:16,897] INFO Hadoop configuration directory (io.confluent.connect.hdfs.DataWriter:93)
[2017-07-19 03:54:17,869] INFO Started recovery for topic partition /streams/test:topic01-0 (io.confluent.connect.hdfs.TopicPartitionWriter:192)
[2017-07-19 03:54:17,880] INFO Finished recovery for topic partition /streams/test:topic01-0 (io.confluent.connect.hdfs.TopicPartitionWriter:207)
[2017-07-19 03:54:17,880] INFO Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@64dfcfc9 finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:155)
[2017-07-19 03:54:17,880] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@64dfcfc9 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)

 

thanks,

Ian

Outcomes