Назад | Перейти на главную страницу

Поток Graylog получает события, но он пуст

Я начал отправлять журналы Пало-Альто в Graylog, и правило потока выбирает их, сопоставляя «Пало-Альто» в поле «теги» (таковы все мои правила потока; внешний экземпляр Logstash выполняет теги до доставка в Graylog).

Я знаю, что узлы Graylog получают эти события на сетевых интерфейсах:

И поток показывает, что он получает события (обратите внимание на «22 сообщения в секунду»):

Тем не менее, когда я нажимаю на поток (или выполняю поиск -> теги: «Пало-Альто»), событий не обнаруживается.

Единственная распространенная проблема, которую я видел в Интернете, - это настройки часового пояса, которые переносят эти события в будущее, но время в нашем отправителе Palo Alto Panorama правильное (PST), и попытка поиска по абсолютному времени на день в будущем ничего не показывает.

Информация о версии:

Graylog 2.2.2 + 691b4b7, кодовое имя Stiegl

Elasticsearch 2.4.4

Lucene 5.5.2

У меня также есть этот вопрос без ответа о том, что функция поиска не работает должным образом, чтобы найти события, которые действительно прибывают нормально. Я сомневаюсь, что это имеет какое-то отношение, но для полноты я включу это здесь.

В файле журнала /var/log/graylog-server/server.log узлов сервера Graylog я заметил множество ошибок, например:

[54]: индекс [graylog_2], тип [сообщение], идентификатор [edb8ec50-1320-11e7-92de-005056b541f6], сообщение [MapperParsingException [не удалось проанализировать [ReceiveTime]]; вложено: IllegalArgumentException [Недопустимый формат: «2017/03/27 12:09:40» неверно сформирован в «/ 03/27 12:09:40»];]

Итак, проблема в том, что эти сообщения нормально поступали в Graylog, но не могли быть проиндексированы Elasticsearch. В конце концов я отбросил и изменил проблемные поля, пока они не понравились Graylog.

if "Palo Alto" in [tags] {
    grok {
        match => ["message", "<\d*>(?<patimestamp>\w* \d* \d*:\d*:\d*) (?<PanoramaHost>[^ ]*) (?<FutureUse0>[^,]*),(?<ReceiveTime>[^,]*),(?<SerialNumber>[^,]*),(?<PAType>[^,]*),%{GREEDYDATA:pamessage}"]
    }
    if [PAType] == "SYSTEM" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","vsys","paEventID","Object","FutureUse2","FutureUse3","Module","Severity","Description","SeqNum","ActionFlags"]}
        mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "TRAFFIC" {
        csv {source => "[pamessage]" columns => ["Threat-ContentType","ConfigVersion","GenerateTime","SrcAddress","DstAddress","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","VSys","SrcZone","DstZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec","Category","Padding","SeqNum","ActionFlags","SrcCountry","DstCountry","cpadding","pkts_sent","pkts_received"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "THREAT" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcIP","DstIP","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","vsys","SrcZone","DstZone","IngressInterface","EgressInterface","LogFwdProfile","FutureUse2","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Misc","ThreatID","Category","Severity","Direction","SeqNum","ActionFlags","SrcLocation","DstLocation","FutureUse3","ContentType","pcapID","Filedigest","Cloud","FutureUse4","UserAgent","FileType","XForwardedFor","Referer","Sender","Subject","Recipient","ReportID"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "CONFIG" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","Host","vsys","Command","Admin","Client","Result","ConfigPath","SeqNum","ActionFlags","BeforeChangeDetail","AfterChangeDetail"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "HIP-MATCH" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcUser","vsys","MachineName","OS","SrcAddress","HIPType","FutureUse2","FutureUse3","SeqNum","ActionFlags"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else {
        mutate {add_tag => "Uncategorized"}
    }
}