Я пытаюсь развернуть брокера kafka в контейнере докеров в кластере mesos.
В частности, у меня есть кластер mesos, где я развертываю различные контейнеры докеров, используя marathon в качестве системы инициализации. Все контейнеры имеют служебные порты и доступны через прокси (HAproxy).
Проблема
Когда я развертываю контейнер kafka с помощью marathon, я могу создать тему, перечислить все темы, но не могу запустить команду произвести / потреблять. Команда производства дает мне следующую ошибку
[2016-01-18 11:10:09,926] WARN Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Образ докера, который я использую, - это spotify / kafka, в нем предустановлены zookeeper и kafka. Этот образ работает нормально, когда я запускаю его с помощью команды docker run.
Я использую следующий json-файл marathon для развертывания контейнера:
{
"id": "spotify-kafka.marathon",
"cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; env; supervisord -n",
"container": {
"type": "DOCKER",
"docker": {
"image": "spotify/kafka",
"network": "BRIDGE",
"portMappings": [
{"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
{"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
]
}
},
"cpus": 0.5,
"mem": 1024.0,
"instances": 1
}
Команда cmd экспортирует несколько переменных env, которые задают IP и порт внутреннего хоста. Внешние порты являются случайными и перехватываются HAproxy, который направляет их на статические.
Команды, которые я использую для общения с kafka, взяты из документации:
https://kafka.apache.org/documentation.html#quickstart
Я также использовал другие изображения, такие как ches / kafka, wurstmeister / kafka и один, который создал сам. Я также нашел https://github.com/mesos/kafka который после сборки вы можете отправлять команды на порт 7000 и развертывать брокеров в кластере, что для меня не удалось. В идеале я хотел бы изображение, в котором уже есть zookeeper и kafka, например изображение spotify.
Обновление 1
Итак, я изменил файл JSON марафона и экспортировал еще несколько переменных, которые кажутся необходимыми. Окончательный JSON показан ниже.
{
"id": "spotify-kafka.marathon",
"cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; export PORT_9092=9092; export PORT=2181; export PORT0=2181; export PORT1=9092; export PORT_2181=2181 ; env; supervisord -n",
"container": {
"type": "DOCKER",
"docker": {
"image": "192.168.1.235:5000/spotify-kafka",
"network": "BRIDGE",
"portMappings": [
{"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
{"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
]
}
},
"cpus": 0.5,
"mem": 1024.0,
"instances": 1
}
Это изменение дало мне другой результат, когда я попытался создать сообщение.
[2016-01-19 11:02:09,297] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,309] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,310] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,416] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,528] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,639] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,750] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,750] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,751] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Обновление 2 - Решение
Покопавшись в сети, я нашел этот репозиторий https://github.com/tobilg/docker-kafka-marathon/ Этот парень создал сценарий оболочки, который автоматически создает для вас файл свойств. Также вы можете масштабировать этот контейнер и иметь несколько экземпляров брокера kafka. Единственным недостатком для меня является то, что он полагается на внешний сервер zookeeper, но я не думаю, что это будет проблемой, которую можно решить, установив его в образ.
Я отмечаю это как решенное.