filebeat采集日志

Filebeat 采集日志

Filebeat

Filebeat是用于转发和集中日志数据的轻量级传送工具。 Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到ElasticsearchLogstash进行索引。

Filebeat由两个主要组件组成:inputsharvesters (直译:收割机,采集器)。 这些组件一起工作以跟踪文件,并将事件数据发送到你指定的输出

harvester 逐行读取每个文件(一行一行地读取每个文件),并把这些内容发送到输出。
每个文件启动一个harvester

一个input 负责管理harvesters,并找到所有要读取的源。
如果input类型是log,则input查找驱动器上与已定义的glob路径匹配的所有文件,并为每个文件启动一个harvester
  • Input组件

    input组件负责监控日志文件(目录)自身的变化情况,如某文件被移动、删除了或创建了新文件,然后将这些信息提供给Harvester使用。
  • Harvester组件

        Harvester用于监控一个日志文件内容的变化情况。如,是否新加了一行数据,是否读到了EOF等。
    
  • Spooler组件

      Spooler用于将Harvester"收割"到的新事件(日志行)发送到指定目的地,如ES, Kafka, Logstash, File等
    

filebeat日志采集延时的原因:

首先Input会以一定间隔扫描日志文件的变化情况,如果在间隔之间删除、创建了文件,那就不会被探测到; 其次Harvester检测日志文件内容时也会有一定频率,而且如果读到EOF还会触发新的策略; 最后Spooler对于需要发送的事件也有一个缓存,如果设置不当可能会导致Spooler积攒一堆事件后才发送从而进一步加重日志延迟

log input 配置


filebeat.inputs:


- type: log

  paths:
    - \path\to\logs\*.log
  multiline:
    # 时间格式开头的日志
    pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
    negate: true
    match: after


output.kafka:
  # Array of hosts to connect to.
  hosts: ["localhost:9092"]
  enabled: true
  topic: topic

启动

# 日志采集到kafka
filebeat -e -c filebeat.yml

    -c:配置文件位置
    -path.logs:日志位置
    -path.data:数据位置
    -path.home:家位置
    -e:关闭日志输出
    -d 选择器:启用对指定选择器的调试。 对于选择器,可以指定逗号分隔的组件列表,也可以使用-d“*”为所有组件启用调试.例如,-d“publish”显示所有“publish”相关的消息。


# consumer 获取日志
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic --from-beginning

参数设置

如果读到 EOF,则 filebeat 将会等待一段时间再去读文件, 结合 close_inactive 选项,如果等待时间超过了默认值 5 分钟,则 Harvester 结束

backoff

Filebeat检测到某个文件到了EOF之后,每次等待多久再去检测文件是否有更新,默认为1s。

max_backoff

Filebeat检测到某个文件到了EOF之后,等待检测文件更新的最大时间,默认是10秒。

backoff_factor

定义到达max_backoff的速度,默认因子是2,到达max_backoff后,
变成每次等待max_backoff那么长的时间才backoff一次,直到文件有更新才会重置为backoff
func (f *Log) wait() {
	// Wait before trying to read file again. File reached EOF.
	select {
	case <-f.done:
		return
	case <-time.After(f.backoff):
	}

	// Increment backoff up to maxBackoff
	if f.backoff < f.config.MaxBackoff {
		f.backoff = f.backoff * time.Duration(f.config.BackoffFactor)
		if f.backoff > f.config.MaxBackoff {
			f.backoff = f.config.MaxBackoff
		}
	}
}

close_inactive

打开了文件需要 close,不能占用文件不释放,不然即使 rm 了文件,磁盘空间也不会释放(早期 Filebeat 有这个bug)。

这个配置就是说明多久关闭文件, 比如一个日志文件,10 分钟都没有读到新的内容就把文件句柄关闭。

这里的时间不是取决于文件的最后更新时间,而是 Filebeat 内部记录的时间,上次读到文件和这次尝试读文件的时间差。

官方建议设置的时间是比文件产生数据频率高一个数量级(默认5m),比如每秒都有日志产生,这个值就可以设置为 1m。

ignore_older

多久前的旧文件就不管

路径下的历史文件可能很多,比如配置了按天分割,显然旧文件我们一般是不需要的

比如设置为 1h,表示文件时间在 1h 之前的日志都不会被 input 模块搜集,直到有新日志产生。

scan_frequency 10s(默认)

如果设置为0s,则Filebeat会尽可能快地感知更新(占用的CPU会变高)。默认是10s

扫描频率如何控制, 多久扫描一次是否有新文件产生

通配设置复杂的话频繁扫文件也是很大的开销。

input 模块只是负责发现新文件,新文件是相对已经被 harvester 获取的文件,第一次发现之后就已经在被 harvester 一行行实时读取了,
所以这里基本上只影响日志切分时的实时性(这种场景下的短暂延迟是可以接受的)

clean_inactive

多久清理一次注册信息
需要保证这个文件已经不活跃

所以这个值需要大于 ignore_older + scan_frequency
不然的话清理后这个文件又被发现,则会重头开始读取,这样就重了。默认值是0(不开启clean_*相关功能)

clean_inactive > ignore_older + scan_frequency > close_inactive

按小时分割的日志配置
scan_frequency: 10s
ignore_older: 60m
close_inactive: 10m
close_renamed: true
close_removed: true
clean_inactive: 70m
clean_removed: true

filebeat.inputs:



- type: log

  # Change to true to enable this input configuration.
  enabled: true
  fields:
    kafka_topic: tp1
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /tmp/a/*.log
  multiline:
    pattern: '^{'
    negate: true
    match: after

  scan_frequency: 100ms  # scan 文件状态速度
  backoff: 50ms         # 监测文件变化速度
  max_backoff: 50ms
  backoff_factor: 1
  flush.timeout: 1s
  ignore_older: 24h
  close_inactive: 200ms # 重置文件句柄速度
  clean_inactive: 25h

- type: log

  # Change to true to enable this input configuration.
  enabled: true
  fields:
    kafka_topic: tp2

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /tmp/asd*.log

  multiline:
    pattern: '^{'
    negate: true
    match: after

  backoff: 50ms
  scan_frequency: 100ms
  max_backoff: 50ms
  backoff_factor: 1
  flush.timeout: 1s
  close_inactive: 200ms
  ignore_older: 24h
  clean_inactive: 25h



配置多个kafka topic

  • filebeat实例,每个filebeat实例设置一个topic

  • filebeat实例多topic


 filebeat.inputs:


- type: log

  # Change to true to enable this input configuration.
  enabled: true
  fields:
    kafka_topic: ber_click
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /home/to/logs/click.log*


- type: log

  # Change to true to enable this input configuration.
  enabled: true
  fields:
    kafka_topic: ber_show

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /home/to/logs/show.log*


output.kafka:
  # Array of hosts to connect to.
  hosts: ["xxx:9093", "xxx:9093", "xxx:9093"]
  enabled: true
  topic: '%{[fields.kafka_topic]}'

使用 filestream input替代 log with ` Elastic 7.14+`

kafka

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统)

常见可以用于web/nginx日志、访问日志,消息服务等等

Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目

kafka下载

windows 下 

    # 启动 zookeeper
    zookeeper-server-start.bat  ..\..\config\zookeeper.properties
    
    # 启动 Kafka
    kafka-server-start.bat  ..\..\config\server.properties
    
    # 创建一个主题
    kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-test-topic
    
    # 查看创建的主题列表
    kafka-topics.bat --list --zookeeper localhost:2181

kafka-python API

heartbeat_interval_ms(3s)

表示多长时间向broker报告一次,这个默认值3000ms
这个值官方推荐不要高于session.timeout.ms 的1/3(这个值默认没问题)

session.timeout.ms(10s) 用于心跳线程(heartbeat thread)

如果发送心跳时间超过这个时间,broker就会认为消费者死亡

假设您设置 session.timeout.ms=30000,因此,消费者心跳线程必须在此时间到期之前向代理发送心跳
如果整个消费者死亡(并且死亡的处理线程很可能使包括心跳线程在内的整个消费者崩溃),则只需要 session.timeout.ms 来检测它

max.poll.interval.ms(5m) 用于处理线程(processing thread)

如果处理单个消息需要 1 分钟,则可以将 max.poll.interval.ms 设置为大于一分钟,以便处理线程有更多时间处理消息
如果处理线程死亡,则需要 max.poll.interval.ms 才能检测到这一点

auto.commit.interval.ms(5s)

自动提交间隔

max_poll_interval_ms(5m)

参数设置大一点可以增加两次 poll 之间处理消息的时间。
当 consumer 一切正常(也就是保持着 heartbeat ),且参数的值小于消息处理的时长,
会导致 consumer leave group 然后又 rejoin group,触发无谓的 group balance,出现 consumer livelock 现象。
  • 多个进程重复消费数据

因此假设进程A正在消费分区1的信息,并提交了偏移量, 之后又消费了10条数据,还没来得及提交偏移量的时候, reblance机制让进程B来继续消费分区1的信息, 此时进程B会从上次进程A提交偏移量的地方开始消费, 因此这10条数据就是重复消费的

解决方式:

将消费者进程与分区静态绑定

self.tp = TopicPartition(topic=self.topic, partition=settings.KAFKA_CONSUMER_PARTITION)
self.consumer.assign([self.tp])
  • 消费组实现容错性机制

一个有2个partition的topic,和2个consumer,这2个consumer共同消费同一个topic中的数据 两个consumer同时运行的情况下,它们分别消费不同partition中的数据。

consumer1消费partition 0中的数据,consumer2消费parition 1中的数据。 刚开始consumer2只消费partition1中的数据,当consumer1退出后, consumer2中也开始消费partition 0中的数据了

  • 同一group_id的consumer

消费者将继续为特定group自动使用最后一个偏移量的数据

Buy me a 肥仔水!