Reactive Programming là gì? Ứng dụng nó trong Java thế nào?

Nam Vu
10 min readDec 20, 2020

Các ứng dụng hiện đại bây giờ đều phải đối mặt với một nỗi lo lớn đó là làm thế nào để ứng dụng có thể hoạt động tốt với số lượng lớn người dùng truy cập và sử dụng đồng thời (high concurrencies issues), mặc dù khả năng đáp ứng của phần cứng ngày càng cải thiện, nhưng tốc độ và khả năng chịu tải (performance) của phần mềm (software) vẫn là yếu tố chính.

Và để tăng performance của ứng dụng ta có hai cách chính đó là:

  • Xử lý song song (parallelize): Sử dụng nhiều Thread và nhiều tài nguyên phần cứng hơn.
  • Tận dụng nguồn lực phần cứng hạn chế một cách hiệu quả hơn.

I. Parallelize

Theo cách lập trình thông thường chúng ta sẽ viết những blocking code, và khi gặp những vấn đề performance chúng ta sẽ giải quyết bằng cách cho những blocking code đó chạy xong xong (parallelize) để tăng tốc độ xử lý của chương trình. Điều đó đồng nghĩa với việc hệ thống sẽ đòi hỏi thêm tài nguyên (threads) , và đồng thời ta phải đối mặt với concurrency problems như Race Condition, Critical Sections, Dead lock…

Bất kỳ một bottleneck nào dẫn tới hệ thống có độ trễ (latency) như liên quan tới I/O events (request DB hay network call) , những resources như Threads sẽ không được giải phóng và bị blocking cho tới khi tác vụ đó được hoàn thành (trả về data). Dẫn tới hệ thống sẽ nhanh chóng bị cạn kiệt tài nguyên và một thảm hoạ thác sẽ xảy ra (cascading failures).

Do vậy kỹ thuật xử lý song song (parallelize) trong phần mềm không phải là một viên đạn bạc (silver bullet), nó đòi hỏi sức mạnh của phần cứng đi kèm và cũng kèm theo đó là những vấn đề phức tạp cần phải giải quyết đi theo.

II. Tận dụng phần cứng hiệu quả hơn

Tận dụng tài nguyên phần cứng hiệu quả hơn có nghĩa là hệ thống sẽ đòi hỏi ít tài nguyên hơn mà vẫn đáp ứng được yếu tố về performance của hệ thống. Để đáp ứng được điều này, một khái niệm được sinh ra đó là Reactive Programming.

Reactive Programming là gì?

Có thể giới thiệu ngắn ngọn Reactive = Asynchronous + Non-Blocking I/O (NIO), có nghĩa là một chương trình được gọi là Reactive nó sẽ đảm bảo được 2 yếu tố là Asynchronous (xử lý bất đồng bộ) và Non-Blocking I/O.

Bằng cách viết những đoạn mã asynchronous và non-blocking, chương trình sẽ cho phép switch qua các tách vụ khác mà đang sử dụng cùng một I/O resource, và có thể quay lại sử lý tiếp khi tác vụ đó hoàn thành. Do đó với reactive programing chương trình có thể sử lý nhiều request hơn trên cùng một tài nguyên hệ thống.

Reactive và non-blocking nhìn chung thì không làm cho ứng dụng chạy nhanh hơn. Lợi ích mà nó được kỳ vọng là ứng dụng chịu tải được tốt hơn mà chỉ yêu cầu ít tài nguyên hơn.

Event Loop

Để đặt được điều này một model được tạo ra được gọi là Event Loop. Event Loop thực tế nó là một vòng lặp vô tận, để lắng nghe và xử lý những event từ event queue một cách tuần tự, và sẽ trả về ngay lập tức sau khi nó đăng ký một call back tương ứng với event đó. Call back sẽ được kích hoạt (trigger) khi dữ liệu được trả về từ một resource (DB, file hay network call) hay từ một error đã xảy ra.

Sơ đồ hoạt động của Event Loop

Event loop thông thường sẽ được chạy trên duy nhất 1 thread, tuy nhiên để sử dụng hiệu quả số lượng CPU Core những Servlet containers hỗ trợ Reactive như Netty sẽ quản lý Event Loop thông qua một Event Loop Group, mỗi Event Loop sẽ chạy trên 1 thread trên từng CPU Core riêng biệt, và số lượng Event Loop sẽ không được nhiều hơn số CPU Cores hiện có của nền tảng phần cứng nó đang chạy.

Netty’s event loop model

Data Streams

Để quản lý được mỗi Event trong Event Loop thì mọi events sẽ được tạo thành từ những Data Stream, có nghĩa là trong Reactive mọi thứ đều được tạo thành từ những Stream. Nếu bạn đã thân thuộc với Stream API trong Java 8 với filter, map, reduce thì chính nó đấy. Everything is a stream” hãy nhớ nhé … Reactive chính là Stream và Stream chính là Reactive…

