Presto+Kafka+MySQLでDDoS判定してみる

このエントリーをはてなブックマークに追加
| コメント(0) | トラックバック(0)

前回のブログ記事 の最後に

まぁここまでやってみて冒頭にも書いたとおりFluentd+Kafka+Norikraで十分できそうな気がしましたが、
例えばAccessLogの集計からDDoSの判定をする際に静的ファイルにホワイトリストを書いておいて、
それに入っているものは除外的な使い方をしたい場合はまだコード書かないといけないのかなという印象を受けました



と書いたのですが、何か良い方法はないかなーと思いつつ、先日IPROSさんのオフィスで行われた Presto勉強会 に参加したし、そろそろPresto触ってみようとPrestoのドキュメントを眺めてたらConnectorにKafkaが用意されてたので、Prestoを使って上記の事をやってみようかと思います。
ちなみにホワイトリストリストは今回MySQLに作ってみました。

Prestoとは

Facebook社が公開した分散SQL処理基盤で、特徴としては複数のConnectorを使うことで複数のデータソースを束ねて処理することが可能になります。もちろん分散基盤なのでWorkerの数を増やすことで大規模データをより高速に処理させることができます。

今回はConnectorにkafka, mysqlを用いて両方のデータを結合するような処理を行いました。

環境

ローカルのPCにVMを2台起動し、それぞれ
Presto+Kafka+ZookeeperはUbuntu 14.04にシングル構成で構築。(IP=192.168.56.103)
WebサーバとしてCentOS6.6でNginx, Djangoアプリケーション, MySQLが動いており、Fluentdのfluent-plugin-kafkaプラグインを使ってKafkaにデータを送信しています。(IP=192.168.56.104)
Prestoのバージョンは0.100を使いました。

インストール

Prestoのインストールはドキュメントの通りインストールしました。

Kafka, Zookeeperについては既にローカルで稼働していましたがPrestoのドキュメントと同じです。

事前準備

MySQL側にホワイトリストを設定するようなDatabaseを作成します。

CREATE DATABASE prestotest;
USE prestotest;
CREATE TABLE ip_whitelist ( id INT NOT NULL AUTO_INCREMENT, ip VARCHAR(15) NOT NULL );
GRANT ALL ON presto_test.* TO 'presto'@'%' IDENTIFIED BY 'prestopassword';
FLUSH PRIVILEGES;

-- 除外するIPとしてアプリケーションサーバのIP=192.168.56.104を追加
INSERT INTO ip_whitelist(ip) VALUES ('192.168.56.104');


Log format

Nginxのログのフォーマットは以下のようにLTSVで設定しました。
前回の記事から時刻の出力フォーマットを $time_local から $time_iso8601 に変更しています。

log_format  ltsv  'remote_ip:$remote_addr\tuser:$remote_user\tt:$time_iso8601\treq:$request\t'
                  'status:$status\tbody_size:$body_bytes_sent\treferer:$http_referer\t'
                  'ua:$http_user_agent\tforwarded_for:$http_x_forwarded_for\t'
                  'upstream:$upstream_addr\tupstream_response:$upstream_response_time';


Fluentd config

NginxのアクセスログをTailしてKafkaに対して送信しています。
Kafkaのdefault_topicは後述する etc/kafka/fluentd.nginx.json に合わせて fluentd.nginx にします。

<source>
  type      tail
  format    ltsv
  path      /path/to/nginx/access.log
  pos_file  /path/to/nginx/access.log.pos
  tag       kafka.access.log
</source>
<match kafka.**>
  type             kafka_buffered
  zookeeper        zookeeper_host:zookeeper_port
  default_topic    fluentd.nginx
  output_data_type json
</match>


Properties

conf/config.properties

今回はCoordinatorとWorkerを同居させているためnode-scheduler.include-coordinatorをtrueに設定しています。

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
task.max-memory=1GB
discovery-server.enabled=true
discovery.uri=http://localhost:8080


etc/catalog/mysql.properties

MySQLのコネクタの設定です。

connector.name=mysql
connection-url=jdbc:mysql://192.168.56.104:3306
connection-user=presto
connection-password=prestopassword


etc/catalog/kafka.properties

