套用Producer/Consumer模式,來測試多執行緒的效率。
全篇用同樣的Producer/Consumer,僅更換存放的Store。
App / Producer / Consumer
Producer
public class Producer implements Runnable {
private String name;
private IStore store;
public static AtomicInteger count = new AtomicInteger();
public Producer(String name, IStore store) {
this.name = name;
this.store = store;
}
@Override
public void run() {
while (true) {
this.store.produce(this.name , String.valueOf(count.getAndIncrement()));
}
}
}
Consumer
public class Consumer implements Runnable {
private String name;
private IStore store;
public Consumer(String name, IStore store) {
this.name = name;
this.store = store;
}
@Override
public void run() {
while (true) {
store.consume(this.name);
}
}
}
App
同時啟用兩個Producer 與 Consumer來餵寫。
public static void main( String[] args ) {
int maxSize = 32;
ExecutorService executorService = Executors.newFixedThreadPool(4);
// IStore store = new SynchronizedStore(maxSize);
// IStore store = new ArrayBlockingQueueStore(maxSize);
IStore store = new ReentrantLockStore(maxSize);
// IStore store = new DisruptorStore();
Producer producer1 = new Producer("prod1", store);
Producer producer2 = new Producer("prod2", store);
Consumer consumer1 = new Consumer("cons1", store);
Consumer consumer2 = new Consumer("cons2", store);
executorService.submit(producer1);
executorService.submit(producer2);
executorService.submit(consumer1);
executorService.submit(consumer2);
}
Thread Safe Pattern
1. Synchronized
最基本的寫法,實際上,效率也幾乎是最差的。
private LinkedList<String> list;
public synchronized void produce(String producer, String message) {
while (list.size() >= limit) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notifyAll();
}
public synchronized String consume(String consumer) {
while (list.isEmpty()) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String result = list.removeFirst();
notifyAll();
return result;
}
2. ReentrantLock
用Lock
來指出Synchronized的部份,兩個condition讓進出更有效率。
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
private LinkedList<String> list;
public void produce(String producer, String message) {
lock.lock();
try {
while (list.size() >= limit) {
notFull.await();
}
list.addLast(this.createAddMessage(producer, message));
notEmpty.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public String consume(String consumer) {
lock.lock();
String result = null;
try {
while (this.list.isEmpty()) {
notEmpty.await();
}
result = list.removeFirst();
notFull.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return result;
}
3. ArrayBlockingQueue
原理同與ReentrantLock
,但不用自己手動lock。其實效率不差,程式乾淨,頗推薦。
private ArrayBlockingQueue<String> list;
public void produce(String producer, String message) {
try {
list.put(this.createAddMessage(producer, message));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String consume(String consumer) {
String result = null;
try {
result = list.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
4. Disruptor
使用Disruptor,直接對RingBuffer處理。
private Disruptor<StringEvent> disruptor;
private RingBuffer<StringEvent> ringBuffer;
private StringEventTranslator translator = new StringEventTranslator();
public DisruptorStore() {
StringEventFactory eventFactory = new StringEventFactory();
this.disruptor = new Disruptor<>(eventFactory,32, DaemonThreadFactory.INSTANCE);
this.disruptor.start();
this.ringBuffer = this.disruptor.getRingBuffer();
}
public void produce(String producer, String message) {
String msg = this.createAddMessage(producer, message);
ringBuffer.publishEvent(this.translator, msg);
}
public String consume(String consumer) {
long seq = ringBuffer.next();
StringEvent stringEvent = ringBuffer.get(seq);
String result = stringEvent.getValue();
return result;
}
測試結果
Type | Result |
---|---|
SynchronizedStore | Used time [5002]ms produce [9356448] numbers, Avg is [1870.5414] per ms. |
ArrayBlockingQueueStore | Used time [5009]ms produce [10889300] numbers, Avg is [2173.9468] per ms. |
ReentrantLockStore | Used time [5004]ms produce [11138058] numbers, Avg is [2225.831] per ms. |
DisruptorStore | Used time [5007]ms produce [21551348] numbers, Avg is [4304.2437] per ms. |