Skip to content

Commit f27f9fb

Browse files
authored
thread safe scheduler (#241)
Signed-off-by: turuslan <[email protected]>
1 parent f128f56 commit f27f9fb

35 files changed

Lines changed: 508 additions & 1157 deletions

housekeeping/clang-tidy-diff.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@
88
BUILD_DIR="${BUILD_DIR:?}"
99

1010
cd $(dirname $0)/..
11-
git diff -U0 origin/master | clang-tidy-diff.py -p1 -path $BUILD_DIR -iregex '(include|src)\/.*\.(h|c|hpp|cpp)' | tee clang-tidy.log
11+
git diff -U0 origin/master | clang-tidy-diff.py -p1 -path $BUILD_DIR -iregex '(include|src)\/.*\.(h|c|hpp|cpp)' -clang-tidy-binary clang-tidy-15 | tee clang-tidy.log
1212
! grep ': error:' clang-tidy.log

include/libp2p/basic/cancel.hpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
#include <memory>
10+
11+
namespace libp2p {
12+
/**
13+
* Interface with destructor for `std::unique_ptr`.
14+
*/
15+
// NOLINTNEXTLINE(cppcoreguidelines-special-member-functions)
16+
class CancelDtor {
17+
public:
18+
inline virtual ~CancelDtor() = 0;
19+
};
20+
CancelDtor::~CancelDtor() = default;
21+
22+
/**
23+
* RAII object to cancel operation.
24+
*/
25+
using Cancel = std::unique_ptr<CancelDtor>;
26+
27+
template <typename F>
28+
class CancelDtorFn final : public CancelDtor {
29+
F f;
30+
31+
public:
32+
explicit CancelDtorFn(F f) : f{std::move(f)} {}
33+
34+
~CancelDtorFn() override {
35+
f();
36+
}
37+
38+
// clang-tidy cppcoreguidelines-special-member-functions
39+
CancelDtorFn(const CancelDtorFn &) = delete;
40+
void operator=(const CancelDtorFn &) = delete;
41+
CancelDtorFn(CancelDtorFn &&) = delete;
42+
void operator=(CancelDtorFn &&) = delete;
43+
};
44+
45+
Cancel cancelFn(auto fn) {
46+
return std::make_unique<CancelDtorFn<decltype(fn)>>(std::move(fn));
47+
}
48+
} // namespace libp2p

include/libp2p/basic/scheduler.hpp

Lines changed: 8 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -10,64 +10,13 @@
1010
#include <functional>
1111
#include <memory>
1212

13-
#include <libp2p/outcome/outcome.hpp>
13+
#include <libp2p/basic/cancel.hpp>
1414

1515
namespace libp2p::basic {
16-
17-
constexpr std::chrono::milliseconds kZeroTime =
18-
std::chrono::milliseconds::zero();
19-
20-
/**
21-
* Feedback from scheduler backend to Scheduler implementation.
22-
*/
23-
class SchedulerBackendFeedback {
24-
public:
25-
virtual ~SchedulerBackendFeedback() = default;
26-
27-
/**
28-
* Called from backend to fire callbacks
29-
* @param current_clock
30-
* For timers: non-zero current async ticks in milliseconds since async's
31-
* epoch;
32-
* For callbacks deferred to next IO loop cycle: zero
33-
*/
34-
virtual void pulse(std::chrono::milliseconds current_clock) noexcept = 0;
35-
};
36-
37-
/**
38-
* Scheduling engine that depends on implementation:
39-
* 1) AsioSchedulerBackend (for asio-based asynchrony, it uses
40-
* std::chrono::steady_clock)
41-
* 2) ManualSchedulerBackend (for testing purposes,
42-
* uses manual time shifts as a async)
43-
*/
44-
class SchedulerBackend {
45-
public:
46-
virtual ~SchedulerBackend() = default;
47-
48-
/**
49-
* Current async
50-
* @return Milliseconds elapsed from async's epoch
51-
*/
52-
virtual std::chrono::milliseconds now() const noexcept = 0;
53-
54-
/**
55-
* Implementation-defined defer or delay function.
56-
* @param abs_time Time since async's epoch.
57-
* If abs_time == 0 then SchedulerBackendFeedback::pulse()
58-
* will be called on the next IO loop cycle with zero argument
59-
* @param scheduler Weak reference to scheduler (as it may expire)
60-
*/
61-
virtual void setTimer(
62-
std::chrono::milliseconds abs_time,
63-
std::weak_ptr<SchedulerBackendFeedback> scheduler) = 0;
64-
};
65-
6616
/**
6717
* Scheduler API. Provides callback deferring facilities and low-res timer
6818
*/
69-
class Scheduler : public SchedulerBackendFeedback,
70-
public std::enable_shared_from_this<Scheduler> {
19+
class Scheduler {
7120
public:
7221
struct Config {
7322
static constexpr std::chrono::milliseconds kMaxTimerThreshold =
@@ -80,79 +29,19 @@ namespace libp2p::basic {
8029
std::chrono::milliseconds max_timer_threshold = kMaxTimerThreshold;
8130
};
8231

83-
enum class Error {
84-
// Invalid argument passed
85-
kInvalidArgument = 1,
86-
87-
// Scheduler handle detached, cannot reschedule
88-
kHandleDetached,
89-
90-
// Scheduler item not found, cannot reschedule
91-
kItemNotFound,
92-
};
93-
94-
/**
95-
* Handle provides scoped callbacks lifetime, allows for canceling and
96-
* rescheduling
97-
*/
98-
class Handle {
99-
public:
100-
/**
101-
* { async-time, seq-number } structure for proper ordering and uniqueness
102-
* within Scheduler
103-
*/
104-
using Ticket = std::pair<std::chrono::milliseconds, uint64_t>;
105-
106-
Handle(const Handle &) = delete;
107-
Handle &operator=(const Handle &) = delete;
108-
109-
Handle() = default;
110-
Handle(Handle &&) = default;
111-
112-
/**
113-
* Ctor. Called from SchedulerImpl
114-
* @param ticket unique ticket, used for cancelling and rescheduling
115-
* @param scheduler handle's owner object
116-
*/
117-
Handle(Ticket ticket, std::weak_ptr<Scheduler> scheduler);
118-
119-
/**
120-
* Move assignment cancels existing ticket, if any
121-
*/
122-
Handle &operator=(Handle &&r) noexcept;
123-
124-
/**
125-
* Non-trivial dtor cancels existing ticket, if any
126-
*/
127-
~Handle();
128-
129-
/**
130-
* Cancels existing ticket, if any
131-
*/
132-
void cancel() noexcept;
133-
134-
/**
135-
* Reschedules existing ticket, if it is still active.
136-
* Allows reentrancy (can be called from inside the callback)
137-
* @param delay_from_now Relative time
138-
* @return success or error value from Scheduler::Error
139-
*/
140-
outcome::result<void> reschedule(
141-
std::chrono::milliseconds delay_from_now) noexcept;
142-
143-
private:
144-
Ticket ticket_;
145-
std::weak_ptr<Scheduler> scheduler_;
146-
};
32+
using Handle = Cancel;
14733

14834
using Callback = std::function<void()>;
35+
using Time = std::chrono::milliseconds;
36+
37+
virtual ~Scheduler() = default;
14938

15039
/**
15140
* Defers callback to be executed during the next IO loop cycle
15241
* @param cb callback
15342
*/
15443
void schedule(Callback &&cb) noexcept {
155-
std::ignore = scheduleImpl(std::move(cb), kZeroTime, false);
44+
std::ignore = scheduleImpl(std::move(cb), Time::zero(), false);
15645
}
15746

15847
/**
@@ -172,7 +61,7 @@ namespace libp2p::basic {
17261
* lifetime
17362
*/
17463
[[nodiscard]] Handle scheduleWithHandle(Callback &&cb) noexcept {
175-
return scheduleImpl(std::move(cb), kZeroTime, true);
64+
return scheduleImpl(std::move(cb), Time::zero(), true);
17665
}
17766

17867
/**
@@ -201,9 +90,6 @@ namespace libp2p::basic {
20190
bool) = delete;
20291

20392
protected:
204-
/// Handle calls cancel() and reschedule()
205-
friend class Handle;
206-
20793
/**
20894
* Called from schedule() and scheduleWithHandle() functions
20995
* @param cb callback
@@ -214,24 +100,5 @@ namespace libp2p::basic {
214100
virtual Handle scheduleImpl(Callback &&cb,
215101
std::chrono::milliseconds delay_from_now,
216102
bool make_handle) noexcept = 0;
217-
218-
/**
219-
* Called from Handle (from move assignment, destructor, or manually)
220-
* @param ticket handle's existing ticket
221-
*/
222-
virtual void cancel(Handle::Ticket ticket) noexcept = 0;
223-
224-
/**
225-
* Called from Handle.
226-
* Reschedules existing ticket.
227-
* Allows reentrancy (can be called from inside the callback)
228-
* @param delay_from_now Relative time
229-
* @return success or error value from Scheduler::Error
230-
*/
231-
virtual outcome::result<Handle::Ticket> reschedule(
232-
Handle::Ticket ticket,
233-
std::chrono::milliseconds delay_from_now) noexcept = 0;
234103
};
235104
} // namespace libp2p::basic
236-
237-
OUTCOME_HPP_DECLARE_ERROR(libp2p::basic, Scheduler::Error)

include/libp2p/basic/scheduler/asio_scheduler_backend.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
#include <boost/asio/io_context.hpp>
1010
#include <boost/asio/steady_timer.hpp>
1111

12-
#include <libp2p/basic/scheduler.hpp>
12+
#include <libp2p/basic/scheduler/backend.hpp>
1313

1414
namespace libp2p::basic {
1515

@@ -27,6 +27,8 @@ namespace libp2p::basic {
2727
explicit AsioSchedulerBackend(
2828
std::shared_ptr<boost::asio::io_context> io_context);
2929

30+
void post(std::function<void()> &&) override;
31+
3032
/**
3133
* @return Milliseconds since steady clock's epoch
3234
*/
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
#include <chrono>
10+
#include <functional>
11+
#include <memory>
12+
13+
namespace libp2p::basic {
14+
/**
15+
* Feedback from scheduler backend to Scheduler implementation.
16+
*/
17+
class SchedulerBackendFeedback {
18+
public:
19+
virtual ~SchedulerBackendFeedback() = default;
20+
21+
/**
22+
* Called from backend to fire callbacks
23+
*/
24+
virtual void pulse() noexcept = 0;
25+
};
26+
27+
/**
28+
* Scheduling engine that depends on implementation:
29+
* 1) AsioSchedulerBackend (for asio-based asynchrony, it uses
30+
* std::chrono::steady_clock)
31+
* 2) ManualSchedulerBackend (for testing purposes,
32+
* uses manual time shifts as a async)
33+
*/
34+
class SchedulerBackend {
35+
public:
36+
virtual ~SchedulerBackend() = default;
37+
38+
/**
39+
* boost::asio::io_context::post
40+
*/
41+
virtual void post(std::function<void()> &&) = 0;
42+
43+
/**
44+
* Current async
45+
* @return Milliseconds elapsed from async's epoch
46+
*/
47+
virtual std::chrono::milliseconds now() const noexcept = 0;
48+
49+
/**
50+
* Implementation-defined defer or delay function.
51+
* @param abs_time Time since async's epoch.
52+
* If abs_time == 0 then SchedulerBackendFeedback::pulse()
53+
* will be called on the next IO loop cycle with zero argument
54+
* @param scheduler Weak reference to scheduler (as it may expire)
55+
*/
56+
virtual void setTimer(
57+
std::chrono::milliseconds abs_time,
58+
std::weak_ptr<SchedulerBackendFeedback> scheduler) = 0;
59+
};
60+
} // namespace libp2p::basic

include/libp2p/basic/scheduler/manual_scheduler_backend.hpp

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66

77
#pragma once
88

9-
#include <vector>
9+
#include <deque>
10+
#include <optional>
1011

11-
#include <libp2p/basic/scheduler.hpp>
12+
#include <libp2p/basic/scheduler/backend.hpp>
1213

1314
namespace libp2p::basic {
1415

@@ -22,6 +23,8 @@ namespace libp2p::basic {
2223
public:
2324
ManualSchedulerBackend() : current_clock_(1) {}
2425

26+
void post(std::function<void()> &&) override;
27+
2528
/**
2629
* @return Milliseconds since clock's epoch. Clock is set manually
2730
*/
@@ -54,24 +57,29 @@ namespace libp2p::basic {
5457
* @return true if no more events scheduled
5558
*/
5659
bool empty() const {
57-
return deferred_callbacks_.empty() && !timer_callback_;
60+
return deferred_callbacks_.empty() and not timer_expires_;
61+
}
62+
63+
void run() {
64+
while (not empty()) {
65+
shiftToTimer();
66+
}
5867
}
5968

6069
private:
70+
void callDeferred();
71+
6172
/// Current time, set manually
6273
std::chrono::milliseconds current_clock_;
6374

6475
/// Callbacks deferred for the next cycle
65-
std::vector<Callback> deferred_callbacks_;
66-
67-
/// Currently processed callback (reentrancy + rescheduling reasons here)
68-
std::vector<Callback> in_process_;
76+
std::deque<Callback> deferred_callbacks_;
6977

7078
/// Timer callback
71-
Callback timer_callback_;
79+
std::weak_ptr<SchedulerBackendFeedback> scheduler_;
7280

7381
/// Expiry of timer event
74-
std::chrono::milliseconds timer_expires_{};
82+
std::optional<std::chrono::milliseconds> timer_expires_;
7583
};
7684

7785
} // namespace libp2p::basic

0 commit comments

Comments
 (0)