/root/bitcoin/src/scheduler.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2015-2022 The Bitcoin Core developers |
2 | | // Distributed under the MIT software license, see the accompanying |
3 | | // file COPYING or http://www.opensource.org/licenses/mit-license.php. |
4 | | |
5 | | #ifndef BITCOIN_SCHEDULER_H |
6 | | #define BITCOIN_SCHEDULER_H |
7 | | |
8 | | #include <attributes.h> |
9 | | #include <sync.h> |
10 | | #include <threadsafety.h> |
11 | | #include <util/task_runner.h> |
12 | | |
13 | | #include <chrono> |
14 | | #include <condition_variable> |
15 | | #include <cstddef> |
16 | | #include <functional> |
17 | | #include <list> |
18 | | #include <map> |
19 | | #include <thread> |
20 | | #include <utility> |
21 | | |
22 | | /** |
23 | | * Simple class for background tasks that should be run |
24 | | * periodically or once "after a while" |
25 | | * |
26 | | * Usage: |
27 | | * |
28 | | * CScheduler* s = new CScheduler(); |
29 | | * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } |
30 | | * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); |
31 | | * std::thread* t = new std::thread([&] { s->serviceQueue(); }); |
32 | | * |
33 | | * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: |
34 | | * s->stop(); |
35 | | * t->join(); |
36 | | * delete t; |
37 | | * delete s; // Must be done after thread is interrupted/joined. |
38 | | */ |
39 | | class CScheduler |
40 | | { |
41 | | public: |
42 | | CScheduler(); |
43 | | ~CScheduler(); |
44 | | |
45 | | std::thread m_service_thread; |
46 | | |
47 | | typedef std::function<void()> Function; |
48 | | |
49 | | /** Call func at/after time t */ |
50 | | void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
51 | | |
52 | | /** Call f once after the delta has passed */ |
53 | | void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) |
54 | 0 | { |
55 | 0 | schedule(std::move(f), std::chrono::steady_clock::now() + delta); |
56 | 0 | } |
57 | | |
58 | | /** |
59 | | * Repeat f until the scheduler is stopped. First run is after delta has passed once. |
60 | | * |
61 | | * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more |
62 | | * accurate scheduling, don't use this method. |
63 | | */ |
64 | | void scheduleEvery(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
65 | | |
66 | | /** |
67 | | * Mock the scheduler to fast forward in time. |
68 | | * Iterates through items on taskQueue and reschedules them |
69 | | * to be delta_seconds sooner. |
70 | | */ |
71 | | void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
72 | | |
73 | | /** |
74 | | * Services the queue 'forever'. Should be run in a thread. |
75 | | */ |
76 | | void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
77 | | |
78 | | /** Tell any threads running serviceQueue to stop as soon as the current task is done */ |
79 | | void stop() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) |
80 | 1 | { |
81 | 1 | WITH_LOCK(newTaskMutex, stopRequested = true); |
82 | 1 | newTaskScheduled.notify_all(); |
83 | 1 | if (m_service_thread.joinable()) m_service_thread.join(); |
84 | 1 | } |
85 | | /** Tell any threads running serviceQueue to stop when there is no work left to be done */ |
86 | | void StopWhenDrained() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) |
87 | 0 | { |
88 | 0 | WITH_LOCK(newTaskMutex, stopWhenEmpty = true); |
89 | 0 | newTaskScheduled.notify_all(); |
90 | 0 | if (m_service_thread.joinable()) m_service_thread.join(); |
91 | 0 | } |
92 | | |
93 | | /** |
94 | | * Returns number of tasks waiting to be serviced, |
95 | | * and first and last task times |
96 | | */ |
97 | | size_t getQueueInfo(std::chrono::steady_clock::time_point& first, |
98 | | std::chrono::steady_clock::time_point& last) const |
99 | | EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
100 | | |
101 | | /** Returns true if there are threads actively running in serviceQueue() */ |
102 | | bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
103 | | |
104 | | private: |
105 | | mutable Mutex newTaskMutex; |
106 | | std::condition_variable newTaskScheduled; |
107 | | std::multimap<std::chrono::steady_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); |
108 | | int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0}; |
109 | | bool stopRequested GUARDED_BY(newTaskMutex){false}; |
110 | | bool stopWhenEmpty GUARDED_BY(newTaskMutex){false}; |
111 | 4 | bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } |
112 | | }; |
113 | | |
114 | | /** |
115 | | * Class used by CScheduler clients which may schedule multiple jobs |
116 | | * which are required to be run serially. Jobs may not be run on the |
117 | | * same thread, but no two jobs will be executed |
118 | | * at the same time and memory will be release-acquire consistent |
119 | | * (the scheduler will internally do an acquire before invoking a callback |
120 | | * as well as a release at the end). In practice this means that a callback |
121 | | * B() will be able to observe all of the effects of callback A() which executed |
122 | | * before it. |
123 | | */ |
124 | | class SerialTaskRunner : public util::TaskRunnerInterface |
125 | | { |
126 | | private: |
127 | | CScheduler& m_scheduler; |
128 | | |
129 | | Mutex m_callbacks_mutex; |
130 | | |
131 | | // We are not allowed to assume the scheduler only runs in one thread, |
132 | | // but must ensure all callbacks happen in-order, so we end up creating |
133 | | // our own queue here :( |
134 | | std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex); |
135 | | bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false; |
136 | | |
137 | | void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
138 | | void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
139 | | |
140 | | public: |
141 | 0 | explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} |
142 | | |
143 | | /** |
144 | | * Add a callback to be executed. Callbacks are executed serially |
145 | | * and memory is release-acquire consistent between callback executions. |
146 | | * Practically, this means that callbacks can behave as if they are executed |
147 | | * in order by a single thread. |
148 | | */ |
149 | | void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
150 | | |
151 | | /** |
152 | | * Processes all remaining queue members on the calling thread, blocking until queue is empty |
153 | | * Must be called after the CScheduler has no remaining processing threads! |
154 | | */ |
155 | | void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
156 | | |
157 | | size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
158 | | }; |
159 | | |
160 | | #endif // BITCOIN_SCHEDULER_H |