前回のブログ記事 の最後に

まぁここまでやってみて冒頭にも書いたとおりFluentd+Kafka+Norikraで十分できそうな気がしましたが、
例えばAccessLogの集計からDDoSの判定をする際に静的ファイルにホワイトリストを書いておいて、
それに入っているものは除外的な使い方をしたい場合はまだコード書かないといけないのかなという印象を受けました



と書いたのですが、何か良い方法はないかなーと思いつつ、先日IPROSさんのオフィスで行われた Presto勉強会 に参加したし、そろそろPresto触ってみようとPrestoのドキュメントを眺めてたらConnectorにKafkaが用意されてたので、Prestoを使って上記の事をやってみようかと思います。
ちなみにホワイトリストリストは今回MySQLに作ってみました。

Javaの勉強がてらサンプルコードを参考にしながら作ってみました。
長いので結論から言うと Fluentd+Kafka+Norikra あるいは Fluentd+Azure EventHubs+Azure Stream Analytics の構成が一番楽でいい気が。。。

作成したコードはGithub上に置いてあります。
ちなみに以下の環境で動作確認しました。

  • Client
    • OS: Mac OSX 10.10 Yosemite
    • JDK: 1.8.0_45
  • Server (Kafka+Zookeeper)
    • OS: Ubuntu 14.10
    • Apache Kafka: 0.8.2.1
    • Scala: 2.11


ユースケース

Web Server(Nginx)のAccess Logを分散Pub-SubシステムのApache Kafkaに送信し、そこからストリーム処理的に時間単位のUpstreamのレスポンスタイムを集計することを想定してます。
Kafkaへのデータ送信はFluentd(fluent-plugin-kafka)を利用し、ログのフォーマットはLTSVでこんな感じ。

log_format  ltsv  'remote_ip:$remote_addr\tuser:$remote_user\tt:$time_local\treq:$request\t'
                   'status:$status\tbody_size:$body_bytes_sent\treferer:$http_referer\t'
                   'ua:$http_user_agent\tforwarded_for:$http_x_forwarded_for\t'
                   'upstream:$upstream_addr\tupstream_response:$upstream_response_time';


kafka2esper.properties

ZookeeperやKafkaの設定、EPLのクエリなどはプロパティファイルから読み込むようになってます。

  • conf/kafka2esper.properties
## For Zookeeper
zookeeper.connect=<zookeeperhost>
zookeeper.session.timeout.ms=400
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000
group.id=<group.id>

## For Kafka
kafka.topic=<Consume Kafka topic>
consumer.numThreads=4

## Esper
esper.query=<Esper Query(EPL)>


Qiitaに投稿した内容の転載です。

事前準備

Credentialsの作成

  • https://console.developers.google.com/project/ でプロジェクト作成
  • [APIs & auth] > [Credentials] を選択してOAuthの [Create new ClientID] をクリック
  • [Service account] を選択して [Create ClientID] をクリック
  • JSONファイルが自動的にダウンロードされるけど [Generate new P12 key] をクリックしてキーの作成

ここで必要なのは
- コンソール上に表示されているEmail address (xxxxxx@developer.gserviceaccount.comなど)
- 上でダウンロードしたP12 keyファイル

Pythonのモジュールのインストール

以下をpipインストール

  • gspread
  • oauth2client

※ oauth2cientをインストールする際にはpyopensslが必要

コード

import gspread
from oauth2client.client import SignedJwtAssertionCredentials

# 認証
f = file('/path/to/p12keyfile', 'rb')
key = f.read()
f.close()
credentials = SignedJwtAssertionCredentials(
                  'xxxxxx@developer.gserviceaccount.com', # Email address
                  key,
                  scope='https://spreadsheets.google.com/feeds https://docs.google.com/feeds',
                  token_uri='https://accounts.google.com/o/oauth2/token'
               )
gs = gspread.authorize(credentials)


doc = gs.open('Worksheet名')

# Sheet追加
sheet = doc.add_worksheet('Sheet名', row=100, col=20)

# Sheetの選択
sheet = doc.worksheet("Sheet名")

# 値の取得

val = sheet.acell('B1').value # ラベル指定の場合のメソッド名はacell
val = sheet.cell(1,2).value   # 座標指定の場合は (行,列)

# 値の設定・変更

sheet.update_acell('B1', 'hoge') # ラベル指定の場合のメソッド名はupdate_acell
sheet.update_cell(1, 2, 'hoge')  # 座標指定の場合は (行,列)