Reactive Mantra !!!

Mỗi Stream sẽ emit ra ba thứ là: giá trị trả về (return data), lỗi (error) hoặc một tín hiệu hoàn thành (completed signal) nếu trong trường hợp ta không quan tâm tới giá trị trả về. Và cũng giống như Stream API trong Java 8, reactive stream sẽ không làm gì (không hoạt động) cho tới khi ta subscribe (lắng nghe) chúng. Hãy luôn ghi nhớ rằng: Không có gì xảy ra cho đến khi subscribe.

Với mỗi loại emit, Stream sẽ định nghĩa từng function để xử lý, một function để hứng return data , một function khác xử lý error , và một function nữa để nhận completed signal. Việc lắng nghe (listen) Stream được gọi là Subscribe, các function được gọi là Observers (quan sát viên), và chủ đề (subject) quan sát (Observable) ở đây là một Stream. Mẫu thiết kế (Design pattern) này được gọi là Observer Design Pattern.

--a---b-c---d---X---|->  

a, b, c, d are emitted values
X is an error
| is the 'completed' signal
---> is the timeline

Đặc điểm của mỗi Stream đó là Immutability (bất biến), muốn xử lý hoặc thay đổi dữ liệu trong Steam ta luôn phải tạo một Stream mới từ Stream gốc bằng các function như filter, map, reduce .

origin Stream: ---c----c--c----c------c-->
vvvvv map(c becomes 1) vvvv
map Stream: ---1----1--1----1------1-->
vvvvvvvvv reduce(+) vvvvvvvvv
respone Stream: ---1----2--3----4------5-->

Ngoài ra với Reactive chúng ta cũng có thể gộp nhiều Stream thành một bằng các function như merge, concat hay zip

Merge hay Concat nói chung là khá giống nhau, đều gộp 2 hay nhiều Stream với cùng dữ liệu trả về về thành một Stream mới, nhưng khác nhau là Merge thì không đảm bảo thứ tự (sequence) của dữ liệu của các Steam nhưng Concat thì ngược lại dữ liệu sau khi gộp sẽ đảm bảo thứ tự theo dữ liệu của các Steam vừa gộp.

Merge Steam
Concat Stream

Merge hay Concat chỉ hoạt động khi giá trị trả về (data type) của các Stream được gộp là giống nhau. Còn Zip thì có thể gộp các Stream với những data type khác nhau và dữ liệu trả về sẽ là một danh sách (list) cặp (Pair) hoặc tập hợp (Tuple) các kiểu dữ liệu.

Project Reactor

Trong hệ sinh thái của JVM để đạt được reactive programming, một dự án (project) đã được ra đời, đó chính là Project Reactor và hạt nhân (core) của project chính là reactor-core. Nó cung cấp cho chúng ta những bộ thư viện để giúp lập trình viên dễ dàng thao tác và xử lý Data Stream trong Reactive.

Đặc điểm chính của Reactor đó là cung cấp hai lại kiểu dữ liệu (data type) của luồng dữ liệu (Publisher) đó là FluxMono

1. Flux: là một Stream có thể phát ra 0..n phần tử, có thể hình dung nó là một List dữ liệu. Ví dụ tạo đơn giản:

Flux<Integer> just = Flux.just(1,2,3,4);

Và cũng giống như khái niệm về Reactive, có 3 tín hiệu mà Flux emit ra để Subscribe có thể nhận được đó là onNext() để hứng return data, onComplete() để nhận tín hiệu Stream hoàn thành và onError() để nhận giá trị lỗi trả về.

Sơ đồ hoạt động của Flux

2. Mono: là một Stream có thể phát ra 0..1 phần tử. Nó hoạt động gần giống như Flux, chỉ là bị giới hạn không quá một phần tử hoặc không có phần tử nào (rỗng) . Ví dụ:

Mono<String> just = Mono.just("ABC"); // Mono với 1 phần tửMono<Void> just = Mono.empty(); // Mono với 0 phần tử (rỗng)

Cũng giống như Flux Mono cung cấp 3 function onNext(), onComplet() và onError() để Subscribe thao tác với dữ liệu được trả về.

Mono cũng có thể truyển đổi thành một Flux, ví dụ 2 hoặc nhiều Mono có thể gộp thành (combine) một Flux bằng cách sử dụng function concatWith(), ví dụ Mono#concatWith(Publisher) sẽ trả về một Flux. Hay sử dụng Mono#then(Mono) để trả về một Mono khác với mục đích kết thúc một Stream mà không quan tâm tới dữ liệu của Mono gốc. Điểm khác nhau giữa Mono#thenMono#map đó là then hoạt động dựa trên tính hiệu onComplete mặc dù Mono gốc có thể empty, trong khi map hay flatmap chỉ hoạt động dựa trên tín hiệu onNext , có nghĩa là chỉ hoạt động khi Mono gốc có dữ liệu trả về (not empty).

