filebeat 配置

实时监控目录采集日志到 kafka

#=========================== Filebeat prospectors =============================
#processors:
# 排除字段, 但是排除不了 @timestamp 跟 @metadata 两个字段
#  - drop_fields:
#      fields: ["prospector", "beat" ]

# 包含字段, 跟上面的 drop_fileds 功能一样, 也排除不了 @timestamp 跟 @metadata
#  - include_fields:
#      fields: ["pid", "message", "offset", "source"]

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - data/empty.log
    - /home/f/doc/logs/*/*.log
    #- c:\programdata\elasticsearch\logs\*

   # ignore logs 72 houre  ,忽略 72小时前的文件
   ignore_older: 72h
  # 关闭 2 分钟不活动的文件
   close_inactive: 2m
  #位置文件清除 168h 前的文件
   clean_inactive: 168h

   # 100K,限制单条log大小
   max_bytes: 100000

  fields:
    pid: 431
  fields_under_root: true #添加字段, 放在根目录

filebeat:
  registry_file: registry0423_05 # registry 文件名
#  shutdown_timeout: 5s
# 设置内存队列
  queue.mem:
    events: 4096
    flush.min_events: 126
    flush.timeout: 5s

#output.console:
#  pretty: true
#  codec:
#    format.string: '{%{[message]}==%{[pid]}---%{+yyMMdd}}==%{[offset]}'

#output.file:
#    path: "/home/f/tmp"
#    filename: "ab.log"
#    codec.format:  # 可以用 codec 的方法来排除, @timestamp, 跟 @metadata
#      string: '{"source":"%{[source]}","message":"%{[message]}","offset":"%{[offset]}"}'

# kafka output
output.kafka:
  codec.format:
    string: '{"pid":"%{[pid]}","source":"%{[source]}","message":"%{[message]}","offset":"%{[offset]}"}'
  # initial brokers for reading cluster metadata
  hosts: ["192.168.1.11:9092"]

  # message topic selection + partitioning
  topic: "topic_%{[pid]}_%{+yyMM}_a2" # topic 可以配置动态参数
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
  keep_alive: 180

使用 %{+yyMMddww} 来标注时间, 可以根据 yyww 来动态产生 topic, 每周一个 topic 1819 18 年, 第 19 周

启动的时候 -e 参数是生产 syslog 日志, 当 console 测试时可以使用, 当使用 logs 文件时, 不要该参数