Implementation in Clojure of streaming data processing for IOT, Stock Data Analysis

Machine Learning Technology   Probabilistic Generative Model Time Series Data Analysis Technology   Stream Data Control Technology  Artificial Intelligence Technology   IOT&Sensor Technology   Semantic Web Technology  Digital Transformation TechnologyClojure

Some business applications may need to respond asynchronously to external stimuli such as network traffic. Examples include IOT applications that receive and process streaming data, and applications that track company stock prices and information in the stock market.

If these applications are executed using the sequential/synchronous processing of a normal computer, the overhead of synchronizing asynchronous data becomes a bottleneck when the amount of input data increases, making it difficult to run the application at effective speed.

A similar problem occurs in the front end of web services, where an order of magnitude speed gap occurs between the human operating speed (msec to sec) and the computer operating speed (on the order of nsec), making it impossible to guarantee operation without asynchronous processing. To address this problem, javascript has asynchronous processing in the language specification.

To address this issue, javascript has asynchronous processing in its language specification. This time, we will perform asynchronous processing in the language of server-side back-end processing, and build it into an implementation that assumes an actual application. Java’s Runnable, Thread, and Rxecuder are well-known as back-end languages with asynchronous processing mechanisms and are available as standard APIs, enabling the construction of a variety of asynchronous processing applications. Pyhton, which is often used for machine learning, has asynchronous frameworks such as Tornado, Twisted, and Gevent, but the mechanics of asynchronous processing are complex, and it is not possible to build asynchronous processing applications using the original Python programming language. However, the asynchronous processing mechanism is complex and conflicts with the ease of programming that python has, so it is not widely used.

Clojure is a LISP that runs on top of the JVM, so it has Java’s asynchronous processing mechanism natively and a flexible language body using macros, etc., which makes it easy to write simple asynchronous processing used in Go and Rust.

As described in “State Management and Loose Coupling of Functions in Clojure,” core.async in Clojure is used to create a g-block using go macros that mimic the Go language, and asynchronous operations are performed inside the block to reduce the impact on the entire process and make it lighter. A channel is used as an outlet for exchanges within the go-block, and a program using core.async creates a whole system by having asynchronous processes process data through the entry and exit points to the channel.

The go macro has a state machine within it, and the machine makes one rotation on each trip that has an input to the channel. During this rotation, a thread is allocated to the go block that is waiting for the channel, and processing moves using the CPU until the next channel input/output, and then processing moves to another go block again at the channel input/output, and so on. This is a very efficient way to run on a limited number of CPUs without launching a lot of threads.

Because of this mechanism (similar to the principle of cooperative multi-processing in operating systems), a go block is not actually a process or a thread, but merely a program unit managed by a state machine. Therefore, core.async works properly even if there is only one thread. This is because core.async was designed to work with Clojurescript, as described in the previous article “Interaction of Clojure with Javascript, node.js, and web frameworks“.

How core.async works

The operation of core.async is described in the official document, but to give a brief overview, the execution unit is enclosed in go, and when a channel is read or written in the go, the execution unit written in go is switched one after another and the operation is executed.

The following is an example of core.async code in Clojure, which is very simple and similar to Go code.

