Tiếp nối bài trước về Mechanical Sympathy và giới thiệu qua về LMAX Disruptor, ở bài viết này người viết sẽ phân tích sâu hơn và hướng dẫn cách sử dụng về bộ thư viện Disruptor.
LMAX-Disruptor là một bộ thư viện giúp cho việc phát triển các ứng dụng với độ tải lớn (high-performance) cho phép xử lý đồng thời (concurrency) một số lượng rất lớn message mà không cần Lock (lock-free). Nếu bạn làm việc với Java thì thực tế đây là bộ thư viện về Concurrency tốt nhất và nhanh nhất hiện nay.
Quay lại ví dụ ở bài trước khi ta khởi tạo một Disruptor đơn giản với 1 Producer và 1 Comsumer.
Ở bài trước tui đã giải thích về ý nghĩa các tham số của Disruptor constructor, nhưng chưa bàn tới các "Waiting strategy" được đại diện bởi interface WaitStrategy. Disruptor có 4 chiến thuật chờ đợi chính là BlockingWaitStrategy
, SleepingWaitStrategy
, YieldingWaitStrategy
và BusySpinWaitStrategy
chúng ta sẽ tìm hiểu bên dưới đây.
Wait Strategies
BlockingWaitStrategy:
Mặc định Disruptor sẽ sử dụng Wait Strategy là BlockingWaitStrategy
, trong nội bộ (code) của BlockingWaitStrategy
nó sử dụng ReentrantLock
để đồng bộ hóa (synchronized) và dùng Condition
để wait và notify các Producer và Consumer để giải quyết các vấn đề về “Producer–consumer problem”.
Đây là cách sử lý chậm nhất trong các wait strategies (vì sử dụng cơ chế Looking), nhưng đây là cách sử dụng an toàn và thân thiện nhất với tài nguyên của CPU, có nghĩa là Disruptor sẽ không gây áp lực lên CPU nhưng đánh đổi lại là tốc độ sử lý của Disruptor. Và đây là cách người viết khuyên dùng khi hệ thống của bạn lượng tài nguyên CPU hạn chế hoặc CPU hay bị tình trạng quá tải (high load).
SleepingWaitStrategy:
Giống như BlockingWaitStrategy
đây cũng là một cách sử dụng khá thân thiện với CPU, bằng cách sử dụng kỹ thuật "busy wait loop" bằng cách sử dụng LockSupport.parkNanos(1)
để dừng (pause) thread hiện tại của Producer hoặc Consumer để wait hoặc notify khi có điều kiện thỏa mãn (Ring buffer đầy hoặc rỗng hoặc hết đầy hoặc có dữ liệu tùy từng trường hợp), giá trị mặc định mỗi lần pause thread (DEFAULT_SLEEP) là 100 nanoseconds và sẽ lặp lại 200 lần cho tới khi có điều kiện thoả mãn trên xuất hiện.
Tuy không sử dụng cơ chế Looking (look-free) nhưng các thread liên tục phải waiting do đó độ trễ sẽ cao (high latency) cho nên tốc độ thực thi của Disruptor sẽ không cao, nhưng bù lại các thread/process sẽ không bị các trường hợp về context switching, do đó số lượng thread/process tạo ra sẽ không nhiều và CPU sẽ không bị stress.
Với trường hợp ứng dụng cần độ trễ thấp (low latency) thì đây là cách không nên sử dụng, nhưng lại rất phù hợp với các chức năng không quan tâm tới độ trễ, ví dụ như tính năng ghi log hệ thống chẳng hạn (asynchronous logging).
YieldingWaitStrategy:
YieldingWaitStrategy
cũng là một kiểu Wait Strategy giống như SleepingWaitStrategy
nhưng có độ trễ thấp hơn (low latency) bằng cách sử dụng chiến thuật "busy spin waiting". Bên trong YieldingWaitStrategy
sẽ sử dụng method Thread.yield()
để khi Ring Buffer đang rảnh rỗi thì sẽ nhường và ưu tiên các Thread khác được phép chạy trước.
Đây là chiến thuật được khuyến khích khi ứng dụng của bạn cần hiệu suất cao (high performance) và số lượng luồng (thread) của Consumer (Event Handler) nhỏ hơn số lượng logical cores của CPU. Có nghĩa là CPU của bạn có hỗ trợ công nghệ và bật chế độ Hyper Threading, và số lõi logical nhiều hơn số lượng của Multi-cast Consumer.
BusySpinWaitStrategy:
Tốc độ thực thi của BusySpinWaitStrategy
là nhanh nhất trong các Wait Strategy nhưng nó lại gây áp lực (stress) lên tài nguyên hệ thống ở đây là CPU nhiều nhất, nó chỉ nên được sử dụng trong trường hợp số lượng luồng của Consumer (Event Handler) nhỏ hơn số lượng lõi vật lý (physical cores) của CPU. Ví dụ CPU server của bạn có 8 lõi thì chỉ nên sử dụng dưới 7 luồng của Consumer (Event Handler) của Disruptor đang chạy.
Core Concepts
Về cơ bản ta đã sử dụng được Disruptor để gửi nhận Producer và Consumer đơn và hiểu được các Wait Strategy trong Disruptor, nhưng để sử dụng Multicast Events hoặc Consumer Dependency Graph, có nghĩa là ta có thể sử dụng nhiều Producer lẫn Consumer kết hợp với nhau, để làm được điều này ta phải hiểu và biết cách sử dụng Sequence
, Sequencer
, SequenceBarrier
và EventProcessor
.
Sequence:
Lớp này được thiết kế để thao tác với các sequence (số thứ tự) của Ring-Buffer nhằm việc đảm bảo việc hoạt động tốt với môi trường Concurrency. Sequence không sử dụng cơ chế Locking để giải quyết vấn đề về Mutual Exclusion như race-condition mà sử dụng cơ chế CAS (Lock-free) để giải quyết bài toán đó, chính vì thế sequence luôn được get()
and set()
vừa chính xác lại rất nhanh.
Sequencer:
Từ phiên bản 3.0 trở đi thì đây chính là cốt lõi (core) của Disruptor, nó được thiết kế để quản lý việc gửi và nhận dữ liệu đến Ring-Buffer giữa Producer và Consumer nhanh nhất và chính xác nhất . Sequencer là một interface và ta có hai class implement nó là SingleProducerSequencer
(cho case Disruptor sử dụng một producer) và MultiProducerSequencer
(cho case Disruptor sử dụng nhiều hơn một producer).
EventProcessor:
Interface này được sử dụng để quản lý nhiều Consumer khi ta sử dụng Multicast Events, nó giúp cho việc quản lý thứ tự sử lý của từng Consumer trong mô hình Consumer Dependency Graph. Nó chỉ có duy nhất một Implementation là lớp BatchEventProcessor
giúp việc lấy event từ Ring-Buffer và điều phối hợp lý event cho từng Consumer theo chiến thuật người dùng đề ra. Thực tế thì EventProcessor là một Runnable và nó có thể được executed bởi một Thread.
Sequence Barrier:
Là một rào chắn dữ liệu (memory barrier) được tạo ra từ Sequencer để đảm bảo thứ tự nhận event giữa Ring-Buffer tới các Consumer chính xác nhất. Nó luôn luôn nằm giữa và điều phối dữ liệu giữa Ring-Buffer và Consumer.
Để dễ hình dung ta có thể tham khảo mô hình hoạt động của Disruptor bên dưới.
Ở bên dưới ta có một mô hình với 2 Producer gửi dữ liệu tới Ring-Buffer, và ta có các Consumers là JournalConsumer
, ReplicationConsumer
và ApplicationConsumer
. Ví dụ này rất giống với các logic trong thực tế, đầu tiên khi một events tới đầu tiên ta cần ghi log thông tin event trước (JournalConsumer
) sau đó backup nó lại ở đâu đó (ReplicationConsumer
) và cuối cùng là thực hiện business logic của chương trình (ApplicationConsumer
). Tiến trình (task) ghi log và backup được thực hiện đồng thời và song song (parallel) với nhau, nhưng business logic chỉ được thực hiện sau khi hai task trên được thực hiện xong.
Quan hệ giữa 3 consumers bên trên ta gọi nó là Consumer Dependency Graph, chúng thực hiện được nhờ vai trò của SequenceBarrier
được nhắc ở bên trên.
- JournalConsumer (C1) tiến trình thực hiện phụ thuộc vào sự điều khiển RingBuffer’s sequence.
- ReplicationConsumer (C2) tiến trình thực hiện phụ thuộc vào sự điều khiển RingBuffer’s sequence.
- ApplicationConsumer (C3) tiến trình thực hiện không chỉ phụ thuộc vào sự điều khiển RingBuffer’s sequence mà còn phụ thuộc vào JournalConsumer’s sequence vào ReplicationConsumer’s sequence.
Lý thuyết vậy là đủ, giờ ta hãy bắt tay vào thực hiện implement mô hình Consumer Dependency Graph bên trên với chỉ một Producer nhé.
Mô hình tổ chức code được minh họa như bên dưới:
+-----+
+----->| EP1 |------+
| +-----+ |
| v
+----+ +-----+
| P1 | | EP3 |
+----+ +-----+
| ^
| +-----+ |
+----->| EP2 |------+
+-----+Disruptor:
track to prevent wrap
+-------------------------------+
| |
| v
+----+ +====+ +=====+ +-----+
| P1 |--->| RB |<--------------| SB2 |<---| AC |
+----+ +====+ +=====+ +-----+
claim ^ get | waitFor
| |
+=====+ +-----+ |
| SB1 |<---| JC |<-----+
+=====+ +-----+ |
^ |
| +-----+ |
+-------| RC |<-----+
waitFor +-----+ P1 - Producer 1
RB - RingBuffer
SB1 - SequenceBarrier 1
JC - JournalConsumer
RC - ReplicationConsumer
SB2 - Sequence Barrier Conclusion
AC - ApplicationConsumer
Đầu tiên ta phải tạo một Event Object, đây chính là thông tin dữ liệu được lưu chuyển tới các Consumer để sử lý business logic. Event Object dưới chỉ chứa một giá trị có kiểu Long.
Tiếp đó là 3 Consumer là JournalConsumer
, ReplicationConsumer
và ApplicationConsumer
kế thừa với EventHandler
và cài đặt hàm onEvent()
để lấy hứng Event Object về và xử lý logic. Logic xử lý của Consumer chỉ đơn giản là in ra giá trị của Event Object và sequence của nó từ Ring-Buffer gửi về.
Journal Consumer:
Replication Consumer:
Application Consumer:
Bởi vì ApplicationConsumer
là Consumer được sử lý sau cùng trong tiến trình cho nên ta thêm một biến CountDownLatch
để tracking việc ApplicationConsumer
thực sự đã hoàn thành xong chưa.
Để tiện cho việc chạy test ta sẽ tạo một Junit Test như sau, và chúng ta sẽ đi sâu vào phân tích nó tiếp sau:
Giờ hãy bắt đầu phân tích đoạn code demo bên trên. Hãy nhìn vào cách chúng ta khởi tạo RingBuffer, ta sẽ sử dụng static method được cung cấp sẵn trong class RingBuffer là createSingleProducer()
(Disruptor chỉ có một Producer) với các đối số truyền vào là Event Factory Object (ValueEvent), buffer size (kích thước của ring buffer) và wait strategy (ở đây ta dùng YieldingWaitStrategy
), nếu ta không truyền wait strategy thì RingBuffer sẽ mặc định sử dụng BlockingWaitStrategy
. Ngoài ra RingBuffer cũng cung cấp cho ta một static method khác là createMultiProducer()
để sử dụng với nhiều Producer.
Bản thân Ring-Buffer đã có một Sequence Barrier (SB1 như trên hình minh họa trên) bằng cách gọi tới hàm newBarrier()
, "rào chắn" này có tác dụng đảm bảo thứ tự gửi nhận tới chung cho tất các Consumers với RingBuffer, chính vì thế hàm newBarrier() không cần có tham số. Và các EventProcessor
của JournalConsumer
và ReplicationConsumer
phải đăng ký tới barrier rằng "chúng tôi muốn nhận event từ Ring-Buffer", bằng cách khởi tạo các EventProcessor
và truyền tham số như ringBuffer
, sequenceBarrier
và các Consumers.
SequenceBarrier ngoài việc đảm bảo thứ tự cũng như quyết định gửi tới những Consumers nào, thì nó còn có tác dụng khi Consumer muốn lấy dữ liệu tiếp theo nhưng event đó chưa có (Producer chưa kịp gửi) thì nó sẽ sử dụng các wait strategy để chờ đợi các event tiếp theo tới. Nó đảm bảo được rằng việc lấy dữ liệu từ Ring-Buffer sẽ đảm bảo được sự chính xác và tốc độ mà không cần phải "đồng bộ" hóa tiến trình này (synchronization).
OK vậy tiếp theo ta phải làm thế nào để ApplicationConsumer
sẽ thực hiện chỉ khi hai Consumers trên thực hiện xong. Tương tự ta cũng tạo ra một SequenceBarrier
(SB2 như trên hình minh họa) được đặt giữa Ring-Buffer, JournalConsumer
và ReplicationConsumer
. Bằng cách gọi tới hàm newBarrier()
truyền vào hai tham số là sequence của hai EventProcessor
mới khởi tạo bên trên, việc này đảm bảo ApplicationConsumer
chỉ thực hiện việc lấy event từ Ring-Buffer sau khi hai consumers Journal và Replication
Hãy để ý tới static code ringBuffer.addGatingSequences()
, hàm này có ý nghĩa rằng Ring-Buffer sẽ đảm bảo event-sequence của ApplicationConsumer
nhỏ hơn hoặc bằng với event-sequence của hai consumers còn lại, điều này sẽ chắc chắn rằng ApplicationConsumer
chỉ được lấy event sau khi hai consumers kia hoàn thành.
Về bản chất các EventProcessor
là một Runnable
có thể chạy với các Thread để cho các consumers bắt đầu "lắng nghe" và lấy event từ Ring-Buffer ta sẽ phải tạo một ExecutorService
để khởi tạo các Thread Pool.
Và start các thread pool để wake up các consumers
Và cuối cùng để gửi dữ liệu tới Ring-Buffer ta tạo một vòng lặp sau đó lấy ra sequence tiếp theo bằng cách sử dụng ringBuffer.next()
, sau đó get và set event data bằng cách sử dụng các hàm ringBuffer.get()
, và gửi event đi bằng hàm ringBuffer.publish()
Và cuối cùng ta có thể sử dụng hàm halt()
của EventProcessor để stop các Consumer khi đảm bảo rằng các consumers đã hoàn tác các tác vụ.
Ta có thể thấy việc khởi tạo các Consumer hoàn toàn khác với các truyền thống là khởi tạo thông qua Disruptor, mà ta sử dụng các BatchEventProcessor
, nhờ đó mà các Consumers có thể phụ thuộc (depend) vào nhau, do đó nghiệp vụ của ứng dụng khi sử dụng Disruptor có thể đa dạng và uyển chuyển hơn rất nhiều.
HẾT!
Link tham khảo: