持久化队列
默认情况下,Logstash在管道阶段(输入→管道工作者)使用内存中有界队列来缓冲事件。这些内存中队列的大小是固定的,不可配置。如果Logstash遇到临时计算机故障,则内存中队列的内容将丢失。临时机器故障是Logstash或其主机异常终止,但能够重新启动的情况。
为了防止异常终止期间的数据丢失,Logstash具有持久队列功能,该功能将消息队列存储在磁盘上。持久队列提供Logstash中数据的持久性。
持久队列对于需要大缓冲区的Logstash部署也很有用。您可以启用持久队列来缓冲磁盘上的事件,并删除消息代理,而不是部署和管理消息代理(如Redis,RabbitMQ或Apache Kafka)以促进缓冲的 发布-订阅者 模型。
总之,启用持久队列的好处如下:
- 无需像Redis或Kafka这样的外部缓冲机制,即可缓冲事件爆发。
- 在正常关闭期间以及Logstash异常终止时,提供至少一次传递保证,以防止消息丢失。如果在事件发生时重新启动Logstash,Logstash将尝试传递存储在持久队列中的消息,直到传递成功至少一次。
注意: 您必须显式设置
queue.checkpoint.writes: 1以保证所有输入事件的最大持久性。请参阅 控制持久性。
持久化队列的局限性
以下是持久队列功能无法解决的问题:
- 不使用 请求-响应 协议的输入插件无法防止数据丢失。例如:tcp,udp,zeromq push+pull,以及许多其他没有确认收件人机制的输入。只有具备确认功能的插件(如beats和http)受此队列保护。
- 它不处理永久性机器故障,例如磁盘损坏,磁盘故障和机器丢失。持久存储到磁盘的数据没有其他副本。
持久化队列的工作原理
队列位于同一进程中的输入和过滤器阶段之间:
输入→队列→过滤器+输出
当输入准备好处理事件后,会将它们写入队列。当对队列的写入成功时,输入可以向其数据源发送确认。
过滤器和输出完处理队列中的事件后,Logstash才会在队列中确认已完成的事件。队列记录管道已处理的事件。当且仅当事件已由Logstash管道完全处理时,事件被记录为已处理(在本文档中,称为"acknowledged"或"ACKed")。
"acknowledged"意味着事件已由所有已配置的过滤器和输出处理。例如,如只有一个Elasticsearch输出,则在Elasticsearch输出成功地将此事件发送到Elasticsearch时,会确认事件。
在正常关闭期间(CTRL+C或SIGTERM),Logstash将停止从队列中读取,并将过滤器和输出处理的正在进行的事件继续处理完成。重启后,Logstash将继续处理持久队列中的事件,以及接受来自输入的新事件。
如果Logstash异常终止,则任何正在进行的事件都不会被确认,并且在重新启动Logstash时,将由过滤器和输出进行重新处理。 Logstash批量处理事件,因此对于某些批次来说,可能已经有一部分事件被成功处理,但在发生异常终止时,未被记录为"acknowledged"。
有关队列写入和确认的特定行为的更多详细信息,请参阅 控制持久性。
配置持久化队列
要配置持久化队列,可以在Logstash设置文件中指定以下选项:
queue.type:指定persisted以启用持久队列。默认情况下,禁用持久队列(默认值:queue.type: memory)。path.queue:存储数据文件的目录路径。默认情况下,文件存储在path.data/queue中。queue.page_capacity:队列页面的最大大小(以字节为单位)。队列数据由称为"pages"的 append-only 文件组成。默认大小为64mb。更改此值不太可能带来性能优势。queue.drain:如果希望Logstash在关闭之前等待持久队列耗尽,则指定true。耗尽队列所需的时间取决于队列中累积的事件数。因此,您应该避免使用此设置,除非队列(即使已满)相对较小,并且可以快速耗尽。queue.max_events:队列中允许的最大事件数。默认值为0(无限制)。queue.max_bytes:队列的总容量,以字节数表示。默认值为1024mb(1gb)。确保磁盘驱动器的容量大于此处指定的值。
提醒: 如果使用持久队列来防止数据丢失,但不需要太多缓冲,则可以将
queue.max_bytes设置为较小的值(例如10mb),以生成较小的队列并提高队列性能。
如果同时指定了 queue.max_events 和 queue.max_bytes,则Logstash将使用先达到的条件。请参阅 处理背压 ,以了解达到这些队列限制时的行为。
您还可以通过设置 queue.checkpoint.writes 来控制何时更新检查点文件。请参阅 控制持久性。
配置示例:
queue.type: persisted
queue.max_bytes: 4gb
处理背压
当队列已满时,Logstash会对输入施加压力,以阻止流入Logstash的数据。这种机制有助于Logstash控制输入阶段的数据流速率,而不会像Elasticsearch那样压倒性的输出。
使用 queue.max_bytes 设置磁盘上队列的总容量。以下示例将队列的总容量设置为8gb:
queue.type: persisted
queue.max_bytes: 8gb
通过指定这些设置,Logstash将缓冲磁盘上的事件,直到队列大小达到8gb。当队列中充满未记录事件且已达到大小限制时,Logstash将不再接受新事件。
每个输入独立处理背压。例如,当 beats 输入遇到背压时,它不再接受新连接,并等待持久队列有空间接受更多事件。在过滤器和输出阶段处理完成队列中的现有事件,并确认它们之后,Logstash会自动开始接受新事件。
控制持久性
持久性是存储写入的属性,可确保数据在写入后可用。
启用持久队列功能后,Logstash会将事件存储在磁盘上。 Logstash在名为检查点的机制中提交到磁盘。
为了讨论持久性,我们需要介绍一些有关如何实现持久队列的细节。
首先,队列本身是一组页面。有两种页面:头页和尾页。头页是写入新事件的地方。只有一个头页。当头页具有特定大小(请参阅 queue.page_capacity)时,它将成为尾页,并创建新的头页。尾页是不可变的,头页是 append-only 的。其次,队列在称为检查点文件的单独文件中,记录有关其自身的详细信息(页面,确认状态等)。
记录检查点时,Logstash将:
- 在头页上调用fsync。
- 原子性地将磁盘写入队列的当前状态。
检查点的过程是原子性的,这意味着如果成功,将保存对文件的任何更新。
如果Logstash终止,或者存在硬件级别故障,则在永久队列中缓冲,但尚未检查点的任何数据都将丢失。
您可以通过设置 queue.checkpoint.writes 来强制要求更频繁地Logstash检查点。此设置指定在强制检查点之前可能写入磁盘的最大事件数,默认值为1024。要确保最大持久性并避免丢失持久队列中的数据,可以设置 queue.checkpoint.writes: 1,从而在写入每个事件后强制执行检查点。请记住,磁盘写入会产生资源成本。将此值设置为1会严重影响性能。
磁盘资源回收
在磁盘上,队列存储为一组页面,其中每个页面是一个文件。 每个页面的大小最多为 queue.page_capacity。 在确认该页面中的所有事件之后删除页面(磁盘资源回收)。 如果旧页面至少有一个尚未确认的事件,则整个页面将保留在磁盘上,直到该页面中的所有事件都成功处理完毕。 包含未处理事件的每个页面将根据 queue.max_bytes 字节大小进行计数。