MarketProducer–consumer problem
Company Profile

Producer–consumer problem

In computing, the producer-consumer problem is a family of problems described by Edsger W. Dijkstra since 1965.

Dijkstra's bounded buffer solution
The original semaphore bounded buffer solution was written in ALGOL style. The buffer can store N portions or elements. The "number of queueing portions" semaphore counts the filled locations in the buffer, the "number of empty positions" semaphore counts the empty locations in the buffer and the semaphore "buffer manipulation" works as mutex for the buffer put and get operations. If the buffer is full, that is the number of empty positions is zero, the producer thread will wait in the P(number of empty positions) operation. If the buffer is empty, that is the number of queueing portions is zero, the consumer thread will wait in the P(number of queueing portions) operation. The V() operations release the semaphores. As a side effect, a thread can move from the wait queue to the ready queue. The P() operation decreases the semaphore value down to zero. The V() operation increases the semaphore value. begin integer number of queueing portions, number of empty positions, buffer manipulation; number of queueing portions:= 0; number of empty positions:= N; buffer manipulation:= 1; parbegin producer: begin again 1: produce next portion; P(number of empty positions); P(buffer manipulation); add portion to buffer; V(buffer manipulation); V(number of queueing portions); goto again 1 end; consumer: begin again 2: P(number of queueing portions); P(buffer manipulation); take portion from buffer; V(buffer manipulation) ; V(number of empty positions); process portion taken; goto again 2 end parend end As of C++ 20, semaphores are part of the language. Dijkstra's solution can easily be written in modern C++. The variable buffer_manipulation is a mutex. The semaphore feature of acquiring in one thread and releasing in another thread is not needed. The lock_guard() statement instead of a lock() and unlock() pair is C++ RAII. The lock_guard destructor ensures lock release in case of an exception. This solution can handle multiple consumer threads and/or multiple producer threads. • include • include • include std::counting_semaphore number_of_queueing_portions{0}; std::counting_semaphore number_of_empty_positions{N}; std::mutex buffer_manipulation; void producer() { for (;;) { Portion portion = produce_next_portion(); number_of_empty_positions.acquire(); { std::lock_guard g(buffer_manipulation); add_portion_to_buffer(portion); } number_of_queueing_portions.release(); } } void consumer() { for (;;) { number_of_queueing_portions.acquire(); Portion portion; { std::lock_guard g(buffer_manipulation); portion = take_portion_from_buffer(); } number_of_empty_positions.release(); process_portion_taken(portion); } } int main() { std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); } ==Using monitors==
Using monitors
Per Brinch Hansen defined the monitor: I will use the term monitor to denote a shared variable and the set of meaningful operations on it. The purpose of a monitor is to control the scheduling of resources among individual processes according to a certain policy. Tony Hoare laid a theoretical foundation for the monitor. bounded buffer: monitor begin buffer:array 0..N-1 of portion; head, tail: 0..N-1; count: 0..N; nonempty, nonfull: condition; procedure append(x: portion); begin if count = N then nonfull.wait; note 0 The monitor is an object that contains variables buffer, head, tail and count to realize a circular buffer, the condition variables nonempty and nonfull for synchronization and the methods append and remove to access the bounded buffer. The monitor operation wait corresponds to the semaphore operation P or acquire, signal corresponds to V or release. The circled operation (+) are taken modulo N. The presented Pascal style pseudo code shows a Hoare monitor. A Mesa monitor uses while count instead of if count. A programming language C++ version is: template class Bounded_buffer { Portion buffer[N]; // 0..N-1 size_t head = 0, tail = 0; // 0..N-1 size_t size = 0; // 0..N std::condition_variable non_empty, non_full; std::mutex mtx; public: void append(Portion portion) { std::unique_lock lck(mtx); non_full.wait(lck, [&]{ return size != N; }); assert(size The C++ version needs an additional mutex for technical reasons. It uses assert to enforce the preconditions for the buffer add and remove operations. ==Using channels==
Using channels
The very first producer-consumer solution in the Electrologica computers used 'channels'. Hoare defined channels: An alternative to explicit naming of source and destination would be to name a port through which communication is to take place. The port names would be local to the processes, and the manner in which pairs of ports are to be connected by channels could be declared in the head of a parallel command. Brinch Hansen implemented channels in the programming languages Joyce and Super Pascal. The Plan 9 operating system programming language Alef, the Inferno operating system programming language Limbo have channels. The following C source code compiles on Plan 9 from User Space: • include "u.h" • include "libc.h" • include "thread.h" enum { STACK = 8192 }; void producer(void *v) { Channel *ch = v; for (uint i = 1; ; ++i) { sleep(400); print("p %d\n", i); sendul(ch, i); } } void consumer(void *v) { Channel *ch = v; for (;;) { uint p = recvul(ch); print("\t\tc %d\n", p); sleep(200 + nrand(600)); } } void threadmain(int argc, char **argv) { int (*mk)(void (*fn)(void*), void *arg, uint stack); mk = threadcreate; Channel *ch = chancreate(sizeof(ulong), 1); mk(producer, ch, STACK); mk(consumer, ch, STACK); recvp(chancreate(sizeof(void*), 0)); threadexitsall(0); } The program entry point is at function threadmain. The function call ch = chancreate(sizeof(ulong), 1) creates the channel, the function call sendul(ch, i) sends a value into the channel and the function call p = recvul(ch) receives a value from the channel. The programming language Go has channels, too. A Go example: package main import ( "fmt" "math/rand" "time" ) var sendMsg = 0 func produceMessage() int { time.Sleep(400 * time.Millisecond) sendMsg++ fmt.Printf("sendMsg = %v\n", sendMsg) return sendMsg } func consumeMessage(recvMsg int) { fmt.Printf("\t\trecvMsg = %v\n", recvMsg) time.Sleep(time.Duration(200+rand.Intn(600)) * time.Millisecond) } func main() { ch := make(chan int, 3) go func() { for { ch The Go producer-consumer solution uses the main Go routine for consumer and creates a new, unnamed Go routine for the producer. The two Go routines are connected with channel ch. This channel can queue up to three int values. The statement ch := make(chan int, 3) creates the channel, the statement ch sends a value into the channel and the statement recvMsg := range ch receives a value from the channel. The allocation of memory resources, the allocation of processing resources, and the synchronization of resources are done by the programming language automatically. ==Without semaphores or monitors==
Without semaphores or monitors
Leslie Lamport documented a bounded buffer producer-consumer solution for one producer and one consumer: We assume that the buffer can hold at most b messages, b >= 1. In our solution, we let k be a constant greater than b, and let s and r be integer variables assuming values between 0 and k-1. We assume that initially s=r and the buffer is empty. By choosing k to be a multiple of b, the buffer can be implemented as an array B [0: b - 1]. The producer simply puts each new message into B[s mod b], and the consumer takes each message from B[r mod b]. The algorithm is shown below, generalized for infinite k. Producer: L: if (s - r) mod k = b then goto L fi; put message in buffer; s := (s + 1) mod k; goto L; Consumer: L: if (s - r) mod k = 0 then goto L fi; take message from buffer; r := (r + 1) mod k; goto L; The Lamport solution uses busy waiting in the thread instead of waiting in the scheduler. This solution neglects the impact of scheduler thread switch at inconvenient times. If the first thread has read a variable value from memory, the scheduler switches to the second thread that changes the variable value, and the scheduler switches back to the first thread then the first thread uses the old value of the variable, not the current value. Atomic read-modify-write solves this problem. Modern C++ offers atomic variables and operations for multi-thread programming. The following busy waiting C++11 solution for one producer and one consumer uses atomic read-modify-write operations fetch_add and fetch_sub on the atomic variable count. enum {N = 4 }; Message buffer[N]; std::atomic count {0}; void producer() { unsigned tail {0}; for (;;) { Message message = produceMessage(); while (N == count) ; // busy waiting buffer[tail++] = message; tail %= N; count.fetch_add(1, std::memory_order_relaxed); } } void consumer() { unsigned head {0}; for (;;) { while (0 == count) ; // busy waiting Message message = buffer[head++]; head %= N; count.fetch_sub(1, std::memory_order_relaxed); consumeMessage(message); } } int main() { std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); } The circular buffer index variables head and tail are thread-local and therefore not relevant for memory consistency. The variable count controls the busy waiting of the producer and consumer thread. ==See also==
tickerdossier.comtickerdossier.substack.com