いい加減このブログも読みづらので変えたいのですが。。。

Capy Inc.がMicrosoft Ventures Tel AvivのAcceleration Program Batch #5に採択され、その期間中にイスラエルに訪れることができたので何をしてきたのかを書き留めておこうかと。

何をしにいったのか?

自分の役割としては世界中から優秀なメンターが集まるということだったため、想定しているシステムのアーキテクチャの妥当性や意見を求めたりよりセキュアなサーバ構成について意見をもらおうという方針で行ってきました。

また、イスラエルへ行く直前に Azure Stream Analytics というサービスがβ版で発表されていたため、サービス向上に役立てないか?ということで急遽目的を1個追加しました。
今回はその際に開発した Fluentd pluginの話です。

Azure Stream Analyticsとは?

tldr;


まず Azure Stream Analytics はリアルタイムにイベントをSQLのようなクエリで簡単に処理できるサービスで、今後IOT(Internet of things)市場がもっと活性化した際のリアルタイムデータ処理系を意図したサービス可と思います。
個人的には Norikra as a ServiceやEsper as a Serviceのように考えてます。
サービス自体はEvent Drivenな構成になっているようで、入力は Azure Blob Storage, Azure Event Hubs(AMQP/HTTPS) へイベントデータを入力でき、リアルタイムにStream Analiticsへ渡されます。
イベント処理は以下の様なSQL Likeなわかりやすいクエリでデータを処理させることができます。





また、クエリには窓関数も設けられているため、定期的にどのようなデータ処理を行うかも指定できます。
用意されている窓関数は以下の3つです。

Tumbling Window

指定された時間毎に窓を区切り、その窓に入ったイベントを対象とします。

Hopping Window

Tumbling Windowに似ていますが、過去いくつの窓を対象とするかも指定できます。

Sliding Window

Sliding Windowは入力されたイベントから指定した時間を遡って対象を決定します。
そのため窓の区切りが上記2つとは異なり変動的になります。


また、出力は今のところ Azure Blob Storage / Azure Event Hubs / SQL Server の3つとなっています。

プラグインの話

Azure Stream AnalyticsにはAzure Event Hubsを経由でイベントデータ(Application LogやAccess Logなど)を入力させることにしました。そのためにまずはそれ用のプラグインの開発を始めました。

fluent-plugin-azureeventhubs

以下の様な感じでAzure Management Portalから取得できるConnection Stringをはりつける事で接続可能にしています。
※ Azure Event Hubsが対応しているAMQPのプロトコルバージョンが1.0であり今のところRubyモジュールで良いAMQP1.0対応モジュールが見つからなかったので、AMQP接続は今のところNot Implementedとしています。





現在Fluentd(td-agent)とfluent-plugin-azureblobstorageを使ってアーカイブしたApplicationログやアクセスログをAzure Blob Storageへ保存しているため、途中でAzure Event Hubsへの入力も行うように分岐させました。


これでEvent Hubs経由でStream Analyticsへの入力までは行えるようになったため以下の様なクエリでアクセスログから過去5分間のStatusコードの集計を行えるようになり、エラー発生傾向などが見える用になるかと思います。
(実際に使っているクエリとは異なります。)





ちなみにプラグインはまだGemsには登録してません。

課題

実はまだStream Analyticsで処理した結果をサービスへ反映させることができていません。
理由としては

  • Azure Blob Storageを出力先とした場合Prefixは指定できるのですがファイル名がランダムとなってしまい、かつローテートされるため集計結果をリアルタイムに拾うことが困難
  • Azure Event Hubsを出力先として利用してAMQPでSubscribeしたいが、AMQP1.0の良いモジュールがRubyに限らずまだ無い。(Qpid Protonなども試したのですがうまくSubscribeできてないです)
  • SQL Serverは既にMySQLをRDBとして利用しているアーキテクチャにSQL Serverを入れるのが嫌だったので却下。

ということで、これからも継続的にここらへんのサポートをしてもらうのですが何か良い方法が有る方は@hid_tgcなどに@mentionを飛ばしてもらえると助かります。

最後に

