CSP (та core.async)

Абревіатура "CSP" розшифровується як Communicating Sequential Processes, що означає "Взаємодія послідовних процесів". Це формалізм для опису рівночасних систем, вперше описаний Ч. Е. Р. Гоаром у 1978 році. Це модель паралельних процесів, що грунтується на передачі повідомлень та синхронізації через канали. Глибоке вивчення теоретичної моделі, що стоїть за CSP, не входить до цієї книги. Натомість ми сфокусуємо увагу на розгляді примітивів паралельного виконання, що їх пропонує бібліотека core.async.

Бібліотека core.async не входить до стандартної бібліотеки ClojureScript - вона є окремою бібліотекою. Тим не менше, це дуже популярна бібліотека. Значну кількість бібліотек побудовано на основі примітивів, що містяться у core.async, тому ми вирішили присвятити core.async розділ у книзі. Також це гарний приклад створення синтаксичної абстракції за допомогою макросів ClojureScript. Для виконання прикладів з цього розділу слід встановити core.async.

Канали

Канали подібні до стрічок конвейєра: до каналу можна покласти одне значення, або дістати з нього одне значення за раз. Допускається існування багатьох сутностей, що читають з каналу та розміщують значення у ному. Це основний механізм передачі повідомлень core.async. Створимо канал та проведемо з ним кілька операцій, щоб поглянути на роботу цього інструмента.

