生产者消费者模式

单生产者单消费者模式

生产者消费者模型是一个经典的问题,其主要有3部分构成:生产者、阻塞队列和消费者。如下图所示:

生产者消费者模式,可以在一定程度上面做到程序之间的模块解耦。但是还有一个主要的特性,就是它可以通过阻塞队列来协调生产者和消费者的处理速率不同的问题。下面给出一个简单的Demo:

PCData:用于传输的数据

public final class PCData {
	
	private final int intData;

	public PCData(int intData) {
		this.intData = intData;
	}

	public PCData(String intData) {
		this.intData = Integer.parseInt(intData);
	}

	public int getIntData() {
		return intData;
	}

	@Override
	public String toString() {
		return "PCData [intData=" + intData + "]";
	}

}

Producer:生产者

public class Producer implements Runnable {
	private volatile boolean isRunning = true;
	private BlockingQueue<PCData> queue;	//内存缓冲区
	private static AtomicInteger count = new AtomicInteger();	// 总数
	private static final int SLEEPTIME = 1000;

	public Producer(BlockingQueue<PCData> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		PCData data = null;
		System.out.println("start producer id = " + Thread.currentThread().getId());
		try {
			while(isRunning) {
				Thread.sleep(SLEEPTIME);
				data = new PCData(count.getAndIncrement());// 构造任务数据
				System.out.println(data + " is put into queue");
				if(!queue.offer(data, 2, TimeUnit.SECONDS)) {// 提交任务到缓冲区
					System.err.println("failed to offer data: "+ data);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		}
	}
	
	public void stop(){
		this.isRunning = false;
	}

}

Customer:消费者

public class Customer implements Runnable {
	private BlockingQueue<PCData> queue; // 内存缓冲区
	private static final int SLEEPTIME = 1000;

	public Customer(BlockingQueue<PCData> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		System.out.println("start Consumer id = " + Thread.currentThread().getId());
		Random r = new Random();

		try {
			while (true) {
				PCData data = queue.take();// 提取任务
				if (null != data) {
					int re = data.getIntData() * data.getIntData();
					System.out.println(MessageFormat.format("{0}*{1}={2}", data.getIntData(), data.getIntData(), re));
					Thread.sleep(r.nextInt(SLEEPTIME));
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		}
	}

}

然后调用测试类如下:

public class MainTest {

	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<PCData> queue = new LinkedBlockingQueue<>(10);// 缓冲区
		Producer producer1 = new Producer(queue);// 生产者
		Producer producer2 = new Producer(queue);
		Producer producer3 = new Producer(queue);
		Customer customer1 = new Customer(queue);// 消费者
		Customer customer2 = new Customer(queue);
		Customer customer3 = new Customer(queue);
		ExecutorService pool = Executors.newCachedThreadPool();
		pool.execute(producer1);
		pool.execute(producer2);
		pool.execute(producer3);
		pool.execute(customer1);
		pool.execute(customer2);
		pool.execute(customer3);
		Thread.sleep(10 * 1000);
		producer1.stop();
		producer2.stop();
		producer3.stop();
		Thread.sleep(3000);
		pool.shutdown();
	}

}

多生产者多消费者模式

在我们日常的使用过程中,有可能在基础的使用场景之上有些调整。以上面单生产者单消费者为例,生产者和消费者线程使用的是同一个线程池。此时,如果想要更大程度上的隔离生产与制造之间的关系,可以将其分别在不同的线程池中处理。

通常情况下,如果有多个生产者的时候,我们有可能更多的考虑的是如何更快速的去消费数据。当有多个消费者的时候,可以参考下面结构:

对于消费者而言,直接对接阻塞队列的是一个分发任务的消费者。这个前端的消费通过Hash,将数据包分发到不同的子消费者当中去。在这些子消费者中,如果有消费不了的任务,则可以将数据回推到父级,然后由父级重新分配任务。