イスラエルに行くことになるまでは、恐らく他の人も思っている通り「危険極まりない場所」かと思っていたのですが、行ってみると都会だったりリゾート地だったり印象は全く違いました。
イスラエルはStartup nationを言われるほどStartupが盛んで、若い人に限らず子持ちのお父さんなども結構どのようなStartupを興そうかと考えていたりしていました。
ちなみに治安的にはサンフランシスコとは比較にならないくらい治安がいい気がします。(あくまで個人的な感想)
いろいろな宗教の発祥の地であったり、歴史的にいざこざが絶えない場所がら街には機関銃を持った徴兵された若い兵士(女性も!)の方が歩いていたりはするのですが、興味のある方は是非一度行ってみるもの良いと思います!

AWS S3Google Cloud StorageがあるのにAzureはなかったのでfluent-plugin-s3をAzure Storage用にポーティングしたfluent-plugin-azurestorageを書いてリリースしました。


パラメータ名がAzure用に変わっていますが使い方などはほぼ同じです。


まだ実装していないのですが、Azure Storageは通常の非構造化のテキストデータやバイナリデータを扱えるBlob Storageのみでなく、構造化データをテーブルのように扱えるTable Storageやキューとして使えるQueue Storageなどがあるので今後利用したくなったら機能追加するかもしれません。
またAzure Storage用に変更した箇所で作りが甘い箇所も多々あると思いますので、もし使って頂けて思う所があればIssue登録して頂けるとありがたいです。


AzureにもHortonworksのHDPがベースになっているHDInsightがあったり、最近プレビュー版が出てきたAzure MLがあったりするのでこのプラグインと組み合わせて使ってもらえるといいなと思っています。

Fluentd plugin2つ目。
今回はMicrosoft SQL ServerへレコードをInsertしていくfluent-plugin-mssqlをリリースしました。(ニッチ!)


github
RubyGems


本プラグインを利用するにあたり、unixODBCとFreeTDSのインストールが必須となりますがインストール手順や設定手順については以前ブログに記載しているのでそちらを参考に。(もう1年も前の記事だった!)

CentOSからSQL Serverへ接続する

http://makeitsmartjp.com/2013/02/centos-sqlserver.html

RubyからSQL Serverへ接続する(CentOS)

http://makeitsmartjp.com/2013/02/ruby-sqlserver-centos.html


本プラグインのベースが @tagomoris さんの
fluent-plugin-mysql となっているため、configファイルの表記方法も以下の差分以外は同じです。

  • host, port, databaseの代わりに、odbc_labelには odbc.ini で定義した接続したいDBへのラベルを指定します。


Microsoft SQL Server環境をお持ちでしたら試してみて下さい。
あと、テスト書いてくれる方いたら大歓迎です。。

最近気になっていた分散Pub/Subメッセージングシステムの Apache Kafka を触ってFluentdのバックエンドとして置いてみたいなーと思ったのですが、既存の https://github.com/kiyoto/fluent-plugin-kafka で試したらKafka-0.8では動作しなかったのでドライバを poseidon にアップデートした fluent-plugin-kafka-poseidon fluent-plugin-kafkaをリリースしました。

https://github.com/htgc/fluent-plugin-kafka-poseidon
http://rubygems.org/gems/fluent-plugin-kafka-poseidon

 ↓↓
https://github.com/htgc/fluent-plugin-kafka
http://rubygems.org/gems/fluent-plugin-kafka

Inputプラグイン、Outputプラグイン、BufferedOutputプラグインの3つを同梱してます。

Inputプラグイン

SubscribeするTopicはconfig内のtopicsで指定します。
複数のTopicをSubscribeする場合はカンマ( , ) で区切ってください。
tagはtopic名で付与しますがappend_prefixとかappend_suffixとか有ったほうが使い勝手がいいのかな。

Outputプラグイン

既存の fluent-plugin-kafka ではBufferedOutputのみでしたが、Kafkaに併せてStormなどでリアルタイム処理をしたい場合にBufferedにしちゃうとリアルタイム性が悪くなるかもということでBufferedOutputとは別にOutputプラグインを追加しました。
もちろんBufferしないのでKafkaとの接続に問題が生じたりFluentd自体が予期せず落ちたりしてしまった場合にはデータをロストしてしまうので注意。
Brokerの指定は'hostname:port'でBrokerが複数ある場合にはカンマ( , )区切りで複数指定も可能です。
PublishするTopicはレコードに 'topic' というキーがあればその値を使うし、なければconfigで指定するdefault_topicを使います。まぁこの辺は既存となんら変わりません。

