Babel Coder

Reactive Programming คืออะไร? รู้จัก RxJS และการสร้าง Observables

intermediate

Reactive Programming เป็นศาสตร์ที่จำเป็นต้องรู้ครับ ไม่ว่าคุณจะเขียนเว็บแอพพลิเคชันด้วย Angular 2+, นักพัฒนาแอนดรอยที่ใฝ่ฝันชีวิตที่ง่ายขึ้นในการทำงานกับ Asynchonous หรือกระทั่งอยากเจริญรอยตามทีมงานเท่ๆแบบ Netflix คุณยิ่งควรรู้จัก Reactive Programming และการใช้งาน RxJS เข้าไปใหญ่

บทความนี้เราจะไปรู้จักกันครับว่า Reactive Programming คืออะไร? การใช้งาน RxJS เพื่อการบรรลุจุดสุดยอดแห่งศาสตร์ Reactive Programming ต้องทำอย่างไร ผมสัญญาว่าชุดบทความนี้จะทำให้คุณได้นิพพานในชาตินี้แน่นอน…ฟันธง!

สารบัญ

ปัญหาของการเปลี่ยนแปลงค่าข้อมูล

ธรรมชาติของการประกาศตัวแปรของเรามักอ้างอิงจากค่าอื่น เช่น

let height = 2
let base = 5

let area = 0.5 * height * base // area = 5

area เป็นตัวแปรสำหรับเก็บค่าพื้นที่ของสามเหลี่ยมที่ขึ้นตรงต่อค่าของ height และ area เรากล่าวว่าตัวแปลทั้งสองทำให้เกิดผลลัพธ์คือ area เมื่อเป็นเช่นนี้คำถามคือถ้าเราเปลี่ยนค่าตัวแปรตัวใดตัวหนึ่ง ค่าของ area ควรจะเปลี่ยนตามไหม?

let height = 2
let base = 5

let area = 0.5 * height * base

// เปลี่ยนค่า base ซักนิดดีกว่า
base = 10

// แบบนี้ area จะเปลี่ยนไหม?

ด้วยสามัญสำนึกของการเขียนโปรแกรม เราทราบทันทีว่าแม้ค่า base จะเปลี่ยน แต่ area ยังเป็นค่า 5 เช่นเดิมนั่นเพราะ area ของเราอิงตามค่า base ตัวก่อนหน้าคือ 5 นั่นเอง

หากตอนนี้เราอยากคำนวณค่า area ตาม base ตัวใหม่จะทำอย่างไร? ง่ายมากก็เขียนสมการคำนวณอีกซักรอบเป็นไง นี่มันโจทย์อนุบาลหมีน้อยสุดๆ

let height = 2
let base = 5

let area = 0.5 * height * base

// เปลี่ยนค่า base ซักนิดดีกว่า
base = 10

// ตอนนี้ area ของเราก็จะคำนวณใหม่แล้ว
area = 0.5 * height * base

ปัดโถ่ เดี๋ยวก็ทุ้มด้วยโพเดี้ยมซะนิ! เราเป็น smart programmer นะ ไม่ควรจะมานั่งคำนวณซ้ำซ้อนแบบนี้อีกแล้ว เมื่อ height หรือ base เปลี่ยน area ของเราก็ควรคำนวณใหม่ตามการเปลี่ยนแปลงนั้น และนี่คือหลักการของคำว่า Reactive นั่นเอง

Reactive Programming คืออะไร

จากปัญหาดังกล่าว ถ้าเราพิจารณาใหม่ว่าค่าของตัวแปรต่างๆสามารถปล่อยออกมาในช่วงเวลาใดๆก็ได้ เราจะได้เส้นสายแสดงข้อมูลที่ปล่อยออกมาในช่วงเวลาหนึ่ง ดังนี้


height:  ----2-------------------

base:    ----5----10-------------

   area = 0.5 x height x base

area:    ----5----10-------------

เส้นสายของข้อมูลที่สัมพันธ์กับเวลานี่หละครับ เราเรียกว่า Stream

จากแผนภาพข้างต้นเราจะค้นพบว่า Stream ของตัวแปร area เกิดจากค่าล่าสุดของ Stream height และ Stream ของ base เมื่อ base เปลี่ยนเป็น 10 ค่าใหม่ของ area จึงนำค่าล่าสุดของ height คือ 2 มาคำนวณคู่กับค่าล่าสุดของ base คือ 10 ลักษณะการทำงานของ Stream ดังกล่าวคือลักษณะการทำงานของ Reactive Programming นั่นเอง

