kafka-fluentd-consumerのConsumerLagが大きくなっていたので調べてみた

うちの環境ではKafka -> kafka-fluentd-consumer -> Fluentd -> Elatsticsearchという経路でログをElatsticsearchに送ってKibanaで可視化しています。

ところがKibanaで直近のログが見れないという問い合わせがあり調査していたら、kafka-fluentd-consumerのConsumerLagが大きい状態で、要はconsumerで詰まっていた。

jmx, prometheus経由でConsumerLagはモニタリングしていたのですが、あんまりちゃんと見てなくて気付けば定期的にlagが大きくなっている状態でした。

まあほっといても解消してた気もしますが、調査しました。

hasNextのConsumerTimeoutExceptionの部分の詳細を見たほうがいいのではという話になりログを仕込んで再起動しました。他にもログを仕込みました。
https://github.com/treasure-data/kafka-fluentd-consumer/blob/fbc4066c8631fe0e890ee42971bf0ade7b1401d8/src/main/java/org/fluentd/kafka/FluentdHandler.java#L130

結果を見るとやっぱりタイムアウトしてました。

kafka.consumer.ConsumerTimeoutException
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
        at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
        at org.fluentd.kafka.FluentdHandler.hasNext(FluentdHandler.java:162)
        at org.fluentd.kafka.FluentdHandler.run(FluentdHandler.java:64)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

ConsumerTimeoutExceptionでググってたらconsumer.timeout.msを-1にすれば例外が出ないという話があり、
しかもkafkanのデフォルトは-1なので-1に設定しました。

kafka-fluentd-consumerではconsumer.timeout.msを設定していない場合は10になるようです。
https://github.com/treasure-data/kafka-fluentd-consumer/blob/fbc4066c8631fe0e890ee42971bf0ade7b1401d8/src/main/java/org/fluentd/kafka/PropertyConfig.java#L44

さらにCPUに空きがあったのでfluentd.consumer.threadsを1から2にしたらlagが小さくなりました。

1日様子見たところ、前日40Mだったのが20Mになってたので、スレッド数を増やした効果はあったと思います。

今はさらに4にまで増やして様子を見ています。

さらに送信に失敗した場合はfailedタグで送信しているのでそこも別fluentdで拾うようにしてみてます。
https://github.com/treasure-data/kafka-fluentd-consumer/blob/fbc4066c8631fe0e890ee42971bf0ade7b1401d8/src/main/java/org/fluentd/kafka/FluentdHandler.java#L84