Java并发框架Disruptor教程(二)、快速入门
添加Maven依赖
将以下内容添加到pom.xml中
<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
入门示例
为了开始使用Disruptor,我们将考虑一个非常简单和人为的例子,一个将生产者传递给消费者的单个长值,消费者只需打印出该值。
-
1、创建事件
首先,我们将定义将携带数据的事件。
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
-
2、创建EventFactory
为了让Disruptor为我们预先分配这些事件,我们需要一个将执行构造的EventFactory,
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
-
3、创建事件的消费者
一旦我们定义了事件,我们需要创建一个处理这些事件的消费者。在我们的例子中,我们要做的就是从控制台中打印出值。
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
我们需要这些事件的来源,为了举个例子,我假设数据来自某种I / O设备,例如网络或ByteBuffer形式的文件。
-
4、创建事件生产者
这里将介绍两种生产者的创建方式
1). 使用Translator创建
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
public class LongEventProducerWithTranslator
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.set(bb.getLong(0));
}
};
public void onData(ByteBuffer bb)
{
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
2). 使用传统API创建
其中只要有
import com.lmax.disruptor.RingBuffer;
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
显而易见的是,事件发布变得比使用简单队列更复杂。这是由于对事件预分配的需求。它需要(在最低级别)消息发布的两阶段方法,即声明环形缓冲区中的插槽然后发布可用数据。还必须将发布包装在try / finally块中。如果我们在Ring Buffer中声明一个插槽(调用RingBuffer.next()),那么我们必须发布这个序列。如果不这样做可能会导致干扰者状态的腐败。具体而言,在多生产者的情况下,这将导致消费者停滞并且在没有重启的情况下无法恢复。因此,建议使用EventTranslator API。
- 5、启动任务
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
Thread.sleep(1000);
}
}
}
文章版权声明:除非注明,否则均为彭超的博客原创文章,转载或复制请以超链接形式并注明出处。
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。