JavaInterview JavaInterview
首页
指南
分类
标签
归档
  • CSDN (opens new window)
  • 文档集合 (opens new window)
  • 系统架构 (opens new window)
  • 微信号 (opens new window)
  • 公众号 (opens new window)

『Java面试+Java学习』
首页
指南
分类
标签
归档
  • CSDN (opens new window)
  • 文档集合 (opens new window)
  • 系统架构 (opens new window)
  • 微信号 (opens new window)
  • 公众号 (opens new window)
  • 指南
  • 简历

  • Java

  • 面试

    • 所有

    • 核心

    • 源码

    • 算法

    • 高频

      • 笔试基础题
      • MySQL各个版本区别
      • MySQL自身如何处理死锁
      • 为什么是先添加队列而不是先创建最大线程
      • 数据库常用的锁有哪些
      • 生产者和消费者5种实现方式
        • 解答
      • Redis内存管理的基石zmalloc
      • 如何动态修改线程池参数
      • 流量高峰时的性能瓶颈有哪些以及如何来解决
      • Java SPI与Dubbo SPI有什么区别
      • JVM运行时数据区(堆、栈、方法区)指向关系
      • Java new对象过程
      • 池化技术(线程池、连接池、内存池)
      • Spring的三级缓存解决循环依赖
      • K8S面试题大全
      • 服务治理、微服务与Service Mesh
      • SpringBoot自定义starter
      • 限流原理解析
      • Redis部署的三种模式(主从复制、哨兵、集群)
      • 敏感词过滤(DFA算法_Trie前缀树)
      • Java线程间通信方式
      • CPU占用过高排查
      • 线程死锁及解决方案
      • 分布式事务
      • 分布式锁
      • JVM调优
      • Synchronized锁升级(锁膨胀)
      • Paxos、Raft、ZAB协议
  • 算法

  • interview
  • hf
JavaInterview.cn
2022-09-13
目录

生产者和消费者5种实现方式Java

文章发布较早,内容可能过时,阅读注意甄别。

# 解答

Java 生产者和消费者 5种实现方式

  • 1 使用 Objcet 的 wait() 和 notifyAll() 实现
  • 2 使用 BlockingQueue 实现
  • 3 使用 Lock 的 Condition 的 await/signal 实现
  • 4 使用 Semaphore 实现
  • 5 使用 Piped 实现

1 使用 Objcet 的 wait() 和 notifyAll() 实现

package com.xu.thread;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Administrator
 */
public class ProducerAndConsumer {

    private static Integer MAX = 10;

    private static AtomicInteger integer = new AtomicInteger(0);

    public static void main(String[] args) {
        List<String> list = new LinkedList<>();
        new Thread(new Consumer(list)).start();
        new Thread(new Producer(list)).start();
    }

    static class Consumer implements Runnable {
    
        private List<String> list;

        Consumer(List<String> list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        Thread.sleep(500);
                        if (list.size() <= 0) {
                            list.wait();
                        } else {
                            String value = list.get(0);
                            list.remove(0);
                            System.out.println("消费者 消费:" + value);
                            list.notifyAll();
                        }
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    }

    static class Producer implements Runnable {
    
        private List<String> list;

        Producer(List<String> list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        Thread.sleep(200);
                        if (list.size() >= MAX) {
                            list.wait();
                        } else {
                            String value = "苹果" + integer.getAndIncrement();
                            System.out.println("生产者 生产:" + value);
                            list.add(value);
                            list.notifyAll();
                        }
                    } catch (Exception e) {

                    }
                }
            }
        }
    }

}


生产者 生产:苹果0
生产者 生产:苹果1
生产者 生产:苹果2
生产者 生产:苹果3
消费者 消费:苹果0
消费者 消费:苹果1
消费者 消费:苹果2
消费者 消费:苹果3
生产者 生产:苹果4
生产者 生产:苹果5
生产者 生产:苹果6
生产者 生产:苹果7
消费者 消费:苹果4
消费者 消费:苹果5
消费者 消费:苹果6
消费者 消费:苹果7
生产者 生产:苹果8
生产者 生产:苹果9
生产者 生产:苹果10
生产者 生产:苹果11
生产者 生产:苹果12
生产者 生产:苹果13
生产者 生产:苹果14
生产者 生产:苹果15
生产者 生产:苹果16
生产者 生产:苹果17
消费者 消费:苹果8
消费者 消费:苹果9
消费者 消费:苹果10
消费者 消费:苹果11
生产者 生产:苹果18
生产者 生产:苹果19
生产者 生产:苹果20
生产者 生产:苹果21
消费者 消费:苹果12
消费者 消费:苹果13

2 使用 BlockingQueue 实现

Queue 是 Java 中实现队列的接口,它总共只有6个方法。

boolean add(E)
E element()
boolean offer(E)
E peek()
E poll()
E remove()

Queue 的 6 个主要方法的异同:

压入元素(添加):add()、offer()

