Loading...
Searching...
No Matches
health_checker.hpp
1/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
8#define BOOST_REDIS_HEALTH_CHECKER_HPP
9
10#include <boost/redis/adapter/any_adapter.hpp>
11#include <boost/redis/config.hpp>
12#include <boost/redis/detail/connection_logger.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/request.hpp>
15#include <boost/redis/response.hpp>
16
17#include <boost/asio/compose.hpp>
18#include <boost/asio/consign.hpp>
19#include <boost/asio/coroutine.hpp>
20#include <boost/asio/post.hpp>
21#include <boost/asio/steady_timer.hpp>
22
23#include <chrono>
24
25namespace boost::redis::detail {
26
27template <class HealthChecker, class Connection>
28class ping_op {
29public:
30 HealthChecker* checker_ = nullptr;
31 Connection* conn_ = nullptr;
32 asio::coroutine coro_{};
33
34 template <class Self>
35 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
36 {
37 BOOST_ASIO_CORO_REENTER(coro_) for (;;)
38 {
39 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
40 conn_->logger_.trace("ping_op (1): timeout disabled.");
41 BOOST_ASIO_CORO_YIELD
42 asio::post(std::move(self));
43 self.complete({});
44 return;
45 }
46
47 if (checker_->checker_has_exited_) {
48 conn_->logger_.trace("ping_op (2): checker has exited.");
49 self.complete({});
50 return;
51 }
52
53 BOOST_ASIO_CORO_YIELD
54 conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self));
55 if (ec) {
56 conn_->logger_.trace("ping_op (3)", ec);
57 checker_->wait_timer_.cancel();
58 self.complete(ec);
59 return;
60 }
61
62 // Wait before pinging again.
63 checker_->ping_timer_.expires_after(checker_->ping_interval_);
64
65 BOOST_ASIO_CORO_YIELD
66 checker_->ping_timer_.async_wait(std::move(self));
67 if (ec) {
68 conn_->logger_.trace("ping_op (4)", ec);
69 self.complete(ec);
70 return;
71 }
72 }
73 }
74};
75
76template <class HealthChecker, class Connection>
77class check_timeout_op {
78public:
79 HealthChecker* checker_ = nullptr;
80 Connection* conn_ = nullptr;
81 asio::coroutine coro_{};
82
83 template <class Self>
84 void operator()(Self& self, system::error_code ec = {})
85 {
86 BOOST_ASIO_CORO_REENTER(coro_) for (;;)
87 {
88 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
89 conn_->logger_.trace("check_timeout_op (1): timeout disabled.");
90 BOOST_ASIO_CORO_YIELD
91 asio::post(std::move(self));
92 self.complete({});
93 return;
94 }
95
96 checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
97
98 BOOST_ASIO_CORO_YIELD
99 checker_->wait_timer_.async_wait(std::move(self));
100 if (ec) {
101 conn_->logger_.trace("check_timeout_op (2)", ec);
102 self.complete(ec);
103 return;
104 }
105
106 if (checker_->resp_.has_error()) {
107 // TODO: Log the error.
108 conn_->logger_.trace("check_timeout_op (3): Response error.");
109 self.complete({});
110 return;
111 }
112
113 if (checker_->resp_.value().empty()) {
114 conn_->logger_.trace("check_timeout_op (4): pong timeout.");
115 checker_->ping_timer_.cancel();
116 conn_->cancel(operation::run);
117 checker_->checker_has_exited_ = true;
118 self.complete(error::pong_timeout);
119 return;
120 }
121
122 if (checker_->resp_.has_value()) {
123 checker_->resp_.value().clear();
124 }
125 }
126 }
127};
128
129template <class Executor>
130class health_checker {
131private:
132 using timer_type = asio::basic_waitable_timer<
133 std::chrono::steady_clock,
134 asio::wait_traits<std::chrono::steady_clock>,
135 Executor>;
136
137public:
138 health_checker(Executor ex)
139 : ping_timer_{ex}
140 , wait_timer_{ex}
141 {
142 req_.push("PING", "Boost.Redis");
143 }
144
145 void set_config(config const& cfg)
146 {
147 req_.clear();
148 req_.push("PING", cfg.health_check_id);
149 ping_interval_ = cfg.health_check_interval;
150 }
151
152 void cancel()
153 {
154 ping_timer_.cancel();
155 wait_timer_.cancel();
156 }
157
158 template <class Connection, class CompletionToken>
159 auto async_ping(Connection& conn, CompletionToken token)
160 {
161 return asio::async_compose<CompletionToken, void(system::error_code)>(
162 ping_op<health_checker, Connection>{this, &conn},
163 token,
164 conn,
165 ping_timer_);
166 }
167
168 template <class Connection, class CompletionToken>
169 auto async_check_timeout(Connection& conn, CompletionToken token)
170 {
171 checker_has_exited_ = false;
172 return asio::async_compose<CompletionToken, void(system::error_code)>(
173 check_timeout_op<health_checker, Connection>{this, &conn},
174 token,
175 conn,
176 wait_timer_);
177 }
178
179private:
180 template <class, class> friend class ping_op;
181 template <class, class> friend class check_timeout_op;
182
183 timer_type ping_timer_;
184 timer_type wait_timer_;
185 redis::request req_;
187 std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
188 bool checker_has_exited_ = false;
189};
190
191} // namespace boost::redis::detail
192
193#endif // BOOST_REDIS_HEALTH_CHECKER_HPP
Creates Redis requests.
Definition request.hpp:46
void push(std::string_view cmd, Ts const &... args)
Appends a new command to the end of the request.
Definition request.hpp:147
void clear()
Clears the request preserving allocated memory.
Definition request.hpp:104
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
Definition response.hpp:35
@ pong_timeout
Connect timeout.
@ run
Refers to connection::async_run operations.