# 変更量が多い時
cell_list = sheet.range('A1:C4')
'''
cell_list[0] : A1
cell_list[1] : B1
cell_list[2] : C1
cell_list[3] : A2
cell_list[4] : B2
 :
'''
for cell in cell_list:
    cell.value = 'fuga'
sheet.update_cells(cell_list)

OAuthを使わない方法

パスワード書くのを躊躇わない方は。

import gspread
doc = spread.login('xxxx@gmail.com', 'password')

sheet = doc.add_worksheet('Sheet名', row=100, col=10)

先日行われたマイクロソフトとJAZUGの共催イベント GoAzure 2015 に呼んで頂き、IoTとAzure Stream Analyticsについてお話させていただきました。

普段あまり人前には出てないし、そもそもそんなに話が上手くない私にこんな大きな機会をくれた元上司の大和屋さんには感謝していますし、何より聴きに来て頂いた方々には大変感謝してます:)
イベント的に大成功の中、個人的にも大きな事故もなく過ごせたのでホッとしています。(Matzとも一緒に写真撮ったし!)

で、セッションの際に紹介したFluentd pluginのfluent-plugin-azureeventhubsですが、せっかくなのでRubyGemsで 公開 しました。

$ gem install fluent-plugin-azureeventhubs

でインストールできますので、もし今Fluentdを使っていてAzure Event HubsやStream Analyticsを試してみたい方が居らっしゃいましたら使ってみて貰えると嬉しいです。

ちなみに一緒に紹介したApache Kafka用のプラグイン(fluent-plugin-kafka)もプルリクの取り込みやIssue対応をしてアップデートしました。

JAZUGは初めてでしたが、リハーサルから打ち上げまでみんな和気あいあいとしていていい感じなコミュニティですね。
いろいろとサポートして頂いた青木さん、亀渕さん、得上さんありがとうございました!

またイマイチだったスライドのデザインをサクッとカッコよく仕上げてくれたしょうた君、本当に助かりました!
当日発表したものはアニメーションが多用されていたので、ちょっと修正したスライドを上げておきます。
※カテゴライズや間違っている点などあるかもしれませんが、もしあればTwitterなどで教えてもらえると助かります!

あと、こんな私で良ければ話しますので機会を頂ける方が居らっしゃいましたらご連絡ください。
JAZUGの皆様もこれからも宜しくお願いしますm(_ _)m

いい加減このブログも読みづらので変えたいのですが。。。

Capy Inc.がMicrosoft Ventures Tel AvivのAcceleration Program Batch #5に採択され、その期間中にイスラエルに訪れることができたので何をしてきたのかを書き留めておこうかと。

何をしにいったのか?

自分の役割としては世界中から優秀なメンターが集まるということだったため、想定しているシステムのアーキテクチャの妥当性や意見を求めたりよりセキュアなサーバ構成について意見をもらおうという方針で行ってきました。

また、イスラエルへ行く直前に Azure Stream Analytics というサービスがβ版で発表されていたため、サービス向上に役立てないか?ということで急遽目的を1個追加しました。
今回はその際に開発した Fluentd pluginの話です。

Azure Stream Analyticsとは?

tldr;


まず Azure Stream Analytics はリアルタイムにイベントをSQLのようなクエリで簡単に処理できるサービスで、今後IOT(Internet of things)市場がもっと活性化した際のリアルタイムデータ処理系を意図したサービス可と思います。
個人的には Norikra as a ServiceやEsper as a Serviceのように考えてます。
サービス自体はEvent Drivenな構成になっているようで、入力は Azure Blob Storage, Azure Event Hubs(AMQP/HTTPS) へイベントデータを入力でき、リアルタイムにStream Analiticsへ渡されます。
イベント処理は以下の様なSQL Likeなわかりやすいクエリでデータを処理させることができます。





また、クエリには窓関数も設けられているため、定期的にどのようなデータ処理を行うかも指定できます。
用意されている窓関数は以下の3つです。

Tumbling Window

指定された時間毎に窓を区切り、その窓に入ったイベントを対象とします。

Hopping Window

Tumbling Windowに似ていますが、過去いくつの窓を対象とするかも指定できます。

Sliding Window

Sliding Windowは入力されたイベントから指定した時間を遡って対象を決定します。
そのため窓の区切りが上記2つとは異なり変動的になります。


また、出力は今のところ Azure Blob Storage / Azure Event Hubs / SQL Server の3つとなっています。

プラグインの話