区别	详细
相同	未超出容量,从队尾压入元素,返回压入的那个元素。
区别	在超出容量时,add()方法会对抛出异常,offer()返回false
弹出元素(删除):remove()、poll()

区别	详细
相同	容量大于0的时候,删除并返回队头被删除的那个元素。
区别	在容量为0的时候,remove()会抛出异常,poll()返回false
获取队头元素(不删除):element()、peek()

区别	详细
相同	容量大于0的时候,都返回队头元素。但是不删除。
区别	容量为0的时候,element()会抛出异常,peek()返回null。
Java 队列主要分为阻塞和非阻塞,有界和无界、单向链表和双向链表。

阻塞和非阻塞:

区别	详细
阻塞	出列还是入列,会阻塞。
非阻塞	不管出列还是入列,都不会进行阻塞。

有界和无界

区别	详细
有界	有界限,大小长度受限制
无界	无限大小,像 ArrayList 一样,在内部动态扩容。

单向链表和双向链表

区别	详细
单向链表	每个元素中除了元素本身之外,还存储一个指针,这个指针指向下一个元素。
双向链表	除了元素本身之外,还有两个指针,一个指针指向前一个元素的地址,另一个指针指向后一个元素的地址。

实现

package com.xu.thread;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Administrator
 */
public class ProducerAndConsumer {

    private static Integer MAX = 10;

    private static AtomicInteger integer = new AtomicInteger(0);
    
    private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        new Thread(new Consumer()).start();
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
        new Thread(new Producer()).start();
    }

    static class Consumer implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                if (queue.size() > 0) {
                    String value = queue.poll();
                    System.out.println("消费者 消费:" + value);
                }
            }
        }
    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                if (queue.size() < MAX) {
                    String value = "苹果" + integer.getAndIncrement();
                    queue.add(value);
                    System.out.println("生产者 生产:" + value);
                }
            }
        }
    }

}



生产者 生产:苹果1
生产者 生产:苹果0
消费者 消费:苹果1
消费者 消费:苹果0
生产者 生产:苹果2
生产者 生产:苹果3
消费者 消费:苹果2
消费者 消费:苹果3
生产者 生产:苹果4
生产者 生产:苹果5
消费者 消费:苹果4
消费者 消费:苹果5
生产者 生产:苹果6
生产者 生产:苹果7
消费者 消费:苹果6
消费者 消费:苹果7
生产者 生产:苹果8
生产者 生产:苹果9
消费者 消费:苹果8
消费者 消费:苹果9
生产者 生产:苹果10
生产者 生产:苹果11
消费者 消费:苹果10
消费者 消费:苹果11

3 使用 Lock 的 Condition 的 await/signal 实现

package com.xu.thread;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author Administrator
 */
public class ProducerAndConsumer {

    private static Integer MAX = 10;

    private static Lock lock = new ReentrantLock();

    private static Condition condition = lock.newCondition();

    private static AtomicInteger integer = new AtomicInteger(0);

    public static void main(String[] args) {
        List<String> list = new LinkedList<>();
        new Thread(new Consumer(list)).start();
        new Thread(new Producer(list)).start();
        new Thread(new Producer(list)).start();
    }

    static class Consumer implements Runnable {
    
        private List<String> list;

        public Consumer(List<String> list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    if (list.size() <= 0) {
                        condition.await();
                    } else {
                        String value = list.get(0);
                        list.remove(0);
                        System.out.println("消费者 消费:" + value);
                        condition.signalAll();
                    }
                    Thread.sleep(100);
                } catch (Exception e) {

                } finally {
                    lock.unlock();
                }
            }
        }
    }

    static class Producer implements Runnable {
    
        private List<String> list;

        public Producer(List<String> list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    if (list.size() <= MAX) {
                        String value = "苹果" + integer.getAndIncrement();
                        list.add(value);
                        System.out.println("生产者 生产:" + value);
                        condition.signalAll();
                    } else {
                        condition.await();
                    }
                    Thread.sleep(300);
                } catch (Exception e) {

                } finally {
                    lock.unlock();
                }
            }
        }
    }

}

结果

生产者 生产:苹果0
生产者 生产:苹果1
生产者 生产:苹果2
生产者 生产:苹果3
生产者 生产:苹果4
生产者 生产:苹果5
生产者 生产:苹果6
生产者 生产:苹果7
生产者 生产:苹果8
生产者 生产:苹果9
生产者 生产:苹果10
消费者 消费:苹果0
消费者 消费:苹果1
消费者 消费:苹果2
消费者 消费:苹果3

4 使用 Semaphore 实现

package com.xu.thread;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Administrator
 */
public class ProducerAndConsumer {

    private static AtomicInteger integer = new AtomicInteger(0);

    /**
     * 互斥锁
     */
    private static final Semaphore mutex1 = new Semaphore(1);

    public static void main(String[] args) {
        List<String> list = new LinkedList<>();
        new Thread(new Consumer(mutex1, list)).start();
        new Thread(new Producer(mutex1, list)).start();
    }

