Я использую Flume flume-ng-1.5.0 (с CDH 5.4) для сбора журналов со многих серверов и перехода в HDFS Вот моя конфигурация:
#Define Source , Sinks, Channel
collector.sources = avro
collector.sinks = HadoopOut
collector.channels = fileChannel
# Define Scribe Interface
collector.sources.avro.type = avro
collector.sources.avro.bind = 0.0.0.0
collector.sources.avro.port = 1463
collector.sources.avro.threads = 5
collector.sources.avro.channels = fileChannel
collector.channels.fileChannel.type = file
collector.channels.fileChannel.checkpointDir = /channel/flume/collector/checkpoint
collector.channels.fileChannel.dataDirs = /channel/flume/collector/data
#collector.channels.fileChannel.transactionCapacity = 100000
#collector.channels.fileChannel.capacity = 1000000000
#Describe Haoop Out
collector.sinks.HadoopOut.type = hdfs
collector.sinks.HadoopOut.channel = fileChannel
collector.sinks.HadoopOut.hdfs.path = /logfarm/%{game_studio}/%{product_code}/%Y-%m-%d/%{category}
collector.sinks.HadoopOut.hdfs.filePrefix = %{category}-%Y-%m-%d
collector.sinks.HadoopOut.hdfs.inUseSuffix = _current
collector.sinks.HadoopOut.hdfs.fileType = DataStream
collector.sinks.HadoopOut.hdfs.writeFormat = Text
#Max File size = 10 MB
collector.sinks.HadoopOut.hdfs.round = true
collector.sinks.HadoopOut.hdfs.roundValue = 10
collector.sinks.HadoopOut.hdfs.roundUnit = minute
collector.sinks.HadoopOut.hdfs.rollSize = 10000000
collector.sinks.HadoopOut.hdfs.rollCount = 0
collector.sinks.HadoopOut.hdfs.rollInterval = 600
collector.sinks.HadoopOut.hdfs.maxOpenFiles = 4096
collector.sinks.HadoopOut.hdfs.timeZone = Asia/Saigon
collector.sinks.HadoopOut.hdfs.useLocalTimeStamp = true
collector.sinks.HadoopOut.hdfs.threadsPoolSize = 50
collector.sinks.HadoopOut.hdfs.batchSize = 10000
Каталоги: / channel / flume / collector / checkpoint, / channel / flume / collector / data пустые и принадлежат пользователю flume.
Но у меня есть странное исключение:
2015-05-08 18:31:34,290 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=fileChannel]. Due to java.io.EOFException: null
at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:340)
at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.RandomAccessFile.readInt(RandomAccessFile.java:827)
at java.io.RandomAccessFile.readLong(RandomAccessFile.java:860)
at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:80)
at org.apache.flume.channel.file.Log.replay(Log.java:426)
at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:290)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
Я хочу, чтобы любой специалист помог мне это исправить. большое спасибо
У меня возникла аналогичная ошибка, связанная с каналом Flume. Я исправил это, когда удалил / переместил каталоги данных и контрольных точек.
В твоем случае:
/ канал / лоток / коллектор / контрольная точка, / канал / лоток / коллектор / данные
Убедитесь, что каталог «/ channel / flume / collector /» чист и пуст. повторный запуск задания потока должен создать каталоги «контрольных точек» и «данных».
Всегда безопасно переместить каталоги и сохранить их в любом удобном месте в качестве справочника журналов в будущем. Мне удалось сделать это как на CDH 5.4 (лоток 1.5), так и на CDh 5.5 (лоток 1.6). Этим должно быть исправлено большинство исключений, касающихся закрытия канала.
Я считаю, что apache.org все еще работает над этой проблемой. Проверьте это: https://issues.apache.org/jira/browse/FLUME-2282
Надеюсь, поможет!!