IOTや株式市場情報分析の為のストリーミングデータ処理のClojureでの実装

機械学習技術 確率的生成モデル 時系列データ解析技術 ストリームデータ制御技術 人工知能技術 IOT&センサー技術  セマンティックウェブ技術  デジタルトランスフォーメーション技術  Clojure

いくつかのビジネスアプリケーションは、ネットワークトラフィックのような外部刺激に非同期で反応する必要になる場合がある。例えば、IOTアプリケーションのようなstreaming dataを受け取って処理するものや株式市場における企業の株価や情報をを追跡するためのアプリケーションなどとなる。

通常のコンピューターのシーケンシャル/同期的な処理でこれらを実行しようとすると、入力するデータの量が増えてきた場合、非同期なデータ間の同期をとるオーバーヘッドがボトルネックとなり実効的な速度でのアプリケーション困難になる。

同様な問題はWebサービスのフロントエンド部分でも、人の動作速度(msec〜sec)とコンピューターの動作速度(nsecオーダー)との間で桁違いな速度ギャップが生じて、非同期処理にしないと動作が担保できない問題が生じ、これに対応するためにjavascriptは言語仕様の中に非同期処理を持っている。

今回はサーバーサイドのバックエンド処理の言語で非同期処理を行い、それらを実際のアプリケーションを想定した実装として組み上げることを行う。非同期処理を行うしくみを持つバックエンドの言語として著名なのはJavaのRunnable、Thread、Rxecuder等が標準APIとして用意されており、様々な非同期処理アプリケーションが構築できる。また、新興のプログラミング言語ではGoやRustなどがそれらに対応した言語として著名であり、機械学習によく用いられるPyhtonもTornadoTwistedGeventなどの非同期フレームワークを持っているが、非同期処理のしくみは複雑であり、本来のpythonの持つプログラミングの容易さと相反するため、それほど多くは活用されていない。

ClojureはJVMの上で動くLISPであるため、Javaの持つ非同期処理の仕組みをネイティブに持つとともに、マクロ等を用いたフレキシブルな言語体型を持つため、GoやRustで用いられている簡素な非同期処理の記述も容易に実現できるという特徴を持つ。

Clojureのcore.asyncについては以前も”Clojureでの状態管理と関数の疎結合“にて述べたが、Go言語を模したgoマクロを用いてg-blockを作り、その中が非同期に動作するようにしてプロセス全体への影響を抑え軽量化する。このgo-blockの中のやりとりをする口として、チャネル(channel)を用い、core.asyncを使ったプログラムでは、チャネルへの出入り口を介して非同期計量プロセスにデータを処理させることで、全体のシステムを作り上げる。

goマクロはその中にステートマシンを持ち、チャネルへの入力がある旅にマシンが一回転する。この一回転時に、チャネルを待ち受けていたgoブロックにスレッドが割り当てられ、次のチャネル入出力までCPUを使って処理が動き、チャネルの入出力でまた別のgoブロックに処理が移り、という形で限られたCPU上で、スレッドを山ほど起動することもなく、効率よく動作することができる。

このような仕組み(OSの協調型マルチプロセスと同じような原理)なので、goブロックは実際にはプロセスでもスレッドではなく、ステートマシンによって管理されたプログラム単位にすぎなくなる。よってcore.asyncはスレッドが一つであってもちゃんと動く。これはcore.asyncがその開発当時から以前”ClojureとJavascript、node.js、webフレームワークとの連携“で述べたClojurescriptでも動くことを想定して作っていたからである。

core.asyncの仕組み

core.asyncの動作に関しては公式ドキュメントに掲載されているが、簡単に概要を書くと、実行単位をgoで囲み、そのなかでchannelを読んだり書いたりすると、goで書かれた実行単位が次々と切り替わって動作されるものとなる。

以下にClojureでのcore.asyncのコード例を示す。Goのコードに近く非常にシンプルな記述となる。

