/root/bitcoin/src/scheduler.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2015-2022 The Bitcoin Core developers |
2 | | // Distributed under the MIT software license, see the accompanying |
3 | | // file COPYING or http://www.opensource.org/licenses/mit-license.php. |
4 | | |
5 | | #include <scheduler.h> |
6 | | |
7 | | #include <sync.h> |
8 | | #include <util/time.h> |
9 | | |
10 | | #include <cassert> |
11 | | #include <functional> |
12 | | #include <utility> |
13 | | |
14 | 0 | CScheduler::CScheduler() = default; |
15 | | |
16 | | CScheduler::~CScheduler() |
17 | 0 | { |
18 | 0 | assert(nThreadsServicingQueue == 0); |
19 | 0 | if (stopWhenEmpty) assert(taskQueue.empty()); |
20 | 0 | } |
21 | | |
22 | | |
23 | | void CScheduler::serviceQueue() |
24 | 0 | { |
25 | 0 | WAIT_LOCK(newTaskMutex, lock); |
26 | 0 | ++nThreadsServicingQueue; |
27 | | |
28 | | // newTaskMutex is locked throughout this loop EXCEPT |
29 | | // when the thread is waiting or when the user's function |
30 | | // is called. |
31 | 0 | while (!shouldStop()) { |
32 | 0 | try { |
33 | 0 | while (!shouldStop() && taskQueue.empty()) { |
34 | | // Wait until there is something to do. |
35 | 0 | newTaskScheduled.wait(lock); |
36 | 0 | } |
37 | | |
38 | | // Wait until either there is a new task, or until |
39 | | // the time of the first item on the queue: |
40 | |
|
41 | 0 | while (!shouldStop() && !taskQueue.empty()) { |
42 | 0 | std::chrono::steady_clock::time_point timeToWaitFor = taskQueue.begin()->first; |
43 | 0 | if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { |
44 | 0 | break; // Exit loop after timeout, it means we reached the time of the event |
45 | 0 | } |
46 | 0 | } |
47 | | |
48 | | // If there are multiple threads, the queue can empty while we're waiting (another |
49 | | // thread may service the task we were waiting on). |
50 | 0 | if (shouldStop() || taskQueue.empty()) |
51 | 0 | continue; |
52 | | |
53 | 0 | Function f = taskQueue.begin()->second; |
54 | 0 | taskQueue.erase(taskQueue.begin()); |
55 | |
|
56 | 0 | { |
57 | | // Unlock before calling f, so it can reschedule itself or another task |
58 | | // without deadlocking: |
59 | 0 | REVERSE_LOCK(lock); |
60 | 0 | f(); |
61 | 0 | } |
62 | 0 | } catch (...) { |
63 | 0 | --nThreadsServicingQueue; |
64 | 0 | throw; |
65 | 0 | } |
66 | 0 | } |
67 | 0 | --nThreadsServicingQueue; |
68 | 0 | newTaskScheduled.notify_one(); |
69 | 0 | } |
70 | | |
71 | | void CScheduler::schedule(CScheduler::Function f, std::chrono::steady_clock::time_point t) |
72 | 0 | { |
73 | 0 | { |
74 | 0 | LOCK(newTaskMutex); |
75 | 0 | taskQueue.insert(std::make_pair(t, f)); |
76 | 0 | } |
77 | 0 | newTaskScheduled.notify_one(); |
78 | 0 | } |
79 | | |
80 | | void CScheduler::MockForward(std::chrono::seconds delta_seconds) |
81 | 0 | { |
82 | 0 | assert(delta_seconds > 0s && delta_seconds <= 1h); |
83 | | |
84 | 0 | { |
85 | 0 | LOCK(newTaskMutex); |
86 | | |
87 | | // use temp_queue to maintain updated schedule |
88 | 0 | std::multimap<std::chrono::steady_clock::time_point, Function> temp_queue; |
89 | |
|
90 | 0 | for (const auto& element : taskQueue) { |
91 | 0 | temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second); |
92 | 0 | } |
93 | | |
94 | | // point taskQueue to temp_queue |
95 | 0 | taskQueue = std::move(temp_queue); |
96 | 0 | } |
97 | | |
98 | | // notify that the taskQueue needs to be processed |
99 | 0 | newTaskScheduled.notify_one(); |
100 | 0 | } |
101 | | |
102 | | static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta) |
103 | 0 | { |
104 | 0 | f(); |
105 | 0 | s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta); |
106 | 0 | } |
107 | | |
108 | | void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta) |
109 | 0 | { |
110 | 0 | scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta); |
111 | 0 | } |
112 | | |
113 | | size_t CScheduler::getQueueInfo(std::chrono::steady_clock::time_point& first, |
114 | | std::chrono::steady_clock::time_point& last) const |
115 | 0 | { |
116 | 0 | LOCK(newTaskMutex); |
117 | 0 | size_t result = taskQueue.size(); |
118 | 0 | if (!taskQueue.empty()) { |
119 | 0 | first = taskQueue.begin()->first; |
120 | 0 | last = taskQueue.rbegin()->first; |
121 | 0 | } |
122 | 0 | return result; |
123 | 0 | } |
124 | | |
125 | | bool CScheduler::AreThreadsServicingQueue() const |
126 | 0 | { |
127 | 0 | LOCK(newTaskMutex); |
128 | 0 | return nThreadsServicingQueue; |
129 | 0 | } |
130 | | |
131 | | |
132 | | void SerialTaskRunner::MaybeScheduleProcessQueue() |
133 | 0 | { |
134 | 0 | { |
135 | 0 | LOCK(m_callbacks_mutex); |
136 | | // Try to avoid scheduling too many copies here, but if we |
137 | | // accidentally have two ProcessQueue's scheduled at once its |
138 | | // not a big deal. |
139 | 0 | if (m_are_callbacks_running) return; |
140 | 0 | if (m_callbacks_pending.empty()) return; |
141 | 0 | } |
142 | 0 | m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now()); |
143 | 0 | } |
144 | | |
145 | | void SerialTaskRunner::ProcessQueue() |
146 | 0 | { |
147 | 0 | std::function<void()> callback; |
148 | 0 | { |
149 | 0 | LOCK(m_callbacks_mutex); |
150 | 0 | if (m_are_callbacks_running) return; |
151 | 0 | if (m_callbacks_pending.empty()) return; |
152 | 0 | m_are_callbacks_running = true; |
153 | |
|
154 | 0 | callback = std::move(m_callbacks_pending.front()); |
155 | 0 | m_callbacks_pending.pop_front(); |
156 | 0 | } |
157 | | |
158 | | // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue |
159 | | // to ensure both happen safely even if callback() throws. |
160 | 0 | struct RAIICallbacksRunning { |
161 | 0 | SerialTaskRunner* instance; |
162 | 0 | explicit RAIICallbacksRunning(SerialTaskRunner* _instance) : instance(_instance) {} |
163 | 0 | ~RAIICallbacksRunning() |
164 | 0 | { |
165 | 0 | { |
166 | 0 | LOCK(instance->m_callbacks_mutex); |
167 | 0 | instance->m_are_callbacks_running = false; |
168 | 0 | } |
169 | 0 | instance->MaybeScheduleProcessQueue(); |
170 | 0 | } |
171 | 0 | } raiicallbacksrunning(this); |
172 | |
|
173 | 0 | callback(); |
174 | 0 | } |
175 | | |
176 | | void SerialTaskRunner::insert(std::function<void()> func) |
177 | 0 | { |
178 | 0 | { |
179 | 0 | LOCK(m_callbacks_mutex); |
180 | 0 | m_callbacks_pending.emplace_back(std::move(func)); |
181 | 0 | } |
182 | 0 | MaybeScheduleProcessQueue(); |
183 | 0 | } |
184 | | |
185 | | void SerialTaskRunner::flush() |
186 | 0 | { |
187 | 0 | assert(!m_scheduler.AreThreadsServicingQueue()); |
188 | 0 | bool should_continue = true; |
189 | 0 | while (should_continue) { |
190 | 0 | ProcessQueue(); |
191 | 0 | LOCK(m_callbacks_mutex); |
192 | 0 | should_continue = !m_callbacks_pending.empty(); |
193 | 0 | } |
194 | 0 | } |
195 | | |
196 | | size_t SerialTaskRunner::size() |
197 | 0 | { |
198 | 0 | LOCK(m_callbacks_mutex); |
199 | 0 | return m_callbacks_pending.size(); |
200 | 0 | } |