Thursday, July 24, 2014

DelayQueue Usage Java Concurrency

DelayQueue is an unbounded BlockingQueue of objects that implement the Delayed interface. An object can only be taken from the queue when its delay has expired. The queue is sorted so that the object at the head has a delay that has expired for the longest time. If no delay has expired, then there is no head element and poll() will return null.

You can't place null elements in the queue.

Here's an example where the Delayed objects are themselves tasks, and the DelayedTaskConsumer takes the most "urgent" task (the one that has been expired for the longest time) off the queue and runs it. Note that DelayQueue is thus a variation of a priority queue.

import java.util.concurrent.*;
import java.util.*;

import static java.util.concurrent.TimeUnit.*;
 
class DelayedTask implements Runnable, Delayed {

    private static int counter = 0;
    private final int id = counter++;
    private final int delta;
    private final long trigger;
    protected static List<delayedtask> sequence
            = new ArrayList<delayedtask>();

    public DelayedTask(int delayInMilliseconds) {
        delta = delayInMilliseconds;
        trigger = System.nanoTime()
                + NANOSECONDS.convert(delta, MILLISECONDS);
        sequence.add(this);
    }

    public long getDelay(TimeUnit unit) {
        return unit.convert(
                trigger - System.nanoTime(), NANOSECONDS);
    }

    public int compareTo(Delayed arg) {
        DelayedTask that = (DelayedTask) arg;
        if (trigger < that.trigger) {
            return -1;
        }
        if (trigger > that.trigger) {
            return 1;
        }
        return 0;
    }

    public void run() {
        System.out.println(this + " ");
    }

    public String toString() {
        return String.format("[%1$-4d]", delta) + " Task " + id;
    }

    public String summary() {
        return "(" + id + ":" + delta + ")";
    }

    public static class EndSentinel extends DelayedTask {

        private ExecutorService exec;

        public EndSentinel(int delay, ExecutorService e) {
            super(delay);
            exec = e;

        }

        public void run() {
            for (DelayedTask pt : sequence) {
                System.out.println(pt.summary());    //sequence is a static list
            }
            System.out.println();
            System.out.println(this + " calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class DelayedTaskConsumer implements Runnable {

    private DelayQueue<delayedtask> q;

    public DelayedTaskConsumer(DelayQueue<delayedtask> q) {
        this.q = q;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                q.take().run(); // Run task with the current thread
            }
        } catch (InterruptedException e) {
// Acceptable way to exit
        }
        System.out.println("Finished DelayedTaskConsumer");
    }
}

public class DelayQueueDemo {

    public static void main(String[] args) {
        Random rand = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        DelayQueue<delayedtask> queue
                = new DelayQueue<>();
// Fill with tasks that have random delays:
        for (int i = 0; i < 20; i++) {
            queue.put(new DelayedTask(rand.nextInt(5000)));
        }
// Set the stopping point
        queue.add(new DelayedTask.EndSentinel(5000, exec));
        exec.execute(new DelayedTaskConsumer(queue));
    }
}

Program Output: 

[128 ] Task 11
[200 ] Task 7
[429 ] Task 5
[520 ] Task 18
[555 ] Task 1
[961 ] Task 4
[998 ] Task 16
[1207] Task 9
[1693] Task 2
[1809] Task 14
[1861] Task 3
[2278] Task 15
[3288] Task 10
[3551] Task 12
[4258] Task 0
[4258] Task 19
[4522] Task 8
[4589] Task 13
[4861] Task 17
[4868] Task 6
(0:4258)
(1:555)
(2:1693)
(3:1861)
(4:961)
(5:429)
(6:4868)
(7:200)
(8:4522)
(9:1207)
(10:3288)
(11:128)
(12:3551)
(13:4589)
(14:1809)
(15:2278)
(16:998)
(17:4861)
(18:520)
(19:4258)
(20:5000)

[5000] Task 20 calling shutdownNow()

Finished DelayedTaskConsumer

DelayedTask contains a List called sequence that preserves the order in which the tasks were created, so that we can see that sorting does in fact take place.
The Delayed interface has one method, getDelay( ), which tells how long it is until the delay time expires or how long ago the delay time has expired. This method forces us to use the TimeUnit class because that’s the argument type. This turns out to be a very convenient class because you can easily convert units without doing any calculations. For example, the value of delta is stored in milliseconds, but the Java SE5 method System.nanoTime( )

produces time in nanoseconds. You can convert the value of delta by saying what units it is in and what units you want it to be in, like this:

NANOSECONDS.convert(delta, MILLISECONDS);

In getDelay( ), the desired units are passed in as the unit argument, and you use this to convert the time difference from the trigger time to the units requested by the caller, without even knowing what those units are (this is a simple example of the Strategy design pattern, where part of the algorithm is passed in as an argument).
For sorting, the Delayed interface also inherits the Comparable interface, so compareTo( ) must be implemented so that it produces a reasonable comparison. toString( ) and summary( ) provide output formatting, and the nested EndSentinel class provides a way to shut everything down by placing it as the last element in the queue.
Note that because DelayedTaskConsumer is itself a task, it has its own Thread which it can use to run each task that comes out of the queue. Since the tasks are being performed in queue priority order, there’s no need in this example to start separate threads to run the DelayedTasks.
You can see from the output that the order in which the tasks are created has no effect on execution order—instead, the tasks are executed in delay order as expected.

Source: Thinking In Java 4th Ed (Concurrency)

No comments:

Post a Comment