Disruptor Demo

套用Producer/Consumer模式,來測試多執行緒的效率。
全篇用同樣的Producer/Consumer,僅更換存放的Store。

App / Producer / Consumer

Producer

public class Producer implements Runnable {

    private String name;
    private IStore store;

    public static AtomicInteger count = new AtomicInteger();

    public Producer(String name, IStore store) {
        this.name = name;
        this.store = store;
    }


    @Override
    public void run() {
        while (true) {
            this.store.produce(this.name , String.valueOf(count.getAndIncrement()));
        }
    }
}

Consumer

public class Consumer implements Runnable {
    private String name;
    private IStore store;

    public Consumer(String name, IStore store) {
        this.name = name;
        this.store = store;
    }

    @Override
    public void run() {
        while (true) {
            store.consume(this.name);
        }
    }
}

App

同時啟用兩個Producer 與 Consumer來餵寫。

public static void main( String[] args ) {
        int maxSize = 32;
        ExecutorService executorService = Executors.newFixedThreadPool(4);
//        IStore store = new SynchronizedStore(maxSize);
//        IStore store = new ArrayBlockingQueueStore(maxSize);
        IStore store = new ReentrantLockStore(maxSize);
//        IStore store = new DisruptorStore();
        Producer producer1 = new Producer("prod1", store);
        Producer producer2 = new Producer("prod2", store);

        Consumer consumer1 = new Consumer("cons1", store);
        Consumer consumer2 = new Consumer("cons2", store);


        executorService.submit(producer1);
        executorService.submit(producer2);
        executorService.submit(consumer1);
        executorService.submit(consumer2);
}

Thread Safe Pattern

1. Synchronized

最基本的寫法,實際上,效率也幾乎是最差的。

    private LinkedList<String> list;

    public synchronized void produce(String producer, String message) {
        while (list.size() >= limit) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        notifyAll();

    }

    public synchronized String consume(String consumer) {
        while (list.isEmpty()) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        String result = list.removeFirst();
        notifyAll();

        return result;
    }

2. ReentrantLock

Lock來指出Synchronized的部份,兩個condition讓進出更有效率。

    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    private LinkedList<String> list;

    public void produce(String producer, String message) {

        lock.lock();
        try {
            while (list.size() >= limit) {
                notFull.await();
            }
            list.addLast(this.createAddMessage(producer, message));
            notEmpty.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public String consume(String consumer) {
        lock.lock();
        String result = null;
        try {
            while (this.list.isEmpty()) {
                notEmpty.await();
            }
            result = list.removeFirst();
            notFull.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

        return result;
    }

3. ArrayBlockingQueue

原理同與ReentrantLock,但不用自己手動lock。其實效率不差,程式乾淨,頗推薦。

    private ArrayBlockingQueue<String> list;

    public  void produce(String producer, String message) {

        try {
            list.put(this.createAddMessage(producer, message));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public String consume(String consumer) {

        String result = null;
        try {
            result = list.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return result;
    }

4. Disruptor

使用Disruptor,直接對RingBuffer處理。

    private Disruptor<StringEvent> disruptor;
    private RingBuffer<StringEvent> ringBuffer;

    private StringEventTranslator translator = new StringEventTranslator();
    public DisruptorStore() {
        StringEventFactory eventFactory = new StringEventFactory();

        this.disruptor = new Disruptor<>(eventFactory,32, DaemonThreadFactory.INSTANCE);
        this.disruptor.start();

        this.ringBuffer = this.disruptor.getRingBuffer();
    }


    public void produce(String producer, String message) {

        String msg = this.createAddMessage(producer, message);
        ringBuffer.publishEvent(this.translator, msg);
    }

    public String consume(String consumer) {

        long seq  = ringBuffer.next();
        StringEvent stringEvent = ringBuffer.get(seq);
        String result = stringEvent.getValue();

        return result;
    }

測試結果

TypeResult
SynchronizedStoreUsed time [5002]ms produce [9356448] numbers, Avg is [1870.5414] per ms.
ArrayBlockingQueueStoreUsed time [5009]ms produce [10889300] numbers, Avg is [2173.9468] per ms.
ReentrantLockStoreUsed time [5004]ms produce [11138058] numbers, Avg is [2225.831] per ms.
DisruptorStoreUsed time [5007]ms produce [21551348] numbers, Avg is [4304.2437] per ms.

發表迴響

這個網站採用 Akismet 服務減少垃圾留言。進一步了解 Akismet 如何處理網站訪客的留言資料