Назад | Перейти на главную страницу

Контейнер докеров Kafka на кластере mesos с помощью marathon

Я пытаюсь развернуть брокера kafka в контейнере докеров в кластере mesos.

В частности, у меня есть кластер mesos, где я развертываю различные контейнеры докеров, используя marathon в качестве системы инициализации. Все контейнеры имеют служебные порты и доступны через прокси (HAproxy).

Проблема

Когда я развертываю контейнер kafka с помощью marathon, я могу создать тему, перечислить все темы, но не могу запустить команду произвести / потреблять. Команда производства дает мне следующую ошибку

[2016-01-18 11:10:09,926] WARN Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Образ докера, который я использую, - это spotify / kafka, в нем предустановлены zookeeper и kafka. Этот образ работает нормально, когда я запускаю его с помощью команды docker run.

Я использую следующий json-файл marathon для развертывания контейнера:

{
    "id": "spotify-kafka.marathon", 
    "cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; env; supervisord -n",
    "container": {
    "type": "DOCKER",
    "docker": {
            "image": "spotify/kafka",
            "network": "BRIDGE",
            "portMappings": [
                    {"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
                    {"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
          ]
    }
},
    "cpus": 0.5,
    "mem": 1024.0,
    "instances": 1
}

Команда cmd экспортирует несколько переменных env, которые задают IP и порт внутреннего хоста. Внешние порты являются случайными и перехватываются HAproxy, который направляет их на статические.

Команды, которые я использую для общения с kafka, взяты из документации:

https://kafka.apache.org/documentation.html#quickstart

Я также использовал другие изображения, такие как ches / kafka, wurstmeister / kafka и один, который создал сам. Я также нашел https://github.com/mesos/kafka который после сборки вы можете отправлять команды на порт 7000 и развертывать брокеров в кластере, что для меня не удалось. В идеале я хотел бы изображение, в котором уже есть zookeeper и kafka, например изображение spotify.

Обновление 1
Итак, я изменил файл JSON марафона и экспортировал еще несколько переменных, которые кажутся необходимыми. Окончательный JSON показан ниже.

{
    "id": "spotify-kafka.marathon", 
    "cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; export PORT_9092=9092; export PORT=2181; export PORT0=2181; export PORT1=9092; export PORT_2181=2181 ; env; supervisord -n",
    "container": {
    "type": "DOCKER",
    "docker": {
            "image": "192.168.1.235:5000/spotify-kafka",
            "network": "BRIDGE",
            "portMappings": [
                    {"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
                    {"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
          ]
    }
},
    "cpus": 0.5,
    "mem": 1024.0,
    "instances": 1
}

Это изменение дало мне другой результат, когда я попытался создать сообщение.

[2016-01-19 11:02:09,297] WARN Error while fetching metadata     [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,309] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,310] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,416] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,528] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,639] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,750] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,750] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,751] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Обновление 2 - Решение
Покопавшись в сети, я нашел этот репозиторий https://github.com/tobilg/docker-kafka-marathon/ Этот парень создал сценарий оболочки, который автоматически создает для вас файл свойств. Также вы можете масштабировать этот контейнер и иметь несколько экземпляров брокера kafka. Единственным недостатком для меня является то, что он полагается на внешний сервер zookeeper, но я не думаю, что это будет проблемой, которую можно решить, установив его в образ.

Я отмечаю это как решенное.