Чтобы сделать наш кластер Kafka доступным как из Интернета, так и из нашей частной сети, мы решили настроить Kafka следующим образом:
Private VIP:9000 => All brokers:9092 (topology query only)
Private VIP:9001 => Broker #1:9092
Private VIP:9002 => Broker #2:9092
...
Public VIP:9000 => All brokers:9092 (topology query only)
Public VIP:9001 => Broker #1:9092
Public VIP:9002 => Broker #2:9092
...
Мы настроили наш балансировщик нагрузки таким образом, а затем наши брокеры:
listeners=PLAINTEXT://<server_priv_ip>:9092
advertised.listeners=INTERNAL://<private_VIP>:9001,EXTERNAL://<public_vip>:9001
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
inter.broker.listener.name=INTERNAL
Теперь, как и следовало ожидать, брокеры Kafka не запускаются:
ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: Each listener must have a different port, listeners: INTERNAL://<private_VIP>:9001,EXTERNAL://<public_vip>:9001
С точки зрения моего администратора, мой подход был полностью логичным, хотя я ожидал, что возникнет проблема, при которой производитель / потребители получат оба адреса, откуда бы они ни связались с брокером. Использование одного и того же порта на 2 разных адресах кажется логичным и способствует ясности ...
Первый вопрос : Почему это неправильно?
Второй вопрос: Как я могу достичь своей цели, если не так? (большинство вариантов есть в таблице)
Мы поговорили с экспертом Kafka, и вот что из этого вышло.
Проще говоря, в то время как kafka знает имена слушателей, чтобы различать 2 слушателя, ему необходимо знать, какой из них клиент, который подключается, хочет достичь, и может сделать это только с помощью входящего порта.
Также слушатели и рекламируемые слушатели сопоставляются друг с другом, если у них один и тот же порт, поэтому вам нужно, чтобы они совпадали.
В итоге вот что мы сделали:
listeners=INTERNAL://hostname:900N,EXTERNAL://hostname:910N,REPLICATION:hostname:9092
advertised.listeners==INTERNAL://vip:900N,EXTERNAL://vip:910N,REPLICATION:hostname:9092
listeners.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,REPLICATION:PLAINTEXT
inter.listener.protocol=REPLICATION
Где "N" - идентификатор брокера (считая с 1 форварда)
На балансировщике нагрузки мы сопоставляем каждый порт VIP-адреса с IP-адресом брокера плюс виртуальный IP-адрес на портах 9000 (внутренняя сеть) и 9100 (внешняя сеть), которые сопоставляются с соответствующим прослушивателем каждого брокера.
Это немного излишне, но работает так, как ожидалось (если темы внутренних метаданных kafka, такие как __consumer_offset
воспроизводятся)