จากแผนภาพเราพบว่าข้อมูลของเราถูกปล่อยในช่วงเวลาต่างๆ การคำนวณของ area จึงไม่ใช่การคำนวณที่ต่อเนื่อง หากแต่เป็นการคำนวณที่ขึ้นอยู่กับช่วงเวลาที่ Stream อื่นมีการปล่อยค่าออกมา เราจึงกล่าวได้ว่าการทำงานนี้เป็นการทำงานแบบ Asynchronous

หมายเหตุ การทำงานของ Stream ไม่จำเป็นต้องเป็น Asynchronous เสมอ ดูเพิ่มเติมในเรื่องของ Observables

ทั้งหมดทั้งมวลจากสองย่อหน้าจึงนำไปสู่บทนิยามของ Reactive Programming นั่นคือ

Reactive Programming คือรูปแบบหนึ่งของการเขียนโปรแกรมที่สนใจข้อมูลบน Stream ที่มีการทำงานแบบ Asynchronous รวมถึงการเปลี่ยนแปลงของข้อมูลใน Stream ที่ส่งผลต่อการเปลี่ยนแปลงของ Stream อื่น

เพื่อที่จะให้การเขียนโปรแกรมแบบ Reactive เป็นผลเร็จ มีสองสิ่งที่โปรแกรมเมอร์อย่างเราพึงทำครับ นั่นคือ

  1. สร้าง Stream ขึ้นมาซะก่อนซิ อยากแก้ปัญหาอะไรก็สร้าง Stream นั้นเพื่อแทนข้อมูล
  2. มีแค่ Stream มันจืดชืด เราต้องมี Operator คือตัวดำเนินการระหว่าง Stream ด้วย ในกรณีของ area การแปลง height และ base เป็น area ด้วยการนำ 0.5 x height x base คือ Operator ที่ทำงานบน Stream ของ height และ base นั่นเอง

ข้อดีของ Reactive Programming

อธิบายความหมายของ Reactive Programming กันไปแล้ว แต่เพื่อนๆหลายๆคนคงยังไม่เห็นความดีงามพระราม 8 เป็นแน่ เพื่อจะอวยไส้แตกแหกไส้ฉีก เราจะมาดูข้อดีของการโปรแกรมแบบ Reactive ที่แม้แต่ทีมพี่ลูกเกตก็ให้ไม่ได้

Push ปะทะ Pull

ES2015 เรามีหนึ่งฟีเจอร์ทางภาษาที่เรียกว่า Generator ซึ่งเป็นฟังก์ชันพิเศษที่อนุญาตให้ปลายทางดูดค่าที่เตรียมไว้ได้ แหมฟังแล้วโค้ดปลายทางก็เหมือนแดร็กคูล่าดีๆนี่เอง สำหรับเหยื่อที่น่าสงสารอย่าง Generator ก็แค่เตรียมเลือดชั้นดีที่อยากให้โดนดูดจ๊วบๆออกไป

function *double() {
  let currentNum = 0
  
  while(true) {
    yield currentNum
    currentNum += 2
  }
}

const doubleGenerator = double()

console.log(doubleGenerator.next().value) // 0
console.log(doubleGenerator.next().value) // 2
console.log(doubleGenerator.next().value) // 4

สองสิ่งที่เป็นตัวบ่งชี้ว่าฟังก์ชันของเราเป็น Generator นั่นคือการประกาศฟังก์ชันด้วย function * และอีกสิ่งคือการใช้ yield

Generator ก็เหมือนเหยื่อครับ คงไม่มีมนุษย์ป้าอกอึ๋มที่ไหนเดินเข้าไปหาแดร็กคูล่าใช่ไหม ความจริงผู้ล่าของเราต่างหากที่ต้องไล่หาเลือด

การเรียกใช้ doubleGenerator คือแดร็กคิวล่าครับ เราต้องการเลือดของเหยื่อหยดถัดไป เราจึงออกคำสั่ง next() เมื่อเหยื่อถูกคมเขี้ยวอันแหลมคมกัดลงไปที่ต้นคอ เลือดที่เตรียมไว้ผ่าน yield จึงได้ไหลรินออกมา แน่นอนว่าการกัดเพื่อดูดแต่ละครั้งคงไม่ได้ออกมาแค่เลือด อย่างน้อยๆคงต้องมาพร้อมขี้ไคล คราบเหงื่อ ยี้~ แค่คิดก็เหม็นเปรี้ยวละ เมื่อเป็นเช่นนี้เราจึงต้องเรียก value เพื่อเป็นการกรองเอาเฉพาะค่าของมันซึ่งก็คือเลือดที่เราสนใจไร้กลิ่นเปรี้ยวเท่านั้น