(import '[java.util Date])
(require '[clojure.core.async :refer [chan go-loop >! <! timeout] :as async]) 
(def ch (chan)) ; make channel 

;; write asynchronous block 
(go-loop [] 
 (when (>! ch (Date.)) ; write channel
   (<! (timeout 2000)) ; wait 2000msec
   (recur)))

;; Read asynchronous block
(go-loop []
 (when-let [date (<! ch)] ; read channel
   (println "now:" date)
   (recur)))  

The go-loop is an abbreviation of the following form, which is prepared because it is used frequently.

(go 
 (loop []
   ;; processing
   ))

A simple go block will never run again once the asynchronous process is finished, but by looping, it is possible to create an “asynchronous block that keeps moving.

In the above program, one go block writes the current date and time to the channel and waits 2 seconds, while the other reads the same channel and puts it on the screen when it is read. Both go loops continue to loop until the channel is closed. This can be said to be two lightweight processes running, one that generates the current time and one that outputs the received time.

Thus, in core.aync, each program processing unit is enclosed in a go block for asynchronous processing, and channels are used to exchange data between the go blocks. go blocks try to read or write data from or to the channel, and if there is no data in the channel yet or the channel is not yet ready to be written, the go block will return to the channel and return the data to the channel. If the go block tries to read/write data from/to the channel and there is no data in the channel or the channel is not yet ready for writing, it enters a park (waiting) state and releases the thread. When it is ready, it is assigned to a thread and starts working again. Because of this mechanism, a large number of asynchronous books can be executed with a very small number of threads.

The strength of core.async is that it is easy to write efficient parallel programs without writing complicated thread control, simply by surrounding the channel read/write with “go. callback hell, asynchronous callback functions are not layered on top of each other.

core.async also includes transducers to abstract data transformation operations and pipelines to connect them (to connect them in asynchronous processing or to add branches), allowing more complex processing to be executed.

Example application using core.async

Below is an example implementation of creating a sliding average from stock market data created by core.async in real-time input.

(ns async-test.core
  (:require [clojure.core.async
             :refer [go chan <! >! timeout go-loop map>] :as async])
  (:require [clojure.core.async.lab :refer [broadcast]])
  (:use [seesaw.core]))

;;UI using seesaw
(native!)

(def main-frame (frame :title "stock price monitor"
                       :width 200 :height 100
                       :on-close :exit))

(def price-label (label "Price: -"))
(def running-avg-label (label "Running average: -"))

(config! main-frame :content
         (border-panel
           :north price-label
           :center running-avg-label
           :border 5))

;;Generation of pseudo input data
(defn share-price [company-code]
  (Thread/sleep 200)
  (rand-int 1000))

;;Calculation of moving averages
(defn avg [numbers]
  (float (/ (reduce + numbers)
            (count numbers))))

(defn roll-buffer [buffer val buffer-size]
     (let [buffer (conj buffer val)]
       (if (> (count buffer) buffer-size)
         (pop buffer)
         buffer)))

(defn make-sliding-buffer [buffer-size]
  (let [buffer (atom clojure.lang.PersistentQueue/EMPTY)]
    (fn [n]
      (swap! buffer roll-buffer n buffer-size))))

(def sliding-buffer (make-sliding-buffer 5))

;;Output data to channel
(defn broadcast-at-interval [msecs task & ports]
     (go-loop [out (apply broadcast ports)]
       (<! (timeout msecs)) (>! out (task))
       (recur out)))

;;Asynchronous processing using core.async
(defn -main [& args]
  (show! main-frame)
  (let [prices-ch (chan)
        sliding-buffer-ch (map> sliding-buffer (chan))]
       (broadcast-at-interval 500 #(share-price "XYZ") prices-ch sliding-buffer-ch)
       (go-loop []
         (when-let [price (<! prices-ch)]
           (text! price-label (str "Price: " price))
           (recur)))
       (go-loop []
         (when-let [buffer (<! sliding-buffer-ch)]
           (text! running-avg-label (str "Running average: " (avg
   buffer)))
           (recur)))))  

The above code uses seesaw as the input/output UI. This is a native wrapper for seesaw, Java’s desktop UI library, and can be easily used to create a simple desktop demo. (For more details, see tutorial or download the seesaw code in git, top the examples folder, and “lein run” the example to get it working.) To run the above code, type the following in the terminal

>lein run -m async-test.core 

If no errors occur, the following screen will rise and the calculation of the moving average for the pseudo input values will continue to be output.

By changing the randomly generated data to external data accessed via HTTP, a more specific application can be assembled.

Stream data processing can also be realized using the key-value database pub-sub function as described in “About redis” and “clojure and redis“.

コメント

タイトルとURLをコピーしました