Java并发框架Disruptor教程(三)、单一生产者

时间:2019-03-06作者:klpeng分类:IT综合浏览:6712评论:0

介绍

在disruptor中单一生产者将数据推送个消费者的过程,可一有多种情况,(P:生产者,C:消费者)

  • 1P => 1C
  • 1P => NC
    在这里演示还是使用订单生成的例子:
    LongEvent:
public class TradeEvent {
    private Integer userId;
    private Long id;
    private String name;
    private Integer status;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return "TradeEvent{" +
                "userId=" + userId +
                ", id=" + id +
                ", name='" + name + '\'' +
                ", status=" + status +
                '}';
    }
}

单一消费

Java并发框架Disruptor教程(三)、单一生产者
需求:生成一个订单,打印订单详情
这里生产者产生一个订单,随后打印订单详情
代码实现

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;

public class LongEventMain {
    public static void handleEvent(TradeEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println(event);
    }

    public static void translate(TradeEvent event, long sequence, ByteBuffer buffer)
    {
        event.setUserId(buffer.getInt(0));
    }

    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<TradeEvent> disruptor = new Disruptor<>(TradeEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(LongEventMain::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<TradeEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(4);
        for (int l = 0; true; l++)
        {
            bb.putInt(0, l);
            ringBuffer.publishEvent(LongEventMain::translate, bb);
            Thread.sleep(1000);
        }
    }
}

多个消费者

  • 串行
    消费者之间必须等待上一个执行完成才能继续
    Java并发框架Disruptor教程(三)、单一生产者
    需求:C1、添加订单号;C2、记录订单(入库);C3、打印订单详情
    代码实现:
public class LongEventMain1 {
    public static void handleEvent1(TradeEvent event, long sequence, boolean endOfBatch) {
        event.setId(UUID.randomUUID().toString());
    }

    public static void handleEvent2(TradeEvent event, long sequence, boolean endOfBatch) {
        System.out.println(String.format("订单号:%s,入库成功!!", event.getId()));
    }

    public static void handleEvent(TradeEvent event, long sequence, boolean endOfBatch) {
        System.out.println(event);
    }

    public static void translate(TradeEvent event, long sequence, ByteBuffer buffer) {
        event.setUserId(buffer.getInt(0));
    }

    public static void main(String[] args) throws Exception {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<TradeEvent> disruptor = new Disruptor<>(TradeEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(LongEventMain1::handleEvent1)
                .handleEventsWith(LongEventMain1::handleEvent2)
                .handleEventsWith(LongEventMain1::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<TradeEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (int l = 0; true; l++) {
            bb.putInt(0, l);
            ringBuffer.publishEvent(LongEventMain1::translate, bb);
            Thread.sleep(1000);
        }
    }
}
  • 并行
    Java并发框架Disruptor教程(三)、单一生产者
    需求:
    C1、添加订单状态
    C2、添加订单名称
    C3、打印订单详情
    代码实现:
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;
import java.util.Random;
import java.util.UUID;

public class LongEventMain2 {
    private static Random random = new Random();

    public static void handleEvent1(TradeEvent event, long sequence, boolean endOfBatch) {
        event.setId(UUID.randomUUID().toString());
        try {
            int i = random.nextInt(10);
            Thread.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("====ID====");
    }

    public static void handleEvent2(TradeEvent event, long sequence, boolean endOfBatch) {
        event.setName("order:"+event.getUserId());
        try {
            int i = random.nextInt(10);
            Thread.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("====Name====");
    }

    public static void handleEvent(TradeEvent event, long sequence, boolean endOfBatch) {
        System.out.println(event);
    }

    public static void translate(TradeEvent event, long sequence, ByteBuffer buffer) {
        event.setUserId(buffer.getInt(0));
    }

    public static void main(String[] args) throws Exception {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<TradeEvent> disruptor = new Disruptor<>(TradeEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(LongEventMain2::handleEvent1,LongEventMain2::handleEvent2)
                .then(LongEventMain2::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<TradeEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (int l = 0; true; l++) {
            bb.putInt(0, l);
            ringBuffer.publishEvent(LongEventMain2::translate, bb);
            Thread.sleep(1000);
        }
    }
}
  • 并行加串行
    Java并发框架Disruptor教程(三)、单一生产者
    需求:
    C1、生成订单号
    C2、生成订单名称
    C3、记录订单号的状态
    C4、打印订单详情
    代码实现:
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;
import java.util.Random;
import java.util.UUID;

public class LongEventMain3 {
    private static Random random = new Random();

    public static void handleEvent1(TradeEvent event, long sequence, boolean endOfBatch) {
        event.setId(UUID.randomUUID().toString());
        try {
            int i = random.nextInt(10);
            Thread.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("====ID====");
    }
    public static void handleEvent3(TradeEvent event, long sequence, boolean endOfBatch) {
        event.setStatus(event.getId().hashCode()%2);
        System.out.println("====Status====");
    }

    public static void handleEvent2(TradeEvent event, long sequence, boolean endOfBatch) {
        event.setName("order:"+event.getUserId());
        try {
            int i = random.nextInt(10);
            Thread.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("====Name====");
    }

    public static void handleEvent(TradeEvent event, long sequence, boolean endOfBatch) {
        System.out.println(event);
    }

    public static void translate(TradeEvent event, long sequence, ByteBuffer buffer) {
        event.setUserId(buffer.getInt(0));
    }

    public static void main(String[] args) throws Exception {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<TradeEvent> disruptor = new Disruptor<>(TradeEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        EventHandler<TradeEvent> h1 = LongEventMain3::handleEvent1;
        EventHandler<TradeEvent> h2 = LongEventMain3::handleEvent2;
        EventHandler<TradeEvent> h3 = LongEventMain3::handleEvent3;
        EventHandler<TradeEvent> h = LongEventMain3::handleEvent;
        disruptor.handleEventsWith(h1,h2);
        disruptor.after(h1).handleEventsWith(h3);
        disruptor.after(h2,h3).handleEventsWith(h);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<TradeEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (int l = 0; true; l++) {
            bb.putInt(0, l);
            ringBuffer.publishEvent(LongEventMain3::translate, bb);
            Thread.sleep(1000);
        }
    }
}
打赏
文章版权声明:除非注明,否则均为彭超的博客原创文章,转载或复制请以超链接形式并注明出处。
相关推荐

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

猜你喜欢