カタログの定義を以下のように記載します。
table名は <schema名>.<table名> にします。

connector.name=kafka
kafka.table-names=fluentd.nginx
kafka.nodes=192.168.56.103:9092
kafka.hide-internal-columns=false


etc/kafka/fluentd.nginx.json

Topicの定義を以下のように記載します。

{
  "tableName": "nginx",
  "schemaName": "fluentd",
  "topicName": "fluentd.nginx",
  "message": {
    "dataFormat": "json",
    "fields": [
      {
        "name": "remote_ip",
        "mapping": "remote_ip",
        "type": "VARCHAR"
      },
      {
        "name": "user",
        "mapping": "user",
        "type": "VARCHAR"
      },
      {
        "name": "t",
        "mapping": "t",
        "type": "TIMESTAMP",
        "dataFormat": "iso8601"
      },
      {
        "name": "req",
        "mapping": "req",
        "type": "VARCHAR"
      },
      {
        "name": "status",
        "mapping": "status",
        "type": "VARCHAR"
      },
      {
        "name": "body_size",
        "mapping": "body_size",
        "type": "BIGINT"
      },
      {
        "name": "referer",
        "mapping": "referer",
        "type": "VARCHAR"
      },
      {
        "name": "ua",
        "mapping": "ua",
        "type": "VARCHAR"
      },
      {
        "name": "forwarded_for",
        "mapping": "forwarded_for",
        "type": "VARCHAR"
      },
      {
        "name": "upstream",
        "mapping": "upstream",
        "type": "VARCHAR"
      },
      {
        "name": "upstream_response",
        "mapping": "upstream_response",
        "type": "DOUBLE"
      }
    ]
  }
}


Prestoの起動

Zookeeper, Kafkaを起動しPrestoも起動します。

$ bin/launcher start


クエリの実行

Presto CLIをダウンロードして、
ローカルのアプリケーションに何度かアクセスした後にデータが入ってるか確認してみます。

$ presto-cli-0.100-executable.jar --server localhost:8080
presto:default> SELECT COUNT(*) FROM kafka.fluentd.nginx;
 _col0
-------
   313
(1 row)


次にremote_ipでgroup byしてみます。

presto:default> SELECT remote_ip, COUNT(*) FROM kafka.fluentd.nginx GROUP BY remote_ip;
   remote_ip    | _col1
----------------+-------
 192.168.56.1   |   283
 192.168.56.104 |    31
(2 rows)


MySQLにも接続できるか確認します。

presto:default> SELECT * FROM mysql.prestotest.ip_whitelist;
 id |       ip
----+----------------
  1 | 192.168.56.104
(1 row)


Kafka結果からMySQLに入っているIPを除外してみます。

presto:default> SELECT remote_ip, COUNT(*) FROM kafka.fluentd.nginx
             -> WHERE remote_ip NOT IN (
             ->   SELECT ip FROM mysql.prestotest.ip_whitelist
             -> ) GROUP BY remote_ip;
  remote_ip   | _col1
--------------+-------
 192.168.56.1 |   283
(1 row)

remote_ip=192.168.56.104が結果から除外されました。

対象を直近5分で10回以上あるものに絞ってみます。

presto:default> SELECT remote_ip, COUNT(*) FROM kafka.fluentd.nginx
             -> WHERE remote_ip NOT IN (
             ->   SELECT ip FROM mysql.prestotest.ip_whitelist
             -> ) AND t > now() - INTERVAL '5' MINUTE
             -> GROUP BY remote_ip HAVING COUNT(*) > 10;
  remote_ip   | _col1
--------------+-------
 192.168.56.1 |    45
(1 row)


もちろんEsperのようにEvent Stream的な処理ではなく、都度クエリを打ったり定期的なバッチ処理として動かさないといけないかとは思いますが、手軽さを考えてもメリット大きい気がします。


ちなみにTreasure DataにPrestogres経由でアクセスできると思うので、オンプレにPrestoサーバを立てることで、Treasure Dataにある膨大なログデータの集計結果とオンプレにあるDBのデータをJOINできたりする気がしますが試してないので実際のところは不明です。
もうちょっとPresto触ってみようと思います。