Sơ đồ hoạt động của Mono

3. Subscribe: Như đã nói bên trên rằng “không có gì xảy ra cho đến khi subscribe”, các Stream như Mono hay Flux sẽ không hành động gì cả cho tới khi nó được Observer hay Subcriber (lắng nghe). Do vậy trong Reactor có cung cấp một function subscribe() để thực hiện lắng nghe Stream.

Ví dụ để subcribe một Flux với basic method không có đối số (arguments)

Flux<Integer> ints = Flux.range(1, 3); //Tạo một Flux với 3 phần tử từ 1->3
ints.subscribe(); // Thực hiện lắng nghe trên Flux vừa tạo

Với ví dụ trên thì sẽ không có out-put nào tạo ra, để có thể bắt (catch) được các out-put thì ta sẽ truyền một đối số làConsumer vào subscribe() ví dụ:

Flux<Integer> ints = Flux.range(1, 3); 
ints.subscribe(i -> System.out.println(i)); // subcribe Flux và in ra dữ liệu trả về của nó

Output:

1
2
3

Error Event: Một lỗi có thể được sử lý ngay trong subcribe ((error handler) như ví dụ sau:

Flux<Integer> ints = Flux.range(1, 6) //(1)
.map(i -> { // (2)
if (i <= 3) {
return i;
}
throw new RuntimeException("Got to 4");
});
ints.subscribe(i -> System.out.println(i), //(3)
error -> System.err.println("Error: " + error)); //(4)

(1) Tạo một Stream Flux có 4 phần tử từ 1-> 6
(2) Map lại Stream hiện tại ra một Steam mới mà chỉ được phép có 3 phần tử từ 1->3 nếu lớn hơn sẽ throw ra một Exception
(3) Print ra dữ liệu output của Stream mới được tạo
(4) sử dụng consumer là error để kết thúc Stream và out-put ra lỗi nếu có

Output:

1
2
3
Error: java.lang.RuntimeException: Got to 4

Completed Event: Nếu có một error được throw ra thì Stream sẽ dừng lại (completed) ngay lập tức. Nếu không có lỗi xảy ra thì ta có thể tạo một event completed khi Stream kết thúc như ví dụ:

Flux<Integer> ints = Flux.range(1, 4); //(1)
ints.subscribe(i -> System.out.println(i), //(2)
error -> System.err.println("Error " + error), (3)
() -> System.out.println("Done")); (4)

(1) Tạo một Stream Flux có 4 phần tử từ 1-> 4
(2) Print ra dữ liệu output của Stream
(3) Sử dụng consumer là error để completed Stream và out-put ra lỗi nếu có
(4) Sử dụng consumer là () để completed Stream và out-put ra event complete

Output:

1
2
3
4
Done

Subscribe có thể yêu cầu một hành động nào đó, ví dụ như yêu cầu số lượng dữ liệu được emit ra trước khi Steam được complete bằng cách sử dụng sub , ví dụ

Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done"),
sub -> sub.request(10));

Stream trên sẽ bị treo (hangs) vĩnh viễn (không bao giờ completed) trừ khi Stream được cancel, bởi vì Subscribe đã yêu cầu chỉ completed cho tới khi nhận được đủ 10 phần tử.

Output: Event “Done” sẽ không bao giờ được output ra.

1
2
3
4

Cancel Event: function Subscribe trả về một kiểu dữ liệu là Disposable và Disposable Interface có cung cấp một method là dispose() để giúp một Stream có thể bị hủy bỏ (cancel) ví dụ:

Flux.just(1,2,3).subscribe().dispose();

Stream sẽ ngay lập tức bị cancel ngay sau khi nó được Subscribe

Ta có khá nhiều các project Reactive khác được base trên Project Reactor ví dụ như Spring Webflux, một phiên bản Spring Web hỗ trợ non-blocking reactive streams, và nó sử dụng Server Netty để run hệ thống reactive.
Hay để đạt được reactive ở đầu Database thì đa phần các NoSQL đều đã hỗ trợ reactive driver (ví dụ Reactive MongoDB), hay project R2DBC cung cấp reactive driver cho các RDBMS DB.

Trên đây mới chỉ là những khái hiệm cơ bản về Reactive Programing và Project Reactor, còn nhiều điều thú vị khác chờ bạn khám phá và chia sẻ.

Tài liệu tham khảo thêm:

--

--