Loading...
Searching...
No Matches
connection.hpp
1/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_CONNECTION_HPP
8#define BOOST_REDIS_CONNECTION_HPP
9
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/adapter/any_adapter.hpp>
12#include <boost/redis/config.hpp>
13#include <boost/redis/detail/connection_logger.hpp>
14#include <boost/redis/detail/exec_fsm.hpp>
15#include <boost/redis/detail/health_checker.hpp>
16#include <boost/redis/detail/helper.hpp>
17#include <boost/redis/detail/multiplexer.hpp>
18#include <boost/redis/detail/reader_fsm.hpp>
19#include <boost/redis/detail/redis_stream.hpp>
20#include <boost/redis/detail/resp3_handshaker.hpp>
21#include <boost/redis/error.hpp>
22#include <boost/redis/logger.hpp>
23#include <boost/redis/operation.hpp>
24#include <boost/redis/request.hpp>
25#include <boost/redis/resp3/type.hpp>
26#include <boost/redis/usage.hpp>
27
28#include <boost/asio/any_completion_handler.hpp>
29#include <boost/asio/any_io_executor.hpp>
30#include <boost/asio/basic_stream_socket.hpp>
31#include <boost/asio/bind_executor.hpp>
32#include <boost/asio/buffer.hpp>
33#include <boost/asio/cancel_after.hpp>
34#include <boost/asio/coroutine.hpp>
35#include <boost/asio/deferred.hpp>
36#include <boost/asio/experimental/channel.hpp>
37#include <boost/asio/experimental/parallel_group.hpp>
38#include <boost/asio/immediate.hpp>
39#include <boost/asio/io_context.hpp>
40#include <boost/asio/ip/tcp.hpp>
41#include <boost/asio/prepend.hpp>
42#include <boost/asio/read_until.hpp>
43#include <boost/asio/ssl/stream.hpp>
44#include <boost/asio/steady_timer.hpp>
45#include <boost/asio/write.hpp>
46#include <boost/assert.hpp>
47#include <boost/config.hpp>
48#include <boost/core/ignore_unused.hpp>
49
50#include <array>
51#include <chrono>
52#include <cstddef>
53#include <memory>
54#include <string>
55#include <utility>
56
57namespace boost::redis {
58namespace detail {
59
60template <class AsyncReadStream, class DynamicBuffer>
61class append_some_op {
62private:
63 AsyncReadStream& stream_;
64 DynamicBuffer buf_;
65 std::size_t size_ = 0;
66 std::size_t tmp_ = 0;
67 asio::coroutine coro_{};
68
69public:
70 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
71 : stream_{stream}
72 , buf_{std::move(buf)}
73 , size_{size}
74 { }
75
76 template <class Self>
77 void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
78 {
79 BOOST_ASIO_CORO_REENTER(coro_)
80 {
81 tmp_ = buf_.size();
82 buf_.grow(size_);
83
84 BOOST_ASIO_CORO_YIELD
85 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
86 if (ec) {
87 self.complete(ec, 0);
88 return;
89 }
90
91 buf_.shrink(buf_.size() - tmp_ - n);
92 self.complete({}, n);
93 }
94 }
95};
96
97template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
98auto async_append_some(
99 AsyncReadStream& stream,
100 DynamicBuffer buffer,
101 std::size_t size,
102 CompletionToken&& token)
103{
104 return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
105 append_some_op<AsyncReadStream, DynamicBuffer>{stream, buffer, size},
106 token,
107 stream);
108}
109
110template <class Executor>
111using exec_notifier_type = asio::experimental::channel<
112 Executor,
113 void(system::error_code, std::size_t)>;
114
115template <class Conn>
116struct exec_op {
117 using executor_type = typename Conn::executor_type;
118
119 Conn* conn_ = nullptr;
120 std::shared_ptr<exec_notifier_type<executor_type>> notifier_ = nullptr;
121 detail::exec_fsm fsm_;
122
123 template <class Self>
124 void operator()(Self& self, system::error_code = {}, std::size_t = 0)
125 {
126 while (true) {
127 // Invoke the state machine
128 auto act = fsm_.resume(conn_->is_open(), self.get_cancellation_state().cancelled());
129
130 // Do what the FSM said
131 switch (act.type()) {
132 case detail::exec_action_type::setup_cancellation:
133 self.reset_cancellation_state(asio::enable_total_cancellation());
134 continue; // this action does not require yielding
135 case detail::exec_action_type::immediate:
136 asio::async_immediate(self.get_io_executor(), std::move(self));
137 return;
138 case detail::exec_action_type::notify_writer:
139 conn_->writer_timer_.cancel();
140 continue; // this action does not require yielding
141 case detail::exec_action_type::wait_for_response:
142 notifier_->async_receive(std::move(self));
143 return;
144 case detail::exec_action_type::cancel_run:
145 conn_->cancel(operation::run);
146 continue; // this action does not require yielding
147 case detail::exec_action_type::done:
148 notifier_.reset();
149 self.complete(act.error(), act.bytes_read());
150 return;
151 }
152 }
153 }
154};
155
156template <class Conn>
157struct writer_op {
158 Conn* conn_;
159 asio::coroutine coro{};
160
161 template <class Self>
162 void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
163 {
164 ignore_unused(n);
165
166 BOOST_ASIO_CORO_REENTER(coro) for (;;)
167 {
168 while (conn_->mpx_.prepare_write() != 0) {
169 BOOST_ASIO_CORO_YIELD
170 asio::async_write(
171 conn_->stream_,
172 asio::buffer(conn_->mpx_.get_write_buffer()),
173 std::move(self));
174
175 conn_->logger_.on_write(ec, conn_->mpx_.get_write_buffer().size());
176
177 if (ec) {
178 conn_->logger_.trace("writer_op (1)", ec);
179 conn_->cancel(operation::run);
180 self.complete(ec);
181 return;
182 }
183
184 conn_->mpx_.commit_write();
185
186 // A socket.close() may have been called while a
187 // successful write might had already been queued, so we
188 // have to check here before proceeding.
189 if (!conn_->is_open()) {
190 conn_->logger_.trace("writer_op (2): connection is closed.");
191 self.complete({});
192 return;
193 }
194 }
195
196 BOOST_ASIO_CORO_YIELD
197 conn_->writer_timer_.async_wait(std::move(self));
198 if (!conn_->is_open()) {
199 conn_->logger_.trace("writer_op (3): connection is closed.");
200 // Notice this is not an error of the op, stoping was
201 // requested from the outside, so we complete with
202 // success.
203 self.complete({});
204 return;
205 }
206 }
207 }
208};
209
210template <class Conn>
211struct reader_op {
212 using dyn_buffer_type = asio::dynamic_string_buffer<
213 char,
214 std::char_traits<char>,
215 std::allocator<char>>;
216
217 // TODO: Move this to config so the user can fine tune?
218 static constexpr std::size_t buffer_growth_hint = 4096;
219
220 Conn* conn_;
221 detail::reader_fsm fsm_;
222
223public:
224 reader_op(Conn& conn) noexcept
225 : conn_{&conn}
226 , fsm_{conn.mpx_}
227 { }
228
229 template <class Self>
230 void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
231 {
232 using dyn_buffer_type = asio::dynamic_string_buffer<
233 char,
234 std::char_traits<char>,
235 std::allocator<char>>;
236
237 for (;;) {
238 auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
239
240 conn_->logger_.on_fsm_resume(act);
241
242 switch (act.type_) {
243 case reader_fsm::action::type::setup_cancellation:
244 self.reset_cancellation_state(asio::enable_terminal_cancellation());
245 continue;
246 case reader_fsm::action::type::needs_more:
247 case reader_fsm::action::type::append_some:
248 async_append_some(
249 conn_->stream_,
250 dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
251 conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
252 std::move(self));
253 return;
254 case reader_fsm::action::type::notify_push_receiver:
255 if (conn_->receive_channel_.try_send(ec, act.push_size_)) {
256 continue;
257 } else {
258 conn_->receive_channel_.async_send(ec, act.push_size_, std::move(self));
259 return;
260 }
261 return;
262 case reader_fsm::action::type::cancel_run: conn_->cancel(operation::run); continue;
263 case reader_fsm::action::type::done: self.complete(act.ec_); return;
264 }
265 }
266 }
267};
268
269inline system::error_code check_config(const config& cfg)
270{
271 if (!cfg.unix_socket.empty()) {
272#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS
274#endif
275 if (cfg.use_ssl)
277 }
278 return system::error_code{};
279}
280
281template <class Conn>
282class run_op {
283private:
284 Conn* conn_ = nullptr;
285 asio::coroutine coro_{};
286 system::error_code stored_ec_;
287
288 using order_t = std::array<std::size_t, 5>;
289
290public:
291 run_op(Conn* conn) noexcept
292 : conn_{conn}
293 { }
294
295 // Called after the parallel group finishes
296 template <class Self>
297 void operator()(
298 Self& self,
299 order_t order,
300 system::error_code ec0,
301 system::error_code ec1,
302 system::error_code ec2,
303 system::error_code ec3,
304 system::error_code)
305 {
306 system::error_code final_ec;
307
308 if (order[0] == 0 && !!ec0) {
309 // The hello op finished first and with an error
310 final_ec = ec0;
311 } else if (order[0] == 2 && ec2 == error::pong_timeout) {
312 // The check ping timeout finished first. Use the ping error code
313 final_ec = ec1;
314 } else {
315 // Use the reader error code
316 final_ec = ec3;
317 }
318
319 (*this)(self, final_ec);
320 }
321
322 // TODO: this op doesn't handle per-operation cancellation correctly
323 template <class Self>
324 void operator()(Self& self, system::error_code ec = {})
325 {
326 BOOST_ASIO_CORO_REENTER(coro_)
327 {
328 // Check config
329 ec = check_config(conn_->cfg_);
330 if (ec) {
331 conn_->logger_.log(logger::level::err, "Invalid configuration", ec);
332 stored_ec_ = ec;
333 BOOST_ASIO_CORO_YIELD asio::async_immediate(self.get_io_executor(), std::move(self));
334 self.complete(stored_ec_);
335 return;
336 }
337
338 for (;;) {
339 // Try to connect
340 BOOST_ASIO_CORO_YIELD
341 conn_->stream_.async_connect(&conn_->cfg_, &conn_->logger_, std::move(self));
342
343 // If we were successful, run all the connection tasks
344 if (!ec) {
345 conn_->mpx_.reset();
346
347 // Note: Order is important here because the writer might
348 // trigger an async_write before the async_hello thereby
349 // causing an authentication problem.
350 BOOST_ASIO_CORO_YIELD
351 asio::experimental::make_parallel_group(
352 [this](auto token) {
353 return conn_->handshaker_.async_hello(*conn_, token);
354 },
355 [this](auto token) {
356 return conn_->health_checker_.async_ping(*conn_, token);
357 },
358 [this](auto token) {
359 return conn_->health_checker_.async_check_timeout(*conn_, token);
360 },
361 [this](auto token) {
362 return conn_->reader(token);
363 },
364 [this](auto token) {
365 return conn_->writer(token);
366 })
367 .async_wait(asio::experimental::wait_for_one_error(), std::move(self));
368
369 // The parallel group result will be translated into a single error
370 // code by the specialized operator() overload
371
372 // The receive operation must be cancelled because channel
373 // subscription does not survive a reconnection but requires
374 // re-subscription.
375 conn_->cancel(operation::receive);
376 }
377
378 // If we are not going to try again, we're done
379 if (!conn_->will_reconnect()) {
380 self.complete(ec);
381 return;
382 }
383
384 // Wait for the reconnection interval
385 conn_->reconnect_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
386 BOOST_ASIO_CORO_YIELD
387 conn_->reconnect_timer_.async_wait(std::move(self));
388
389 // If the timer was cancelled, exit
390 if (ec) {
391 self.complete(ec);
392 return;
393 }
394
395 // If we won't reconnect, exit
396 if (!conn_->will_reconnect()) {
397 self.complete(asio::error::operation_aborted);
398 return;
399 }
400 }
401 }
402 }
403};
404
405logger make_stderr_logger(logger::level lvl, std::string prefix);
406
407} // namespace detail
408
419template <class Executor>
421public:
423
425 BOOST_DEPRECATED("This typedef is deprecated, and will be removed with next_layer().")
426 typedef asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>> next_layer_type;
427
429 using executor_type = Executor;
430
432 executor_type get_executor() noexcept { return writer_timer_.get_executor(); }
433
435 template <class Executor1>
440
450 executor_type ex,
451 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
452 logger lgr = {})
453 : stream_{ex, std::move(ctx)}
454 , writer_timer_{ex}
455 , reconnect_timer_{ex}
456 , receive_channel_{ex, 256}
457 , health_checker_{ex}
458 , logger_{std::move(lgr)}
459 {
460 set_receive_response(ignore);
461 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
462 }
463
475 std::move(ex),
476 asio::ssl::context{asio::ssl::context::tlsv12_client},
477 std::move(lgr))
478 { }
479
482 asio::io_context& ioc,
483 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
484 logger lgr = {})
485 : basic_connection(ioc.get_executor(), std::move(ctx), std::move(lgr))
486 { }
487
489 basic_connection(asio::io_context& ctx, logger lgr)
491 ctx.get_executor(),
492 asio::ssl::context{asio::ssl::context::tlsv12_client},
493 std::move(lgr))
494 { }
495
532 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
533 auto async_run(config const& cfg, CompletionToken&& token = {})
534 {
535 cfg_ = cfg;
536 health_checker_.set_config(cfg);
537 handshaker_.set_config(cfg);
538
539 return asio::async_compose<CompletionToken, void(system::error_code)>(
540 detail::run_op<this_type>{this},
541 token,
542 writer_timer_);
543 }
544
559 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
561 "The async_run overload taking a logger argument is deprecated. "
562 "Please pass the logger to the connection's constructor, instead, "
563 "and use the other async_run overloads.")
564 auto async_run(config const& cfg, logger l, CompletionToken&& token = {})
565 {
566 set_stderr_logger(l.lvl, cfg);
567 return async_run(cfg, std::forward<CompletionToken>(token));
568 }
569
579 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
581 "Running without an explicit config object is deprecated."
582 "Please create a config object and pass it to async_run.")
583 auto async_run(CompletionToken&& token = {})
584 {
585 return async_run(config{}, std::forward<CompletionToken>(token));
586 }
587
610 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
611 auto async_receive(CompletionToken&& token = {})
612 {
613 return receive_channel_.async_receive(std::forward<CompletionToken>(token));
614 }
615
627 std::size_t receive(system::error_code& ec)
628 {
629 std::size_t size = 0;
630
631 auto f = [&](system::error_code const& ec2, std::size_t n) {
632 ec = ec2;
633 size = n;
634 };
635
636 auto const res = receive_channel_.try_receive(f);
637 if (ec)
638 return 0;
639
640 if (!res)
642
643 return size;
644 }
645
679 template <
680 class Response = ignore_t,
681 class CompletionToken = asio::default_completion_token_t<executor_type>>
682 auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {})
683 {
684 return this->async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
685 }
686
692 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
693 auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token = {})
694 {
695 auto& adapter_impl = adapter.impl_;
696 BOOST_ASSERT_MSG(
697 req.get_expected_responses() <= adapter_impl.supported_response_size,
698 "Request and response have incompatible sizes.");
699
700 auto notifier = std::make_shared<detail::exec_notifier_type<executor_type>>(
701 get_executor(),
702 1);
703 auto info = detail::make_elem(req, std::move(adapter_impl.adapt_fn));
704
705 info->set_done_callback([notifier]() {
706 notifier->try_send(std::error_code{}, 0);
707 });
708
709 return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
710 detail::exec_op<this_type>{this, notifier, detail::exec_fsm(mpx_, std::move(info))},
711 token,
712 writer_timer_);
713 }
714
726 void cancel(operation op = operation::all)
727 {
728 switch (op) {
729 case operation::resolve: stream_.cancel_resolve(); break;
730 case operation::exec: mpx_.cancel_waiting(); break;
732 cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
733 break;
734 case operation::run: cancel_run(); break;
735 case operation::receive: receive_channel_.cancel(); break;
736 case operation::health_check: health_checker_.cancel(); break;
737 case operation::all:
738 stream_.cancel_resolve();
739 cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
740 health_checker_.cancel();
741 cancel_run(); // run
742 receive_channel_.cancel(); // receive
743 mpx_.cancel_waiting(); // exec
744 break;
745 default: /* ignore */;
746 }
747 }
748
749 auto run_is_canceled() const noexcept { return mpx_.get_cancel_run_state(); }
750
752 bool will_reconnect() const noexcept
753 {
754 return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();
755 }
756
759 "ssl::context has no const methods, so this function should not be called. Set up any "
760 "required TLS configuration before passing the ssl::context to the connection's constructor.")
761 auto const& get_ssl_context() const noexcept { return stream_.get_ssl_context(); }
762
765 "This function is no longer necessary and is currently a no-op. connection resets the stream "
766 "internally as required. This function will be removed in subsequent releases")
767 void reset_stream() { }
768
771 "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
772 "the other member functions to interact with the connection.")
773 auto& next_layer() noexcept { return stream_.next_layer(); }
774
777 "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
778 "the other member functions to interact with the connection.")
779 auto const& next_layer() const noexcept { return stream_.next_layer(); }
780
782 template <class Response>
783 void set_receive_response(Response& response)
784 {
785 mpx_.set_receive_response(response);
786 }
787
789 usage get_usage() const noexcept { return mpx_.get_usage(); }
790
791private:
792 using clock_type = std::chrono::steady_clock;
793 using clock_traits_type = asio::wait_traits<clock_type>;
794 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
795
796 using receive_channel_type = asio::experimental::channel<
798 void(system::error_code, std::size_t)>;
799 using health_checker_type = detail::health_checker<Executor>;
800 using resp3_handshaker_type = detail::resp3_handshaker<executor_type>;
801
802 auto use_ssl() const noexcept { return cfg_.use_ssl; }
803
804 void cancel_run()
805 {
806 stream_.close();
807 writer_timer_.cancel();
808 receive_channel_.cancel();
809 mpx_.cancel_on_conn_lost();
810 }
811
812 // Used by both this class and connection
813 void set_stderr_logger(logger::level lvl, const config& cfg)
814 {
815 logger_.reset(detail::make_stderr_logger(lvl, cfg.log_prefix));
816 }
817
818 template <class> friend struct detail::reader_op;
819 template <class> friend struct detail::writer_op;
820 template <class> friend struct detail::exec_op;
821 template <class, class> friend struct detail::hello_op;
822 template <class, class> friend class detail::ping_op;
823 template <class> friend class detail::run_op;
824 template <class, class> friend class detail::check_timeout_op;
825 friend class connection;
826
827 template <class CompletionToken>
828 auto reader(CompletionToken&& token)
829 {
830 return asio::async_compose<CompletionToken, void(system::error_code)>(
831 detail::reader_op<this_type>{*this},
832 std::forward<CompletionToken>(token),
833 writer_timer_);
834 }
835
836 template <class CompletionToken>
837 auto writer(CompletionToken&& token)
838 {
839 return asio::async_compose<CompletionToken, void(system::error_code)>(
840 detail::writer_op<this_type>{this},
841 std::forward<CompletionToken>(token),
842 writer_timer_);
843 }
844
845 bool is_open() const noexcept { return stream_.is_open(); }
846
847 detail::redis_stream<Executor> stream_;
848
849 // Notice we use a timer to simulate a condition-variable. It is
850 // also more suitable than a channel and the notify operation does
851 // not suspend.
852 timer_type writer_timer_;
853 timer_type reconnect_timer_; // to wait the reconnection period
854 receive_channel_type receive_channel_;
855 health_checker_type health_checker_;
856 resp3_handshaker_type handshaker_;
857
858 config cfg_;
859 detail::multiplexer mpx_;
860 detail::connection_logger logger_;
861};
862
873public:
875 using executor_type = asio::any_io_executor;
876
885 explicit connection(
886 executor_type ex,
887 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
888 logger lgr = {});
889
900 : connection(
901 std::move(ex),
902 asio::ssl::context{asio::ssl::context::tlsv12_client},
903 std::move(lgr))
904 { }
905
907 explicit connection(
908 asio::io_context& ioc,
909 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
910 logger lgr = {})
911 : connection(ioc.get_executor(), std::move(ctx), std::move(lgr))
912 { }
913
915 connection(asio::io_context& ioc, logger lgr)
916 : connection(
917 ioc.get_executor(),
918 asio::ssl::context{asio::ssl::context::tlsv12_client},
919 std::move(lgr))
920 { }
921
923 executor_type get_executor() noexcept { return impl_.get_executor(); }
924
937 template <class CompletionToken = asio::deferred_t>
939 "The async_run overload taking a logger argument is deprecated. "
940 "Please pass the logger to the connection's constructor, instead, "
941 "and use the other async_run overloads.")
942 auto async_run(config const& cfg, logger l, CompletionToken&& token = {})
943 {
944 return asio::async_initiate<CompletionToken, void(boost::system::error_code)>(
945 [](auto handler, connection* self, config const* cfg, logger l) {
946 self->async_run_impl(*cfg, std::move(l), std::move(handler));
947 },
948 token,
949 this,
950 &cfg,
951 std::move(l));
952 }
953
955 template <class CompletionToken = asio::deferred_t>
956 auto async_run(config const& cfg, CompletionToken&& token = {})
957 {
958 return asio::async_initiate<CompletionToken, void(boost::system::error_code)>(
959 [](auto handler, connection* self, config const* cfg) {
960 self->async_run_impl(*cfg, std::move(handler));
961 },
962 token,
963 this,
964 &cfg);
965 }
966
968 template <class CompletionToken = asio::deferred_t>
969 auto async_receive(CompletionToken&& token = {})
970 {
971 return impl_.async_receive(std::forward<CompletionToken>(token));
972 }
973
975 std::size_t receive(system::error_code& ec) { return impl_.receive(ec); }
976
978 template <class Response = ignore_t, class CompletionToken = asio::deferred_t>
979 auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {})
980 {
981 return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
982 }
983
985 template <class CompletionToken = asio::deferred_t>
986 auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token = {})
987 {
988 return asio::async_initiate<CompletionToken, void(boost::system::error_code, std::size_t)>(
989 [](auto handler, connection* self, request const* req, any_adapter&& adapter) {
990 self->async_exec_impl(*req, std::move(adapter), std::move(handler));
991 },
992 token,
993 this,
994 &req,
995 std::move(adapter));
996 }
997
999 void cancel(operation op = operation::all);
1000
1002 bool will_reconnect() const noexcept { return impl_.will_reconnect(); }
1003
1006 "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
1007 "the other member functions to interact with the connection.")
1008 auto& next_layer() noexcept { return impl_.stream_.next_layer(); }
1009
1012 "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
1013 "the other member functions to interact with the connection.")
1014 auto const& next_layer() const noexcept { return impl_.stream_.next_layer(); }
1015
1018 "This function is no longer necessary and is currently a no-op. connection resets the stream "
1019 "internally as required. This function will be removed in subsequent releases")
1020 void reset_stream() { }
1021
1023 template <class Response>
1025 {
1026 impl_.set_receive_response(response);
1027 }
1028
1030 usage get_usage() const noexcept { return impl_.get_usage(); }
1031
1034 "ssl::context has no const methods, so this function should not be called. Set up any "
1035 "required TLS configuration before passing the ssl::context to the connection's constructor.")
1036 auto const& get_ssl_context() const noexcept { return impl_.stream_.get_ssl_context(); }
1037
1038private:
1039 void async_run_impl(
1040 config const& cfg,
1041 logger&& l,
1042 asio::any_completion_handler<void(boost::system::error_code)> token);
1043
1044 void async_run_impl(
1045 config const& cfg,
1046 asio::any_completion_handler<void(boost::system::error_code)> token);
1047
1048 void async_exec_impl(
1049 request const& req,
1050 any_adapter&& adapter,
1051 asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token);
1052
1054};
1055
1056} // namespace boost::redis
1057
1058#endif // BOOST_REDIS_CONNECTION_HPP
A type-erased reference to a response.
A SSL connection to the Redis server.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
basic_connection(executor_type ex, logger lgr)
Constructor.
executor_type get_executor() noexcept
Returns the associated executor.
basic_connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr={})
Constructs from a context.
basic_connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr={})
Constructor.
basic_connection(asio::io_context &ctx, logger lgr)
Constructs from a context.
auto async_run(config const &cfg, CompletionToken &&token={})
Starts underlying connection operations.
BOOST_DEPRECATED("The async_run overload taking a logger argument is deprecated. " "Please pass the logger to the connection's constructor, instead, " "and use the other async_run overloads.") auto async_run(config const &cfg
Starts underlying connection operations.
Executor executor_type
Executor type.
Rebinds the socket type to another executor.
A basic_connection that type erases the executor.
BOOST_DEPRECATED("This function is no longer necessary and is currently a no-op. connection resets the stream " "internally as required. This function will be removed in subsequent releases") void reset_stream()
Calls boost::redis::basic_connection::reset_stream.
bool will_reconnect() const noexcept
Calls boost::redis::basic_connection::will_reconnect.
asio::any_io_executor executor_type
Executor type.
connection(asio::io_context &ioc, logger lgr)
Constructs from a context.
connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr={})
Constructor.
BOOST_DEPRECATED("Accessing the underlying stream is deprecated and will be removed in the next release. Use " "the other member functions to interact with the connection.") auto &next_layer() noexcept
Calls boost::redis::basic_connection::next_layer.
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
BOOST_DEPRECATED("Accessing the underlying stream is deprecated and will be removed in the next release. Use " "the other member functions to interact with the connection.") auto const &next_layer() const noexcept
Calls boost::redis::basic_connection::next_layer.
BOOST_DEPRECATED("The async_run overload taking a logger argument is deprecated. " "Please pass the logger to the connection's constructor, instead, " "and use the other async_run overloads.") auto async_run(config const &cfg
Calls boost::redis::basic_connection::async_run.
connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr={})
Constructs from a context.
usage get_usage() const noexcept
Returns connection usage information.
connection(executor_type ex, logger lgr)
Constructor.
executor_type get_executor() noexcept
Returns the underlying executor.
BOOST_DEPRECATED("ssl::context has no const methods, so this function should not be called. Set up any " "required TLS configuration before passing the ssl::context to the connection's constructor.") auto const &get_ssl_context() const noexcept
Returns the ssl context.
Creates Redis requests.
Definition request.hpp:46
std::chrono::steady_clock::duration reconnect_wait_interval
Time waited before trying a reconnection.
Definition config.hpp:91
bool use_ssl
Uses SSL instead of a plain connection.
Definition config.hpp:32
level
Syslog-like log levels.
Definition logger.hpp:25
ignore_t ignore
Global ignore object.
std::decay_t< decltype(std::ignore)> ignore_t
Type used to ignore responses.
Definition ignore.hpp:30
operation
Connection operations that can be cancelled.
Definition operation.hpp:19
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Definition response.hpp:25
@ unix_sockets_ssl_unsupported
The configuration specified UNIX sockets with SSL, which is not supported.
@ sync_receive_push_failed
Can't receive push synchronously without blocking.
@ pong_timeout
Connect timeout.
@ unix_sockets_unsupported
The configuration specified a UNIX socket address, but UNIX sockets are not supported by the system.
@ health_check
Health check operation.
@ exec
Refers to connection::async_exec operations.
@ resolve
Resolve operation.
@ reconnection
Cancels reconnection.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Definition config.hpp:30
Connection usage information.
Definition usage.hpp:20
Defines logging configuration.
Definition logger.hpp:20
level lvl
Defines a severity filter for messages.
Definition logger.hpp:87