es_rejected_execution_exceptionが出たのでthread_pool.bulk.queue_sizeを増やした

題記の通りです。

fluent-plugin-elasticsearch 1.9.3からElasticsearch 5.3に書き込んでから下記のようなエラーがたまに出てるのに気付きました。

Could not push log to Elasticsearch: {...{"index"=>{"_index"=>"hoge-2017.03.31", "_type"=>"fluentd", "_id"=>"...", "status"=>429, "error"=>{"type"=>"es_rejected_execution_exception", "reason"=>"rejected execution of org.elasticsearch.transport.TransportService$7@1aa442ab on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@6d72211b[Running, pool size = 32, active threads = 32, queued tasks = 210, completed tasks = 4224628]]"}}},...

なおElasticsearch 5系にfluent-plugin-elasticsearchから書き込む場合は下記がmergeされた1.9.3を使えばfluent-plugin-elasticsearchでCannot get new connection from poolが出た - wyukawa’s blogは出ないと思います。
https://github.com/uken/fluent-plugin-elasticsearch/pull/240


es_rejected_execution_exceptionはElasticsearchが持っているスレッドプールのキューサイズを超えるbulk requestがきたので、それをrejectした、つまりログがElasticsearchに一部入らなかったということです。

ソースをチラ見したところ内部的にはThreadPoolExecutorで実装されています。

該当packageはこの辺になるとおもいます。
https://github.com/elastic/elasticsearch/tree/master/core/src/main/java/org/elasticsearch/threadpool
https://github.com/elastic/elasticsearch/tree/master/core/src/main/java/org/elasticsearch/common/util/concurrent

なお僕は下記本の第四章 タスクと並行処理 を読んでThreadPoolExecutor周りが分かりました。

ElasticsearchではThreadPoolExecutorを継承したEsThreadPoolExecutorを使っています。
https://github.com/elastic/elasticsearch/blob/v5.3.0/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java#L77

ThreadPoolExecutorのキューがいっぱいになってタスクが実行対象として受け入れ不可能な場合はRejectedExecutionExceptionをthrowしますが、リジェクションポリシーとしてEsAbortPolicyを実装しています。
https://github.com/elastic/elasticsearch/blob/v5.3.0/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java#L50

ログでqueue capacity = 200となっている部分は
https://github.com/elastic/elasticsearch/blob/v5.3.0/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java#L148
から取っていて、それ以外の[Running, pool size = 32, active threads = 32, queued tasks = 210, completed tasks = 4224628]]はThreadPoolExecutor#toStringからきています。

なお公式ドキュメント
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html
によればbulkのqueue_sizeのdefaultは50とありますが、上記ログや下記ソースから判断してもdefaultは200だとおもいます。

https://github.com/elastic/elasticsearch/blob/v5.3.0/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java#L172

なのでpull requestなげときました。
https://github.com/elastic/elasticsearch/pull/23870

そんなこんなでelasticsearch.ymlに下記を追加してElasticsearchを再起動して様子見中です。

thread_pool.bulk.queue_size: 1000

bulkのqueue_sizeは下記コマンドで確認できます。

curl 'localhost:9200/_cat/thread_pool/bulk?v&h=id,name,queue_size'


またうちの環境ではX-Pack Monitoringとelasticsearch_exporterを使ってモニタリングしています。
で、thread poolのメトリクス周りでelasticsearch_exporterのバグがあったのでpull requestしています。
https://github.com/justwatchcom/elasticsearch_exporter/pull/38

X-Pack MonitoringでもIndexing Threadsでthread pool周りのモニタリングができるのですが、node単位でしかみれないので、
最大値とかみたいなあという話をdiscussになげときました。
https://discuss.elastic.co/t/more-thread-pool-and-circuit-breaker-metrics/80943

elasticsearch_exporterでは下記のようなクエリを使ってキューサイズの最大値をチェックするようにしました。

max(elasticsearch_thread_pool_queue_count{cluster="..."}) by (type)

こちらからは以上です。