• notice
  • Congratulations on the launch of the Sought Tech site

Disruptor - High-performance threaded messaging framework

foreword

Disruptor is an open source high-performance concurrency framework for transferring messages between threads by LMAX Corporation in the United Kingdom . It is very similar to BlockingQueue in jdk, but its performance is unmatched by BlockingQueue. The following is an official test report, which can be seen intuitively. The performance difference between the two:


Disruptor  project address: https://github.com/LMAX-Exchange/disruptor

core concept?

Such a performance-breaking framework must be played. Before trying it out, we first understand the main concepts of disruptor, and then combine the landlord's weblog project (the BlockingQueue used before ) to practice.

RingBuffer: Ring buffer, the carrier of message event information. RingBuffer used to be the main object in the Disruptor, but since version 3.0, its responsibilities have been simplified to just store and update the data (events) exchanged through the Disruptor. In some more advanced application scenarios, Ring Buffer can be completely replaced by user-defined implementation.

Event: Defines the data types exchanged between producers and consumers.

EventFactory: a factory class interface for creating events, implemented by users, and providing specific events

EventHandler: The event handling interface, implemented by the user, is used to handle events.

So far, we only need to understand the above core content. For more details, you can move to the wiki document: https://github.com/LMAX-Exchange/disruptor

Core Architecture Diagram:

Practice Disruptor?

Transform the boot-websocket-log project, which is an example of a typical producer-consumer pattern. Then replace BlockingQueue with Disruptor to complete the function, and you can compare it if you are interested.

The first step is to define the event type

/**
 * Created by kl on 2018/8/24.
 * Content : Process log event content carrier
 */
public class LoggerEvent {

    private LoggerMessage log;

    public LoggerMessage getLog() {
        return log;
    }

    public void setLog(LoggerMessage log) {
        this.log = log;
    }
}

The second step is to define the event factory

/**
 * Created by kl on 2018/8/24.
 * Content : Process log event factory class
 */
public class LoggerEventFactory implements EventFactory{
    @Override
    public LoggerEvent newInstance() {
        return new LoggerEvent();
    }
}

The third step is to define the data processor

/**
 * Created by kl on 2018/8/24.
 * Content : Process log event handler
 */
@Component
public class LoggerEventHandler implements EventHandler{

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Override
    public void onEvent(LoggerEvent stringEvent, long l, boolean b) {
        messagingTemplate.convertAndSend("/topic/pullLogger",stringEvent.getLog());
    }
}

The fourth step, create the Disruptor practical class, define the event publishing method, and publish the event

/**
* Created by kl on 2018/8/24.
* Content : Disruptor circular queue
*/
@Component
public class LoggerDisruptorQueue {

   private Executor executor = Executors.newCachedThreadPool();

   // The factory for the event
   private LoggerEventFactory factory = new LoggerEventFactory();

   private FileLoggerEventFactory fileLoggerEventFactory = new FileLoggerEventFactory();

   // Specify the size of the ring buffer, must be power of 2.
   private int bufferSize = 2 * 1024;

   // Construct the Disruptor
   private Disruptordisruptor = new Disruptor<>(factory, bufferSize, executor);;

   private DisruptorfileLoggerEventDisruptor = new Disruptor<>(fileLoggerEventFactory, bufferSize, executor);;

   private static RingBufferringBuffer;

   private static RingBufferfileLoggerEventRingBuffer;

   @Autowired
   LoggerDisruptorQueue(LoggerEventHandler eventHandler,FileLoggerEventHandler fileLoggerEventHandler) {
       disruptor.handleEventsWith(eventHandler);
       fileLoggerEventDisruptor.handleEventsWith(fileLoggerEventHandler);
       this.ringBuffer = disruptor.getRingBuffer();
       this.fileLoggerEventRingBuffer = fileLoggerEventDisruptor.getRingBuffer();
       disruptor.start();
       fileLoggerEventDisruptor.start();
   }

   public static void publishEvent(LoggerMessage log) {
       long sequence = ringBuffer.next(); // Grab the next sequence
       try {
           LoggerEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
           // for the sequence
           event.setLog(log); // Fill with data
       } finally {
           ringBuffer.publish(sequence);
       }
   }

   public static void publishEvent(String log) {
       if(fileLoggerEventRingBuffer == null) return;
       long sequence = fileLoggerEventRingBuffer.next(); // Grab the next sequence
       try {
           FileLoggerEvent event = fileLoggerEventRingBuffer.get(sequence); // Get the entry in the Disruptor
           // for the sequence
           event.setLog(log); // Fill with data
       } finally {
           fileLoggerEventRingBuffer.publish(sequence);
       }
   }

}

Conclusion at the end of the article

The above four steps have completed the use of Disruptor. After starting the project, log events will be continuously published, and the processor will transmit the event content to the front-end page through websocket for display.

boot-websocket-log project address: https://gitee.com/kailing/boot-websocket-log

Disruptor is a high-performance in-process data exchange framework between threads, especially suitable for log processing. Disruptor is also fromhttps://github.com/alipay/sofa-tracer learned that this is an open source distributed link tracking project of the Ant Financial team, and the log processing part uses Disruptor.



Tags

Technical otaku

Sought technology together

Related Topic

1 Comments

author

buy atorvastatin 80mg for sale & lt;a href="https://lipiws.top/"& gt;order lipitor 80mg generic& lt;/a& gt; order lipitor 20mg without preion

Ieiihu

2024-03-09

Leave a Reply

+