    static class Consumer implements Runnable {
    
        private Semaphore mutex;
        private List<String> list;

        Consumer(Semaphore mutex, List<String> list) {
            this.list = list;
            this.mutex = mutex;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(500);
                    mutex.acquire();
                    if (list.size() > 0) {
                        String value = list.get(0);
                        list.remove(0);
                        System.out.println("消费者 消费:" + value + "\t" + list.size());
                    }
                } catch (InterruptedException e) {

                } finally {
                    mutex.release();
                }
            }
        }
    }

    static class Producer implements Runnable {
    
        private Semaphore mutex;
        private List<String> list;

        Producer(Semaphore mutex, List<String> list) {
            this.list = list;
            this.mutex = mutex;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(200);
                    mutex.acquire();
                    String value = "苹果" + integer.getAndIncrement();
                    list.add(value);
                    System.out.println("生产者 生产:" + value + "\t" + list.size());
                } catch (Exception e) {

                } finally {
                    mutex.release();
                }
            }
        }
    }

}

结果

生产者 生产:苹果0	1
生产者 生产:苹果1	2
消费者 消费:苹果0	1
生产者 生产:苹果2	2
生产者 生产:苹果3	3
消费者 消费:苹果1	2
生产者 生产:苹果4	3
生产者 生产:苹果5	4
生产者 生产:苹果6	5
消费者 消费:苹果2	4
生产者 生产:苹果7	5
生产者 生产:苹果8	6
消费者 消费:苹果3	5
生产者 生产:苹果9	6
生产者 生产:苹果10	7
生产者 生产:苹果11	8

5 使用 Piped 实现

模式	实现对象
字节输入输出流	PipedInputStream / PipedOutputStream
字符输入输出流	PipedReader / PipedWriter


package com.xu.thread;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Administrator
 */
public class ProducerAndConsumer {

    private static AtomicInteger integer = new AtomicInteger(0);

    public static void main(String[] args) throws IOException {
        PipedInputStream input = new PipedInputStream();
        PipedOutputStream output = new PipedOutputStream();
        Consumer consumer = new Consumer(input);
        Producer producer = new Producer(output);
        new Thread(consumer).start();
        new Thread(producer).start();
        producer.getPipedOutputStream().connect(consumer.getPipedInputStream());
    }

    static class Consumer implements Runnable {
        private PipedInputStream input;

        public Consumer(PipedInputStream input) {
            this.input = input;
        }

        public PipedInputStream getPipedInputStream() {
            return input;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(500);
                    int len;
                    byte[] buffer = new byte[1024];
                    while ((len = input.read(buffer)) != -1) {
                        String value = new String(buffer, 0, len);
                        System.out.println("消费者 消费:" + value);
                    }
                } catch (Exception e) {

                }
            }
        }
    }

    static class Producer implements Runnable {
        private PipedOutputStream output;

        public Producer(PipedOutputStream output) {
            this.output = output;
        }

        public PipedOutputStream getPipedOutputStream() {
            return output;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(200);
                    String value = "苹果" + integer.getAndIncrement();
                    output.write(value.getBytes());
                    output.flush();
                    System.out.println("生产者 生产:" + value);
                } catch (Exception e) {

                }
            }
        }
    }

}

结果

生产者 生产:苹果0
生产者 生产:苹果1
消费者 消费:苹果0苹果1
生产者 生产:苹果2
消费者 消费:苹果2
生产者 生产:苹果3
消费者 消费:苹果3
生产者 生产:苹果4
消费者 消费:苹果4
消费者 消费:苹果5
生产者 生产:苹果5
消费者 消费:苹果6
生产者 生产:苹果6
生产者 生产:苹果7
消费者 消费:苹果7
生产者 生产:苹果8
消费者 消费:苹果8
生产者 生产:苹果9
消费者 消费:苹果9
生产者 生产:苹果10
消费者 消费:苹果10
生产者 生产:苹果11
消费者 消费:苹果11
生产者 生产:苹果12
消费者 消费:苹果12
生产者 生产:苹果13
消费者 消费:苹果13
生产者 生产:苹果14
消费者 消费:苹果14
生产者 生产:苹果15
消费者 消费:苹果15
生产者 生产:苹果16
消费者 消费:苹果16
生产者 生产:苹果17
消费者 消费:苹果17
生产者 生产:苹果18
消费者 消费:苹果18
生产者 生产:苹果19
微信 支付宝
数据库常用的锁有哪些
Redis内存管理的基石zmalloc

← 数据库常用的锁有哪些 Redis内存管理的基石zmalloc→

最近更新
01
1637. 两点之间不包含任何点的最宽垂直区域 Java
06-26
02
1636. 按照频率将数组升序排序 Java
06-26
03
1638. 统计只差一个字符的子串数目 Java
06-26
更多文章>
Theme by Vdoing | Copyright © 2019-2025 JavaInterview.cn
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式