あいつの日誌β

働きながら旅しています。

fluent-plugin-slack

あらすじ

Slack API に不慣れた小生が fluent で集めているログを slack に POST してくださいと言われたとある年末のお話です。 調査を年末、実装は来年するので忘れないようにメモしておきます。

現状

とりあえず受信側の fluentd が以下のようになっていて、ここにjson形式のデータを投げているっぽい(実は fluentd も不慣れ)

<source>
type forward
port 24225
</source>

<match hoge.fuga.piyo**>
type file_alternative
path /hoge/fuga/piyo/logs/%Y/%m/%Y-%m-%d.log
time_slice_wait 1m
utc
</match>

作戦

さて、もうそのまんまな plugin があったのでこれを使いたいと思います。

https://github.com/sowawa/fluent-plugin-slack

そして slack の api ですが Incoming Webhooks と Slack API を使う方法があるようです。 ただし Incoming Webhooks で発行される トークンというか api のエンドポイントが変更になって fluentd-plugin-slack がその変更に追従していないように見えました。

というわけで slack API を使う事にします。

https://api.slack.com/methods/chat.postMessage

とりあえずログ出力を分ける

以下のように type copy とするとやりたいことができるらしい

<match hoge.fuga.piyo**>
 type copy

  <store>
    type file_alternative
    path /hoge/fuga/piyo/logs/%Y/%m/%Y-%m-%d.log
    time_slice_wait 1m
    utc
  </store>
  <store>
    # fluent-plugin-slack の設定
  </store>

問題1(バグ)

何回か実行してみてもいまいち動いてくれないのでログを仕込んだらバグだった。 年明けにマージされてればいいなあ

https://github.com/sowawa/fluent-plugin-slack/pull/3

とりえあず手元のコードを修正して先に進む

問題2(フォーマットが違う)

fluent-plugin-slack は message というプロパティを含む データ形式を期待しているのですが 現状のシステムではそのプロパティは存在していませんでした。

なので以下のように exec_filter を使ってみます。

<match hoge.fuga.piyo**>
 type copy

  <store>
    type file_alternative
    path /hoge/fuga/piyo/logs/%Y/%m/%Y-%m-%d.log
    time_slice_wait 1m
    utc
  </store>
  <store>
  <store>
    # fluent-plugin-slack の設定を移動させて代わりに以下を追加
    type exec_filter
    command ruby myparser.rb
    in_format json
    out_format msgpack
    tag slack
    flush_interval 1m   
  </store>
</match>
 
<match slack>
  # fluent-plugin-slack の設定をここに移動させる
</match>

myparser.rb を次のように用意する。やっていることは message というプロパティを追加しているだけです。

require 'json'
require 'msgpack'
 
while str = STDIN.gets
    s = JSON.parse(str)
    print MessagePack.pack({message:s})
    STDOUT.flush
end

問題3(リクエストURLが長すぎる)

これでうまく行ったと思ったら今度は連続してログを書き出すとそれもまとめて slack にリクエストしてしまい、 URL長すぎるからだめよだめだめ問題が発生します。

よって buffer_chunk_limitchunk size を絞る必要があります。 とりあえず 1K でいいかな。

まとめ

以下のような設定 + 自前の parser を準備するといける

<source>
type forward
port 24225
</source>

<match hoge.fuga.piyo**>
  type copy
  <store>
  type file_alternative
    path /hoge/fuga/piyo/logs/%Y/%m/%Y-%m-%d.log
    time_slice_wait 1m
    utc
  </store>
  <store>
    type exec_filter
    command ruby myparser.rb
    in_format json
    out_format msgpack
    tag slack
    flush_interval 1m   
  </store>
</match>
<match slack>
  type buffered_slack
  rtm true
  token xoxp-xxxxxxxxxxxxx
  channel my-private-test-room
  username okamuuu-bot
  color good
  icon_emoji :ghost:
  buffer_path /tmp/td_slack_buffer
  buffer_chunk_limit 1k
  flush_interval 5s
</match>

というわけで来年の最初の仕事はこれをあれする。

あとPOSTに変更したい