Azure Stream AnalyticsにはAzure Event Hubsを経由でイベントデータ(Application LogやAccess Logなど)を入力させることにしました。そのためにまずはそれ用のプラグインの開発を始めました。

fluent-plugin-azureeventhubs

以下の様な感じでAzure Management Portalから取得できるConnection Stringをはりつける事で接続可能にしています。
※ Azure Event Hubsが対応しているAMQPのプロトコルバージョンが1.0であり今のところRubyモジュールで良いAMQP1.0対応モジュールが見つからなかったので、AMQP接続は今のところNot Implementedとしています。





現在Fluentd(td-agent)とfluent-plugin-azureblobstorageを使ってアーカイブしたApplicationログやアクセスログをAzure Blob Storageへ保存しているため、途中でAzure Event Hubsへの入力も行うように分岐させました。


これでEvent Hubs経由でStream Analyticsへの入力までは行えるようになったため以下の様なクエリでアクセスログから過去5分間のStatusコードの集計を行えるようになり、エラー発生傾向などが見える用になるかと思います。
(実際に使っているクエリとは異なります。)





ちなみにプラグインはまだGemsには登録してません。

課題

実はまだStream Analyticsで処理した結果をサービスへ反映させることができていません。
理由としては

  • Azure Blob Storageを出力先とした場合Prefixは指定できるのですがファイル名がランダムとなってしまい、かつローテートされるため集計結果をリアルタイムに拾うことが困難
  • Azure Event Hubsを出力先として利用してAMQPでSubscribeしたいが、AMQP1.0の良いモジュールがRubyに限らずまだ無い。(Qpid Protonなども試したのですがうまくSubscribeできてないです)
  • SQL Serverは既にMySQLをRDBとして利用しているアーキテクチャにSQL Serverを入れるのが嫌だったので却下。

ということで、これからも継続的にここらへんのサポートをしてもらうのですが何か良い方法が有る方は@hid_tgcなどに@mentionを飛ばしてもらえると助かります。

最後に

イスラエルに行くことになるまでは、恐らく他の人も思っている通り「危険極まりない場所」かと思っていたのですが、行ってみると都会だったりリゾート地だったり印象は全く違いました。
イスラエルはStartup nationを言われるほどStartupが盛んで、若い人に限らず子持ちのお父さんなども結構どのようなStartupを興そうかと考えていたりしていました。
ちなみに治安的にはサンフランシスコとは比較にならないくらい治安がいい気がします。(あくまで個人的な感想)
いろいろな宗教の発祥の地であったり、歴史的にいざこざが絶えない場所がら街には機関銃を持った徴兵された若い兵士(女性も!)の方が歩いていたりはするのですが、興味のある方は是非一度行ってみるもの良いと思います!

AWS S3Google Cloud StorageがあるのにAzureはなかったのでfluent-plugin-s3をAzure Storage用にポーティングしたfluent-plugin-azurestorageを書いてリリースしました。


パラメータ名がAzure用に変わっていますが使い方などはほぼ同じです。


まだ実装していないのですが、Azure Storageは通常の非構造化のテキストデータやバイナリデータを扱えるBlob Storageのみでなく、構造化データをテーブルのように扱えるTable Storageやキューとして使えるQueue Storageなどがあるので今後利用したくなったら機能追加するかもしれません。
またAzure Storage用に変更した箇所で作りが甘い箇所も多々あると思いますので、もし使って頂けて思う所があればIssue登録して頂けるとありがたいです。


AzureにもHortonworksのHDPがベースになっているHDInsightがあったり、最近プレビュー版が出てきたAzure MLがあったりするのでこのプラグインと組み合わせて使ってもらえるといいなと思っています。

Fluentd plugin2つ目。
今回はMicrosoft SQL ServerへレコードをInsertしていくfluent-plugin-mssqlをリリースしました。(ニッチ!)


github
RubyGems


本プラグインを利用するにあたり、unixODBCとFreeTDSのインストールが必須となりますがインストール手順や設定手順については以前ブログに記載しているのでそちらを参考に。(もう1年も前の記事だった!)

CentOSからSQL Serverへ接続する

http://makeitsmartjp.com/2013/02/centos-sqlserver.html

RubyからSQL Serverへ接続する(CentOS)

http://makeitsmartjp.com/2013/02/ruby-sqlserver-centos.html


本プラグインのベースが @tagomoris さんの
fluent-plugin-mysql となっているため、configファイルの表記方法も以下の差分以外は同じです。

  • host, port, databaseの代わりに、odbc_labelには odbc.ini で定義した接続したいDBへのラベルを指定します。