กล่าวโดยสรุป เราจะเห็นว่าการใช้งาน Generator คือการเขียนโปรแกรมเพื่อดึงข้อมูลที่เราสนใจออกมานั่นเอง เราจึงเรียกวิธีการของ Generator เช่นนี้ว่า Pull Style

กรณีของ Reactive Programming ด้วย Stream เราพบว่าโค้ดปลายทางของเราไม่ใช่แดร็กคิวล่าครับ เราไม่ต้องออกล่าเหยื่อให้เสียแรง เราแค่นอนอ้าปากพะงาบๆอยู่ในอ่างจากุชชี่ เมื่อไหร่ที่ข้อมูลพร้อม เลือดเหล่านั้นก็จะไหลรินหลั่งลงสู่คอหอยเอง

// ยังไม่ต้องสนใจในรายละเอียดนะครับ
// ทราบเพียงว่า Stream นี้มีการปล่อยค่าเลขคู่ทุกๆ 200 ms เป็นพอ
const doubleStream = Rx.Observable.interval(200).filter(x => x % 2 === 0)

// เราไม่ต้องเหนื่อยหาเหยื่อ เหยื่อวิ่งเอาเลือดมาให้เรากินเอง
// เมื่อไหร่ที่ doubleStream มีการปล่อยค่าออกมา
// มันจะไปเรียก console.log ให้เราอัตโนมัติ
// ด้วยวิธีการนี้จึงกล่าวได้ว่า console.log ของเราอยู่กับที่
// แต่ข้อมูลต่างหากที่วิ่งเข้ามาหา console.log เอง
doubleStream.subscribe(console.log)

ด้วยกลเม็ดของ Reactive Programming นี่หละครับที่ทำให้เหยื่อต้องยัดเยียดข้อมูลมาให้กับเรา เราจึงกล่าวว่าวิธีการดังกล่าวเป็นแบบ Push Style นั่นเอง

นิยามพฤติกรรมตั้งแต่เริ่มสร้าง Stream

ในกระบวนการเขียนโปรแกรมแบบ Imperative เราสร้างพฤติกรรมเมื่อเราต้องการให้เกิดการทำงานเช่นนั้น หากเราต้องการให้ทุกๆ 1 วินาทีมีการพิมพ์ตัวเลขออกมาตั้งแต่ 0 เป็นต้นไป เราอาจเขียนโปรแกรมเช่นนี้

let num = 0

// นิยามพฤติกรรมคือการพิมพ์ค่าออกมาตั้งแต่ 0 และเพิ่มค่าเรื่อยๆในการพิมพ์ครั้งถัดไป
// พฤติกรรมนี้เรานิยามเมื่อเราต้องการใช้งานมัน
setInterval(
  () => console.log(num++), 
  1000
)

สำหรับวิธีการของ Reactive Programming เราจะนิยามพฤติกรรมตั้งแต่ตอนสร้าง Stream เลยครับ

// สร้าง Stream ปุ๊บ บอกทันทีเลยว่าเจ้าจงปล่อยข้อมูลทุกๆ 1 วินาทีซะนะ
// ด้วยวิธีการนี้ จึงเป็นการประกาศพฤติกรรมตั้งแต่ต้น
const doubleStream = Rx.Observable.interval(1000)

doubleStream.subscribe(console.log)

ตัวอย่างข้างบนอาจดูง่ายไปใช่ไหมครับ งั้นเรามาพิจารณาตัวอย่างในชีวิตจริงกันดีกว่า

ถ้าเราต้องการสร้างช่องค้นหา โดยมีเงื่อนไขดังนี้

  • เมื่อพิมพ์ค่าอะไรลงไป เราจะนำค่าดังกล่าวส่งไปค้นหาข้อมูลจากเซิฟเวอร์
  • แต่ถ้าเราพิมพ์ปุ๊บแล้วส่งทันที มันก็คือการระดมยิงเซิฟเวอร์ดีๆนี่เอง ดังนั้นให้ผู้ใช้งานหยุดพิมพ์ครบ 500 ms ก่อนจึงค่อยส่งข้อมูลไปหาเซิฟเวอร์
  • ถ้าค่าข้อมูลที่เราพิมพ์ก่อนหน้าตรงกับค่าที่เราพึ่งพิมพ์เสร็จ อย่าส่งข้อมูลไปค้นหาจากเซิฟเวอร์
  • ถ้าเราพิมพ์รอบแรกเสร็จ มีการส่งข้อมูลไปเซิฟเวอร์แล้ว แต่ข้อมูลยังไม่ตอบกลับมาจากเซิฟเวอร์ ทันใดนั้นเราเปลี่ยนใจค้นหาข้อมูลด้วยคำใหม่ คำค้นหลังส่งไปหาเซิฟเวอร์ในขณะที่ผลลัพธ์การค้นหาแรกส่งกลับมาพอดี เป็นเหตุให้เราแสดงผลลัพธ์จากการค้นหาแรกแทนที่จะเป็นผลลัพธ์หลัง ด้วยเหตุนี้เราจำเป็นต้องยกเลิกผลลัพธ์ก่อนหน้าทิ้งหากเราเริ่มค้นหาครั้งใหม่

