Apache Kafka ConsumerにEsperを組み込んでEPLクエリを実行してみる

このエントリーをはてなブックマークに追加
| コメント(0) | トラックバック(0)

Javaの勉強がてらサンプルコードを参考にしながら作ってみました。
長いので結論から言うと Fluentd+Kafka+Norikra あるいは Fluentd+Azure EventHubs+Azure Stream Analytics の構成が一番楽でいい気が。。。

作成したコードはGithub上に置いてあります。
ちなみに以下の環境で動作確認しました。

  • Client
    • OS: Mac OSX 10.10 Yosemite
    • JDK: 1.8.0_45
  • Server (Kafka+Zookeeper)
    • OS: Ubuntu 14.10
    • Apache Kafka: 0.8.2.1
    • Scala: 2.11


ユースケース

Web Server(Nginx)のAccess Logを分散Pub-SubシステムのApache Kafkaに送信し、そこからストリーム処理的に時間単位のUpstreamのレスポンスタイムを集計することを想定してます。
Kafkaへのデータ送信はFluentd(fluent-plugin-kafka)を利用し、ログのフォーマットはLTSVでこんな感じ。

log_format  ltsv  'remote_ip:$remote_addr\tuser:$remote_user\tt:$time_local\treq:$request\t'
                   'status:$status\tbody_size:$body_bytes_sent\treferer:$http_referer\t'
                   'ua:$http_user_agent\tforwarded_for:$http_x_forwarded_for\t'
                   'upstream:$upstream_addr\tupstream_response:$upstream_response_time';


kafka2esper.properties

ZookeeperやKafkaの設定、EPLのクエリなどはプロパティファイルから読み込むようになってます。

  • conf/kafka2esper.properties
## For Zookeeper
zookeeper.connect=<zookeeperhost>
zookeeper.session.timeout.ms=400
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000
group.id=<group.id>

## For Kafka
kafka.topic=<Consume Kafka topic>
consumer.numThreads=4

## Esper
esper.query=<Esper Query(EPL)>


tl;dr
以降は簡単なコードの説明です。

  • App.java

Applicationのメインクラスです。
Propertiesファイルを読み込んでEsperのListenerの設定・Kafka Consumerの設定・起動を行っています。

  • NginxAccessLog.java

アクセスログをMappingさせるデータのクラスです。
今回はJSONを入力としてJacksonのObjectMapperでMappingさせていますが、必要最低限のパラメータのみMappingさせるように

@JsonIgnoreProperties(ignoreUnknown=true)

で定義のないメンバは無視するようにしています。

  • EsperEventListener.java

com.espertech.esper.client.UpdateListenerインタフェースのListenerの実装です。
イベントが発行されるとevent()メソッドがコールされるためそこに処理を書いていきます。
今回はEPLで指定した最大値=max(upstream_response), 平均値=avg(upstream_response), 集計数=count(*)を標準出力に表示するのみを行っています。
もし他のシステムでこの値を使う場合にはKVSなどに保存しておいたりKafkaの別Topicへの入力などにしてもいいかもしれないです。

  • KafkaConsumer.java

KafkaのTopicをSubscribeしてEsperにイベントを発行するクラスです。
実際にはKafkaConsumerはEsperSubmitterクラスの管理してる感じになってます。

EserSubmitterクラスはKafkaから受信したJSON StringをNginxAccessLogクラスにMappingしてEsperに対してsendEventしている形です。

まぁここまでやってみて冒頭にも書いたとおりFluentd+Kafka+Norikraで十分できそうな気がしましたが、例えばAccessLogの集計からDDoSの判定をする際に静的ファイルにホワイトリストを書いておいて、それに入っているものは除外的な使い方をしたい場合はまだコード書かないといけないのかなという印象を受けました。
で、そこらへんはAzure EventHubs + Azure Stream Analytics + Azure Blob Storageで出来てしまうので、やっぱりStream AnalyticsはStream Analyticsで差別化できてるなーと思った次第です。