Reactive Programming คืออะไร? รู้จัก RxJS และการสร้าง Observables
Reactive Programming เป็นศาสตร์ที่จำเป็นต้องรู้ครับ ไม่ว่าคุณจะเขียนเว็บแอพพลิเคชันด้วย Angular 2+, นักพัฒนาแอนดรอยที่ใฝ่ฝันชีวิตที่ง่ายขึ้นในการทำงานกับ Asynchonous หรือกระทั่งอยากเจริญรอยตามทีมงานเท่ๆแบบ Netflix คุณยิ่งควรรู้จัก Reactive Programming และการใช้งาน RxJS เข้าไปใหญ่
บทความนี้เราจะไปรู้จักกันครับว่า Reactive Programming คืออะไร? การใช้งาน RxJS เพื่อการบรรลุจุดสุดยอดแห่งศาสตร์ Reactive Programming ต้องทำอย่างไร ผมสัญญาว่าชุดบทความนี้จะทำให้คุณได้นิพพานในชาตินี้แน่นอน...ฟันธง!
ปัญหาของการเปลี่ยนแปลงค่าข้อมูล
ธรรมชาติของการประกาศตัวแปรของเรามักอ้างอิงจากค่าอื่น เช่น
1let height = 22let base = 534let area = 0.5 * height * base // area = 5
area
เป็นตัวแปรสำหรับเก็บค่าพื้นที่ของสามเหลี่ยมที่ขึ้นตรงต่อค่าของ height
และ area
เรากล่าวว่าตัวแปลทั้งสองทำให้เกิดผลลัพธ์คือ area เมื่อเป็นเช่นนี้คำถามคือถ้าเราเปลี่ยนค่าตัวแปรตัวใดตัวหนึ่ง ค่าของ area ควรจะเปลี่ยนตามไหม?
1let height = 22let base = 534let area = 0.5 * height * base56// เปลี่ยนค่า base ซักนิดดีกว่า7base = 1089// แบบนี้ area จะเปลี่ยนไหม?
ด้วยสามัญสำนึกของการเขียนโปรแกรม เราทราบทันทีว่าแม้ค่า base จะเปลี่ยน แต่ area
ยังเป็นค่า 5 เช่นเดิมนั่นเพราะ area ของเราอิงตามค่า base ตัวก่อนหน้าคือ 5 นั่นเอง
หากตอนนี้เราอยากคำนวณค่า area ตาม base ตัวใหม่จะทำอย่างไร? ง่ายมากก็เขียนสมการคำนวณอีกซักรอบเป็นไง นี่มันโจทย์อนุบาลหมีน้อยสุดๆ
1let height = 22let base = 534let area = 0.5 * height * base56// เปลี่ยนค่า base ซักนิดดีกว่า7base = 1089// ตอนนี้ area ของเราก็จะคำนวณใหม่แล้ว10area = 0.5 * height * base
ปัดโถ่ เดี๋ยวก็ทุ้มด้วยโพเดี้ยมซะนิ! เราเป็น smart programmer นะ ไม่ควรจะมานั่งคำนวณซ้ำซ้อนแบบนี้อีกแล้ว เมื่อ height หรือ base เปลี่ยน area ของเราก็ควรคำนวณใหม่ตามการเปลี่ยนแปลงนั้น และนี่คือหลักการของคำว่า Reactive นั่นเอง
Reactive Programming คืออะไร
จากปัญหาดังกล่าว ถ้าเราพิจารณาใหม่ว่าค่าของตัวแปรต่างๆสามารถปล่อยออกมาในช่วงเวลาใดๆก็ได้ เราจะได้เส้นสายแสดงข้อมูลที่ปล่อยออกมาในช่วงเวลาหนึ่ง ดังนี้
1height: ----2-------------------23base: ----5----10-------------45 area = 0.5 x height x base67area: ----5----10-------------
เส้นสายของข้อมูลที่สัมพันธ์กับเวลานี่หละครับ เราเรียกว่า Stream
จากแผนภาพข้างต้นเราจะค้นพบว่า Stream ของตัวแปร area เกิดจากค่าล่าสุดของ Stream height และ Stream ของ base เมื่อ base เปลี่ยนเป็น 10 ค่าใหม่ของ area จึงนำค่าล่าสุดของ height คือ 2 มาคำนวณคู่กับค่าล่าสุดของ base คือ 10 ลักษณะการทำงานของ Stream ดังกล่าวคือลักษณะการทำงานของ Reactive Programming นั่นเอง
จากแผนภาพเราพบว่าข้อมูลของเราถูกปล่อยในช่วงเวลาต่างๆ การคำนวณของ area จึงไม่ใช่การคำนวณที่ต่อเนื่อง หากแต่เป็นการคำนวณที่ขึ้นอยู่กับช่วงเวลาที่ Stream อื่นมีการปล่อยค่าออกมา เราจึงกล่าวได้ว่าการทำงานนี้เป็นการทำงานแบบ Asynchronous
หมายเหตุ การทำงานของ Stream ไม่จำเป็นต้องเป็น Asynchronous เสมอ ดูเพิ่มเติมในเรื่องของ Observables
ทั้งหมดทั้งมวลจากสองย่อหน้าจึงนำไปสู่บทนิยามของ Reactive Programming นั่นคือ
Reactive Programming คือรูปแบบหนึ่งของการเขียนโปรแกรมที่สนใจข้อมูลบน Stream ที่มีการทำงานแบบ Asynchronous รวมถึงการเปลี่ยนแปลงของข้อมูลใน Stream ที่ส่งผลต่อการเปลี่ยนแปลงของ Stream อื่น
เพื่อที่จะให้การเขียนโปรแกรมแบบ Reactive เป็นผลเร็จ มีสองสิ่งที่โปรแกรมเมอร์อย่างเราพึงทำครับ นั่นคือ
- สร้าง Stream ขึ้นมาซะก่อนซิ อยากแก้ปัญหาอะไรก็สร้าง Stream นั้นเพื่อแทนข้อมูล
- มีแค่ Stream มันจืดชืด เราต้องมี Operator คือตัวดำเนินการระหว่าง Stream ด้วย ในกรณีของ area การแปลง height และ base เป็น area ด้วยการนำ
0.5 x height x base
คือ Operator ที่ทำงานบน Stream ของ height และ base นั่นเอง
ข้อดีของ Reactive Programming
อธิบายความหมายของ Reactive Programming กันไปแล้ว แต่เพื่อนๆหลายๆคนคงยังไม่เห็นความดีงามพระราม 8 เป็นแน่ เพื่อจะอวยไส้แตกแหกไส้ฉีก เราจะมาดูข้อดีของการโปรแกรมแบบ Reactive ที่แม้แต่ทีมพี่ลูกเกตก็ให้ไม่ได้
Push ปะทะ Pull
ES2015 เรามีหนึ่งฟีเจอร์ทางภาษาที่เรียกว่า Generator ซึ่งเป็นฟังก์ชันพิเศษที่อนุญาตให้ปลายทางดูดค่าที่เตรียมไว้ได้ แหมฟังแล้วโค้ดปลายทางก็เหมือนแดร็กคูล่าดีๆนี่เอง สำหรับเหยื่อที่น่าสงสารอย่าง Generator ก็แค่เตรียมเลือดชั้นดีที่อยากให้โดนดูดจ๊วบๆออกไป
1function* double() {2 let currentNum = 034 while (true) {5 yield currentNum6 currentNum += 27 }8}910const doubleGenerator = double()1112console.log(doubleGenerator.next().value) // 013console.log(doubleGenerator.next().value) // 214console.log(doubleGenerator.next().value) // 4
สองสิ่งที่เป็นตัวบ่งชี้ว่าฟังก์ชันของเราเป็น Generator นั่นคือการประกาศฟังก์ชันด้วย function *
และอีกสิ่งคือการใช้ yield
Generator ก็เหมือนเหยื่อครับ คงไม่มีมนุษย์ป้าอกอึ๋มที่ไหนเดินเข้าไปหาแดร็กคูล่าใช่ไหม ความจริงผู้ล่าของเราต่างหากที่ต้องไล่หาเลือด
การเรียกใช้ doubleGenerator
คือแดร็กคิวล่าครับ เราต้องการเลือดของเหยื่อหยดถัดไป เราจึงออกคำสั่ง next()
เมื่อเหยื่อถูกคมเขี้ยวอันแหลมคมกัดลงไปที่ต้นคอ เลือดที่เตรียมไว้ผ่าน yield
จึงได้ไหลรินออกมา แน่นอนว่าการกัดเพื่อดูดแต่ละครั้งคงไม่ได้ออกมาแค่เลือด อย่างน้อยๆคงต้องมาพร้อมขี้ไคล คราบเหงื่อ ยี้~ แค่คิดก็เหม็นเปรี้ยวละ เมื่อเป็นเช่นนี้เราจึงต้องเรียก value
เพื่อเป็นการกรองเอาเฉพาะค่าของมันซึ่งก็คือเลือดที่เราสนใจไร้กลิ่นเปรี้ยวเท่านั้น
กล่าวโดยสรุป เราจะเห็นว่าการใช้งาน Generator คือการเขียนโปรแกรมเพื่อดึงข้อมูลที่เราสนใจออกมานั่นเอง เราจึงเรียกวิธีการของ Generator เช่นนี้ว่า Pull Style
กรณีของ Reactive Programming ด้วย Stream เราพบว่าโค้ดปลายทางของเราไม่ใช่แดร็กคิวล่าครับ เราไม่ต้องออกล่าเหยื่อให้เสียแรง เราแค่นอนอ้าปากพะงาบๆอยู่ในอ่างจากุชชี่ เมื่อไหร่ที่ข้อมูลพร้อม เลือดเหล่านั้นก็จะไหลรินหลั่งลงสู่คอหอยเอง
1// ยังไม่ต้องสนใจในรายละเอียดนะครับ2// ทราบเพียงว่า Stream นี้มีการปล่อยค่าเลขคู่ทุกๆ 200 ms เป็นพอ3const doubleStream = Rx.Observable.interval(200).filter((x) => x % 2 === 0)45// เราไม่ต้องเหนื่อยหาเหยื่อ เหยื่อวิ่งเอาเลือดมาให้เรากินเอง6// เมื่อไหร่ที่ doubleStream มีการปล่อยค่าออกมา7// มันจะไปเรียก console.log ให้เราอัตโนมัติ8// ด้วยวิธีการนี้จึงกล่าวได้ว่า console.log ของเราอยู่กับที่9// แต่ข้อมูลต่างหากที่วิ่งเข้ามาหา console.log เอง10doubleStream.subscribe(console.log)
ด้วยกลเม็ดของ Reactive Programming นี่หละครับที่ทำให้เหยื่อต้องยัดเยียดข้อมูลมาให้กับเรา เราจึงกล่าวว่าวิธีการดังกล่าวเป็นแบบ Push Style นั่นเอง
นิยามพฤติกรรมตั้งแต่เริ่มสร้าง Stream
ในกระบวนการเขียนโปรแกรมแบบ Imperative เราสร้างพฤติกรรมเมื่อเราต้องการให้เกิดการทำงานเช่นนั้น หากเราต้องการให้ทุกๆ 1 วินาทีมีการพิมพ์ตัวเลขออกมาตั้งแต่ 0 เป็นต้นไป เราอาจเขียนโปรแกรมเช่นนี้
1let num = 023// นิยามพฤติกรรมคือการพิมพ์ค่าออกมาตั้งแต่ 0 และเพิ่มค่าเรื่อยๆในการพิมพ์ครั้งถัดไป4// พฤติกรรมนี้เรานิยามเมื่อเราต้องการใช้งานมัน5setInterval(() => console.log(num++), 1000)
สำหรับวิธีการของ Reactive Programming เราจะนิยามพฤติกรรมตั้งแต่ตอนสร้าง Stream เลยครับ
1// สร้าง Stream ปุ๊บ บอกทันทีเลยว่าเจ้าจงปล่อยข้อมูลทุกๆ 1 วินาทีซะนะ2// ด้วยวิธีการนี้ จึงเป็นการประกาศพฤติกรรมตั้งแต่ต้น3const doubleStream = Rx.Observable.interval(1000)45doubleStream.subscribe(console.log)
ตัวอย่างข้างบนอาจดูง่ายไปใช่ไหมครับ งั้นเรามาพิจารณาตัวอย่างในชีวิตจริงกันดีกว่า
ถ้าเราต้องการสร้างช่องค้นหา โดยมีเงื่อนไขดังนี้
- เมื่อพิมพ์ค่าอะไรลงไป เราจะนำค่าดังกล่าวส่งไปค้นหาข้อมูลจากเซิฟเวอร์
- แต่ถ้าเราพิมพ์ปุ๊บแล้วส่งทันที มันก็คือการระดมยิงเซิฟเวอร์ดีๆนี่เอง ดังนั้นให้ผู้ใช้งานหยุดพิมพ์ครบ 500 ms ก่อนจึงค่อยส่งข้อมูลไปหาเซิฟเวอร์
- ถ้าค่าข้อมูลที่เราพิมพ์ก่อนหน้าตรงกับค่าที่เราพึ่งพิมพ์เสร็จ อย่าส่งข้อมูลไปค้นหาจากเซิฟเวอร์
- ถ้าเราพิมพ์รอบแรกเสร็จ มีการส่งข้อมูลไปเซิฟเวอร์แล้ว แต่ข้อมูลยังไม่ตอบกลับมาจากเซิฟเวอร์ ทันใดนั้นเราเปลี่ยนใจค้นหาข้อมูลด้วยคำใหม่ คำค้นหลังส่งไปหาเซิฟเวอร์ในขณะที่ผลลัพธ์การค้นหาแรกส่งกลับมาพอดี เป็นเหตุให้เราแสดงผลลัพธ์จากการค้นหาแรกแทนที่จะเป็นผลลัพธ์หลัง ด้วยเหตุนี้เราจำเป็นต้องยกเลิกผลลัพธ์ก่อนหน้าทิ้งหากเราเริ่มค้นหาครั้งใหม่
โจทย์ฟังเหมือนจะง่ายใช่ไหมครับ ไหนเพื่อนๆลองคิดวิธีเขียนโปรแกรมตามโจทย์นี้ด้วยวิธีการของ Imperative กันดู เริ่มยากแล้วใช่ไหมละ...
ด้วยวิธีการของ Reactive Programming ด้วย Stream จึงได้โปรแกรมหน้าตาเช่นนี้
1const inputStream = Rx.Observable2 .fromEvent(document.getElementById('input'), 'keyup') // ดักจับการพิมพ์บน input3 .debounceTime(500) // รอให้ผู้ใช้งานพิมพ์เสร็จก่อน 500 ms4 .distinctUntilChanged() // ถ้าข้อความค้นหาตรงกับของเดิม ไม่ส่งข้อมูลหาเซิฟเวอร์5 .map(ev => ev.target.value) // เราสนใจเฉพาะค่าที่เราพิมพ์ในช่อง input6 .switchMap(7 // การใช้ switchMap ช่วยให้ยกเลิกการค้นหาก่อนหน้าได้8 value => fetch(<API_ENDPOINT>)9 )
เราแค่นิยามพฤติกรรมเมื่อเราสร้าง Stream เมื่อเป็นเช่นนี้เราจึงทำความเข้าใจโปรแกรมเราได้ง่ายขึ้น
Operator คือสิ่งสำคัญ
ตามที่กล่าวข้างต้น Reactive Programming อาศัยส่วนประสานของสองสิ่งประหนึ่งเป็นหยินหยางนั่นคือ Stream และ Operator ในไลบรารี่ต่างๆที่มีคุณสมบัติของการโปรแกรมแบบ Reactive เช่น RxJS จึงอุดมไปด้วย Operator ที่ใช้ในการดำเนินการต่างๆกับเหล่า Stream มากมาย
มาตรฐาน Promise ปัจจุบันยังไม่บรรจุการยกเลิกการทำงานของ Promise (Cancellable Promise) จากโจทย์ที่แล้ว หากเราต้องการยกเลิกการค้นหาข้อมูลก่อนหน้า เราต้องเขียนเองหลายอย่างมาก หรืออีกทางเลือกคือหาไลบรารี่หรืออะไรก็ได้ที่ทำให้ Promise ของเราสามารถยกเลิกได้ เช่นใช้ Bluebird ที่เป็น JavaScript Promises Library ตัวนึง
แต่สำหรับ RxJS เรามีวิธีการแยบยลในการยกเลิกสิ่งที่เราไม่ต้องการ เช่นการใช้ Operator ชื่อ switchMap ดังที่เห็นในโจทย์ก่อนหน้า
RxJS: A reactive programming library for JavaScript
RxJS ไม่ใช่พ่อทุกสถาบัน จึงไม่แปลกที่เพื่อนๆบางคนจะไม่รู้จักครับ แต่คงไม่มีใครไม่รู้จักไมโครซอฟต์ใช่ไหมครับ ยังไงก็ต้องขอแสดงความเสียใจกับผู้ไม่ปลื้มไมโครซอฟต์ด้วยนะครับ เพราะ RxJS เนี่ยคลอดออกมาจากครรภ์ของบิลเกตเลยละ #ผู้ชายก็ท้องได้ (พัฒนาโดยไมโครซอฟต์ แต่บิลเกตไม่รู้อิโหน่อิเหน่ด้วย เอาชื่อเฮียแกมาใช้เฉยๆ)
RxJS เป็นหนึ่งในไลบรารี่จากตระกูล ReactiveX ที่มีสโแกนอันกิ๊บเก๋ว่า A reactive programming library for JavaScript โดยปัจจุบัน RxJS ได้ก้าวมาถึงเวอร์ชัน 5.x.x แล้วครับ
ไลบรารี่สำหรับการโปรแกรมแบบ Reactive แม้จะมีหลายเจ้า แต่สำหรับในชุดบทความนี้เราจะใช้ RxJS เพื่อสร้างการทำงานแบบ Reactive เป็นหลักครับ
แม้เราจะอิงมาตรฐานตาม RxJS แต่เพื่อนๆอย่าได้กังวลว่าการใช้งานจะผูกติดแค่กับ RxJS เท่านั้น เพราะในเวอร์ชัน 5 RxJS ของเราได้พยายามทำตัวให้เข้ากันได้กับมาตรฐาน Observable ของ ECMAScript แล้ว ส่วนมาตรฐานของ ECMAScript หนะรึ... รอลูกบวชอีกสิบปีก็คงยังไม่คลอดง่ายๆ
รู้จัก Observables
คุณเคยแอบชอบดาราคนไหนจนต้องไป Follow เขาใน Instagram ไหมครับ?
เมื่อคุณตัดสินใจ Follow ใครซักคนใน IG เมื่อนั้นส่วนนึงของชีวิตเขาจะเข้ามาเป็นส่วนหนึ่งในชีวิตคุณ เมื่อใดที่คนดังเหล่านั้นโพสต์รูปคิกขุ คุณก็จะเห็นรูปนั้นเช่นกัน แม้ดาราคนนั้นจะโพสต์ภาพหน้าสดสุดโทรม ถึงคุณจะไม่อยากเห็นแต่คุณก็เลี่ยงไม่ได้ นั่นเพราะคุณ Follow เขาแล้วนั่นเอง
ในฐานะที่คุณคือผู้ติดตาม คุณจึงเป็น Observer (แปลว่าผู้สังเกต) ส่วนดาราที่ถูกคุณติดตาม จะเป็น Observable (แปลว่าสิ่งที่ถูกสังเกตได้) นั่นเอง
Observable เปรียบได้กับ Stream ที่สามารถปล่อยค่าได้ในช่วงเวลาหนึ่งๆครับ ตัวอย่างเช่น
1const timerStream = Rx.Observable.interval(1000)23timerStream.subscribe(console.log)
จากตัวอย่างข้างต้น timerStream คือ Observable ที่จะปล่อยค่าออกมาทุกๆ 1 วินาที หากเราสนใจที่จะส่อง ค่าเหล่านี้เราต้อง subscribe
ที่เป็นการบอกว่าเราต้องการติดตามความเคลื่อนไหวนั่นเอง สำหรับ subscribe เราสามารถส่ง Callback Function ไปให้กับมันได้ ฟังก์ชันดังกล่าวจะได้รับการเรียกเมื่อ Observable ของเรามีการปล่อยค่าออกมา พูดอีกนัยยะก็คือเราต้องส่ง Observer ไปให้กับเมธอด subscribe ด้วยเหตุนี้เราจึงกล่าวได้ว่า console.log
ของเราจึงเป็น Observer นั่นเอง
Observable จอมขี้เกียจ
เพื่อนๆคิดว่าถ้าดาราซักคนไม่มีคน Follow เขาใน IG เลย เขายังจะอยากถ่ายรูปแล้วโพสต์ลง IG ไหม?
สถานการณ์เดียวกันนี้เกิดขึ้นกับ Observable ครับ หากไม่มี Observer ไหนไป subscribe
มันเลย Observable ตัวดังกล่าวก็จะไม่ทำงาน อารมณ์เหมือนดาราที่จะไม่โพสต์รูป ถ้าไม่มีคนติดตามนั่นละ
1const timerStream = Rx.Observable.interval(1000)
จากโปรแกรมข้างต้น เราพบว่าโค้ดของเราไม่ได้รับการทำงาน นั่นเพราะ timerStream ของเราสร้างขึ้นมาแล้วแต่ขาดซึ่งผู้ติดตาม มันจึงหมดกำลังใจที่จะปล่อยค่าออกมานั่นเอง น่าสงสารเนอะ!
การสร้างและใช้งาน Observables
หัวข้อ Push ปะทะ Pull เราได้แสดงให้เห็นวิธีสร้างและใช้งาน Generator เพื่อเตรียมข้อมูลให้ปลายทางเรียกใช้กันไปแล้ว
1function* double() {2 let currentNum = 034 while (true) {5 yield currentNum6 currentNum += 27 }8}910const doubleGenerator = double()1112console.log(doubleGenerator.next().value) // 013console.log(doubleGenerator.next().value) // 214console.log(doubleGenerator.next().value) // 4
อาศัย yield
ทำให้เราเลือกได้ว่าค่าข้อมูลใดควรปล่อยให้ภายนอกเข้าถึงได้ในรอบถัดไปผ่าน next
วิธีการของ Generator ต้องเรียก next เพื่อดึงค่าที่เราสนใจออกมาซึ่งแตกต่างจากการโปรแกรมแบบ Reactive ที่เราจะปล่อยค่าออกมาให้ผู้สนใจเห็นแทน
1const timerStream = Rx.Observable.create((observer) => {2 let time = 034 // ส่งค่าออกไปให้ observer เห็นทุกๆ 1 วินาที5 // พร้อมทั้งอัพเดท time ขึ้นไปอีก 16 setInterval(() => observer.next(time++), 1000)7})89timerStream.subscribe(console.log)
การสร้าง Observable แบบพื้นฐานเราสามารถทำผ่านการเรียกเมธอด create
ได้ครับ เมธอดดังกล่าวจะรับพารามิเตอร์เข้ามาเป็นฟังก์ชัน ฟังก์ชันนี้จะมีพารามิเตอร์เป็น Observer อีกทอดนึง เมื่อใดก็ตามที่เราต้องการปล่อยข้อมูลออกมา เราจึงเรียก observer.next
พร้อมทั้งส่งค่าออกไป อย่าลืมนะครับเราต้อง push ค่าไปให้ผู้สนใจ เพราะนี่คือวิถีแห่งการโปรแกรมแบบ Reactive นั่นเอง
การจัดการข้อผิดพลาดและการจบการทำงาน
Observable ก็เหมือนดาราบน IG ครับ บางบทก็หน่อมแน้ม บางทีก็ขายของ บางช่วงก็ขี้วีน คำถามคือเมื่อ Observable ของเรามีนิสัยดั่งดารา เมื่อเธอโพสต์เรื่องปรี๊ดแตก เราต้องทำยังไง? จะด่าเธอไหม? หากซักวันนึงที่เธอปิดบัญชีทิ้งเสีย เราจะจัดการกับเรื่องนี้ยังไง? จะชักดิ้นชักงออยู่หน้าคอมดี หรือจะเปลี่ยนใจไป Follow คนอื่นแทน?
Observable ที่ขี้วีนก็เหมือนการปะทุของข้อผิดพลาดที่เราต้องจัดการ
Observable สามารถโยนข้อผิดพลาดออกไปได้ผ่านเมธอด error
ของ Observer เมื่อผู้ติดตามได้รับข้อผิดพลาด เขาย่อมต้องรู้ถึงวิธีจัดการข้อผิดพลาดนั้น เราจึงต้องมี Callback Function ตัวที่สองเพื่อรองรับการจัดการกับข้อผิดพลาดนั้น
1const timerStream = Rx.Observable.create((observer) => {2 let time = 034 setInterval(() => observer.next(time++), 1000)56 // เมื่อครบ 3 วินาทีจะปล่อย Error ออกไปให้ observer เห็น7 setTimeout(() => observer.error(new Error('Oops!')), 3000)8})910// ผลลัพธ์จากการทำงานเป็น11// 012// 113// Oops!14timerStream.subscribe(15 (value) => console.log(value),16 // เราจึงต้องการ callback สำหรับจัดการข้อผิดพลาดนั้น17 (error) => console.error(error.message)18)
Observable ก็คือสายน้ำครับ เราหยุดสายน้ำได้ไหม? คำตอบก็คือมันต้องได้ซิ แค่ปิดก็อกน้ำไง นี่ยิงปืนนัดเดียวได้นกสองตัวเลยนะ หยุดการทำงานของ Observable ด้วย แถมยังช่วยชาติประหยัดน้ำแล้วค่อยไปผลาญกันอีกทีตอนสงกรานต์!
Observable สามารถเสร็จสิ้นการปล่อยข้อมูลได้ครับด้วยการบอกว่า complete
อ้า~ เสร็จแล้ว
1const numStream = Rx.Observable.create((observer) => {2 observer.next(1)3 observer.next(2)4 observer.next(3)56 // หลังจากปล่อยค่าเรียบร้อยก็จบการทำงาน7 observer.complete()8})910// ผลลัพธ์ของการทำงานคือ11// 112// 213// 314// Completed!15numStream.subscribe(16 (value) => console.log(value),17 (error) => console.error(error.message),18 // เราจึงต้องการ callback เพื่อจัดการกับเหตุการณ์ที่ observable ยุติการปล่อยค่าแล้ว19 () => console.log('Completed!')20)
ทั้งหมดทั้งมวลทำให้เราเห็นวิธีการส่ง Callback Function สามตัวเข้าไปในเมธอด subscribe เพื่อใช้จัดการกับเหตุการณ์รับค่า, จัดการข้อผิดพลาด และ ดักจับการสิ้นสุดการทำงาน ตามลำดับ แต่การส่ง Arrow Function เข้าไปในลักษณะนี้มันช่างทำความเข้าใจยาก ถ้าคนไม่เคยใช้ RxJS มาก่อน ต้องงงแน่ๆว่าแต่ละตัวที่ส่งเข้าไปคืออะไร
เพื่อป้องกันความสับสน เราจึงนิยมส่งอ็อบเจ็กต์ที่มี property ชี้ชัดว่าแต่ละฟังก์ชันทำงานอะไรเข้าไปแทน ดังนี้
1timerStream.subscribe({2 next(value) {3 console.log(value)4 },5 error(error) {6 console.error(error.message)7 },8 complete() {9 console.log('Completed!')10 },11})
Observables และการทำงานแบบ Asynchronous
เราได้เห็นกันไปแล้วว่า Observable สามารถปล่อยค่าออกมาภายหลังได้
1const timerStream = Rx.Observable.create((observer) => {2 let time = 034 // ปล่อยค่าออกมาในภายหลัง ทุกๆ 1 วินาที5 // นั่นคือไม่ได้ปล่อยค่าแต่แรกที่โค้ดนี้ทำงาน6 setInterval(() => observer.next(time++), 1000)7})
เมื่อการทำงานเกิดขึ้นทีหลังได้เช่นนี้ เราจึงกล่าวว่า Observable สนับสนุนการทำงานแบบ Asynchronous แต่นั่นไม่ได้หมายความว่า Observable ไม่สามารถทำงานแบบ Synchronous ได้นะครับ
1const timerStream = Rx.Observable.create((observer) => {2 observer.next(1)3 observer.next(2)4 observer.next(3)56 observer.complete()7})
จากตัวอย่างข้างต้น โปรแกรมเรามีการปล่อยค่าสามค่าพร้อมกันผ่าน next
จะสังเกตได้ว่าการทำงานเช่นที่ว่าไม่ได้อิงกับช่วงเวลา แต่เป็นการปล่อยค่าในทันทีทันใด เหตุนี้เรากล่าวได้ว่า Observable สนับสนุนการทำงานแบบ Synchronous เช่นกัน เมื่อการทำงานนี้เป็นแบบ Synchronous การปล่อยข้อมูลนี้จึงเกิดใน Event Loop เดียวกัน
รูปแบบย่อสำหรับการสร้าง Observables
การสร้าง Observable ผ่านเมธอด create
มันชั่งไม่ทันใจวัยรุ่นซะเลย RxJS ทราบถึงปัญหานี้ พี่แกเลยจัดกลุ่มของเมธอดสำหรับสร้าง Observable อย่างง่ายดายมาให้กับเรา
of
ในกรณีที่เราต้องส่งค่าชุดหนึ่งโดยไม่อิงกับเวลาให้กับ Observer เราสามารถทำได้ดังนี้
1const numStream = Rx.Observable.create((observer) => {2 observer.next(1)3 observer.next(2)4 observer.next(3)5})67numStream.subscribe(console.log)
แค่จะส่งออกข้อมูลสามค่า ทำไมต้องเขียนอะไรให้วุ่นวายขนาดนี้ เราสามารถใช้ of
แทนกรณีเช่นนี้ได้ครับ
1const numStream = Rx.Observable.of(1, 2, 3)23numStream.subscribe(console.log)
from
แล้วถ้าข้อมูลที่เราถือครองอยู่เป็นอาร์เรย์หละ เราจะสร้าง Observable เพื่อปล่อยแต่ละค่าของอาร์เรย์ออกไปเช่นไร?
RxJS ได้เตรียมเมธอด from
ไว้ให้กับเราครับ เราสามารถใช้เมธอดนี้เพื่อส่งค่าแต่ละตัวในอาร์เรย์ของเราได้
1const numStream = Rx.Observable.from([1, 2, 3])23numStream.subscribe(console.log)45// ผลลัพธ์เป็น6// 17// 28// 3
กรณีของการใช้ from เราไม่จำเป็นต้องส่งอาร์เรย์เท่านั้น from ยังรองรับการใช้งานกับสิ่งอื่นอีก เช่น Promise และ Generator
1const requestStream = Rx.Observable.from(2 fetch('https://www.babelcoder.com/api/v1/articles')3)45// ผลลัพธ์6// 2007// Completed!8requestStream.subscribe({9 next(response) {10 console.log(response.status)11 },12 error(error) {13 console.log(error.message)14 },15 complete() {16 console.log('Completed!')17 },18})
ในตัวอย่างของเรามีการใช้ fetch API ที่มีการคืนค่ากลับมาเป็น Promise เหตุนี้เราจึงใช้ from เพื่อแปลง Promise ของเราให้เป็น Observable
ด้วยพลานุภาพแห่ง fetch API ทำให้เราส่งการร้องขอไปที่ API Server ได้ เมื่อมีการตอบกลับจากเซิฟเวอร์แล้ว requestStream จึงส่งการตอบกลับนั้นไปให้ Observer หากเกิดข้อผิดพลาดขึ้นมาระหว่างการทำงาน Observable ก็จะโยนข้อผิดพลาดนั้นออกมา เมื่อทุกอย่างเสร็จสิ้นจึงทำการเรียก complete
เพื่อสิ้นสุดการทำงาน
fromEvent
แน่นอนว่าเพื่อนๆหลายคนคงคิดจะใช้ RxJS กับการทำงานบนเว็บใช่ไหมละ ถ้า RxJS ไม่สามารถสร้าง Observable จากเหตุการณ์ต่างๆที่เกิดขึ้นกับอีลีเมนต์เช่นการคลิกปุ่มได้มันคงกากน่าดู
RxJS ได้ให้เมธอดชื่อ fromEvent
เพื่อสร้าง Stream ของเหตุการณ์ที่เกิดบน HTML element ที่เราระบุได้ครับ
1const clickStream = Rx.Observable.fromEvent(2 document.getElementById('button'),3 'click'4)
จากตัวอย่างนี้ เมื่อใดก็ตามที่เราคลิกปุ่ม clickStream ก็จะปล่อยค่าคือเหตุการณ์คลิกไปให้กับ Observer เราจะได้เห็นตัวอย่างการใช้งานจริงในบทความของ RxJS กับ Operator ครับ
interval และ timer
กรณีของการสร้าง Observable ผ่าน of เราถือว่าเป็นการทำงานแบบ Synchronous เพราะทำทันที หากสิ่งที่เราต้องการคือการปล่อยข้อมูลโดยขึ้นตรงกับเวลา เราจึงต้องใช้ interval เพื่อส่งข้อมูลตามเงื่อนเวลาที่เรากำหนด
1// การใช้งาน interval เราต้องส่งตัวเลขแทนมิลลิวินาทีเข้าไป2// ตัวเลขนี้จะเป็นตัวบอกว่าจะให้มีการปล่อยข้อมูลทุกกี่มิลลิวินาที3// โดยตัวเลขแรกที่ทำการส่งคือค่า 04// ในเวลาถัดไปจะส่งค่า 1, 2, 3, ...5const timerStream = Rx.Observable.interval(1000)
แต่ถ้าสิ่งที่เราต้องการคือการหน่วงเวลาออกไปก่อน จากนั้นจึงค่อยปล่อยข้อมูล เราต้องใช้ timer
แทน ดังนี้
1// หน่วงเวลาออกไป 3000 มิลลิวินาที หรือ 3 วินาที2// จากนั้นจึงเริ่มปล่อยค่า 0 ออกมา3// ในทุกๆ 1000 มิลลิวินาที หรือ 1 วินาที4// จะเริ่มปล่อยข้อมูลลำดับถัดไป คือ 1, 2, 3, ...5const timerStream = Rx.Observable.timer(3000, 1000)
สรุป
หากเพื่อนๆได้อ่านบทความจนมาถึงจุดนี้แล้ว ผมเชื่อว่าเพื่อนๆต้องเริ่มเห็นความสำคัญของ Reactive Programming บ้างแล้วหละ อีกหลายๆคนอาจยังไม่เห็นประโยชน์อย่างแท้จริงของการใช้ RxJS นี่ไม่ใช่สิ่งที่ต้องแปลกใจครับ นั่นเพราะถ้ายังจำกันได้ผมบอกว่า Stream (Observable) ของเรามันช่างไร้ประโยชน์ยิ่งนักหากขาดซึ่ง Operator ด้วยเหตุนี้ในบทความถัดไปของเราผมจะแนะนำวิธีการใช้งาน Operator ต่างๆเพื่อการจัดการ Observable ของเราอย่างมีประสิทธิภาพครับ Stay Tuned!
สารบัญ
- ปัญหาของการเปลี่ยนแปลงค่าข้อมูล
- Reactive Programming คืออะไร
- ข้อดีของ Reactive Programming
- RxJS: A reactive programming library for JavaScript
- รู้จัก Observables
- Observable จอมขี้เกียจ
- การสร้างและใช้งาน Observables
- การจัดการข้อผิดพลาดและการจบการทำงาน
- Observables และการทำงานแบบ Asynchronous
- รูปแบบย่อสำหรับการสร้าง Observables
- สรุป