โจทย์ฟังเหมือนจะง่ายใช่ไหมครับ ไหนเพื่อนๆลองคิดวิธีเขียนโปรแกรมตามโจทย์นี้ด้วยวิธีการของ Imperative กันดู เริ่มยากแล้วใช่ไหมละ…

ด้วยวิธีการของ Reactive Programming ด้วย Stream จึงได้โปรแกรมหน้าตาเช่นนี้

const inputStream = Rx.Observable
  .fromEvent(document.getElementById('input'), 'keyup') // ดักจับการพิมพ์บน input
  .debounceTime(500) // รอให้ผู้ใช้งานพิมพ์เสร็จก่อน 500 ms
  .distinctUntilChanged() // ถ้าข้อความค้นหาตรงกับของเดิม ไม่ส่งข้อมูลหาเซิฟเวอร์
  .map(ev => ev.target.value) // เราสนใจเฉพาะค่าที่เราพิมพ์ในช่อง input
  .switchMap(
    // การใช้ switchMap ช่วยให้ยกเลิกการค้นหาก่อนหน้าได้
    value => fetch(<API_ENDPOINT>)
  )

เราแค่นิยามพฤติกรรมเมื่อเราสร้าง Stream เมื่อเป็นเช่นนี้เราจึงทำความเข้าใจโปรแกรมเราได้ง่ายขึ้น

Operator คือสิ่งสำคัญ

ตามที่กล่าวข้างต้น Reactive Programming อาศัยส่วนประสานของสองสิ่งประหนึ่งเป็นหยินหยางนั่นคือ Stream และ Operator ในไลบรารี่ต่างๆที่มีคุณสมบัติของการโปรแกรมแบบ Reactive เช่น RxJS จึงอุดมไปด้วย Operator ที่ใช้ในการดำเนินการต่างๆกับเหล่า Stream มากมาย

มาตรฐาน Promise ปัจจุบันยังไม่บรรจุการยกเลิกการทำงานของ Promise (Cancellable Promise) จากโจทย์ที่แล้ว หากเราต้องการยกเลิกการค้นหาข้อมูลก่อนหน้า เราต้องเขียนเองหลายอย่างมาก หรืออีกทางเลือกคือหาไลบรารี่หรืออะไรก็ได้ที่ทำให้ Promise ของเราสามารถยกเลิกได้ เช่นใช้ Bluebird ที่เป็น JavaScript Promises Library ตัวนึง

แต่สำหรับ RxJS เรามีวิธีการแยบยลในการยกเลิกสิ่งที่เราไม่ต้องการ เช่นการใช้ Operator ชื่อ switchMap ดังที่เห็นในโจทย์ก่อนหน้า

RxJS: A reactive programming library for JavaScript

RxJS ไม่ใช่พ่อทุกสถาบัน จึงไม่แปลกที่เพื่อนๆบางคนจะไม่รู้จักครับ แต่คงไม่มีใครไม่รู้จักไมโครซอฟต์ใช่ไหมครับ ยังไงก็ต้องขอแสดงความเสียใจกับผู้ไม่ปลื้มไมโครซอฟต์ด้วยนะครับ เพราะ RxJS เนี่ยคลอดออกมาจากครรภ์ของบิลเกตเลยละ #ผู้ชายก็ท้องได้ (พัฒนาโดยไมโครซอฟต์ แต่บิลเกตไม่รู้อิโหน่อิเหน่ด้วย เอาชื่อเฮียแกมาใช้เฉยๆ)

RxJS เป็นหนึ่งในไลบรารี่จากตระกูล ReactiveX ที่มีสโแกนอันกิ๊บเก๋ว่า A reactive programming library for JavaScript โดยปัจจุบัน RxJS ได้ก้าวมาถึงเวอร์ชัน 5.x.x แล้วครับ

ไลบรารี่สำหรับการโปรแกรมแบบ Reactive แม้จะมีหลายเจ้า แต่สำหรับในชุดบทความนี้เราจะใช้ RxJS เพื่อสร้างการทำงานแบบ Reactive เป็นหลักครับ