(import '[java.util Date])
(require '[clojure.core.async :refer [chan go-loop >! <! timeout] :as async]) 
(def ch (chan)) ; make channel 

;; write asynchronous block 
(go-loop [] 
 (when (>! ch (Date.)) ; write channel
   (<! (timeout 2000)) ; wait 2000msec
   (recur)))

;; Read asynchronous block
(go-loop []
 (when-let [date (<! ch)] ; read channel
   (println "now:" date)
   (recur)))  

go-loopは以下のような形の省略形で、多用されるので用意されている。

(go 
 (loop []
   ;; processing
   ))

単純なgoブロックは、非同期処理が終わると2度と実行されないが、loopすることで「ずっと動き続ける非同期ブロック」を作ることができる。

上記のプログラムは、片方のgoブロックが現在日時をチャネルに書き込んで2秒待つ、もう一つのgoブロックは同じチャンネルを読み込み、読めたらそれを画面に出す。いずれのgoループもチャネルが閉じられるまでloopし続ける。これは現在時刻を生成するプロセスと、受け取った時刻を出力するプロセスの2つの軽量プロセスが動いているといえる。

このようにcore.ayncでは、プログラム処理単位ごとにgoブロックで囲って非同期処理にし、そのgoブロック間でのデータのやり取りにはチャネルを用いる。goブロックは、チャネルからデータを読み書きしようとして、もしチャネルにまだデータがなかったり、チャネルにまだ書けない状態であれば、park(待機)状態になりスレッドを解放する。そして準備ができればまたスレッドに割り当てられて動きだす。このような仕組みのために非常に少ないスレッド数で、たくさんの非同期ブックを実行することができる。

複雑なスレッド制御を書かなくとも、シンプルにチャネルを読み書きするところをgoで囲むことで、簡単に効率良い並列プログラムをかけるのがcore.asyncの強みであり、プログラムを書く側は単純にわりたいことを上から下に書くだけで良い。javascriptのcallback hellと呼ばれるような、非同期コールバック関数が何段にも重なることはない。

またcore.asyncには、データの変換操作を抽象化するtransducerやそれらをつなげるpipelineという処理(非同期処理の中につなげたり、分岐を加えたりする)が加えられより複雑な処理を実行できるようにもなっている。

core.asyncを用いたアプリケーション例

以下に、core.asyncで作成した株式市場のデータをリアルタイムで入力でスライド平均を作る実装例を示す。

(ns async-test.core
  (:require [clojure.core.async
             :refer [go chan <! >! timeout go-loop map>] :as async])
  (:require [clojure.core.async.lab :refer [broadcast]])
  (:use [seesaw.core]))

;;UI using seesaw
(native!)

(def main-frame (frame :title "stock price monitor"
                       :width 200 :height 100
                       :on-close :exit))

(def price-label (label "Price: -"))
(def running-avg-label (label "Running average: -"))

(config! main-frame :content
         (border-panel
           :north price-label
           :center running-avg-label
           :border 5))

;;Generation of pseudo input data
(defn share-price [company-code]
  (Thread/sleep 200)
  (rand-int 1000))

;;Calculation of moving averages
(defn avg [numbers]
  (float (/ (reduce + numbers)
            (count numbers))))

(defn roll-buffer [buffer val buffer-size]
     (let [buffer (conj buffer val)]
       (if (> (count buffer) buffer-size)
         (pop buffer)
         buffer)))

(defn make-sliding-buffer [buffer-size]
  (let [buffer (atom clojure.lang.PersistentQueue/EMPTY)]
    (fn [n]
      (swap! buffer roll-buffer n buffer-size))))

(def sliding-buffer (make-sliding-buffer 5))

;;Output data to channel
(defn broadcast-at-interval [msecs task & ports]
     (go-loop [out (apply broadcast ports)]
       (<! (timeout msecs)) (>! out (task))
       (recur out)))

;;Asynchronous processing using core.async
(defn -main [& args]
  (show! main-frame)
  (let [prices-ch (chan)
        sliding-buffer-ch (map> sliding-buffer (chan))]
       (broadcast-at-interval 500 #(share-price "XYZ") prices-ch sliding-buffer-ch)
       (go-loop []
         (when-let [price (<! prices-ch)]
           (text! price-label (str "Price: " price))
           (recur)))
       (go-loop []
         (when-let [buffer (<! sliding-buffer-ch)]
           (text! running-avg-label (str "Running average: " (avg
   buffer)))
           (recur)))))  

上記のコードでは、入出力のUIとしてseesawを用いている。これはJavaのdesktop UIライブラリのseesawのネィティブラッパーで、簡易なデスクトップデモを作りたい時に、容易に構築可能なライブラリとなる。(詳しくはtutorialか、gitでseesawのコードをダウンロードし、examplesのフォルダをトップにして”lein run”してexampleを動作させる)。上記のコードの動作させるにはターミナルで以下の入力を行う。

>lein run -m async-test.core 

エラーが生じなければ、以下のような画面が立ち上がり、疑似的な入力値に対する移動平均の計算が出力され続ける。

ランダムでデータを生成していた部分を外部データにHTTPでアクセスしたものに変えることで、より具体的なアプリケーションを組むことができる。

尚ストリームデータ処理は以前”redisについて“や”clojureとredis“で述べたようなkey-valueデータベースのpub-sub機能を用いて実現していく事も可能となる。

コメント

  1. […] IOTや株式市場情報分析の為のストリーミングデータ処理のClojureでの実装 […]

  2. […] IOTや株式市場情報分析の為のストリーミングデータ処理のClojureでの実装 […]

タイトルとURLをコピーしました