This page is more on examples of multi-thread programming in c++11 than a reference.
The above example create a thread t which executes the run function it the new thread.
If we do not create any thread there will always be one thread in the application, we usually call it the main thread.Now suppose we are creating a new thread then the main thread and the newly created thread will execute in parallel. In our example above thread object t is created in main thread and it will be out of scope once main thread finish executes main function and terminate will be called, even if the newly created thread is active. To avoid this we need to make our main thread wait for the newly created thread to complete its execution. The way to do that is to call t.join();
An example of using std::mutex is shown below,
Creating thread
A new thread can be created using std::thread class. An example is shown below on how to create a thread.
#include <thread> #include <iostream> void run() { std::cout << __FUNCTION__ << std::endl; } int main(int argc, char **argv) { std::thread t(run); t.join(); }
The above example create a thread t which executes the run function it the new thread.
If we do not create any thread there will always be one thread in the application, we usually call it the main thread.Now suppose we are creating a new thread then the main thread and the newly created thread will execute in parallel. In our example above thread object t is created in main thread and it will be out of scope once main thread finish executes main function and terminate will be called, even if the newly created thread is active. To avoid this we need to make our main thread wait for the newly created thread to complete its execution. The way to do that is to call t.join();
Important functions in std::thread
get_id | Retrieves thread thread id |
joinable | Checks if the thread is joinable |
join | Joins the thread, ie. wait for the thread to complete its execution |
detach | Detach thread |
swap | Swap thread objects |
Using Mutex
Mutex can be used to protect data from multiple threads. mutex::lock can be used to prevent multiple thread access to shared data and mutex::unlock to release the lock.An example of using std::mutex is shown below,
#include <thread> #include <mutex> #include <chrono> #include <vector> #include <memory> #include <iostream> void run() { static std::mutex m; // only one thread at a time so that both // of the print outs happens one after another m.lock(); std::cout << __FUNCTION__ << " started" << std::endl; // wait for 100 milli seconds std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::cout << __FUNCTION__ << " ended" << std::endl; m.unlock(); } int main(int argc, char **argv) { std::vector<std::shared_ptr<std::thread> > threads; for (int i=0; i<5; ++i) { std::shared_ptr<std::thread> t(new std::thread(run)); threads.push_back(t); } for (std::shared_ptr<std::thread> t : threads ) { t->join(); } }
Important functions in std::mutex
lock | locks the mutex |
unlock | unlock the mutex |
try_lock | try locking the mutex, returns if the mutex is already locked |
Condition Variable
Condition variable is a synchronization primitive that allows a thread to wait until a condition occurs.
As a simple example, lets take the first example and make it work without using join function.
#include <iostream> #include <mutex> #include <thread> #include <condition_variable> std::condition_variable cv; std::mutex m; void run() { std::cout << __FUNCTION__ << " entered" << std::endl; // lets put a small delay for (int i=0; i<5; ++i) { std::cout << "." << std::flush; std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } std::cout << std::endl; std::cout << __FUNCTION__ << " exiting" << std::endl; // notify once we are done with the thread execution std::unique_lock<std::mutex> l(m); cv.notify_one(); } int main() { std::thread t(run); t.detach(); // wait for the thread to finish, once the thread execution is complete it will be notified std::unique_lock<std::mutex> l(m); cv.wait(l); return 0; }
Important functions in std::condition_variable
wait | wait for notification |
notify_one | notify one waiting thread |
notify_all | notify all waiting thread |
Now lets see some example of multi thread programming.
Printing 1 to 10 from two thread.
Lets print 1 to 10 with odd numbers printed in one thread and even from the other thread.
#include <iostream> #include <mutex> #include <thread> #include <condition_variable> // mutex used with condition variables std::mutex evenMutex; std::mutex oddMutex; // condition variables std::condition_variable evenCV; std::condition_variable oddCV; void evenThread() { for (int i=2; i<11; i+=2) { // wait for the odd thread to print the value and notify this thread { std::unique_lock<std::mutex> l(oddMutex); oddCV.wait(l); } std::cout << i << std::endl; // notify the odd thread to print value std::unique_lock<std::mutex> el(evenMutex); evenCV.notify_one(); } } void oddThread() { for (int i=1; i<10; i+=2) { std::cout << i << std::endl; // notify the even thread to print the value { std::unique_lock<std::mutex> ol(oddMutex); oddCV.notify_one(); } // wait for the even thread to print the value std::unique_lock<std::mutex> el(evenMutex); evenCV.wait(el); } } int main(int argc, char **argv) { std::thread ot(oddThread); std::thread et(evenThread); ot.join(); et.join(); return 0; }
Thread pool
Lets create a thread pool which creates a given number of threads and distributes the work in one of the available thread. If no thread is available it will wait for a thread to finish.
#include <iostream> #include <mutex> #include <thread> #include <condition_variable> #include <atomic> #include <vector> #include <queue> #include <memory> class ThreadPool; // worker class. It will create a thread and wait for a task from the ThreadPool class Worker { std::thread mThread; std::atomic_bool mFinished; ThreadPool *mPool; public: Worker(ThreadPool *pool) : mFinished(false), mPool(pool) { create(); } ~Worker() { join(); } void join() { if (mThread.joinable()) { mThread.join(); } } void terminate() { mFinished = true; } void create(); private: void doCreate(); }; class ThreadPool { private: int mWorkerCount; // worker vector std::vector<std::shared_ptr<Worker>> mThreads; // work quueue is a queue of lambda functions std::queue<std::function<void(void)>> mWorkQueue; // lock used to synchronize queue access and update std::mutex mWorkQueueLock; // condition variable used to notify task availability std::mutex mNotifierLock; std::condition_variable mNotifier; // condition variable used to notify termination of thread pool std::mutex mTerminateLock; std::condition_variable mTerminateCV; public: ThreadPool(int workerCount) : mWorkerCount(workerCount) { create(); } ~ThreadPool() { // wait for the termination of the pool std::unique_lock<std::mutex> tl(mTerminateLock); mTerminateCV.wait(tl); } void terminate() { for (std::shared_ptr<Worker> t: mThreads) { t->terminate(); } std::unique_lock<std::mutex> tl(mTerminateLock); mTerminateCV.notify_one(); } void submitWork(std::function<void()> const & work) { // add task to queue { std::unique_lock<std::mutex> ql(mWorkQueueLock); mWorkQueue.push(work); } // notify availability of task std::unique_lock<std::mutex> nl(mNotifierLock); mNotifier.notify_one(); } std::function<void()> getWork() { // wait for a task waitForWork(); { // retrieve a work std::unique_lock<std::mutex> ql(mWorkQueueLock); std::function<void()> work = mWorkQueue.front(); mWorkQueue.pop(); return work; } } private: void waitForWork() { // wait until queue is not empty std::unique_lock<std::mutex> nl(mNotifierLock); mNotifier.wait(nl, [this] { std::unique_lock<std::mutex> ql(mWorkQueueLock); return !mWorkQueue.empty(); }); } void create() { for (int i=0; i<mWorkerCount; ++i) { mThreads.push_back(std::shared_ptr<Worker>(new Worker(this))); } } }; void Worker::create() { mThread = std::thread([this] { doCreate(); }); } void Worker::doCreate() { while (!mFinished) { mPool->getWork()(); } } int main(int argc, char **argv) { ThreadPool pool(2); pool.submitWork([]{std::cout << "**work1**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }); pool.submitWork([]{std::cout << "**work2**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }); pool.submitWork([]{std::cout << "**work3**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }); pool.submitWork([]{std::cout << "**work4**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }); pool.submitWork([]{std::cout << "**work5**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }); pool.submitWork([]{std::cout << "**work6**" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }); return 0; }
No comments:
Post a Comment