Я хочу использовать logstash для сбора файла журнала, и формат файла был таким:
type=USER_START msg=audit(1404170401.294:157): user pid=29228 uid=0 auid=0 ses=7972 subj=system_u:system_r:crond_t:s0-s0:c0.c1023 msg='op=PAM:session_open acct="root" exe="/usr/sbin/crond" hostname=? addr=? terminal=cron res=success'
Какой фильтр мне использовать для сопоставления строки? или есть другой способ справиться с этим.
Любая помощь будет оценена.
Использовал приведенный ниже шаблон, чтобы сопоставить строку с отладчик Grok , но все же получил No matches
сообщение.
type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): user pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} subj=%{WORD:audit_subject} msg=%{GREEDYDATA:audit_message}
Но когда я удалил subj=%{WORD:audit_subject} msg=%{GREEDYDATA:audit_message}
, он завершился успешно и получил такой объект JSON.
{
"audit_type": [
[
"USER_END"
]
],
"audit_epoch": [
[
"1404175981.491"
]
],
"BASE10NUM": [
[
"1404175981.491",
"524",
"1465",
"0",
"0"
]
],
"audit_counter": [
[
"524"
]
],
"audit_pid": [
[
"1465"
]
],
"audit_uid": [
[
"0"
]
],
"audit_audid": [
[
"0"
]
]
}
Не знаю почему subj
и msg
не могу работать дальше.
Журналы аудита записываются в виде серии пар ключ = значение, которые легко извлекаются с помощью фильтра kv. Однако я заметил, что ключ msg
иногда используется дважды и также представляет собой серию пар ключ = значение.
Первый грок используется для получения полей audit_type
, audit_epoch
, audit_counter
и sub_msg
(2-е поле сообщения)
grok {
pattern => [ "type=%{DATA:audit_type}\smsg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\):.*?( msg=\'(?<sub_msg>.*?)\')?$" ]
named_captures_only => true
}
kv используется для извлечения всех пар ключ = значение, кроме msg и type, поскольку мы уже получили эти данные с помощью grok:
kv {
exclude_keys => [ "msg", "type" ]
}
kv снова используется для анализа пар ключ = значение в sub_msg (если он существует):
kv {
source => "sub_msg"
}
date используется для установки даты на значение в audit_epoch, используя формат даты UNIX
проанализирует временные метки с плавающей запятой или целые числа:
date {
match => [ "audit_epoch", "UNIX" ]
}
Наконец, mutate используется для удаления лишних полей:
mutate {
remove_field => ['sub_msg', 'audit_epoch']
}
Вы также можете переименовать поля, например, sysadmin1138:
mutate {
rename => [
"auid", "uid_audit",
"fsuid", "uid_fs",
"suid", "uid_set",
"ses", "session_id"
]
remove_field => ['sub_msg', 'audit_epoch']
}
Все вместе взятые фильтры выглядят так:
filter {
grok {
pattern => [ "type=%{DATA:audit_type}\smsg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\):.*?( msg=\'(?<sub_msg>.*?)\')?$" ]
named_captures_only => true
}
kv {
exclude_keys => [ "msg", "type" ]
}
kv {
source => "sub_msg"
}
date {
match => [ "audit_epoch", "UNIX" ]
}
mutate {
rename => [
"auid", "uid_audit",
"fsuid", "uid_fs",
"suid", "uid_set",
"ses", "session_id"
]
remove_field => ['sub_msg', 'audit_epoch']
}
}
Быстрый поиск находит этот на github
AUDIT type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): user pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} subj=%{WORD:audit_subject} msg=%{GREEDYDATA:audit_message}
AUDITLOGIN type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): login pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} old auid=%{NUMBER:old_auid} new auid=%{NUMBER:new_auid} old ses=%{NUMBER:old_ses} new ses=%{NUMBER:new_ses}
Беглый обзор предполагает, что это, вероятно, то, что вы ищете.
Лучшим решением, чем Grok, может быть использование кв фильтр. Это анализирует поля, сконфигурированные в формате «ключ = значение», каковым является большинство входов журнала аудита. В отличие от Grok, он обрабатывает строки с полями «иногда - иногда - нет». Однако имена полей представлены в их менее полезных сокращенных формах, поэтому вам может потребоваться некоторое переименование полей.
filter {
kv { }
}
Это даст вам большую часть информации, а поля будут соответствовать тому, что отображается в журналах. Все типы данных будут string
. Чтобы сделать все возможное, чтобы очеловечить поля:
filter {
kv { }
mutate {
rename => {
"type" => "audit_type"
"auid" => "uid_audit"
"fsuid => "uid_fs"
"suid" => "uid_set"
"ses" => "session_id"
}
}
}
В msg
Тем не менее, поле, которое содержит метку времени и идентификатор события, все равно нужно будет найти. Другие ответы показывают, как это сделать.
filter {
kv { }
grok {
match => { "msg" => "audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\):"
}
mutate {
rename => {
"type" => "audit_type"
"auid" => "uid_audit"
"fsuid => "uid_fs"
"suid" => "uid_set"
"ses" => "session_id"
}
}
}
формат для Grok изменился, поэтому взгляните на это:
filter {
grok {
# example: type=CRED_DISP msg=audit(1431084081.914:298): pid=1807 uid=0 auid=1000 ses=7 msg='op=PAM:setcred acct="user1" exe="/usr/sbin/sshd" hostname=host1 addr=192.168.160.1 terminal=ssh res=success'
match => { "message" => "type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} ses=%{NUMBER:ses} msg=\'op=%{WORD:operation}:%{WORD:detail_operation} acct=\"%{WORD:acct_user}\" exe=\"%{GREEDYDATA:exec}\" hostname=%{GREEDYDATA:hostname} addr=%{GREEDYDATA:ipaddr} terminal=%{WORD:terminal} res=%{WORD:result}\'" }
}
date {
match => [ "audit_epoch", "UNIX_MS" ]
}
}
Здесь используется дата из audit_epoch как @datetime.