Назад | Перейти на главную страницу

Rsyslog. Очередь с дисковой поддержкой для омкафки

У меня следующий конвейер:

nginx -> unix_socket -> rsyslog -> omkafka module -> kafka

Для омкафки я использую следующую конфигурацию:

module(
  load="impstats"
  interval="10"             # how often to generate stats
  resetCounters="on"        # to get deltas (e.g. # of messages submitted in the last 10 seconds)
  log.file="/var/log/impstats"     # file to write those stats to
  log.syslog="off"          # don't send stats through the normal processing pipeline. More on that in a bit
)

#### LOAD MODULES ####
module(load="omkafka")

#### DEFINE GLOBALS ####
$MaxMessageSize 64k
$EscapeControlCharactersOnReceive off

#### TEMPLATES ####
$template ngFormat, "%msg:4:$%"

input(type="imuxsock" Socket="/spool/syslog" Ruleset="outwriter")

ruleset(name="outwriter"){
    action(
      type="omkafka"
      broker=["kafka666:9092"]
      topic="nginx_logs"
      partitions.auto="on"
      template="cerberFormat"
      queue.type="linkedlist"
      queue.dequeueBatchSize="10000"   # numbers of messages to be parsed from queue
      queue.highWatermark="450000"    # max no. of events to hold in memory
      queue.lowWatermark="250000"     # use memory queue again, when it's back to this level
      queue.spoolDirectory="/spool/logs"  # where to write on disk
      queue.fileName="rsyslog_queue"
      queue.maxDiskSpace="100g"        # it will stop at this much disk space
      queue.size="500000"           # or this many messages
      queue.saveOnShutdown="on"      # save memory queue contents to disk when rsyslog is exiting
    )
}

main_queue(
  queue.type="linkedlist"
  queue.dequeueBatchSize="10000"   # numbers of messages to be parsed from queue
  queue.highWatermark="450000"    # max no. of events to hold in memory
  queue.lowWatermark="250000"     # use memory queue again, when it's back to this level
  queue.spoolDirectory="/spool/logs"  # where to write on disk
  queue.fileName="rsyslog_main_queue"
  queue.maxDiskSpace="100g"        # it will stop at this much disk space
  queue.size="500000"           # or this many messages
  queue.saveOnShutdown="on"      # save memory queue contents to disk when rsyslog is exiting
)

Я полагаю, что если брокер kafka недоступен, все сообщения omkafka должны быть помещены в указанную DA-очередь. Но когда я наблюдаю за счетчиками с помощью impstats, очередь DA всегда пуста, а omkafka использует собственную очередь вывода.

Это выглядит так:

Tue Oct  4 13:02:09 2016: global: origin=dynstats 
Tue Oct  4 13:02:09 2016: imuxsock: origin=imuxsock submitted=13060 ratelimit.discarded=0 ratelimit.numratelimiters=0 
Tue Oct  4 13:02:09 2016: **omkafka**: submitted=0 **maxoutqsize=100000** failures=0 topicdynacache.skipped=0 topicdynacache.miss=0 topicdynacache.evicted=0 
Tue Oct  4 13:02:09 2016: action 0: origin=core.action processed=13060 failed=13060 suspended=0 suspended.duration=300 resumed=0 
Tue Oct  4 13:02:09 2016: action 1: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 3: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 4: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 5: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 6: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 7: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 8: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 9: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 10: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 11: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: resource-usage: origin=impstats utime=24242276 stime=15882703 maxrss=125316 minflt=95642 majflt=0 inblock=0 oublock=632 nvcsw=1067580 nivcsw=513 
Tue Oct  4 13:02:09 2016: **main Q[DA]:** origin=core.queue size=0 enqueued=0 full=0 discarded.full=0 discarded.nf=0 **maxqsize=0** 
Tue Oct  4 13:02:09 2016: main Q: origin=core.queue size=0 enqueued=13060 full=0 discarded.full=0 discarded.nf=0 maxqsize=18 

Что-то не так с моими конфигами или у омкафки нет надежной очереди?

Спасибо!

Вы должны добавить этот конфиг в свой action блок:

name="kafkaoutput"
action.resumeretrycount="-1"

И он будет писать в очередь DA, когда размер очереди> 450000

Я случайно нашел этот:

queue.buffering.max.messages = 100000

Похоже, сообщения уже должны были быть израсходованы.