2025年09月05日/ 浏览 78
在操作系统的多线程编程中,生产者消费者问题是并发编程的经典案例。该模型描述了两个角色:
这个模型的精髓在于解决了生产者和消费者速度不匹配时的协调问题。想象一下快餐店的场景:厨师(生产者)不断制作汉堡,顾客(消费者)购买汉堡,而收银台就是他们的共享缓冲区。
c
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
互斥锁就像卫生间的门锁,保证同一时间只有一个线程能访问临界区。在生产者消费者模型中,我们用它保护共享队列的访问。
c
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
条件变量相当于线程间的信号灯,当缓冲区空时让消费者等待,缓冲区满时让生产者等待。
我们采用环形队列作为缓冲区,这种数据结构能高效利用内存:
c
typedef struct {
int data[QUEUE_SIZE];
int front; // 消费位置
int rear; // 生产位置
int count; // 当前元素数量
} CircularQueue;
队列操作的核心逻辑:
c
void enqueue(CircularQueue *q, int item) {
q->data[q->rear] = item;
q->rear = (q->rear + 1) % QUEUE_SIZE;
q->count++;
}
int dequeue(CircularQueue *q) {
int item = q->data[q->front];
q->front = (q->front + 1) % QUEUE_SIZE;
q->count–;
return item;
}
c
typedef struct {
int data[QUEUE_SIZE];
int front, rear, count;
} CircularQueue;
CircularQueue queue;
pthreadmutext lock = PTHREADMUTEXINITIALIZER;
pthreadcondt notfull = PTHREADCONDINITIALIZER;
pthreadcondt notempty = PTHREADCONDINITIALIZER;
void init_queue(CircularQueue *q) {
q->front = q->rear = q->count = 0;
}
void enqueue(CircularQueue *q, int item) {
q->data[q->rear] = item;
q->rear = (q->rear + 1) % QUEUE_SIZE;
q->count++;
}
int dequeue(CircularQueue *q) {
int item = q->data[q->front];
q->front = (q->front + 1) % QUEUE_SIZE;
q->count–;
return item;
}
void* producer(void arg) {
int id = *(int)arg;
for (int i = 0; i < 10; i++) {
pthreadmutexlock(&lock);
while (queue.count == QUEUE_SIZE) {
printf("Producer %d: queue full\n", id);
pthread_cond_wait(¬_full, &lock);
}
int item = rand() % 100;
enqueue(&queue, item);
printf("Producer %d: produced %d\n", id, item);
pthread_cond_signal(¬_empty);
pthread_mutex_unlock(&lock);
usleep(rand() % 100000);
}
return NULL;
}
void* consumer(void arg) {
int id = *(int)arg;
for (int i = 0; i < 10; i++) {
pthreadmutexlock(&lock);
while (queue.count == 0) {
printf("Consumer %d: queue empty\n", id);
pthread_cond_wait(¬_empty, &lock);
}
int item = dequeue(&queue);
printf("Consumer %d: consumed %d\n", id, item);
pthread_cond_signal(¬_full);
pthread_mutex_unlock(&lock);
usleep(rand() % 100000);
}
return NULL;
}
int main() {
pthreadt producers[PRODUCERS], consumers[CONSUMERS];
int producerids[PRODUCERS], consumer_ids[CONSUMERS];
init_queue(&queue);
srand(time(NULL));
for (int i = 0; i < PRODUCERS; i++) {
producer_ids[i] = i + 1;
pthread_create(&producers[i], NULL, producer, &producer_ids[i]);
}
for (int i = 0; i < CONSUMERS; i++) {
consumer_ids[i] = i + 1;
pthread_create(&consumers[i], NULL, consumer, &consumer_ids[i]);
}
for (int i = 0; i < PRODUCERS; i++) {
pthread_join(producers[i], NULL);
}
for (int i = 0; i < CONSUMERS; i++) {
pthread_join(consumers[i], NULL);
}
return 0;
}
等待条件判断:必须使用while循环而不是if判断,因为可能存在虚假唤醒(spurious wakeup)
信号发送时机:
性能优化:
错误处理:
这个经典模型在现代系统中随处可见:消息队列系统(如Kafka)、线程池任务调度、GUI事件处理等底层都是这种思想的延伸。掌握好这个基础模型,对理解更复杂的并发系统大有裨益。