1、是否能动态注册消费器?
不能
private static void disruptor(ExecutorService executorService) {
Disruptor<FilePack> disruptor = new Disruptor<>(
new EventFactory<FilePack>() {
@Override
public FilePack newInstance() {
return new FilePack();
}
},
1024,
executorService,
ProducerType.SINGLE,
new SleepingWaitStrategy()
);
RingBuffer<FilePack> ringBuffer = disruptor.getRingBuffer();
disruptor.start();
// 启动后开始注册
disruptor.handleEventsWith(new EventHandler<FilePack>() {
@Override
public void onEvent(FilePack event, long sequence, boolean endOfBatch) throws Exception {
System.out.println(event.getPath());
}
});
for (int i = 0; i <10; i++) {
long sequence = ringBuffer.next();
FilePack filePack = ringBuffer.get(sequence);
filePack.setPath(String.valueOf(i));
filePack.setData(new byte[0]);
ringBuffer.publish(sequence);
}
disruptor.shutdown();
}
报错
java.lang.IllegalStateException: All event handlers must be added before calling starts.
2、是否可重用线程?
可以
@Test
public void testReuse() {
ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("", true));
// 内部 shutdown disruptor
disruptor(executorService);
disruptor(executorService);
System.out.println();
}
发现第二次依旧可以正常消费,所以,如果传入线程池,会重用线程的。