現在のログ解析基盤

1月のPresto Meetupでログ解析基盤について少し話してから3ヶ月弱経ったんだけどその時から若干変わったのでメモっておく。

以前はこんな感じでした。

Prestoは今は0.100を使っていて特に問題は発生してないです。
Cognos周りはだいぶ良くなって
Cognos 10.2.1 + Prestogres 0.4.8 + Prestogres ODBC Driver
から
Cognos 10.2.2 + Prestogres 0.6.7 + protocolVersion=2だけ使用するようにpatchしたPostgreSQL JDBC Driver
に変えました。

patchって言っても1行コメントしただけ。ここ。
https://github.com/pgjdbc/pgjdbc/blob/REL9_3_1102/org/postgresql/core/ConnectionFactory.java#L32

この変更によって以前はwhereが飛ばないとかあったんだけどそれもなくなって快適。Cognosは基本的にODBCよりJDBCの方が良いみたい。

MySQLは破棄してPrestoに移行するつもりです。まだできてないけど。

移行する理由はPrestoなら分析関数が使えるしスケールアウトが容易だから。あと管理するストレージの種類を減らしたいからね。

PentahoからもPrestogresつないでPrestoにアクセスするということを一部やってます。データ量が少ないOLAPならいけそう。データ量多くなると厳しそう。

InfiniDBはそのうち破棄予定です。

Azkabanは使っているけど、開発がオワコンな感じなのでRundeckに移行することを考えた方がいいのかなあ。Rundeckだとcron書式も書けるので1日に2回実行とかも簡単に設定できそう。Azkabanだとflowを2つ作らないといけないので面倒なんですよね。これ以外ではAzkabanで困ってないので、移行コスト考えるとどうするか微妙。困ってないのは便利なものを知らないだけという可能性もあるんですけどね。。。

HDPは2.1のまま、Pythonも2.7.7のまま。前者は上げたいんだけどね。

それ以外のPresto周りの話でいうと最近Prestoのクエリ履歴を取り始めました。Treasure Dataの人はこの辺かなりちゃんとやっているようですが、僕のほうではそんなに必要性を感じてなかったこともありやってませんでした。ところが最近諸事情がありやった方が良いなと思ったのでやり始めました。

やり方はいろいろあると思うんですが、Prestoの/v1/queryをfluentdのin_execで叩いて取得した結果をHDFSに入れてHive, Prestoで見るようにしました。

fluentdの設定はこんな感じ。1分ごとに/v1/queryを叩きます。データが重複する可能性がありますが、それはあとで別途対応します。
またPrestoはデフォルトだと100件までしか履歴を持たないようなので1分以内に100以上のクエリが実行されたら抽出できない履歴が出てきますが、そこは許容してます。厳密なものはまだ必要じゃないと思うので。

<source>
  type                exec
  format              json
  tag                 presto
  command ruby        /path/to/aaa.rb
  run_interval        1m
</source>
<match presto>
  type                webhdfs
  namenode         master1.your.cluster.local:50070
  standby_namenode master2.your.cluster.local:50070
  path                /path/on/hdfs/%Y%m%d/%Y%m%d-%H-${hostname}.json
  username            hoge
  flush_interval      1m
  output_include_time false
  output_include_tag  false
</match>

外部コマンドで動かすプログラムはこんな感じ。JSONの中に改行が入っているとHiveで見たときに別レコードになっちゃうのでエスケープしてます。

require 'net/http'
require 'uri'
require 'json'

uri = URI.parse('http://presto.server:8080/v1/query')

http = Net::HTTP.new(uri.host, uri.port)
res = http.get(uri.request_uri)

if res.code == '200'
  result = JSON.parse(res.body)
  result.each {|record|
                  record['query'].gsub!(/(\r\n|\r|\n)/, '\\\\n') 
                  puts record.to_json
              }
else
  raise "#{res.code} #{res.message}"
end

webhdfs経由でputしたデータをHiveの外部テーブル(jsonを保持した1カラムのみ)でアタッチしてhive viewをこんな感じに作りました。

create view ... (
  totalDrivers,
  completedDrivers,
  queuedDrivers,
  runningDrivers,
  createTime,
  queryId,
  catalogProperties,
  systemProperties,
  startTime,
  user,
  source,
  catalog,
  schema,
  timeZoneKey,
  locale,
  remoteUserAddress,
  userAgent,
  state,
  errorType,
  name,
  code,
  scheduled,
  self,
  query,
  elapsedTime,
  endTime,
  yyyymmdd,
)
partitioned on (yyyymmdd)

as

select
  totalDrivers,
  completedDrivers,
  queuedDrivers,
  runningDrivers,
  createTime,
  queryId,
  catalogProperties,
  systemProperties,
  startTime,
  user,
  source,
  catalog,
  schema,
  timeZoneKey,
  locale,
  remoteUserAddress,
  userAgent,
  state,
  errorType,
  name,
  code,
  scheduled,
  self,
  query,
  elapsedTime,
  endTime,
  yyyymmdd
from
  ...
lateral view json_tuple(json, 'totalDrivers', 'completedDrivers', 'queuedDrivers', 'runningDrivers', 'createTime', 'queryId',
'session', 'state', 'errorType', 'errorCode', 'scheduled', 'self', 'query', 'elapsedTime', 'endTime')t
as
totalDrivers, completedDrivers, queuedDrivers, runningDrivers, createTime, queryId,
session, state, errorType, errorCode, scheduled, self, query, elapsedTime, endTime
lateral view json_tuple(session, 'catalogProperties', 'systemProperties', 'startTime', 'user', 'source', 'catalog', 'schema', 'timeZoneKey', 'locale', 'remoteUserAddress', 'userAgent')s
as
catalogProperties, systemProperties, startTime, user, source, catalog, schema, timeZoneKey, locale, remoteUserAddress, userAgent
lateral view json_tuple(errorCode, 'name', 'code')e
as
name, code

あとは以下のようにして重複を削除しました。

 INSERT OVERWRITE TABLE ... PARTITION(...)
 SELECT
   DISTINCT
   totaldrivers,
   completeddrivers,
   queueddrivers,
   runningdrivers,
   createtime,
   queryid,
   catalogproperties,
   systemproperties,
   starttime,
   user,
   source,
   catalog,
   schema,
   timezonekey,
   locale,
   remoteuseraddress,
   useragent,
   state,
   errorType,
   name,
   code,
   scheduled,
   self,
   query,
   elapsedtime,
   endtime
 FROM
   ...
 WHERE
   endtime is not null ...

あとはこのテーブルを必要に応じてみる感じですね。

例えば以下のようなクエリで1時間ごとのクエリ実行件数を見ることが出来ます。

select
  date_format(from_unixtime(cast(starttime as bigint)/1000), '%Y%m%d-%H00') as starttime,
  count(*) as query_count
from
  ...
group by
  date_format(from_unixtime(cast(starttime as bigint)/1000), '%Y%m%d-%H00')

こういうふうに外部からpollingする形式だとPresto本体を全くいじることなくモニタリング出来るので良いのですが、その代わり重複削除とか気にすることが出てきます。Prestoから直接QueryCompletionEventをどこかにpushするやり方もあるようなのですが、一旦は上記の通りにやりました。