kafka-fluentd-consumerとfluencyとfluent-plugin-elasticsearchのメモリに関する話

うちの環境ではkafkaに入ってるログをkafka-fluentd-consumer 0.3.0でconsumeしたのちにfluent-plugin-elasticsearch 1.9.0経由でElasticsearchになげるということをしています。

fluent-plugin-elasticsearchは8プロセス動いていて各プロセスがメモリを5〜8GB使っている状態でした。マシンのメモリは64GBだったので割とメモリがかつかつな状態だったせいか、以下のようなログをはいてkafka-fluentd-consumerが落ちるという状況が発生していました。

failed; error='Cannot allocate memory' (errno=12)
#
# Native memory allocation (mmap) failed to map 555745280 bytes for committing reserved memory.

hs_err_pid[pid].logは下記のような感じ。

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 555745280 bytes for committing reserved memory.
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Check if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
#  Out of Memory Error (os_linux.cpp:2627), pid=2605, tid=140344240674560
#
# JRE version: Java(TM) SE Runtime Environment (8.0_66-b17) (build 1.8.0_66-b17)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode linux-amd64 compressed oops)
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#

kafka-fluentd-consumerはfluencyを使ってfluentdにログを投げています。

fluencyに関してはこちらを参照

で、fluencyは自身のバッファにoff heapを使っているのですが、そこでメモリを確保できなくて落ちたっぽいです。この辺スタックトレースもないのでわかりづらい。。。

fluencyのバッファサイズを減らす手もありますが、一方でBufferFullExceptionが出たからバッファサイズを増やした経緯もあります。

一旦fluent-plugin-elasticsearchのバッファをメモリからファイルに変えて様子を見ているところです。要はマシンの空きメモリ容量が増えれば良いと思ったため。

fluentdのメモリの使用状況は以下のような感じ。2回再起動しているので2つの谷がありますが、最初と最後に注目すると、5〜8GBから4〜5GBに下がっています。もっとも徐々に増えているのでまだ安心はできませんが。

fluencyは内部のバッファが一杯になるとBufferFullExceptionをthrowします。これをcatchしてretryするかどうかはアプリケーション側に判断を委ねています。まあ集約ノードでも無い限りそんなに大量のログをなげないと思うので末端ノードだったらBufferFullExceptionが起きることは無いような気はします。

kafka-fluentd-consumerではBufferFullExceptionをcatchしてretryしています。

それは良いんですが、kafka-fluentd-consumerはちょいちょい例外を無視してます。これはおそらくログファイルが大きくなってディスクを圧迫するのを避けるためだとは思いますが、少し困る場合もあります。

うちの環境だとkafka-fluentd-consumerからログ送信した先のfluentdで下記のようなwarnがでていました。

[warn]: no patterns matched tag="failed"

これはkafka-fluentd-consumerがログ送信に失敗した場合はfailedタグをつけてエラーデータを送信するためです。受信側でfailledタグを受け取っていなかったので上記のエラーが出ていました。で、エラーの詳細を知りたい場合は受信側でfailedタグを受け取ってファイルに出力して確認するなどが必要になります。