แม้เราจะอิงมาตรฐานตาม RxJS แต่เพื่อนๆอย่าได้กังวลว่าการใช้งานจะผูกติดแค่กับ RxJS เท่านั้น เพราะในเวอร์ชัน 5 RxJS ของเราได้พยายามทำตัวให้เข้ากันได้กับมาตรฐาน Observable ของ ECMAScript แล้ว ส่วนมาตรฐานของ ECMAScript หนะรึ… รอลูกบวชอีกสิบปีก็คงยังไม่คลอดง่ายๆ

รู้จัก Observables

คุณเคยแอบชอบดาราคนไหนจนต้องไป Follow เขาใน Instagram ไหมครับ?

เมื่อคุณตัดสินใจ Follow ใครซักคนใน IG เมื่อนั้นส่วนนึงของชีวิตเขาจะเข้ามาเป็นส่วนหนึ่งในชีวิตคุณ เมื่อใดที่คนดังเหล่านั้นโพสต์รูปคิกขุ คุณก็จะเห็นรูปนั้นเช่นกัน แม้ดาราคนนั้นจะโพสต์ภาพหน้าสดสุดโทรม ถึงคุณจะไม่อยากเห็นแต่คุณก็เลี่ยงไม่ได้ นั่นเพราะคุณ Follow เขาแล้วนั่นเอง

ในฐานะที่คุณคือผู้ติดตาม คุณจึงเป็น Observer (แปลว่าผู้สังเกต) ส่วนดาราที่ถูกคุณติดตาม จะเป็น Observable (แปลว่าสิ่งที่ถูกสังเกตได้) นั่นเอง

Observable เปรียบได้กับ Stream ที่สามารถปล่อยค่าได้ในช่วงเวลาหนึ่งๆครับ ตัวอย่างเช่น

const timerStream = Rx.Observable.interval(1000)

timerStream.subscribe(console.log)

จากตัวอย่างข้างต้น timerStream คือ Observable ที่จะปล่อยค่าออกมาทุกๆ 1 วินาที หากเราสนใจที่จะส่อง ค่าเหล่านี้เราต้อง subscribe ที่เป็นการบอกว่าเราต้องการติดตามความเคลื่อนไหวนั่นเอง สำหรับ subscribe เราสามารถส่ง Callback Function ไปให้กับมันได้ ฟังก์ชันดังกล่าวจะได้รับการเรียกเมื่อ Observable ของเรามีการปล่อยค่าออกมา พูดอีกนัยยะก็คือเราต้องส่ง Observer ไปให้กับเมธอด subscribe ด้วยเหตุนี้เราจึงกล่าวได้ว่า console.log ของเราจึงเป็น Observer นั่นเอง

Observable จอมขี้เกียจ

เพื่อนๆคิดว่าถ้าดาราซักคนไม่มีคน Follow เขาใน IG เลย เขายังจะอยากถ่ายรูปแล้วโพสต์ลง IG ไหม?

สถานการณ์เดียวกันนี้เกิดขึ้นกับ Observable ครับ หากไม่มี Observer ไหนไป subscribe มันเลย Observable ตัวดังกล่าวก็จะไม่ทำงาน อารมณ์เหมือนดาราที่จะไม่โพสต์รูป ถ้าไม่มีคนติดตามนั่นละ

const timerStream = Rx.Observable.interval(1000)

จากโปรแกรมข้างต้น เราพบว่าโค้ดของเราไม่ได้รับการทำงาน นั่นเพราะ timerStream ของเราสร้างขึ้นมาแล้วแต่ขาดซึ่งผู้ติดตาม มันจึงหมดกำลังใจที่จะปล่อยค่าออกมานั่นเอง น่าสงสารเนอะ!

การสร้างและใช้งาน Observables

หัวข้อ Push ปะทะ Pull เราได้แสดงให้เห็นวิธีสร้างและใช้งาน Generator เพื่อเตรียมข้อมูลให้ปลายทางเรียกใช้กันไปแล้ว

function *double() {
  let currentNum = 0
  
  while(true) {
    yield currentNum
    currentNum += 2
  }
}

const doubleGenerator = double()

console.log(doubleGenerator.next().value) // 0
console.log(doubleGenerator.next().value) // 2
console.log(doubleGenerator.next().value) // 4

อาศัย yield ทำให้เราเลือกได้ว่าค่าข้อมูลใดควรปล่อยให้ภายนอกเข้าถึงได้ในรอบถัดไปผ่าน next วิธีการของ Generator ต้องเรียก next เพื่อดึงค่าที่เราสนใจออกมาซึ่งแตกต่างจากการโปรแกรมแบบ Reactive ที่เราจะปล่อยค่าออกมาให้ผู้สนใจเห็นแทน

