生产者和消费者5种实现方式Java
文章发布较早,内容可能过时,阅读注意甄别。
# 解答
Java 生产者和消费者 5种实现方式
- 1 使用 Objcet 的 wait() 和 notifyAll() 实现
- 2 使用 BlockingQueue 实现
- 3 使用 Lock 的 Condition 的 await/signal 实现
- 4 使用 Semaphore 实现
- 5 使用 Piped 实现
1 使用 Objcet 的 wait() 和 notifyAll() 实现
package com.xu.thread;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Administrator
*/
public class ProducerAndConsumer {
private static Integer MAX = 10;
private static AtomicInteger integer = new AtomicInteger(0);
public static void main(String[] args) {
List<String> list = new LinkedList<>();
new Thread(new Consumer(list)).start();
new Thread(new Producer(list)).start();
}
static class Consumer implements Runnable {
private List<String> list;
Consumer(List<String> list) {
this.list = list;
}
@Override
public void run() {
while (true) {
synchronized (list) {
try {
Thread.sleep(500);
if (list.size() <= 0) {
list.wait();
} else {
String value = list.get(0);
list.remove(0);
System.out.println("消费者 消费:" + value);
list.notifyAll();
}
} catch (InterruptedException e) {
}
}
}
}
}
static class Producer implements Runnable {
private List<String> list;
Producer(List<String> list) {
this.list = list;
}
@Override
public void run() {
while (true) {
synchronized (list) {
try {
Thread.sleep(200);
if (list.size() >= MAX) {
list.wait();
} else {
String value = "苹果" + integer.getAndIncrement();
System.out.println("生产者 生产:" + value);
list.add(value);
list.notifyAll();
}
} catch (Exception e) {
}
}
}
}
}
}
生产者 生产:苹果0
生产者 生产:苹果1
生产者 生产:苹果2
生产者 生产:苹果3
消费者 消费:苹果0
消费者 消费:苹果1
消费者 消费:苹果2
消费者 消费:苹果3
生产者 生产:苹果4
生产者 生产:苹果5
生产者 生产:苹果6
生产者 生产:苹果7
消费者 消费:苹果4
消费者 消费:苹果5
消费者 消费:苹果6
消费者 消费:苹果7
生产者 生产:苹果8
生产者 生产:苹果9
生产者 生产:苹果10
生产者 生产:苹果11
生产者 生产:苹果12
生产者 生产:苹果13
生产者 生产:苹果14
生产者 生产:苹果15
生产者 生产:苹果16
生产者 生产:苹果17
消费者 消费:苹果8
消费者 消费:苹果9
消费者 消费:苹果10
消费者 消费:苹果11
生产者 生产:苹果18
生产者 生产:苹果19
生产者 生产:苹果20
生产者 生产:苹果21
消费者 消费:苹果12
消费者 消费:苹果13
2 使用 BlockingQueue 实现
Queue 是 Java 中实现队列的接口,它总共只有6个方法。
boolean add(E)
E element()
boolean offer(E)
E peek()
E poll()
E remove()
Queue 的 6 个主要方法的异同:
压入元素(添加):add()、offer()
区别 详细
相同 未超出容量,从队尾压入元素,返回压入的那个元素。
区别 在超出容量时,add()方法会对抛出异常,offer()返回false
弹出元素(删除):remove()、poll()
区别 详细
相同 容量大于0的时候,删除并返回队头被删除的那个元素。
区别 在容量为0的时候,remove()会抛出异常,poll()返回false
获取队头元素(不删除):element()、peek()
区别 详细
相同 容量大于0的时候,都返回队头元素。但是不删除。
区别 容量为0的时候,element()会抛出异常,peek()返回null。
Java 队列主要分为阻塞和非阻塞,有界和无界、单向链表和双向链表。
阻塞和非阻塞:
区别 详细
阻塞 出列还是入列,会阻塞。
非阻塞 不管出列还是入列,都不会进行阻塞。
有界和无界
区别 详细
有界 有界限,大小长度受限制
无界 无限大小,像 ArrayList 一样,在内部动态扩容。
单向链表和双向链表
区别 详细
单向链表 每个元素中除了元素本身之外,还存储一个指针,这个指针指向下一个元素。
双向链表 除了元素本身之外,还有两个指针,一个指针指向前一个元素的地址,另一个指针指向后一个元素的地址。
实现
package com.xu.thread;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Administrator
*/
public class ProducerAndConsumer {
private static Integer MAX = 10;
private static AtomicInteger integer = new AtomicInteger(0);
private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
public static void main(String[] args) {
new Thread(new Consumer()).start();
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new Producer()).start();
}
static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (queue.size() > 0) {
String value = queue.poll();
System.out.println("消费者 消费:" + value);
}
}
}
}
static class Producer implements Runnable {
@Override
public void run() {
while (true) {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (queue.size() < MAX) {
String value = "苹果" + integer.getAndIncrement();
queue.add(value);
System.out.println("生产者 生产:" + value);
}
}
}
}
}
生产者 生产:苹果1
生产者 生产:苹果0
消费者 消费:苹果1
消费者 消费:苹果0
生产者 生产:苹果2
生产者 生产:苹果3
消费者 消费:苹果2
消费者 消费:苹果3
生产者 生产:苹果4
生产者 生产:苹果5
消费者 消费:苹果4
消费者 消费:苹果5
生产者 生产:苹果6
生产者 生产:苹果7
消费者 消费:苹果6
消费者 消费:苹果7
生产者 生产:苹果8
生产者 生产:苹果9
消费者 消费:苹果8
消费者 消费:苹果9
生产者 生产:苹果10
生产者 生产:苹果11
消费者 消费:苹果10
消费者 消费:苹果11
3 使用 Lock 的 Condition 的 await/signal 实现
package com.xu.thread;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Administrator
*/
public class ProducerAndConsumer {
private static Integer MAX = 10;
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
private static AtomicInteger integer = new AtomicInteger(0);
public static void main(String[] args) {
List<String> list = new LinkedList<>();
new Thread(new Consumer(list)).start();
new Thread(new Producer(list)).start();
new Thread(new Producer(list)).start();
}
static class Consumer implements Runnable {
private List<String> list;
public Consumer(List<String> list) {
this.list = list;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
if (list.size() <= 0) {
condition.await();
} else {
String value = list.get(0);
list.remove(0);
System.out.println("消费者 消费:" + value);
condition.signalAll();
}
Thread.sleep(100);
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
}
static class Producer implements Runnable {
private List<String> list;
public Producer(List<String> list) {
this.list = list;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
if (list.size() <= MAX) {
String value = "苹果" + integer.getAndIncrement();
list.add(value);
System.out.println("生产者 生产:" + value);
condition.signalAll();
} else {
condition.await();
}
Thread.sleep(300);
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
}
}
结果
生产者 生产:苹果0
生产者 生产:苹果1
生产者 生产:苹果2
生产者 生产:苹果3
生产者 生产:苹果4
生产者 生产:苹果5
生产者 生产:苹果6
生产者 生产:苹果7
生产者 生产:苹果8
生产者 生产:苹果9
生产者 生产:苹果10
消费者 消费:苹果0
消费者 消费:苹果1
消费者 消费:苹果2
消费者 消费:苹果3
4 使用 Semaphore 实现
package com.xu.thread;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Administrator
*/
public class ProducerAndConsumer {
private static AtomicInteger integer = new AtomicInteger(0);
/**
* 互斥锁
*/
private static final Semaphore mutex1 = new Semaphore(1);
public static void main(String[] args) {
List<String> list = new LinkedList<>();
new Thread(new Consumer(mutex1, list)).start();
new Thread(new Producer(mutex1, list)).start();
}
static class Consumer implements Runnable {
private Semaphore mutex;
private List<String> list;
Consumer(Semaphore mutex, List<String> list) {
this.list = list;
this.mutex = mutex;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(500);
mutex.acquire();
if (list.size() > 0) {
String value = list.get(0);
list.remove(0);
System.out.println("消费者 消费:" + value + "\t" + list.size());
}
} catch (InterruptedException e) {
} finally {
mutex.release();
}
}
}
}
static class Producer implements Runnable {
private Semaphore mutex;
private List<String> list;
Producer(Semaphore mutex, List<String> list) {
this.list = list;
this.mutex = mutex;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(200);
mutex.acquire();
String value = "苹果" + integer.getAndIncrement();
list.add(value);
System.out.println("生产者 生产:" + value + "\t" + list.size());
} catch (Exception e) {
} finally {
mutex.release();
}
}
}
}
}
结果
生产者 生产:苹果0 1
生产者 生产:苹果1 2
消费者 消费:苹果0 1
生产者 生产:苹果2 2
生产者 生产:苹果3 3
消费者 消费:苹果1 2
生产者 生产:苹果4 3
生产者 生产:苹果5 4
生产者 生产:苹果6 5
消费者 消费:苹果2 4
生产者 生产:苹果7 5
生产者 生产:苹果8 6
消费者 消费:苹果3 5
生产者 生产:苹果9 6
生产者 生产:苹果10 7
生产者 生产:苹果11 8
5 使用 Piped 实现
模式 实现对象
字节输入输出流 PipedInputStream / PipedOutputStream
字符输入输出流 PipedReader / PipedWriter
package com.xu.thread;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Administrator
*/
public class ProducerAndConsumer {
private static AtomicInteger integer = new AtomicInteger(0);
public static void main(String[] args) throws IOException {
PipedInputStream input = new PipedInputStream();
PipedOutputStream output = new PipedOutputStream();
Consumer consumer = new Consumer(input);
Producer producer = new Producer(output);
new Thread(consumer).start();
new Thread(producer).start();
producer.getPipedOutputStream().connect(consumer.getPipedInputStream());
}
static class Consumer implements Runnable {
private PipedInputStream input;
public Consumer(PipedInputStream input) {
this.input = input;
}
public PipedInputStream getPipedInputStream() {
return input;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(500);
int len;
byte[] buffer = new byte[1024];
while ((len = input.read(buffer)) != -1) {
String value = new String(buffer, 0, len);
System.out.println("消费者 消费:" + value);
}
} catch (Exception e) {
}
}
}
}
static class Producer implements Runnable {
private PipedOutputStream output;
public Producer(PipedOutputStream output) {
this.output = output;
}
public PipedOutputStream getPipedOutputStream() {
return output;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(200);
String value = "苹果" + integer.getAndIncrement();
output.write(value.getBytes());
output.flush();
System.out.println("生产者 生产:" + value);
} catch (Exception e) {
}
}
}
}
}
结果
生产者 生产:苹果0
生产者 生产:苹果1
消费者 消费:苹果0苹果1
生产者 生产:苹果2
消费者 消费:苹果2
生产者 生产:苹果3
消费者 消费:苹果3
生产者 生产:苹果4
消费者 消费:苹果4
消费者 消费:苹果5
生产者 生产:苹果5
消费者 消费:苹果6
生产者 生产:苹果6
生产者 生产:苹果7
消费者 消费:苹果7
生产者 生产:苹果8
消费者 消费:苹果8
生产者 生产:苹果9
消费者 消费:苹果9
生产者 生产:苹果10
消费者 消费:苹果10
生产者 生产:苹果11
消费者 消费:苹果11
生产者 生产:苹果12
消费者 消费:苹果12
生产者 生产:苹果13
消费者 消费:苹果13
生产者 生产:苹果14
消费者 消费:苹果14
生产者 生产:苹果15
消费者 消费:苹果15
生产者 生产:苹果16
消费者 消费:苹果16
生产者 生产:苹果17
消费者 消费:苹果17
生产者 生产:苹果18
消费者 消费:苹果18
生产者 生产:苹果19