Я начал отправлять журналы Пало-Альто в Graylog, и правило потока выбирает их, сопоставляя «Пало-Альто» в поле «теги» (таковы все мои правила потока; внешний экземпляр Logstash выполняет теги до доставка в Graylog).
Я знаю, что узлы Graylog получают эти события на сетевых интерфейсах:
И поток показывает, что он получает события (обратите внимание на «22 сообщения в секунду»):
Тем не менее, когда я нажимаю на поток (или выполняю поиск -> теги: «Пало-Альто»), событий не обнаруживается.
Единственная распространенная проблема, которую я видел в Интернете, - это настройки часового пояса, которые переносят эти события в будущее, но время в нашем отправителе Palo Alto Panorama правильное (PST), и попытка поиска по абсолютному времени на день в будущем ничего не показывает.
Информация о версии:
Graylog 2.2.2 + 691b4b7, кодовое имя Stiegl
Elasticsearch 2.4.4
Lucene 5.5.2
У меня также есть этот вопрос без ответа о том, что функция поиска не работает должным образом, чтобы найти события, которые действительно прибывают нормально. Я сомневаюсь, что это имеет какое-то отношение, но для полноты я включу это здесь.
В файле журнала /var/log/graylog-server/server.log узлов сервера Graylog я заметил множество ошибок, например:
[54]: индекс [graylog_2], тип [сообщение], идентификатор [edb8ec50-1320-11e7-92de-005056b541f6], сообщение [MapperParsingException [не удалось проанализировать [ReceiveTime]]; вложено: IllegalArgumentException [Недопустимый формат: «2017/03/27 12:09:40» неверно сформирован в «/ 03/27 12:09:40»];]
Итак, проблема в том, что эти сообщения нормально поступали в Graylog, но не могли быть проиндексированы Elasticsearch. В конце концов я отбросил и изменил проблемные поля, пока они не понравились Graylog.
if "Palo Alto" in [tags] {
grok {
match => ["message", "<\d*>(?<patimestamp>\w* \d* \d*:\d*:\d*) (?<PanoramaHost>[^ ]*) (?<FutureUse0>[^,]*),(?<ReceiveTime>[^,]*),(?<SerialNumber>[^,]*),(?<PAType>[^,]*),%{GREEDYDATA:pamessage}"]
}
if [PAType] == "SYSTEM" {
csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","vsys","paEventID","Object","FutureUse2","FutureUse3","Module","Severity","Description","SeqNum","ActionFlags"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else if [PAType] == "TRAFFIC" {
csv {source => "[pamessage]" columns => ["Threat-ContentType","ConfigVersion","GenerateTime","SrcAddress","DstAddress","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","VSys","SrcZone","DstZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec","Category","Padding","SeqNum","ActionFlags","SrcCountry","DstCountry","cpadding","pkts_sent","pkts_received"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else if [PAType] == "THREAT" {
csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcIP","DstIP","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","vsys","SrcZone","DstZone","IngressInterface","EgressInterface","LogFwdProfile","FutureUse2","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Misc","ThreatID","Category","Severity","Direction","SeqNum","ActionFlags","SrcLocation","DstLocation","FutureUse3","ContentType","pcapID","Filedigest","Cloud","FutureUse4","UserAgent","FileType","XForwardedFor","Referer","Sender","Subject","Recipient","ReportID"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else if [PAType] == "CONFIG" {
csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","Host","vsys","Command","Admin","Client","Result","ConfigPath","SeqNum","ActionFlags","BeforeChangeDetail","AfterChangeDetail"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else if [PAType] == "HIP-MATCH" {
csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcUser","vsys","MachineName","OS","SrcAddress","HIPType","FutureUse2","FutureUse3","SeqNum","ActionFlags"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else {
mutate {add_tag => "Uncategorized"}
}
}