const timerStream = Rx.Observable.create(observer => {
  let time = 0
  
  // ส่งค่าออกไปให้ observer เห็นทุกๆ 1 วินาที
  // พร้อมทั้งอัพเดท time ขึ้นไปอีก 1
  setInterval(() => observer.next(time++), 1000)
})

timerStream.subscribe(console.log)

การสร้าง Observable แบบพื้นฐานเราสามารถทำผ่านการเรียกเมธอด create ได้ครับ เมธอดดังกล่าวจะรับพารามิเตอร์เข้ามาเป็นฟังก์ชัน ฟังก์ชันนี้จะมีพารามิเตอร์เป็น Observer อีกทอดนึง เมื่อใดก็ตามที่เราต้องการปล่อยข้อมูลออกมา เราจึงเรียก observer.next พร้อมทั้งส่งค่าออกไป อย่าลืมนะครับเราต้อง push ค่าไปให้ผู้สนใจ เพราะนี่คือวิถีแห่งการโปรแกรมแบบ Reactive นั่นเอง

การจัดการข้อผิดพลาดและการจบการทำงาน

Observable ก็เหมือนดาราบน IG ครับ บางบทก็หน่อมแน้ม บางทีก็ขายของ บางช่วงก็ขี้วีน คำถามคือเมื่อ Observable ของเรามีนิสัยดั่งดารา เมื่อเธอโพสต์เรื่องปรี๊ดแตก เราต้องทำยังไง? จะด่าเธอไหม? หากซักวันนึงที่เธอปิดบัญชีทิ้งเสีย เราจะจัดการกับเรื่องนี้ยังไง? จะชักดิ้นชักงออยู่หน้าคอมดี หรือจะเปลี่ยนใจไป Follow คนอื่นแทน?

Observable ที่ขี้วีนก็เหมือนการปะทุของข้อผิดพลาดที่เราต้องจัดการ

Observable สามารถโยนข้อผิดพลาดออกไปได้ผ่านเมธอด error ของ Observer เมื่อผู้ติดตามได้รับข้อผิดพลาด เขาย่อมต้องรู้ถึงวิธีจัดการข้อผิดพลาดนั้น เราจึงต้องมี Callback Function ตัวที่สองเพื่อรองรับการจัดการกับข้อผิดพลาดนั้น

const timerStream = Rx.Observable.create(observer => {
  let time = 0
  
  setInterval(() => observer.next(time++), 1000)
  
  // เมื่อครบ 3 วินาทีจะปล่อย Error ออกไปให้ observer เห็น
  setTimeout(() => observer.error(new Error('Oops!')), 3000)
})

// ผลลัพธ์จากการทำงานเป็น
// 0
// 1
// Oops!
timerStream.subscribe(
  (value) => console.log(value),
  // เราจึงต้องการ callback สำหรับจัดการข้อผิดพลาดนั้น
  (error) => console.error(error.message)
)

Observable ก็คือสายน้ำครับ เราหยุดสายน้ำได้ไหม? คำตอบก็คือมันต้องได้ซิ แค่ปิดก็อกน้ำไง นี่ยิงปืนนัดเดียวได้นกสองตัวเลยนะ หยุดการทำงานของ Observable ด้วย แถมยังช่วยชาติประหยัดน้ำแล้วค่อยไปผลาญกันอีกทีตอนสงกรานต์!

Observable สามารถเสร็จสิ้นการปล่อยข้อมูลได้ครับด้วยการบอกว่า complete อ้า~ เสร็จแล้ว

const numStream = Rx.Observable.create(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
  
  // หลังจากปล่อยค่าเรียบร้อยก็จบการทำงาน
  observer.complete()
})

// ผลลัพธ์ของการทำงานคือ
// 1
// 2
// 3
// Completed!
numStream.subscribe(
  (value) => console.log(value),
  (error) => console.error(error.message),
  // เราจึงต้องการ callback เพื่อจัดการกับเหตุการณ์ที่ observable ยุติการปล่อยค่าแล้ว
  () => console.log('Completed!')
)

ทั้งหมดทั้งมวลทำให้เราเห็นวิธีการส่ง Callback Function สามตัวเข้าไปในเมธอด subscribe เพื่อใช้จัดการกับเหตุการณ์รับค่า, จัดการข้อผิดพลาด และ ดักจับการสิ้นสุดการทำงาน ตามลำดับ แต่การส่ง Arrow Function เข้าไปในลักษณะนี้มันช่างทำความเข้าใจยาก ถ้าคนไม่เคยใช้ RxJS มาก่อน ต้องงงแน่ๆว่าแต่ละตัวที่ส่งเข้าไปคืออะไร