Microsoft SQL Server環境をお持ちでしたら試してみて下さい。
あと、テスト書いてくれる方いたら大歓迎です。。

最近気になっていた分散Pub/Subメッセージングシステムの Apache Kafka を触ってFluentdのバックエンドとして置いてみたいなーと思ったのですが、既存の https://github.com/kiyoto/fluent-plugin-kafka で試したらKafka-0.8では動作しなかったのでドライバを poseidon にアップデートした fluent-plugin-kafka-poseidon fluent-plugin-kafkaをリリースしました。

https://github.com/htgc/fluent-plugin-kafka-poseidon
http://rubygems.org/gems/fluent-plugin-kafka-poseidon

 ↓↓
https://github.com/htgc/fluent-plugin-kafka
http://rubygems.org/gems/fluent-plugin-kafka

Inputプラグイン、Outputプラグイン、BufferedOutputプラグインの3つを同梱してます。

Inputプラグイン

SubscribeするTopicはconfig内のtopicsで指定します。
複数のTopicをSubscribeする場合はカンマ( , ) で区切ってください。
tagはtopic名で付与しますがappend_prefixとかappend_suffixとか有ったほうが使い勝手がいいのかな。

Outputプラグイン

既存の fluent-plugin-kafka ではBufferedOutputのみでしたが、Kafkaに併せてStormなどでリアルタイム処理をしたい場合にBufferedにしちゃうとリアルタイム性が悪くなるかもということでBufferedOutputとは別にOutputプラグインを追加しました。
もちろんBufferしないのでKafkaとの接続に問題が生じたりFluentd自体が予期せず落ちたりしてしまった場合にはデータをロストしてしまうので注意。
Brokerの指定は'hostname:port'でBrokerが複数ある場合にはカンマ( , )区切りで複数指定も可能です。
PublishするTopicはレコードに 'topic' というキーがあればその値を使うし、なければconfigで指定するdefault_topicを使います。まぁこの辺は既存となんら変わりません。

BufferedOutputプラグイン

OutputプラグインのBuffered版です。Kafkaに渡す頻度を適度にflush_intervalで調整してください。

簡単にPublish/Subscribeができること程度しか確認できてないので、何かありましたらご指摘ください。
あと、今回はじめてRubygemsにリリースしたので記念プラグインです!

[2014-02-06 UPDATED]
既存のfluent-plugin-kafkaの作者である @kiyototamura さんよりfluent-plugin-kafkaの名前で引き継がせていただくこととなり、プラグイン名を変更しました。
@kiyototamura さん、ご連絡とサポートありがとうございました!!

Rubyで お!っとなった2つの事

最近ようやくRubyのコーディングに慣れてきた感があるのですが、Rubyを書いていて「お!」っとなった事があったので書いておきます。
ちなみにRubyのバージョンはruby-2.0.0p247。


1. リストのソート

よくみるサンプルは

程度なのですがオブジェクトのリストだった場合にも同じように書けます。



2. リストのuniq (重複なしのデータ)

ソートと同じようにユニークも簡単に書けます。

普通の配列のuniq



オブジェクトのuniq



しかもユニーク判定はオブジェクトでも可能なので以下のように複数条件の指定も可能です。



普段Ruby書いてる人にとっては極普通のことな気もするしもっと他にも便利なものがあるとは思うのですが、他言語から入ってきた自分にはsortもuniqも完結な書き方でかゆいところに手が届いた感じでちょっとRubyが好きになった瞬間でした。

http://makeitsmartjp.com/2011/04/gy-radiation-api.htmlで公開を始めた放射線量取得のAPIなのですが、参照先のサイトクローズの影響により2013年5月17日 10:31を最後に更新が行えていませんでした。


継続方法も考えてみたのですが、なかなか自分の浅知恵ではうまい解決方法も見つけることができなかったため一旦停止することとしました。


今までAPIを利用していただいた方、ブログパーツを利用していただいたり、APIを使って自作していただいた方には大変感謝しています。また、今回の更新停止によりお手数をお掛けしてしまい申し訳ないですがご理解頂けると助かります。
特にいつもフィードバックを頂けた@fannypainterさんには大変感謝しています。ありがとうございました!


また、本APIは2011年4月2日から公開しているのですが、データ自体2011/5/17からSQLiteのDatabaseとして保存していたため、このタイミングでGithubに公開しました。
ほとんどのデータがテキスト型で保存されていたり重複データが含まれているため、使うにはいろいろ難があると思いますがもし良ければ参照してみてください。