logstash 配置文件学习
配置文件举例
- 默认位置是
/usr/local/logstash/config/logstash-sample.conf
匹配需求描述
- 先需要统计后台服务的日志, 主要有下面 4 类
日志类型 | 格式 | 举例 |
---|---|---|
服务器日志 | [localtime][log_level][module_name] {event_info} |
[2019-07-25 09:26:57.315][debug ][EasyRedis] {Connect Redis success} |
设备认证日志 | [localtime][log_level][module_name][dev_name][dev_id][dev_ip] {event_info} |
[2019-07-25 09:26:07.317][warn ][GbsipServerService][cam1][34020000001320000001][192.168.10.199] {ON} |
用户认证日志 | [localtime][log_level][module_name][user_id][user_ip] {event_info} |
[2019-07-25 09:26:14.049][warn ][GbsipClientService][34020000001110000001][192.168.1.160] {online} |
用户操作日志 | [localtime][log_level][module_name][user_id][user_ip][dev_id] {event_info} |
[2019-07-25 09:38:31.630][trace ][GbsipClientService][34020000001110000001][192.168.1.160][34020000001320000001] {Bye} |
- 现在的需求包括
- 1 过滤掉不符合上述 4 类格式的日志
- 2 可以一键搜索上述 4 种日志, 即查看所有满足要求的日志
- 3 可以一键搜索上述任意一种日志
- 实现方法
- 统一日志格式
[type][localtime][log_level][module_name][user_id][user_ip][dev_name][dev_id][dev_ip] {event_info}
- 缺少的字段为空
- filter 的正则匹配表达式为
\[%{DATA:localtime}\]\[%{DATA:log_level}\]\[%{DATA:module_name}\]\[%{NUMBER:type}\]\[%{DATA:user_id}\]\[%{DATA:user_ip}\]\[%{DATA:dev_name}\]\[%{DATA:dev_id}\]\[%{DATA:dev_ip}\]\ \{%{DATA:event_info}\}
, 可用于分割日志
- 统一日志格式
统一后日志举例
[2019-07-25 09:26:57.315][debug ][EasyRedis][1][][][][][] {Connect Redis success} [2019-07-25 09:26:07.317][warn ][GbsipServerService][2][][][cam1][34020000001320000001][192.168.10.199] {ON} [2019-07-25 09:26:14.049][warn ][GbsipClientService][3][34020000001110000001][192.168.1.160][][][] {online} [2019-07-25 09:38:31.630][trace ][GbsipClientService][4][34020000001110000001][192.168.1.160][][34020000001320000001][] {Bye}
日志收集架构
- filebeat
- 轻量级日志收集工具, 用于搜集文件数据, 比 logstash 占用资源更少
- 在
filebeat.yml
中会配置指定的监听文件
- logstash
- 用于日志收集, 具有 filter 功能, 可以过滤分析日志, 然后发送到消息队列
- kibana
- 供前端的页面再进行搜索和图表可视化, 调用Elasticsearch的接口返回的数据进行可视化
- 用 Kibana 来搜索、查看, 并和存储在 ElasticSearch 索引中的数据进行交互
- ElasticSearch
- 搜索服务器, 提供了一个分布式多用户能力的全文搜索引擎, 基于 RESTful web 接口
完整配置文件
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
# 从日志文件中获取信息, 使用 filebeat 推送日志到 logstash
beats {
port => 5044
}
}
filter {
# 如果搜索不到 "] {" 关键字, 丢弃该消息
# 其他日志内容均保证不含 "] {" 关键字
if "] {" not in [message] {
# drop 可以跳过某些不想统计的日志信息, 当某条日志信息符合 if 规则时
# 该条信息则不会在 out 中出现, logstash 将直接进行下一条日志的解析
drop { }
} else {
grok {
# 自定义正则匹配
match => {"message" => "\[%{DATA:localtime}\]\[%{DATA:log_level}\]%{DATA:c1}\[%{DATA:c2}\[%{DATA:module_name}\]\[%{NUMBER:type}\]\[%{DATA:user_id}\]\[%{DATA:user_ip}\]\[%{DATA:dev_name}\]\[%{DATA:dev_id}\]\[%{DATA:dev_ip}\]\ \{%{DATA:event_info}\}"}
}
# 匹配服务器日志成功
if "[1]" in [message] {
# mutate 是做转换用的
mutate {
# 添加字段 _log_info_, 便于搜索所有日志
add_field => {
"_log_info_" => "%{event_info}"
}
# 添加字段 _serv_log_, 便于搜索服务器日志
add_field => {
"_serv_log_" => "%{event_info}"
}
}
}
# 匹配设备认证日志成功
if "[2]" in [message] {
mutate {
add_field => {
"_log_info_" => "[%{dev_name}] [%{dev_id}] [%{dev_ip}] %{event_info}"
}
# 添加字段 _dev_log_, 便于搜索服务器日志
add_field => {
"_dev_log_" => "[%{dev_name}] [%{dev_id}] [%{dev_ip}] %{event_info}"
}
}
}
# 匹配用户认证日志成功
if "[3]" in [message] {
mutate {
add_field => {
"_log_info_" => "[%{user_id}] [%{user_ip}] %{event_info}"
}
# 添加字段 _user_log_, 便于搜索服务器日志
add_field => {
"_user_log_" => "[%{user_id}] [%{user_ip}] %{event_info}"
}
}
}
# 匹配用户操作日志成功
if "[4]" in [message] {
mutate {
add_field => {
"_log_info_" => "[%{user_id}] [%{user_ip}] [%{dev_id}] %{event_info}"
}
# 添加字段 _user_dev_log_, 便于搜索服务器日志
add_field => {
"_user_dev_log_" => "[%{user_id}] [%{user_ip}] [%{dev_id}] %{event_info}"
}
}
}
}
}
output {
# 将输出保存到 elasticsearch
elasticsearch {
hosts => ["http://192.168.1.240:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}