• 技术文章 >java >java基础

    java PriorityBlockingQueue的使用

    小妮浅浅小妮浅浅2021-02-08 19:21:05原创2315

    本教程操作环境:windows7系统、java10版,DELL G3电脑。

    1.概念

    使用平衡二叉树堆,实现的具有优先级的无界阻塞队列。是一个BlockingQueue,所以它是线程安全的。

    2.特点

    (1)无边界设计,但容量实际是依靠系统资源影响

    (2)添加元素,如果超过1,则进入优先级排序

    3.应用实例

    有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

    例子中定义了一个将要放入“优先阻塞队列”的任务类,并且定义了一个任务工场类和一个任务执行类,在任务工场类中产生了各种不同优先级的任务,将其添加到队列中,在任务执行类中,任务被一个个取出并执行。

    package com.niuh.queue.priority;
     
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Queue;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.PriorityBlockingQueue;
    import java.util.concurrent.TimeUnit;
     
    /**
     * <p>
     * PriorityBlockingQueue使用示例
     * </p>
     */
    public class PriorityBlockingQueueDemo {
     
        public static void main(String[] args) throws Exception {
            Random random = new Random(47);
            ExecutorService exec = Executors.newCachedThreadPool();
            PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
            exec.execute(new PrioritizedTaskProducer(queue, exec)); // 这里需要注意,往PriorityBlockingQueue中添加任务和取出任务的
            exec.execute(new PrioritizedTaskConsumer(queue)); // 步骤是同时进行的,因而输出结果并不一定是有序的
        }
    }
     
    class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
        private Random random = new Random(47);
        private static int counter = 0;
        private final int id = counter++;
        private final int priority;
     
        protected static List<PrioritizedTask> sequence = new ArrayList<>();
     
        public PrioritizedTask(int priority) {
            this.priority = priority;
            sequence.add(this);
        }
     
        @Override
        public int compareTo(PrioritizedTask o) {
            return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0);  // 定义优先级计算方式
        }
     
        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
            } catch (InterruptedException e) {
            }
            System.out.println(this);
        }
     
        @Override
        public String toString() {
            return String.format("[%1$-3d]", priority) + " Task " + id;
        }
     
        public String summary() {
            return "(" + id + ": " + priority + ")";
        }
     
        public static class EndSentinel extends PrioritizedTask {
            private ExecutorService exec;
     
            public EndSentinel(ExecutorService exec) {
                super(-1);
                this.exec = exec;
            }
     
            @Override
            public void run() {
                int count = 0;
                for (PrioritizedTask pt : sequence) {
                    System.out.print(pt.summary());
                    if (++count % 5 == 0) {
                        System.out.println();
                    }
                }
                System.out.println();
                System.out.println(this + " Calling shutdownNow()");
                exec.shutdownNow();
            }
        }
    }
     
    class PrioritizedTaskProducer implements Runnable {
        private Random random = new Random(47);
        private Queue<Runnable> queue;
        private ExecutorService exec;
     
        public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
            this.queue = queue;
            this.exec = exec;
        }
     
        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                queue.add(new PrioritizedTask(random.nextInt(10))); // 往PriorityBlockingQueue中添加随机优先级的任务
                Thread.yield();
            }
            try {
                for (int i = 0; i < 10; i++) {
                    TimeUnit.MILLISECONDS.sleep(250);
                    queue.add(new PrioritizedTask(10)); // 往PriorityBlockingQueue中添加优先级为10的任务
                }
                for (int i = 0; i < 10; i++) {
                    queue.add(new PrioritizedTask(i));// 往PriorityBlockingQueue中添加优先级为1-10的任务
                }
                queue.add(new PrioritizedTask.EndSentinel(exec));
            } catch (InterruptedException e) {
            }
            System.out.println("Finished PrioritizedTaskProducer");
        }
    }
     
    class PrioritizedTaskConsumer implements Runnable {
        private PriorityBlockingQueue<Runnable> queue;
     
        public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
            this.queue = queue;
        }
     
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    queue.take().run(); // 任务的消费者,从PriorityBlockingQueue中取出任务执行
                }
            } catch (InterruptedException e) {
            }
            System.out.println("Finished PrioritizedTaskConsumer");
        }
    }

    以上就是javaPriorityBlockingQueue的使用,当我们需要有重要任务想提前处理时,可以选择PriorityBlockingQueue这种阻塞队列来优先任务的处理。学会基础内容后,可以就代码部分进行试验了。

    专题推荐:java priorityblockingqueue
    上一篇:java中SynchronousQueue的核心方法 下一篇:PriorityBlockingQueue在java中的原理

    相关文章推荐

    • java中SynchronousQueue是什么意思• java中SynchronousQueue的原理• SynchronousQueue在java中的元素增减• java中SynchronousQueue的核心方法

    全部评论我要评论

    © 2021 Python学习网 苏ICP备2021003149号-1

  • 取消发布评论
  • 

    Python学习网