filebeat 常用配置
filebeat 配置
实时监控目录采集日志到
kafka
#=========================== Filebeat prospectors ============================= #processors: # 排除字段, 但是排除不了 @timestamp 跟 @metadata 两个字段 # - drop_fields: # fields: ["prospector", "beat" ] # 包含字段, 跟上面的 drop_fileds 功能一样, 也排除不了 @timestamp 跟 @metadata # - include_fields: # fields: ["pid", "message", "offset", "source"] filebeat.prospectors: - type: log enabled: true paths: - data/empty.log - /home/f/doc/logs/*/*.log #- c:\programdata\elasticsearch\logs\* # ignore logs 72 houre ,忽略 72小时前的文件 ignore_older: 72h # 关闭 2 分钟不活动的文件 close_inactive: 2m #位置文件清除 168h 前的文件 clean_inactive: 168h # 100K,限制单条log大小 max_bytes: 100000 fields: pid: 431 fields_under_root: true #添加字段, 放在根目录 filebeat: registry_file: registry0423_05 # registry 文件名 # shutdown_timeout: 5s # 设置内存队列 queue.mem: events: 4096 flush.min_events: 126 flush.timeout: 5s #output.console: # pretty: true # codec: # format.string: '{%{[message]}==%{[pid]}---%{+yyMMdd}}==%{[offset]}' #output.file: # path: "/home/f/tmp" # filename: "ab.log" # codec.format: # 可以用 codec 的方法来排除, @timestamp, 跟 @metadata # string: '{"source":"%{[source]}","message":"%{[message]}","offset":"%{[offset]}"}' # kafka output output.kafka: codec.format: string: '{"pid":"%{[pid]}","source":"%{[source]}","message":"%{[message]}","offset":"%{[offset]}"}' # initial brokers for reading cluster metadata hosts: ["192.168.1.11:9092"] # message topic selection + partitioning topic: "topic_%{[pid]}_%{+yyMM}_a2" # topic 可以配置动态参数 partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000 keep_alive: 180
使用
%{+yyMMddww}
来标注时间, 可以根据 yyww 来动态产生 topic, 每周一个 topic1819
18 年, 第 19 周
启动的时候
-e
参数是生产 syslog 日志, 当 console 测试时可以使用, 当使用 logs 文件时, 不要该参数