fluent-plugin-elasticsearchでCannot get new connection from poolが出た

題名の通りで動作環境は以下の通りです。
Elasticsearch側はcerebroで見る限りgreenで正常に見えました。

  • fluentd 0.12.27
  • fluent-plugin-elasticsearch 1.9.0
  • Elasticsearch 5.0.1

fluentdの設定はこんな感じ

<match ...>
  type forest
  subtype elasticsearch
  <template>
    hosts ...
    id ...
    buffer_type "file"
    buffer_path "..."
    flush_at_shutdown true
    buffer_chunk_limit 4m
    buffer_queue_limit 512
    flush_interval 1s
    num_threads 8
    logstash_format true
    logstash_prefix ${tag}
    utc_index false
    type_name fluentd
    include_tag_key true
    tag_key tag_key 
    request_timeout 30s
  </template>
</match>

エラーログはこんな感じ

error_class="Elasticsearch::Transport::Transport::Error" error="Cannot get new connection from pool." plugin_id="..."
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/elasticsearch-transport-1.0.18/lib/elasticsearch/transport/transport/base.rb:249:in `perform_request'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/elasticsearch-transport-1.0.18/lib/elasticsearch/transport/transport/http/faraday.rb:20:in `perform_request'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/elasticsearch-transport-1.0.18/lib/elasticsearch/transport/client.rb:128:in `perform_request'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/elasticsearch-api-1.0.18/lib/elasticsearch/api/actions/bulk.rb:90:in `bulk'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/fluent-plugin-elasticsearch-1.9.0/lib/fluent/plugin/out_elasticsearch.rb:353:in `send_bulk'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/fluent-plugin-elasticsearch-1.9.0/lib/fluent/plugin/out_elasticsearch.rb:339:in `write_objects'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/fluentd-0.12.27/lib/fluent/output.rb:480:in `write'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/fluentd-0.12.27/lib/fluent/buffer.rb:354:in `write_chunk'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/fluentd-0.12.27/lib/fluent/buffer.rb:333:in `pop'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/fluentd-0.12.27/lib/fluent/output.rb:338:in `try_flush'
/home/.../local/ruby-2.1/lib/ruby/gems/2.1.0/gems/fluentd-0.12.27/lib/fluent/output.rb:149:in `run'

時系列でいうと以下のような感じ

11/17(木) Elasticsearch 5にアップグレード
12/3(土) 「Cannot get new connection from pool.」発生。fluentd再起動して一旦回避
12/5(月) FluentdからAmazon Elasticsearch Serviceへログ転送する時の注意点 | 本日も乙およびElasitcsearch-ruby raises "Cannot get new connection from pool" error - Elasticsearch - Discuss the Elastic Stack を見てresurrect_afterを0に設定
12/9(金) 「Cannot get new connection from pool.」再度発生。
試しにreload_on_failure trueにして再起動するもすぐ再発

Amazon Elasticsearch使ってないけどreload_connections falseにして再起動するもすぐ再発

fluent-plugin-elasticsearchが依存しているelasticsearch-rubyのソース見てもなぜget_connectionでnilかfalseが返ってくるのか理解できず、排他制御周りでなんかバグってるのかなと思い、num_threads 1にして再起動するもすぐ再発

https://github.com/elastic/elasticsearch-ruby/blob/master/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb

        def perform_request(method, path, params={}, body=nil, &block)
          raise NoMethodError, "Implement this method in your transport class" unless block_given?
          start = Time.now if logger || tracer
          tries = 0

          begin
            tries     += 1
            connection = get_connection or raise Error.new("Cannot get new connection from pool.")
        def get_connection(options={})
          resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

          @counter_mtx.synchronize { @counter += 1 }
          reload_connections!         if reload_connections && counter % reload_after == 0
          connections.get_connection(options)
        end

        # Reloads and replaces the connection collection based on cluster information
        #
        # @see Sniffer#hosts
        #
        def reload_connections!
          hosts = sniffer.hosts
          __rebuild_connections :hosts => hosts, :options => options
          self
        rescue SnifferTimeoutError
          logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger
          self
        end

        # Tries to "resurrect" all eligible dead connections
        #
        # @see Connections::Connection#resurrect!
        #
        def resurrect_dead_connections!
          connections.dead.each { |c| c.resurrect! }
        end

https://github.com/elastic/elasticsearch-ruby/blob/master/elasticsearch-transport/lib/elasticsearch/transport/transport/connections/connection.rb

          def resurrect!
            alive! if resurrectable?
          end

          # Returns true if the connection is eligible to be resurrected as alive, false otherwise.
          #
          # @return [Boolean]
          #
          def resurrectable?
            @state_mutex.synchronize {
              Time.now > @dead_since + ( @options[:resurrect_timeout] * 2 ** (@failures-1) )
            }
          end

resurrect_afterが0ならすぐresurrect_dead_connections!が呼ばれてコネクション使えそうだけど、resurrect_timeout(defaultが60s)をexponential backoffした分は待ちそう。
resurrect_timeoutいじろうかと思ったけどfluent-plugin-elasticsearch側からresurrect_timeoutを設定できない。

この時点でだいぶ困ったけど、fluent-plugin-elasticsearchが使っているelasticsearch-rubyが1.0.18なのに気づく。
僕が使っているのはElasticsearch 5なので5系を使うべきなのではと思い始める。
だって、
https://github.com/elastic/elasticsearch-ruby
にも1系は1系しか互換性なさそうに見えるし。

そんなわけでgemspecをいじってdeployしたらこの日はエラーが収まって今のところ大丈夫という状況です。
https://github.com/wyukawa/fluent-plugin-elasticsearch/commit/b73cc0e21d8558d536c7819adcc3eb3b89ec5368

Elasticsearch側は問題なさそうに見えた。Elasticsearch 2.4を使っているときは発生してなかった。
この辺から想像するにelasitcsearch-rubyに何か関係してそうな気はしてますが、正直わかりません。
なんかわかったら教えてもらえると嬉しいです。