เพื่อป้องกันความสับสน เราจึงนิยมส่งอ็อบเจ็กต์ที่มี property ชี้ชัดว่าแต่ละฟังก์ชันทำงานอะไรเข้าไปแทน ดังนี้

timerStream.subscribe({
  next(value) { console.log(value) },
  error(error) { console.error(error.message) },
  complete() { console.log('Completed!') }
})

Observables และการทำงานแบบ Asynchronous

เราได้เห็นกันไปแล้วว่า Observable สามารถปล่อยค่าออกมาภายหลังได้

const timerStream = Rx.Observable.create(observer => {
  let time = 0
  
  // ปล่อยค่าออกมาในภายหลัง ทุกๆ 1 วินาที
  // นั่นคือไม่ได้ปล่อยค่าแต่แรกที่โค้ดนี้ทำงาน
  setInterval(() => observer.next(time++), 1000)
})

เมื่อการทำงานเกิดขึ้นทีหลังได้เช่นนี้ เราจึงกล่าวว่า Observable สนับสนุนการทำงานแบบ Asynchronous แต่นั่นไม่ได้หมายความว่า Observable ไม่สามารถทำงานแบบ Synchronous ได้นะครับ

const timerStream = Rx.Observable.create(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
  
  observer.complete()
})

จากตัวอย่างข้างต้น โปรแกรมเรามีการปล่อยค่าสามค่าพร้อมกันผ่าน next จะสังเกตได้ว่าการทำงานเช่นที่ว่าไม่ได้อิงกับช่วงเวลา แต่เป็นการปล่อยค่าในทันทีทันใด เหตุนี้เรากล่าวได้ว่า Observable สนับสนุนการทำงานแบบ Synchronous เช่นกัน เมื่อการทำงานนี้เป็นแบบ Synchronous การปล่อยข้อมูลนี้จึงเกิดใน Event Loop เดียวกัน

รูปแบบย่อสำหรับการสร้าง Observables

การสร้าง Observable ผ่านเมธอด create มันชั่งไม่ทันใจวัยรุ่นซะเลย RxJS ทราบถึงปัญหานี้ พี่แกเลยจัดกลุ่มของเมธอดสำหรับสร้าง Observable อย่างง่ายดายมาให้กับเรา

of

ในกรณีที่เราต้องส่งค่าชุดหนึ่งโดยไม่อิงกับเวลาให้กับ Observer เราสามารถทำได้ดังนี้

const numStream = Rx.Observable.create(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
})

numStream.subscribe(console.log)

แค่จะส่งออกข้อมูลสามค่า ทำไมต้องเขียนอะไรให้วุ่นวายขนาดนี้ เราสามารถใช้ of แทนกรณีเช่นนี้ได้ครับ

const numStream = Rx.Observable.of(1, 2, 3)

numStream.subscribe(console.log)

from

แล้วถ้าข้อมูลที่เราถือครองอยู่เป็นอาร์เรย์หละ เราจะสร้าง Observable เพื่อปล่อยแต่ละค่าของอาร์เรย์ออกไปเช่นไร?

RxJS ได้เตรียมเมธอด from ไว้ให้กับเราครับ เราสามารถใช้เมธอดนี้เพื่อส่งค่าแต่ละตัวในอาร์เรย์ของเราได้

const numStream = Rx.Observable.from([1, 2, 3])

numStream.subscribe(console.log)

// ผลลัพธ์เป็น
// 1
// 2
// 3

กรณีของการใช้ from เราไม่จำเป็นต้องส่งอาร์เรย์เท่านั้น from ยังรองรับการใช้งานกับสิ่งอื่นอีก เช่น Promise และ Generator

const requestStream = Rx.Observable.from(fetch('https://www.babelcoder.com/api/v1/articles'))

// ผลลัพธ์
// 200
// Completed!
requestStream.subscribe({
  next(response) { console.log(response.status) },
  error(error) { console.log(error.message) },
  complete() { console.log('Completed!') }
})

ในตัวอย่างของเรามีการใช้ fetch API ที่มีการคืนค่ากลับมาเป็น Promise เหตุนี้เราจึงใช้ from เพื่อแปลง Promise ของเราให้เป็น Observable

ด้วยพลานุภาพแห่ง fetch API ทำให้เราส่งการร้องขอไปที่ API Server ได้ เมื่อมีการตอบกลับจากเซิฟเวอร์แล้ว requestStream จึงส่งการตอบกลับนั้นไปให้ Observer หากเกิดข้อผิดพลาดขึ้นมาระหว่างการทำงาน Observable ก็จะโยนข้อผิดพลาดนั้นออกมา เมื่อทุกอย่างเสร็จสิ้นจึงทำการเรียก complete เพื่อสิ้นสุดการทำงาน

