c4se記:さっちゃんですよ☆

.。oO(さっちゃんですよヾ(〃l _ l)ノ゙☆)

.。oO(此のblogは、主に音樂考察Programming に分類されますよ。ヾ(〃l _ l)ノ゙♬♪♡)

音樂は SoundCloud に公開中です。

考察は現在は主に Cosense で公表中です。

Programming は GitHub で開發中です。

非同期に行はれる二種類の處理を繋ぐには四種類の channel が要る

非同期に行はれる二種類の處理を繋ぎたい。一つ目の處理から來る data は一つづつ二つ目の處理へ渡される。一つ目の處理は、二つ目の處理よりも速く data を作り出すとする。分散處理の出番である。CPU の core が複數有る場合や IO 待ちが支配的であるならば、一台の電算機の中で分散處理を行ふのも便利である。これを Elixir で行ふと次に成る。

stream
|> Task.async_stream(fn item -> some_process(item) end)
|> Stream.run

Task.async_stream/3は stream から渡される data 毎に process を起動し、process は函數を實行する。同時に處理する process の個數は最大で、Erlang VM が管理する scheduler と同じ個數 (普通は CPU の假想 core 數に等しい) に抑へられる。これを呼ぶ側は stream が尽きるのを待つから同期的に呼んで好い (それも非同期にしたければ GenServer を起こしてそこで呼ぶのである)。

さて Clojure には Go に似た、core.async と云ふ分散處理の仕組みが有る。二つの process は(async/chan)で作れる channel を通して通信する。これで先のものとほぼ同じものを作らう。

(defn task-parallel
  "Do tasks in parallel by processers number.

  (task-parallel
    (fn [chan] (doseq [item stream] (async/>!! chan item)))
    (fn [item] (do-something item)))"
  [producer consumer]
  (let [nproc (.availableProcessors (Runtime/getRuntime))
        chan (async/chan nproc)
        timeout-ms 60000
        threads (mapv (fn [_]
                        (let [on-exit (async/chan)
                              ;; thread (async/go-loop []
                              ;;          (async/alt!!
                              ;;            chan ([data _]
                              ;;                  (if data
                              ;;                    (try
                              ;;                      (consumer data)
                              ;;                      (recur)
                              ;;                      (catch Exception ex
                              ;;                        (println ex)
                              ;;                        (recur)))
                              ;;                    (async/close! on-exit)))
                              ;;            (async/timeout timeout-ms) (async/close! on-exit)))
                              ;; Using alts!! instead of alts! because of Babashka compatibility.
                              thread (async/go-loop [[data _] (async/alts!! [chan (async/timeout timeout-ms)])]
                                       (if data
                                         (try
                                           (consumer data)
                                           (recur (async/alts!! [chan (async/timeout timeout-ms)]))
                                           (catch Exception ex
                                             (println ex)
                                             (recur (async/alts!! [chan (async/timeout timeout-ms)]))))
                                         (async/close! on-exit)))]
                          {:on-exit on-exit :thread thread}))
                      (range nproc))]
    (try
      (producer chan)
      (finally (async/close! chan)))
    (doseq [{:keys [on-exit thread]} threads]
      (async/<!! on-exit)
      (async/close! thread))))

CPU の假想 core と同じ個數の thread を起動し、それを thread pool とする。各 thread は producer から data を受け取り consumer を實行する。

使ひ方は簡單に成る。

(task-parallel (fn [chan] (doseq [item stream] (>!! chan item)))
               (fn [item] (some-process item)))

task-parallelには三種類の channel が出て來る。chanon-exitthreadである。

  • chan : producer から thread pool へ data を渡す。producer は data が尽きたらこの channel を閉じて好い。閉じなくとも、thread 側は producer から三秒(初囘のみ六秒)data が送られなければ data が尽きた、或いは producer が crush したと見做し終了する。
  • on-exit : 各 thread は自らが終はる前にこの channel を閉じる。channel が閉じられると<!!nilを受け取る。上ではやってゐないが error を送るのにも使ふ。
  • thread : これを閉じると thread を終了させられる。thread を終了する唯一の方法だ。上ではやってゐないが cancel するのにも使ふ。

chanは producer -> consumer、on-exitは consumer -> 外界、threadは外界 -> consumer である。consumer から結果を受け取りたければ (producer-consumer)、もう一種類 channel が要る。これをresultと呼ぼう。resultは consumer -> 外界である。

Elixir のTask.async_stream/3と比べてみる。chanresultは見えづらいが作られてゐる。外界 (呼び出し元) が stream から data を受け取る。Task.async_stream/3 がその data を引數とし、process を起動する(spawn)。process は處理結果を外界へ送り(send)終了する。on-exitthreadErlang VM に隠れてゐる。これらはProcess.monitor/1で取り出して使へる。起こされた process は處理するべき命令が無くなれば Erlang VM に依り終了される (終了したくないならば再歸等をして何かを處理し續ける事が要る)。すると Erlang VM は monitor 元 (この場合は外界) に{:DOWN, ~}と云ふ message を送る。外界はProcess.exit/2で process を終了出來る。Process.monitor/1ではなくProcess.link/1を使ってゐれば、呼び出し元 (外界) が終了すると Erlang VM は起こされた process (consumer) も終了する。Process.link/1にはもう一つ違ひが有る。呼び出された process が終了すると呼び出した process をも Erlang VM は終了する。:trap_exit flag を自らに立て且つ起動された process が正常終了すれば、Erlang VM は呼び出し元を終了するのではなく{:EXIT, ~} message を送る樣に成る。詰まり Elixir の code にもこの四種類の channel が有る訣である。

  • producer -> consumer : 生成した data を渡す
  • consumer -> 外界 : 處理結果を渡す
  • consumer -> 外界 : 終了を報せる
  • 外界 -> consumer : 終了する

これを見ると actor model は、VM (や分散處理 library) が便利な仕組みを作り易い、しかし channel を明示して扱ふやり方に比べると 處理を行ふ事處理を行ふ場所 とを complect してゐる事が解る。これは Erlang が採擇し Clojure が避けたものだ。

producer が別途管理されてゐる事は無論前提としてゐた。