(require '[cljs.core.async :refer [chan put! take!]])

(enable-console-print!)

(def ch (chan))

(take! ch #(println "Got a value:" %))
;; => nil

;; there is a now a pending take operation, let's put something on the channel

(put! ch 42)
;; Got a value: 42
;; => 42

У наведеному прикладі ми створили канал ch за допомогою конструктора chan. Після цього ми виконали операцію take та передали функцію-колбек, що її буде викликано після успішного завершення операції. Після виклику методу put! для розміщення значення у каналі операція take завершує роботу, а на екран виводиться повідомлення "Got a value: 42". Зауважимо, що результатом виклику функції put! є значення, яке було щойно розміщене у каналі.

Подібно до take!, функція put! може приймати колбек, але ми не передавали жодного колбека у попередньому прикладі. Для операцій з розміщення значення колбеки будуть викликані після прийняття значення. Порядок розміщення та читання значень не визначений заздалегідь. Щоб переконатися у цьому, виконаємо кілька операцій put, а потім - take:

(require '[cljs.core.async :refer [chan put! take!]])

(def ch (chan))

(put! ch 42 #(println "Just put 42"))
;; => true
(put! ch 43 #(println "Just put 43"))
;; => true

(take! ch #(println "Got" %))
;; Got 42
;; Just put 42
;; => nil

(take! ch #(println "Got" %))
;; Got 43
;; Just put 43
;; => nil

Можливо, у вас зʼявилося питання про те, чому операція put! повертає результат true. Це ознака того, що операція put можлива, хоча значення ще не було прочитане. Канали можна закривати. Це призведе до того, що операції з додавання значень будуть призводити до помилки:

(require '[cljs.core.async :refer [chan put! close!]])

(def ch (chan))

(close! ch)
;; => nil

(put! ch 42)
;; => false

Цей приклад демонструє найпростішу ситуацію, але що буде з незавершеною операцію, якщо канал закритий? Проведемо кілька операцій з розміщення та читання значень, закриємо канал та подивимося на ефект:

(require '[cljs.core.async :refer [chan put! take! close!]])

(def ch (chan))

(take! ch #(println "Got value:" %))
;; => nil
(take! ch #(println "Got value:" %))
;; => nil

(close! ch)
;; Got value: nil
;; Got value: nil
;; => nil

Очевидно, що, якщо канал закритий, усі операції take! отримують значення nil. Значення nil у каналі має значення сторожа, що трактується отримувачами повідомлень як закриття каналу. З цієї причини не дозволяється відправляти у канал значення nil:

(require '[cljs.core.async :refer [chan put!]])

(def ch (chan))

(put! ch nil)
;; Error: Assert failed: Can't put nil in on a channel
Буфери

Ми побачили, що незавершені операції take та put потрапляють у чергу каналу. Що станеться, якщо таких незавершених операцій багато? Перевіримо це шляхом відправки численних операцій take та put:

(require '[cljs.core.async :refer [chan put! take!]])

(def ch (chan))

(dotimes [n 1025]
  (put! ch n))
;; Error: Assert failed: No more than 1024 pending puts are allowed on a single channel.

(def ch (chan))

(dotimes [n 1025]
  (take! ch #(println "Got" %)))
;; Error: Assert failed: No more than 1024 pending takes are allowed on a single channel.

Як ви бачите, існує обмеження на кількість незавершених операцій розміщення та читання з каналу. На сьогодні це обмеження складає 1024 одиниць, але це деталь реалізації, що може бути змінена у подальшому. Зауважимо, що в одному каналі не можуть одночасно існувати незавершені операції з розміщення та читання, бо за умови наявності незавершених операцій читання операції з розміщення будуть одразу успішно завершуватися, і навпаки.

Канали підтримують можливість буферизації операцій розміщення. Якщо ми створимо канал з буфером, операції put будуть одразу успішно завершуватися, якщо у буфері достатньо місця; в іншому випадку, такі операції будуть ставити у чергу. Продемонструємо цю поведінку за допомогою буферу на один елемент. Конструктори chan очікують число першим аргументом, тому буфер буде заданого розміру:

(require '[cljs.core.async :refer [chan put! take!]])

(def ch (chan 1))

(put! ch 42 #(println "Put succeeded!"))
;; Put succeeded!
;; => true

(dotimes [n 1024]
  (put! ch n))
;; => nil

(put! ch 42)
;; Error: Assert failed: No more than 1024 pending puts are allowed on a single channel.

Що сталося у попередньому прикладі? Ми створили канал з буфером на один елемент та виконали операцію розміщення значення у каналі. Операція одразу успішно завершилася, бо значення потрапило до буфера. Після цього ми виконали наступні 1024 операцій розміщення, щоб заповнити чергу операцій, а при спробі розмістити ще одне значення канал ініціював помилку через те, що розміщення ще однієї операції у чергу неможливе.

Тепер ми знаємо, як працюють канали та навіщо потрібні буфери. Перейдемо до інших видів буферів, реалізованих у стандартній бібліотеці. Різні типи буферів мають різні правила використання. Цікаво розглянути усі можливі варіанти, щоб мати змогу вибрати оптимальний. Канали за замовчування не мають буферів.

Буфер фіксованого розміру

Для створення буфера фіксованого розміру слід передати конструктору chan розмір буфера. Буфери фіксованого розміру є найпростішим видом буферів - коли такий буфер повний, операції розміщення потрапляють у чергу.

Конструктор chan приймає число або буфер першим аргументом. У наступному прикладі створено два канали, що мають фіксований розмір 32:

(require '[cljs.core.async :refer [chan buffer]])

(def a-ch (chan 32))

(def another-ch (chan (buffer 32)))
Буфери, що відкидають значення

Буфери фіксованого розміру дозволяють додавати операції до черги. Але раніше ми вже бачили, що якщо буфер повний, операції все одно будуть ставати у чергу. Якщо необхідно відкидати операції, які сталися після заповнення буфера, можна скористатися буфером, що відкидає значення (dropping buffer).

Буфери, що відкидають значення, мають фіксований розмір. Коли такий буфер повний, операції з розміщення нових значень будуть успішно завершуватися, але значення таких операцій будуть відкинуті. Наведемо приклад:

(require '[cljs.core.async :refer [chan dropping-buffer put! take!]])

(def ch (chan (dropping-buffer 2)))

(put! ch 40)
;; => true
(put! ch 41)
;; => true
(put! ch 42)
;; => true

(take! ch #(println "Got" %))
;; Got 40
;; => nil
(take! ch #(println "Got" %))
;; Got 41
;; => nil
(take! ch #(println "Got" %))
;; => nil

Ми виконали три операції з розміщення значення, всі три успішно завершилися, але буфер вміщує лише два значення, тому підписані клієнти отримають лише два значення. Третя операція з читання значення додаються у чергу, бо не отримує значення. Значення третьої операції з розміщення значення 42 відкидається.

Sliding

Буфер динамічного розміру демонструє протилежну поведінку. Коли буфер наповнюється, операції з розміщення значень будуть завершені, а найстарші значення будуть відкинуті, на їхнє місце будуть записані нові. Буфер динамічного розміру стане в нагоді, коли необхідно виконати нові операції, а старі значення дозволено відкинути.

(require '[cljs.core.async :refer [chan sliding-buffer put! take!]])

(def ch (chan (sliding-buffer 2)))

(put! ch 40)
;; => true
(put! ch 41)
;; => true
(put! ch 42)
;; => true

(take! ch #(println "Got" %))
;; Got 41
;; => nil
(take! ch #(println "Got" %))
;; Got 42
;; => nil
(take! ch #(println "Got" %))
;; => nil

Ми виконали три операції розміщення значень, і усі три успішно завершилися, але буфер динамічного розміру каналу має розмір 2, тому лише два значення було передано до читачів. Третя операція з читання потрапляє у чергу, бо відповідне значення першої операції з розміщення було відкинуто.

Трансдʼюсери

Як стало відомо з розділу про трансдʼюсери, розміщення значень у каналі можна розглядати як процес, що можна перетворити на трансдʼюсер. Це означає, що можна створити канал, передати йому трансдʼюсер та трансформувати вхідні значення перед тим, як вони потраплять до каналу.

Якщо ми хочемо використати трансдʼюсер з каналом, слід надати буфер, бо функція-перетворювач, яку буде трансформувати трансдʼюсер, буде використана у буфері як функція трансформації, бо вона отримує буфер та вхідні значення та повертає буфер із вбудованим значенням.

(require '[cljs.core.async :refer [chan put! take!]])

(def ch (chan 1 (map inc)))

(put! ch 41)
;; => true

(take! ch #(println "Got" %))
;; Got 42
;; => nil

Можливо, ви задаєте собі питання, що станеться з каналом, коли функція-редʼюсер поверне значення. Виявляється, що з точки зору каналу, завершення операції - це закриття каналу, тому при появі такого значення канал буде закрито:

(require '[cljs.core.async :refer [chan put! take!]])

(def ch (chan 1 (take 2)))

(take! ch #(println "Got" %))
;; => nil
(take! ch #(println "Got" %))
;; => nil
(take! ch #(println "Got" %))
;; => nil

(put! ch 41)
;; => true
(put! ch 42)
;; Got 41
;; => true
(put! ch 43)
;; Got 42
;; Got nil
;; => false

Ми взяли трансдʼюсер take, що зберігає стан, що дозволяє розмістити у каналі не більше 2 значень. Після цього ми виконали три операції з читання значення та очікуємо отримати лише два значення. З прикладу видно, що третя операція з читання отримала спеціальне значення nil, що вказує на закриття каналу. Також третя операція з розміщення даних отримала значення false, що вказує на те, що операція не відбулася.

Обробка виключних ситуацій

Виключні ситуації, що виникають під час додавання нових значень до буфера, призводять до завершення операції з помилкою, а виключна ситуація буде виведена у консоль. Зауважимо, що конструктор каналу приймає третій аргумент - функцію обробки виключних ситуацій.

Ми створюємо канал з обробником помилок, що буде викликаний при появі виключної ситуації. Якщо обробник має значення nil, операція буде тихо завершуватися помилкою, та якщо вона має інше значення, операція додавання буде повторно виконана з цим значенням.

(require '[cljs.core.async :refer [chan put! take!]])

(enable-console-print!)

(defn exception-xform
  [rfn]
  (fn [acc input]
    (throw (js/Error. "I fail!"))))

(defn handle-exception
  [ex]
  (println "Exception message:" (.-message ex))
  42)

(def ch (chan 1 exception-xform handle-exception))

(put! ch 0)
;; Exception message: I fail!
;; => true

(take! ch #(println "Got:" %))
;; Got: 42
;; => nil
Offer та Poll

Ми познайомилися з двома базовими операціями у каналах: put! та take!. такі операції розміщують або читають значення та можуть потрапити у чергу, якщо не можуть бути одразу виконані. Обидві функції асинхронні за своєю природою: вони можуть успішно завершитися, але пізніше.

core.async пропонує дві синхронні операції для розміщення та читання значень offer! та poll!. Наведемо приклад використання цих операцій.

offer! розміщує значення у каналі, якщо це можливо зробити одразу. Значенням offer! є true, якщо канал отримує значення, інакше false. Зауважимо, що на відміну від put!, offer! не розрізняє відкриті та закриті канали.

(require '[cljs.core.async :refer [chan offer!]])

(def ch (chan 1))

(offer! ch 42)
;; => true

(offer! ch 43)
;; => false

poll! отримує значення з каналу, якщо це можливо зробити одразу. Операція повертає значення у випадку вдачі, або nil. На відміну від take!, poll! не розрізняє відкриті та закриті канали.

(require '[cljs.core.async :refer [chan offer! poll!]])

(def ch (chan 1))

(poll! ch)
;; => nil

(offer! ch 42)
;; => true

(poll! ch)
;; => 42

Процеси

Ми вже знайомі з каналами, але нам бракує важливої деталі: процесів. Процеси - це елементи логіки, що виконуються незалежно та використовують канали для комунікації та координації. Операції з розміщення та читання значень всередині процесів зупинять процес до моменту завершення операції. Завершення процесу не блокує єдиний потік у середовищі виконання ClojureScript. Натомість процес продовжиться пізніше після виконання операції, на яку він очікує.

Процеси запускаються за допомогою макросів go, а операції з розміщення та читання значень використовують синтаксичні заповнювачі <! та >!. Макрос go переписує код для використання колбеків, але всередині go код виглядає як синхронний, що спрощує розуміння такого коду:

(require '[cljs.core.async :refer [chan <! >!]])
(require-macros '[cljs.core.async.macros :refer [go]])

(enable-console-print!)

(def ch (chan))

(go
  (println [:a] "Gonna take from channel")
  (println [:a] "Got" (<! ch)))

(go
  (println [:b] "Gonna put on channel")
  (>! ch 42)
  (println [:b] "Just put 42"))

;; [:a] Gonna take from channel
;; [:b] Gonna put on channel
;; [:b] Just put 42
;; [:a] Got 42

У попередньому прикладі ми запустили процес за допомогою функції go, якій передали значення від ch, що її було виведено у консоль. Значення не буде доступний одразу, тому процес зупиниться. Після цього ми запустимо інший процес, що розміщує значення у канал.

Через незавершену операцію з читання операція з розміщення одразу успішно завершиться, а значення буде доставлене до першого процесу, після чого обидва процеси завершаться.

Два блоки go виконуються незалежно один від одного та асинхронно, але виглядають як синхронний код. Блоки у наведеному прикладі досить прості, але можливість писати код у вигляді процесів, що виконуються паралельно та координуються через канали - це дуже сильний інструмент для реалізації складних асинхронних процесів. Канали також пропонують чудові можливості для ослаблення звʼязків між сутностями, що створюють значення, та сутностями, що їх споживають.

Також процеси можуть чекати визначений проміжок часу. Існує функція timeout, значення якої є канал, що буде закритий через визначену кількість мілісекунд. Поєднання каналів, що казриються через визначений час, та операцій з читання даних всередині блоку запуску процесу дає можливість призупинення:

(require '[cljs.core.async :refer [<! timeout]])
(require-macros '[cljs.core.async.macros :refer [go]])

(enable-console-print!)

(defn seconds
[](.getSeconds 'js/Date.'))

(println "Launching go block")

(go
(println [:a] "Gonna take a nap" (seconds))
(<! (timeout 1000))
(println [:a] "I slept one second, bye!" (seconds)))

(println "Block launched")

;; Launching go block
;; Block launched
;; [:a] Gonna take a nap 9
;; [:a] I slept one second, bye! 10

У виведених повідомленнях видно, що процеси нічого не робить одну секунду, коли блокує операцію з читання у каналі. Програма продовжує виконання, а за секунду процес відновлюється та завершується.

Вибір

Окрім розміщення та читання одного значення за раз всередині блоку можливо робити не детермінований вибір одного з багатьох операцій у каналах за допомогою alts!. Функція alts! очікує серію операцій з розміщення та читання значень у каналах (ми також можемо спробувати розмістити значення та прочитати значення одночасно) та виконує операцію за готовністю. Якщо при виклику alts! можуть бути виконано більш ніж одну операцію, за замовчування alts! робить вибір за псевдо-випадковою процедурою.

Досить просто спробувати виконати операцію у каналі та скасувати її через певних проміжок часу за допомогою функцій timeout та alts!:

(require '[cljs.core.async :refer [chan <! timeout alts!]])
(require-macros '[cljs.core.async.macros :refer [go]])

(enable-console-print!)

(def ch (chan))

(go
(println [:a] "Gonna take a nap")
(<! (timeout 1000))
(println [:a] "I slept one second, trying to put a value on channel")
(>! ch 42)
(println [:a] "I'm done!"))

(go
(println [:b] "Gonna try taking from channel")
(let [cancel (timeout 300)
[value ch] (alts! [ch cancel])]
(if (= ch cancel)
(println [:b] "Too slow, take from channel cancelled")
(println [:b] "Got" value))))

;; [:a] Gonna take a nap
;; [:b] Gonna try taking from channel
;; [:b] Too slow, take from channel cancelled
;; [:a] I slept one second, trying to put a value on channel

У попередньому прикладі ми запустили блок процесу, який чекає одну секунду та розміщує значення у каналі ch. Інший блок створює канал cancel, який буде закрито за 300 мілісекунд. Після цього буде спроба прочитати значення з обох каналів за допомогою alts!, що відбудеться як тільки стане можливим прочитати значення з будь-якого каналу. Через те, що канал cancel буде закрито за 300 мілісекунд, alts! буде успішно завершена, бо операції читання з закритого каналу повертають спеціальне значення nil. Зауважимо, що значення функції alts! - це вектор з двох елементів, що містить значення операції та канал, з якого це значення було отримане.

Таким чином ми маємо змогу визначити, з якого саме каналу було отримане значення - cancel або ch. Пропоную вам скопіювати цей приклад та змінити таймаут першого процесу на 100 мілісекунд, щоб побачити успішне завершення операції читання з каналу ch.

Тепер ми знаємо, як вибрати між двох операцій читання, тому розглянемо, як виразити операцію запису за певної умови у alts!. Необхідно навести канал та значення, для спроби розміщення. Скористаємося вектором з двох елементів, що містить канал та значення, що представляє операцію запису.

Розглянемо приклад:

(require '[cljs.core.async :refer [chan <! alts!]])
(require-macros '[cljs.core.async.macros :refer [go]])

(enable-console-print!)

(def a-ch (chan))
(def another-ch (chan))

(go
(println [:a] "Take a value from `a-ch`")
(println [:a] "Got" (<! a-ch))
(println [:a] "I'm done!"))

(go
(println [:b] "Take a value from `another-ch`")
(println [:a] "Got" (<! another-ch))
(println [:b] "I'm done!"))

(go
(println [:c] "Gonna try putting in both channels simultaneously")
(let [[value ch] (alts! [[a-ch 42][another-ch 99]])]
(if (= ch a-ch)
(println [:c] "Put a value in `a-ch`")
(println [:c] "Put a value in `another-ch`"))))

;; [:a] Take a value from `a-ch`
;; [:b] Take a value from `another-ch`
;; [:c] Gonna try putting in both channels simultaneously
;; [:c] Put a value in `a-ch`
;; [:a] Got 42
;; [:a] I'm done!

При виконанні цього прикладу успішно виконується лише операція розміщення значення до каналу a-ch. Обидва канали готові прийняти значення, коли виконується alts!, ви можете отримати інший результат виконання цього коду.

Пріоритет

alts! за замовчуванням виконує недетермінований вибір , коли кілька операцій готові до виконання. Натомість можна вказати пріоритети операцій за допомогою опції :priority до функції alts!. Якщо :priority має значення true та більш ніж одна операція готова до виконання, такі операції будуть виконані у порядку.

(require '[cljs.core.async :refer [chan >! alts!]])
(require-macros '[cljs.core.async.macros :refer [go]])

(enable-console-print!)

(def a-ch (chan))
(def another-ch (chan))

(go
(println [:a] "Put a value on `a-ch`")
(>! a-ch 42)
(println [:a] "I'm done!"))

(go
(println [:b] "Put a value on `another-ch`")
(>! another-ch 99)
(println [:b] "I'm done!"))

(go
(println [:c] "Gonna try taking from both channels with priority")
(let [[value ch] (alts! [a-ch another-ch] :priority true)]
(if (= ch a-ch)
(println [:c] "Got" value "from `a-ch`")
(println [:c] "Got" value "from `another-ch`"))))

;; [:a] Put a value on `a-ch`
;; [:a] I'm done!
;; [:b] Put a value on `another-ch`
;; [:b] I'm done!
;; [:c] Gonna try taking from both channels with priority
;; [:c] Got 42 from `a-ch`

a-ch та another-ch мають значення для читання, коли було виконано alts! та ми вказуємо, що значенням :priority істинне, тому a-ch має преференції. можна видалити опцію :priority та виконати приклад кілька разів, щоб побачити, як бех цієї опції alts! робить недетермінований вибір.

Значення за замовчуванням

Іншим цікавим аспектом alts! є те, що вона може одразу завершити виконання, якщо жодна операція не готова та ми передали значення за замовчуванням. Ми можемо зробити вибір між операціями тільки у тому випадку, якщо хоча б одна з них готова, або повернути значення за замовчуванням.

(require '[cljs.core.async :refer [chan alts!]])
(require-macros '[cljs.core.async.macros :refer [go]])

(def a-ch (chan))
(def another-ch (chan))

(go
(println [:a] "Gonna try taking from any of the channels without blocking")
(let [[value ch] (alts! [a-ch another-ch] :default :not-ready)]
(if (and (= value :not-ready)
(= ch :default))
(println [:a] "No operation is ready, aborting")
(println [:a] "Got" value))))

;; [:a] Gonna try taking from any of the channels without blocking
;; [:a] No operation is ready, aborting

У цьому прикладі видно, що, якщо жодна операція не готова до виконання, alts! повертає те значення, яке ми передали після ключа :default у момент виклику, а канал - це сам :default.

Комбінатори

Після знайомства з каналами та процесами можна переходити до знайомства з певними цікавими комбінаторами для роботи з каналами у core.async. Це розділ включає короткий опис таких можливостей із простими прикладами використання.

pipe

pipe тримає вхідний та вихідний канал та передає усі значення, розміщені у вхідному каналі, до вихідного каналу. Вихідний канал буде закрито, як тільки закритий вхідних канал, якщо третім аргументом до pipe не було передано значення false:

(require '[cljs.core.async :refer [chan pipe put! <! close!]])
(require-macros '[cljs.core.async.macros :refer [go-loop]])

(def in (chan))
(def out (chan))

(pipe in out)

(go-loop [value (<! out)]
(if (nil? value)
(println [:a] "I'm done!")
(do
(println [:a] "Got" value)
(println [:a] "Waiting for a value")
(recur (<! out)))))

(put! in 0)
;; => true
(put! in 1)
;; => true
(close! in)

;; [:a] Got 0
;; [:a] Waiting for a value
;; [:a] Got 1
;; [:a] Waiting for a value
;; [:a] I'm done!

У цьому прикладі ми використали go-loop для читання рекурсивного значень до моменту, коли канал out буде закрито. Зауважимо, що при закритті каналу in канал out також закриється, завдяки чому go-loop буде завершено.

pipeline-async

pipeline-async отримує число для управління паралелізмом, вихідний канал, асинхронну функцію та вхідний канал. Асинхронна функція має два аргументи: значення, що було розміщено у вхідному каналі, та канал, до якого слід розмістити результат асинхронної операції, після чого канал буде закрито. Число вказує на кількість блоків, що будуть одночасно використані для виклику асинхронної функції із вхідними значеннями.

Вихідний канал отримає результати у порядку, відносному до вхідних каналів, незалежно від часу, необхідного для завершення кожної з асинхронних операцій. Останній опціональний параметр вказує на те, чи буде канал закритий після закриття вихідного каналу. За замовчуванням значення цього параметру true.

(require '[cljs.core.async :refer [chan pipeline-async put! <! close!]])
(require-macros '[cljs.core.async.macros :refer [go-loop]])

(def in (chan))
(def out (chan))
(def parallelism 3)

(defn wait-and-put [value ch]
(let [wait (rand-int 1000)]
(js/setTimeout (fn []
(println "Waiting" wait "miliseconds for value" value)
(put! ch wait)
(close! ch))
wait)))

(pipeline-async parallelism out wait-and-put in)

(go-loop [value (<! out)]
(if (nil? value)
(println [:a] "I'm done!")
(do
(println [:a] "Got" value)
(println [:a] "Waiting for a value")
(recur (<! out)))))

(put! in 1)
(put! in 2)
(put! in 3)
(close! in)

;; Waiting 164 miliseconds for value 3
;; Waiting 304 miliseconds for value 2
;; Waiting 908 miliseconds for value 1
;; [:a] Got 908
;; [:a] Waiting for a value
;; [:a] Got 304
;; [:a] Waiting for a value
;; [:a] Got 164
;; [:a] Waiting for a value
;; [:a] I'm done!
pipeline

pipeline та pipeline-async схожі, але замість асинхронної функції pipeline отримує трансдʼюсер. Трансдʼюсер буде застосовано до кожного вхідного значення окремо.

(require '[cljs.core.async :refer [chan pipeline put! <! close!]])
(require-macros '[cljs.core.async.macros :refer [go-loop]])

(def in (chan))
(def out (chan))
(def parallelism 3)

(pipeline parallelism out (map inc) in)

(go-loop [value (<! out)]
(if (nil? value)
(println [:a] "I'm done!")
(do
(println [:a] "Got" value)
(println [:a] "Waiting for a value")
(recur (<! out)))))

(put! in 1)
(put! in 2)
(put! in 3)
(close! in)

;; [:a] Got 2
;; [:a] Waiting for a value
;; [:a] Got 3
;; [:a] Waiting for a value
;; [:a] Got 4
;; [:a] Waiting for a value
;; [:a] I'm done!
split

split отримує функцію-предикат та канал, а його значенням є вектор, що містить два канали. Перший канал отримує значення, для якого предикат є істинним, другий - хибним. Ми також можемо передати буфер або число для каналів із третім або четвертим аргументом.

(require '[cljs.core.async :refer [chan split put! <! close!]])
(require-macros '[cljs.core.async.macros :refer [go-loop]])

(def in (chan))
(def chans (split even? in))
(def even-ch (first chans))
(def odd-ch (second chans))

(go-loop [value (<! even-ch)]
(if (nil? value)
(println [:evens] "I'm done!")
(do
(println [:evens] "Got" value)
(println [:evens] "Waiting for a value")
(recur (<! even-ch)))))

(go-loop [value (<! odd-ch)]
(if (nil? value)
(println [:odds] "I'm done!")
(do
(println [:odds] "Got" value)
(println [:odds] "Waiting for a value")
(recur (<! odd-ch)))))

(put! in 0)
(put! in 1)
(put! in 2)
(put! in 3)
(close! in)

;; [:evens] Got 0
;; [:evens] Waiting for a value
;; [:odds] Got 1
;; [:odds] Waiting for a value
;; [:odds] Got 3
;; [:odds] Waiting for a value
;; [:evens] Got 2
;; [:evens] Waiting for a value
;; [:evens] I'm done!
;; [:odds] I'm done!
reduce

reduce отримує функцію-редʼюсер, вихідне значення та канал. Значенням reduce є канал, що містить результат обробки усіх значень, розміщених у вхідному каналі до закриття. Обробка починається з переданого вихідного значення.

(require '[cljs.core.async :as async :refer [chan put! <! close!]])
(require-macros '[cljs.core.async.macros :refer [go]])

(def in (chan))

(go
(println "Result" (<! (async/reduce + (+) in))))

(put! in 0)
(put! in 1)
(put! in 2)
(put! in 3)
(close! in)

## ;; Result: 6
onto-chan

onto-chan отримує канал та колекцію та розміщує зміст колекції у канал. Після цього канал буде закрито, але onto-chan приймає третім аргументом значення, що вказує на необхідність закриття каналу. Перепишемо попередній приклад за допомогою onto-chan:

(require '[cljs.core.async :as async :refer [chan put! <! close! onto-chan]])
(require-macros '[cljs.core.async.macros :refer [go]])

(def in (chan))

(go
(println "Result" (<! (async/reduce + (+) in))))

(onto-chan in [0 1 2 3])

## ;; Result: 6
to-chan

to-chan отримує колекцію та повертає канал, що якого буде розміщено кожне значення з колекції, після чого канал буде закрито.

(require '[cljs.core.async :refer [chan put! <! close! to-chan]])
(require-macros '[cljs.core.async.macros :refer [go-loop]])

(def ch (to-chan (range 3)))

(go-loop [value (<! ch)]
(if (nil? value)
(println [:a] "I'm done!")
(do
(println [:a] "Got" value)
(println [:a] "Waiting for a value")
(recur (<! ch)))))

;; [:a] Got 0
;; [:a] Waiting for a value
;; [:a] Got 1
;; [:a] Waiting for a value
;; [:a] Got 2
;; [:a] Waiting for a value
;; [:a] I'm done!
merge

merge отримує колекцію вхідних значень та повертає канал, куди він розмістить кожне зі значень, що потрапляють до вхідних каналів. Цей канал буде закрито, коли усі вхідні канали буде закрито. За замовчування канал не буде буферизований, але останнім аргументом можна передати буфер або число.

(require '[cljs.core.async :refer [chan put! <! close! merge]])
(require-macros '[cljs.core.async.macros :refer [go-loop]])

(def in1 (chan))
(def in2 (chan))
(def in3 (chan))

(def out (merge [in1 in2 in3]))

(go-loop [value (<! out)]
(if (nil? value)
(println [:a] "I'm done!")
(do
(println [:a] "Got" value)
(println [:a] "Waiting for a value")
(recur (<! out)))))

(put! in1 1)
(close! in1)
(put! in2 2)
(close! in2)
(put! in3 3)
(close! in3)

;; [:a] Got 3
;; [:a] Waiting for a value
;; [:a] Got 2
;; [:a] Waiting for a value
;; [:a] Got 1
;; [:a] Waiting for a value
;; [:a] I'm done!

Абстракції вищого рівня

Ми познайомилися з примітивами низького рівня, представленими у core.async, та з комбінаторами, доступними для каналів. Також core.async пропонує деякі корисні абстракції вищого рівня понад каналами. Такі абстракції можуть виступати як матеріал для побудови більш складних функцій програми..

Mult

Коли значення з каналу слід передати у численні інші канали, можна використати функцію mult для створення мультиплікатора наданого каналу. Після створення мультиплікатора ми можемо приєднувати канали до нього за допомогою tap та відʼєднувати за допомогою untap. Мультиплікатори також дозволяють відʼєднати усі підʼєднані канали одним кроком за допомогою untap-all.

Кожне значення, що потрапляє до каналу, на базі якого створений мультиплікатор, розповсюджується по усім приєднаним каналам, і усі ці канали мають прийняти значення перед тим, як буде розповсюджене наступне значення. Для попередження блокування каналу через повільне отримання повідомлень окремими отримувачами слід обережено використовувати буферизації на каналах, що отримують значення.

Закриті підʼєднані канали автоматично видаляються з мультиплікатора. Якщо підʼєднаних каналів немає, розміщені у вихідному каналі значення відкидаються.

(require '[cljs.core.async :refer [chan put! <! close! timeout mult tap]])
(require-macros '[cljs.core.async.macros :refer [go-loop]])

;; Source channel and mult
(def in (chan))
(def m-in (mult in))

;; Sink channels
(def a-ch (chan))
(def another-ch (chan))

;; Taker for `a-ch`
(go-loop [value (<! a-ch)]
(if (nil? value)
(println [:a] "I'm done!")
(do
(println [:a] "Got" value)
(recur (<! a-ch)))))

;; Taker for `another-ch`, which sleeps for 3 seconds between takes
(go-loop [value (<! another-ch)]
(if (nil? value)
(println [:b] "I'm done!")
(do
(println [:b] "Got" value)
(println [:b] "Resting 3 seconds")
(<! (timeout 3000))
(recur (<! another-ch)))))

;; Tap the two channels to the mult
(tap m-in a-ch)
(tap m-in another-ch)

;; See how the values are delivered to `a-ch` and `another-ch`
(put! in 1)
(put! in 2)

;; [:a] Got 1
;; [:b] Got 1
;; [:b] Resting for 3 seconds
;; [:a] Got 2
;; [:b] Got 2
;; [:b] Resting for 3 seconds
Публікація-підписка (pub-sub)

Після знайомства з мультиплікаторами ви вже можете уявити можливі способи реалізації абстракції публікації-підписки на базі функцій mult, tap та untap, але цей механізм комунікацій широко розповсюджений, тому він вже реалізований у core.async.

Замість створення мультиплікатора з вихідного каналу ми створимо публікацію за допомогою функції pub, яка отримує як аргументи канал та функцію, що буде використовуватися для отримання теми повідомлення.

Підписатися на публікацію можна за допомогою sub, яка очікує публікацію, на яку слід підписатися, тему та канал, до якого слід розмістити повідомлення, що мають визначену тему. Зауважимо, що можна підписати канал на кілька тем.

Функція unsub може отримати публікацію, тему та канал і відписати вказаний канал від теми. Також існує unsub-all, що отримує публікацію та тему та відписує усі канали від вказаної теми.

(require '[cljs.core.async :refer [chan put! <! close! pub sub]])
(require-macros '[cljs.core.async.macros :refer [go-loop]])

;; Source channel and publication
(def in (chan))
(def publication (pub in :action))

;; Sink channels
(def a-ch (chan))
(def another-ch (chan))

;; Channel with `:increment` action
(sub publication :increment a-ch)

(go-loop [value (<! a-ch)]
(if (nil? value)
(println [:a] "I'm done!")
(do
(println [:a] "Increment:" (inc (:value value)))
(recur (<! a-ch)))))

;; Channel with `:double` action
(sub publication :double another-ch)

(go-loop [value (<! another-ch)]
(if (nil? value)
(println [:b] "I'm done!")
(do
(println [:b] "Double:" (\* 2 (:value value)))
(recur (<! another-ch)))))

;; See how values are delivered to `a-ch` and `another-ch` depending on their action
(put! in {:action :increment :value 98})
(put! in {:action :double :value 21})

;; [:a] Increment: 99
;; [:b] Double: 42
Змішувачі

З розділу про комбінатори у стандартній бібліотеці core.async нам відомо, що для поєднання кількох каналів в один ми можемо скористатися функцією merge. При поєднання (злитті) каналів усі значення, розміщені вхідному каналі, потраплять по обʼєднаного каналу. Але у певних ситуаціях нам потрібен більш гнучкий контроль над значеннями, що потраплять до вихідного каналу. У подібних ситуаціях зручно використовувати змішувачі.

Стандартна бібіліотека core.async пропонує абстракцію змішувачів, за допомогою якої можна поєднати кілька вхідних каналів в один вихідний. Також за допомогою змішувачів можна блокувати, ставити канали на паузу або слухати лише обрані канали.

Для створення зімувача потрібні вихідний канал на функція mix. Маючи змішувач, ми можемо додати вхідні канали за допомогою admix, видалити канали за допомогою unmix, або видалити усі вхідні канали за допомогою unmix-all.

Для управління станом вхідних каналів ми використовуємо функцію toggle, що очікує змішувач та мапу (відображення) каналів на стани цих каналів. Зауважимо, що можна додавати канали для суміші за допомогою toggle, бо відображення буде поєднане з поточним станом суміші. Стан каналу є відображенням, що може мати ключі :mute, :pause та :solo з відповідними булевими значеннями.

Розглянемо що означають ці операції з каналами - блокування, призупинення та виділення одного каналу:

  • Заблокований канал вводу означає, що значення з каналу можна приймати, але такі значення не будуть передаватися до вихідного каналу. Таким чином, поки канал заблокований, усі розміщені у ньому значення будуть відкинуті.
  • Призупинити вхідний канал означає, що з такого каналу неможливо прочитати значення. Значення, розміщені у такому каналі, не будуть ані передані до вихідного каналу, ані відкинуті.
  • Виділення одного чи більше каналів означає, що вихідні канали отримають лише значення, розміщені у виділених каналах. За замовчення не виділені канали блокуються, але можна використати solo-mode та приймати рішення щодо блокування чи призупинення не виділених каналів.

Ви отримали багато нової інформації про канали, тому для кращого засвоєння розглянемо приклад. Перш за все, налаштуємо змішувач з вихідним каналом out та трьома вхідними каналами. Після цього ми будемо виводити усі значення, отримані вихідним каналом, для демонстрації управління роботою вхідних каналів:

(require '[cljs.core.async :refer [chan put! <! close! mix admix
unmix toggle solo-mode]])
(require-macros '[cljs.core.async.macros :refer [go-loop]])

;; Output channel and mixer
(def out (chan))
(def mixer (mix out))

;; Input channels
(def in-1 (chan))
(def in-2 (chan))
(def in-3 (chan))

(admix mixer in-1)
(admix mixer in-2)
(admix mixer in-3)

;; Let's listen to the `out` channel and print what we get from it
(go-loop [value (<! out)]
(if (nil? value)
(println [:a] "I'm done")
(do
(println [:a] "Got" value)
(recur (<! out)))))

За замовченням кожне значення у каналі вводу буде розміщене у канал out:

(do
(put! in-1 1)
(put! in-2 2)
(put! in-3 3))

;; [:a] Got 1
;; [:a] Got 2
;; [:a] Got 3

поставимо канал in-2 на паузу, розмістимо значення у кожному з каналів та знову запустимо in-2:

(toggle mixer {in-2 {:pause true}})
;; => true

(do
(put! in-1 1)
(put! in-2 2)
(put! in-3 3))

;; [:a] Got 1
;; [:a] Got 3

(toggle mixer {in-2 {:pause false}})

## ;; [:a] Got 2

Як видно з попереднього прикладу, значення, розміщені у каналах, поставлених на паузу, не буде відкинуто. Для відхилення значень, розміщених у каналі, слід заблокувати канал. Розглянемо приклад:

(toggle mixer {in-2 {:mute true}})
;; => true

(do
(put! in-1 1)
(put! in-2 2) ;; `out` will never get this value since it's discarded
(put! in-3 3))

;; [:a] Got 1
;; [:a] Got 3

## (toggle mixer {in-2 {:mute false}})

Ми розмістили значення 2 у каналі in-2. Цей канал був заблокований, тому значення було відкинуто та ніколи не потрапило до каналу out. Розглянемо треті стан каналу у змішувачі - виділений канал.

Ми вже згадували про те, що виділення одного каналу у змішувачі передбачає те, що за замовчуванням усі інші канали будуть заблоковані:

(toggle mixer {in-1 {:solo true}
in-2 {:solo true}})
;; => true

(do
(put! in-1 1)
(put! in-2 2)
(put! in-3 3)) ;; `out` will never get this value since it's discarded

;; [:a] Got 1
;; [:a] Got 2

(toggle mixer {in-1 {:solo false}
in-2 {:solo false}})

Але ми можемо визначати режим для невиділених каналів на час існування виділених. Визначимо, що за замовчування невиділені канали будуть призупинятися:

(solo-mode mixer :pause)
;; => true
(toggle mixer {in-1 {:solo true}
in-2 {:solo true}})
;; => true

(do
(put! in-1 1)
(put! in-2 2)
(put! in-3 3))

;; [:a] Got 1
;; [:a] Got 2

(toggle mixer {in-1 {:solo false}
in-2 {:solo false}})

## ;; [:a] Got 3

results matching ""

    No results matching ""