fromEvent

แน่นอนว่าเพื่อนๆหลายคนคงคิดจะใช้ RxJS กับการทำงานบนเว็บใช่ไหมละ ถ้า RxJS ไม่สามารถสร้าง Observable จากเหตุการณ์ต่างๆที่เกิดขึ้นกับอีลีเมนต์เช่นการคลิกปุ่มได้มันคงกากน่าดู

RxJS ได้ให้เมธอดชื่อ fromEvent เพื่อสร้าง Stream ของเหตุการณ์ที่เกิดบน HTML element ที่เราระบุได้ครับ

const clickStream = Rx.Observable.fromEvent(
  document.getElementById('button'), 'click'
)

จากตัวอย่างนี้ เมื่อใดก็ตามที่เราคลิกปุ่ม clickStream ก็จะปล่อยค่าคือเหตุการณ์คลิกไปให้กับ Observer เราจะได้เห็นตัวอย่างการใช้งานจริงในบทความของ RxJS กับ Operator ครับ

interval และ timer

กรณีของการสร้าง Observable ผ่าน of เราถือว่าเป็นการทำงานแบบ Synchronous เพราะทำทันที หากสิ่งที่เราต้องการคือการปล่อยข้อมูลโดยขึ้นตรงกับเวลา เราจึงต้องใช้ interval เพื่อส่งข้อมูลตามเงื่อนเวลาที่เรากำหนด

// การใช้งาน interval เราต้องส่งตัวเลขแทนมิลลิวินาทีเข้าไป
// ตัวเลขนี้จะเป็นตัวบอกว่าจะให้มีการปล่อยข้อมูลทุกกี่มิลลิวินาที
// โดยตัวเลขแรกที่ทำการส่งคือค่า 0
// ในเวลาถัดไปจะส่งค่า 1, 2, 3, ...
const timerStream = Rx.Observable.interval(1000)

แต่ถ้าสิ่งที่เราต้องการคือการหน่วงเวลาออกไปก่อน จากนั้นจึงค่อยปล่อยข้อมูล เราต้องใช้ timer แทน ดังนี้

// หน่วงเวลาออกไป 3000 มิลลิวินาที หรือ 3 วินาที
// จากนั้นจึงเริ่มปล่อยค่า 0 ออกมา
// ในทุกๆ 1000 มิลลิวินาที หรือ 1 วินาที
// จะเริ่มปล่อยข้อมูลลำดับถัดไป คือ 1, 2, 3, ...
const timerStream = Rx.Observable.timer(3000, 1000)

สรุป

หากเพื่อนๆได้อ่านบทความจนมาถึงจุดนี้แล้ว ผมเชื่อว่าเพื่อนๆต้องเริ่มเห็นความสำคัญของ Reactive Programming บ้างแล้วหละ อีกหลายๆคนอาจยังไม่เห็นประโยชน์อย่างแท้จริงของการใช้ RxJS นี่ไม่ใช่สิ่งที่ต้องแปลกใจครับ นั่นเพราะถ้ายังจำกันได้ผมบอกว่า Stream (Observable) ของเรามันช่างไร้ประโยชน์ยิ่งนักหากขาดซึ่ง Operator ด้วยเหตุนี้ในบทความถัดไปของเราผมจะแนะนำวิธีการใช้งาน Operator ต่างๆเพื่อการจัดการ Observable ของเราอย่างมีประสิทธิภาพครับ Stay Tuned!


แสดงความคิดเห็นของคุณ


ไม่ระบุตัวตน19 วันที่ผ่านมา

หน่วงเวลาออกไป 3000 มิลลิวินาที หรือ 3 นาที ในทุกๆ 1000 มิลลิวินาที หรือ 1 นาที สองบรรทัดด้านบนน่าจะตกคำว่า วิ ไปนะครับคิดว่าน่าจะต้องเป็น วินาที

ข้อความตอบกลับ
ไม่ระบุตัวตน19 วันที่ผ่านมา

แก้ไขเรียบร้อยแล้วครับ ขอบคุณมากครับ 😃


BinaryKung21 วันที่ผ่านมา

โค้ดตรง Method From ของ Observable น่าจะผิดนะครับ จาก of น่าจะเปลี่ยนเป็น from

ข้อความตอบกลับ
ไม่ระบุตัวตน21 วันที่ผ่านมา

ขอบคุณครับ แก้ไขเรียบร้อยฮะ 😃