OR-Tools  8.2
threadpool.cc
Go to the documentation of this file.
1// Copyright 2010-2018 Google LLC
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
15
17
18namespace operations_research {
19void RunWorker(void* data) {
20 ThreadPool* const thread_pool = reinterpret_cast<ThreadPool*>(data);
21 std::function<void()> work = thread_pool->GetNextTask();
22 while (work != NULL) {
23 work();
24 work = thread_pool->GetNextTask();
25 }
26}
27
28ThreadPool::ThreadPool(const std::string& prefix, int num_workers)
29 : num_workers_(num_workers) {}
30
32 if (started_) {
33 std::unique_lock<std::mutex> mutex_lock(mutex_);
34 waiting_to_finish_ = true;
35 mutex_lock.unlock();
36 condition_.notify_all();
37 for (int i = 0; i < num_workers_; ++i) {
38 all_workers_[i].join();
39 }
40 }
41}
42
44 CHECK_GT(capacity, num_workers_);
45 CHECK(!started_);
46 queue_capacity_ = capacity;
47}
48
50 started_ = true;
51 for (int i = 0; i < num_workers_; ++i) {
52 all_workers_.push_back(std::thread(&RunWorker, this));
53 }
54}
55
56std::function<void()> ThreadPool::GetNextTask() {
57 std::unique_lock<std::mutex> lock(mutex_);
58 for (;;) {
59 if (!tasks_.empty()) {
60 std::function<void()> task = tasks_.front();
61 tasks_.pop_front();
62 if (tasks_.size() < queue_capacity_ && waiting_for_capacity_) {
63 waiting_for_capacity_ = false;
64 capacity_condition_.notify_all();
65 }
66 return task;
67 }
68 if (waiting_to_finish_) {
69 return nullptr;
70 } else {
71 condition_.wait(lock);
72 }
73 }
74 return nullptr;
75}
76
77void ThreadPool::Schedule(std::function<void()> closure) {
78 std::unique_lock<std::mutex> lock(mutex_);
79 while (tasks_.size() >= queue_capacity_) {
80 waiting_for_capacity_ = true;
81 capacity_condition_.wait(lock);
82 }
83 tasks_.push_back(closure);
84 if (started_) {
85 lock.unlock();
86 condition_.notify_all();
87 }
88}
89
90} // namespace operations_research
#define CHECK(condition)
Definition: base/logging.h:495
#define CHECK_GT(val1, val2)
Definition: base/logging.h:702
ThreadPool(const std::string &prefix, int num_threads)
Definition: threadpool.cc:28
void SetQueueCapacity(int capacity)
Definition: threadpool.cc:43
std::function< void()> GetNextTask()
Definition: threadpool.cc:56
void Schedule(std::function< void()> closure)
Definition: threadpool.cc:77
The vehicle routing library lets one model and solve generic vehicle routing problems ranging from th...
void RunWorker(void *data)
Definition: threadpool.cc:19
int64 capacity