BufferedOutputプラグイン

OutputプラグインのBuffered版です。Kafkaに渡す頻度を適度にflush_intervalで調整してください。

簡単にPublish/Subscribeができること程度しか確認できてないので、何かありましたらご指摘ください。
あと、今回はじめてRubygemsにリリースしたので記念プラグインです!

[2014-02-06 UPDATED]
既存のfluent-plugin-kafkaの作者である @kiyototamura さんよりfluent-plugin-kafkaの名前で引き継がせていただくこととなり、プラグイン名を変更しました。
@kiyototamura さん、ご連絡とサポートありがとうございました!!

Rubyで お!っとなった2つの事

最近ようやくRubyのコーディングに慣れてきた感があるのですが、Rubyを書いていて「お!」っとなった事があったので書いておきます。
ちなみにRubyのバージョンはruby-2.0.0p247。


1. リストのソート

よくみるサンプルは

程度なのですがオブジェクトのリストだった場合にも同じように書けます。



2. リストのuniq (重複なしのデータ)

ソートと同じようにユニークも簡単に書けます。

普通の配列のuniq



オブジェクトのuniq



しかもユニーク判定はオブジェクトでも可能なので以下のように複数条件の指定も可能です。



普段Ruby書いてる人にとっては極普通のことな気もするしもっと他にも便利なものがあるとは思うのですが、他言語から入ってきた自分にはsortもuniqも完結な書き方でかゆいところに手が届いた感じでちょっとRubyが好きになった瞬間でした。

http://makeitsmartjp.com/2011/04/gy-radiation-api.htmlで公開を始めた放射線量取得のAPIなのですが、参照先のサイトクローズの影響により2013年5月17日 10:31を最後に更新が行えていませんでした。


継続方法も考えてみたのですが、なかなか自分の浅知恵ではうまい解決方法も見つけることができなかったため一旦停止することとしました。


今までAPIを利用していただいた方、ブログパーツを利用していただいたり、APIを使って自作していただいた方には大変感謝しています。また、今回の更新停止によりお手数をお掛けしてしまい申し訳ないですがご理解頂けると助かります。
特にいつもフィードバックを頂けた@fannypainterさんには大変感謝しています。ありがとうございました!


また、本APIは2011年4月2日から公開しているのですが、データ自体2011/5/17からSQLiteのDatabaseとして保存していたため、このタイミングでGithubに公開しました。
ほとんどのデータがテキスト型で保存されていたり重複データが含まれているため、使うにはいろいろ難があると思いますがもし良ければ参照してみてください。

Fluentdを使うようになってから個人的に

  • インターネットを挟んだ複数拠点間でfluentdでログ収集したいなー
  • でも受け手側でポートを全開放するのは怖いしIPフィルタかユーザ認証できたらいいなー
  • Treasure Dataのようにアカウント認証の仕組みを作るのも面倒だなー



と思っていたところ、先日のFluentd Casual Talks #2でtagomorisさんがfluent-plugin-secure-forwardを発表されました。

tagomorisさんのスライドはこちら



Twitterでも



とつぶやいたので試してみます。

サーバはEC2 (Amazon Linux AMI release 2012.03)
クライアントはローカル上のCentOS (CentOS release 5.9 (Final))

td-agentのバージョンは共に0.10.30で試しています。

前回の続き。

unixODBC, FreeTDSの設定が完了後、rubyからSQL Serverへ接続してみます。
Rubyのバージョンは1.9.3で確認しています。

ちょっと機会があったので備忘録的にメモしておきます。

環境はこんなかんじ

  • SQL Server 2008 R2 on Windows Server 2008 R2
  • CentOS 5.8



S3話題続きです。

現在仕事でもS3(とCloudFront)使ったりしてますがs3cmdの使い方を簡単に紹介します。