背景
工作中,我负责的系统是一个数据流处理服务 - 以流水线(pipeline)的形式分多级异步处理:
其中的 队列 实际使用的是 Disruptor,多生产者单消费者模式:
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(name).setDaemon(true).build();
Disruptor<Event<T>> disruptor = new Disruptor<>(Event<T>::new, bufferSize, factory, ProducerType.MULTI, new SleepingWaitStrategy());
disruptor.handleEventsWith((Event<T> event, long sequence, boolean endOfBatch) -> {
consumer.accept(event.value, endOfBatch);
event …