現在のログ解析基盤
1月のPresto Meetupでログ解析基盤について少し話してから3ヶ月弱経ったんだけどその時から若干変わったのでメモっておく。
以前はこんな感じでした。
Prestoは今は0.100を使っていて特に問題は発生してないです。
Cognos周りはだいぶ良くなって
Cognos 10.2.1 + Prestogres 0.4.8 + Prestogres ODBC Driver
から
Cognos 10.2.2 + Prestogres 0.6.7 + protocolVersion=2だけ使用するようにpatchしたPostgreSQL JDBC Driver
に変えました。
patchって言っても1行コメントしただけ。ここ。
https://github.com/pgjdbc/pgjdbc/blob/REL9_3_1102/org/postgresql/core/ConnectionFactory.java#L32
この変更によって以前はwhereが飛ばないとかあったんだけどそれもなくなって快適。Cognosは基本的にODBCよりJDBCの方が良いみたい。
MySQLは破棄してPrestoに移行するつもりです。まだできてないけど。
移行する理由はPrestoなら分析関数が使えるしスケールアウトが容易だから。あと管理するストレージの種類を減らしたいからね。
PentahoからもPrestogresつないでPrestoにアクセスするということを一部やってます。データ量が少ないOLAPならいけそう。データ量多くなると厳しそう。
InfiniDBはそのうち破棄予定です。
Azkabanは使っているけど、開発がオワコンな感じなのでRundeckに移行することを考えた方がいいのかなあ。Rundeckだとcron書式も書けるので1日に2回実行とかも簡単に設定できそう。Azkabanだとflowを2つ作らないといけないので面倒なんですよね。これ以外ではAzkabanで困ってないので、移行コスト考えるとどうするか微妙。困ってないのは便利なものを知らないだけという可能性もあるんですけどね。。。
HDPは2.1のまま、Pythonも2.7.7のまま。前者は上げたいんだけどね。
それ以外のPresto周りの話でいうと最近Prestoのクエリ履歴を取り始めました。Treasure Dataの人はこの辺かなりちゃんとやっているようですが、僕のほうではそんなに必要性を感じてなかったこともありやってませんでした。ところが最近諸事情がありやった方が良いなと思ったのでやり始めました。
やり方はいろいろあると思うんですが、Prestoの/v1/queryをfluentdのin_execで叩いて取得した結果をHDFSに入れてHive, Prestoで見るようにしました。
fluentdの設定はこんな感じ。1分ごとに/v1/queryを叩きます。データが重複する可能性がありますが、それはあとで別途対応します。
またPrestoはデフォルトだと100件までしか履歴を持たないようなので1分以内に100以上のクエリが実行されたら抽出できない履歴が出てきますが、そこは許容してます。厳密なものはまだ必要じゃないと思うので。
<source> type exec format json tag presto command ruby /path/to/aaa.rb run_interval 1m </source> <match presto> type webhdfs namenode master1.your.cluster.local:50070 standby_namenode master2.your.cluster.local:50070 path /path/on/hdfs/%Y%m%d/%Y%m%d-%H-${hostname}.json username hoge flush_interval 1m output_include_time false output_include_tag false </match>
外部コマンドで動かすプログラムはこんな感じ。JSONの中に改行が入っているとHiveで見たときに別レコードになっちゃうのでエスケープしてます。
require 'net/http' require 'uri' require 'json' uri = URI.parse('http://presto.server:8080/v1/query') http = Net::HTTP.new(uri.host, uri.port) res = http.get(uri.request_uri) if res.code == '200' result = JSON.parse(res.body) result.each {|record| record['query'].gsub!(/(\r\n|\r|\n)/, '\\\\n') puts record.to_json } else raise "#{res.code} #{res.message}" end
webhdfs経由でputしたデータをHiveの外部テーブル(jsonを保持した1カラムのみ)でアタッチしてhive viewをこんな感じに作りました。
create view ... ( totalDrivers, completedDrivers, queuedDrivers, runningDrivers, createTime, queryId, catalogProperties, systemProperties, startTime, user, source, catalog, schema, timeZoneKey, locale, remoteUserAddress, userAgent, state, errorType, name, code, scheduled, self, query, elapsedTime, endTime, yyyymmdd, ) partitioned on (yyyymmdd) as select totalDrivers, completedDrivers, queuedDrivers, runningDrivers, createTime, queryId, catalogProperties, systemProperties, startTime, user, source, catalog, schema, timeZoneKey, locale, remoteUserAddress, userAgent, state, errorType, name, code, scheduled, self, query, elapsedTime, endTime, yyyymmdd from ... lateral view json_tuple(json, 'totalDrivers', 'completedDrivers', 'queuedDrivers', 'runningDrivers', 'createTime', 'queryId', 'session', 'state', 'errorType', 'errorCode', 'scheduled', 'self', 'query', 'elapsedTime', 'endTime')t as totalDrivers, completedDrivers, queuedDrivers, runningDrivers, createTime, queryId, session, state, errorType, errorCode, scheduled, self, query, elapsedTime, endTime lateral view json_tuple(session, 'catalogProperties', 'systemProperties', 'startTime', 'user', 'source', 'catalog', 'schema', 'timeZoneKey', 'locale', 'remoteUserAddress', 'userAgent')s as catalogProperties, systemProperties, startTime, user, source, catalog, schema, timeZoneKey, locale, remoteUserAddress, userAgent lateral view json_tuple(errorCode, 'name', 'code')e as name, code
あとは以下のようにして重複を削除しました。
INSERT OVERWRITE TABLE ... PARTITION(...) SELECT DISTINCT totaldrivers, completeddrivers, queueddrivers, runningdrivers, createtime, queryid, catalogproperties, systemproperties, starttime, user, source, catalog, schema, timezonekey, locale, remoteuseraddress, useragent, state, errorType, name, code, scheduled, self, query, elapsedtime, endtime FROM ... WHERE endtime is not null ...
あとはこのテーブルを必要に応じてみる感じですね。
例えば以下のようなクエリで1時間ごとのクエリ実行件数を見ることが出来ます。
select date_format(from_unixtime(cast(starttime as bigint)/1000), '%Y%m%d-%H00') as starttime, count(*) as query_count from ... group by date_format(from_unixtime(cast(starttime as bigint)/1000), '%Y%m%d-%H00')
こういうふうに外部からpollingする形式だとPresto本体を全くいじることなくモニタリング出来るので良いのですが、その代わり重複削除とか気にすることが出てきます。Prestoから直接QueryCompletionEventをどこかにpushするやり方もあるようなのですが、一旦は上記の通りにやりました。