7#ifndef BOOST_REDIS_CONNECTION_HPP
8#define BOOST_REDIS_CONNECTION_HPP
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/adapter/any_adapter.hpp>
12#include <boost/redis/config.hpp>
13#include <boost/redis/detail/connection_logger.hpp>
14#include <boost/redis/detail/exec_fsm.hpp>
15#include <boost/redis/detail/health_checker.hpp>
16#include <boost/redis/detail/helper.hpp>
17#include <boost/redis/detail/multiplexer.hpp>
18#include <boost/redis/detail/reader_fsm.hpp>
19#include <boost/redis/detail/redis_stream.hpp>
20#include <boost/redis/detail/resp3_handshaker.hpp>
21#include <boost/redis/error.hpp>
22#include <boost/redis/logger.hpp>
23#include <boost/redis/operation.hpp>
24#include <boost/redis/request.hpp>
25#include <boost/redis/resp3/type.hpp>
26#include <boost/redis/usage.hpp>
28#include <boost/asio/any_completion_handler.hpp>
29#include <boost/asio/any_io_executor.hpp>
30#include <boost/asio/basic_stream_socket.hpp>
31#include <boost/asio/bind_executor.hpp>
32#include <boost/asio/buffer.hpp>
33#include <boost/asio/cancel_after.hpp>
34#include <boost/asio/coroutine.hpp>
35#include <boost/asio/deferred.hpp>
36#include <boost/asio/experimental/channel.hpp>
37#include <boost/asio/experimental/parallel_group.hpp>
38#include <boost/asio/immediate.hpp>
39#include <boost/asio/io_context.hpp>
40#include <boost/asio/ip/tcp.hpp>
41#include <boost/asio/prepend.hpp>
42#include <boost/asio/read_until.hpp>
43#include <boost/asio/ssl/stream.hpp>
44#include <boost/asio/steady_timer.hpp>
45#include <boost/asio/write.hpp>
46#include <boost/assert.hpp>
47#include <boost/config.hpp>
48#include <boost/core/ignore_unused.hpp>
57namespace boost::redis {
60template <
class AsyncReadStream,
class DynamicBuffer>
63 AsyncReadStream& stream_;
65 std::size_t size_ = 0;
67 asio::coroutine coro_{};
70 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
72 , buf_{std::move(buf)}
77 void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
79 BOOST_ASIO_CORO_REENTER(coro_)
85 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
91 buf_.shrink(buf_.size() - tmp_ - n);
97template <
class AsyncReadStream,
class DynamicBuffer,
class CompletionToken>
98auto async_append_some(
99 AsyncReadStream& stream,
100 DynamicBuffer buffer,
102 CompletionToken&& token)
104 return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
105 append_some_op<AsyncReadStream, DynamicBuffer>{stream, buffer, size},
110template <
class Executor>
111using exec_notifier_type = asio::experimental::channel<
113 void(system::error_code, std::size_t)>;
117 using executor_type =
typename Conn::executor_type;
119 Conn* conn_ =
nullptr;
120 std::shared_ptr<exec_notifier_type<executor_type>> notifier_ =
nullptr;
121 detail::exec_fsm fsm_;
123 template <
class Self>
124 void operator()(Self& self, system::error_code = {}, std::size_t = 0)
128 auto act = fsm_.resume(conn_->is_open(), self.get_cancellation_state().cancelled());
131 switch (act.type()) {
132 case detail::exec_action_type::setup_cancellation:
133 self.reset_cancellation_state(asio::enable_total_cancellation());
135 case detail::exec_action_type::immediate:
136 asio::async_immediate(self.get_io_executor(), std::move(self));
138 case detail::exec_action_type::notify_writer:
139 conn_->writer_timer_.cancel();
141 case detail::exec_action_type::wait_for_response:
142 notifier_->async_receive(std::move(self));
144 case detail::exec_action_type::cancel_run:
147 case detail::exec_action_type::done:
149 self.complete(act.error(), act.bytes_read());
159 asio::coroutine coro{};
161 template <
class Self>
162 void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
166 BOOST_ASIO_CORO_REENTER(coro)
for (;;)
168 while (conn_->mpx_.prepare_write() != 0) {
169 BOOST_ASIO_CORO_YIELD
172 asio::buffer(conn_->mpx_.get_write_buffer()),
175 conn_->logger_.on_write(ec, conn_->mpx_.get_write_buffer().size());
178 conn_->logger_.trace(
"writer_op (1)", ec);
184 conn_->mpx_.commit_write();
189 if (!conn_->is_open()) {
190 conn_->logger_.trace(
"writer_op (2): connection is closed.");
196 BOOST_ASIO_CORO_YIELD
197 conn_->writer_timer_.async_wait(std::move(self));
198 if (!conn_->is_open()) {
199 conn_->logger_.trace(
"writer_op (3): connection is closed.");
212 using dyn_buffer_type = asio::dynamic_string_buffer<
214 std::char_traits<char>,
215 std::allocator<char>>;
218 static constexpr std::size_t buffer_growth_hint = 4096;
221 detail::reader_fsm fsm_;
224 reader_op(Conn& conn) noexcept
229 template <
class Self>
230 void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
232 using dyn_buffer_type = asio::dynamic_string_buffer<
234 std::char_traits<char>,
235 std::allocator<char>>;
238 auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
240 conn_->logger_.on_fsm_resume(act);
243 case reader_fsm::action::type::setup_cancellation:
244 self.reset_cancellation_state(asio::enable_terminal_cancellation());
246 case reader_fsm::action::type::needs_more:
247 case reader_fsm::action::type::append_some:
250 dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
251 conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
254 case reader_fsm::action::type::notify_push_receiver:
255 if (conn_->receive_channel_.try_send(ec, act.push_size_)) {
258 conn_->receive_channel_.async_send(ec, act.push_size_, std::move(self));
262 case reader_fsm::action::type::cancel_run: conn_->cancel(
operation::run);
continue;
263 case reader_fsm::action::type::done: self.complete(act.ec_);
return;
269inline system::error_code check_config(
const config& cfg)
271 if (!cfg.unix_socket.empty()) {
272#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS
278 return system::error_code{};
284 Conn* conn_ =
nullptr;
285 asio::coroutine coro_{};
286 system::error_code stored_ec_;
288 using order_t = std::array<std::size_t, 5>;
291 run_op(Conn* conn) noexcept
296 template <
class Self>
300 system::error_code ec0,
301 system::error_code ec1,
302 system::error_code ec2,
303 system::error_code ec3,
306 system::error_code final_ec;
308 if (order[0] == 0 && !!ec0) {
319 (*this)(self, final_ec);
323 template <
class Self>
324 void operator()(Self& self, system::error_code ec = {})
326 BOOST_ASIO_CORO_REENTER(coro_)
329 ec = check_config(conn_->cfg_);
333 BOOST_ASIO_CORO_YIELD asio::async_immediate(self.get_io_executor(), std::move(self));
334 self.complete(stored_ec_);
340 BOOST_ASIO_CORO_YIELD
341 conn_->stream_.async_connect(&conn_->cfg_, &conn_->logger_, std::move(self));
350 BOOST_ASIO_CORO_YIELD
351 asio::experimental::make_parallel_group(
353 return conn_->handshaker_.async_hello(*conn_, token);
356 return conn_->health_checker_.async_ping(*conn_, token);
359 return conn_->health_checker_.async_check_timeout(*conn_, token);
362 return conn_->reader(token);
365 return conn_->writer(token);
367 .async_wait(asio::experimental::wait_for_one_error(), std::move(self));
379 if (!conn_->will_reconnect()) {
385 conn_->reconnect_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
386 BOOST_ASIO_CORO_YIELD
387 conn_->reconnect_timer_.async_wait(std::move(self));
396 if (!conn_->will_reconnect()) {
397 self.complete(asio::error::operation_aborted);
405logger make_stderr_logger(
logger::level lvl, std::string prefix);
419template <
class Executor>
425 BOOST_DEPRECATED(
"This typedef is deprecated, and will be removed with next_layer().")
426 typedef asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>
next_layer_type;
435 template <
class Executor1>
451 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
453 : stream_{ex, std::move(ctx)}
455 , reconnect_timer_{ex}
456 , receive_channel_{ex, 256}
457 , health_checker_{ex}
458 , logger_{std::move(lgr)}
460 set_receive_response(
ignore);
461 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
476 asio::ssl::context{asio::ssl::context::tlsv12_client},
482 asio::io_context& ioc,
483 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
492 asio::ssl::context{asio::ssl::context::tlsv12_client},
532 template <
class CompletionToken = asio::default_completion_token_t<executor_type>>
536 health_checker_.set_config(cfg);
537 handshaker_.set_config(cfg);
539 return asio::async_compose<CompletionToken, void(system::error_code)>(
540 detail::run_op<this_type>{
this},
559 template <
class CompletionToken = asio::default_completion_token_t<executor_type>>
561 "The async_run overload taking a logger argument is deprecated. "
562 "Please pass the logger to the connection's constructor, instead, "
563 "and use the other async_run overloads.")
566 set_stderr_logger(l.
lvl, cfg);
567 return async_run(cfg, std::forward<CompletionToken>(token));
579 template <
class CompletionToken = asio::default_completion_token_t<executor_type>>
581 "Running without an explicit config object is deprecated."
582 "Please create a config object and pass it to async_run.")
583 auto
async_run(CompletionToken&& token = {})
585 return async_run(config{}, std::forward<CompletionToken>(token));
610 template <
class CompletionToken = asio::default_completion_token_t<executor_type>>
611 auto async_receive(CompletionToken&& token = {})
613 return receive_channel_.async_receive(std::forward<CompletionToken>(token));
627 std::size_t
receive(system::error_code& ec)
629 std::size_t size = 0;
631 auto f = [&](system::error_code
const& ec2, std::size_t n) {
636 auto const res = receive_channel_.try_receive(f);
681 class CompletionToken = asio::default_completion_token_t<executor_type>>
682 auto async_exec(request
const& req, Response& resp =
ignore, CompletionToken&& token = {})
684 return this->async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
692 template <
class CompletionToken = asio::default_completion_token_t<executor_type>>
693 auto async_exec(request
const& req, any_adapter adapter, CompletionToken&& token = {})
695 auto& adapter_impl = adapter.impl_;
697 req.get_expected_responses() <= adapter_impl.supported_response_size,
698 "Request and response have incompatible sizes.");
700 auto notifier = std::make_shared<detail::exec_notifier_type<executor_type>>(
703 auto info = detail::make_elem(req, std::move(adapter_impl.adapt_fn));
705 info->set_done_callback([notifier]() {
706 notifier->try_send(std::error_code{}, 0);
709 return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
710 detail::exec_op<this_type>{
this, notifier, detail::exec_fsm(mpx_, std::move(info))},
738 stream_.cancel_resolve();
740 health_checker_.cancel();
742 receive_channel_.cancel();
743 mpx_.cancel_waiting();
749 auto run_is_canceled() const noexcept {
return mpx_.get_cancel_run_state(); }
752 bool will_reconnect() const noexcept
759 "ssl::context has no const methods, so this function should not be called. Set up any "
760 "required TLS configuration before passing the ssl::context to the connection's constructor.")
761 auto const& get_ssl_context() const noexcept {
return stream_.get_ssl_context(); }
765 "This function is no longer necessary and is currently a no-op. connection resets the stream "
766 "internally as required. This function will be removed in subsequent releases")
767 void reset_stream() { }
771 "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
772 "the other member functions to interact with the connection.")
773 auto& next_layer() noexcept {
return stream_.next_layer(); }
777 "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
778 "the other member functions to interact with the connection.")
779 auto const& next_layer() const noexcept {
return stream_.next_layer(); }
782 template <
class Response>
783 void set_receive_response(Response&
response)
785 mpx_.set_receive_response(
response);
789 usage get_usage() const noexcept {
return mpx_.get_usage(); }
792 using clock_type = std::chrono::steady_clock;
793 using clock_traits_type = asio::wait_traits<clock_type>;
794 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
796 using receive_channel_type = asio::experimental::channel<
798 void(system::error_code, std::size_t)>;
799 using health_checker_type = detail::health_checker<Executor>;
800 using resp3_handshaker_type = detail::resp3_handshaker<executor_type>;
802 auto use_ssl() const noexcept {
return cfg_.
use_ssl; }
807 writer_timer_.cancel();
808 receive_channel_.cancel();
809 mpx_.cancel_on_conn_lost();
815 logger_.reset(detail::make_stderr_logger(lvl, cfg.log_prefix));
818 template <
class>
friend struct detail::reader_op;
819 template <
class>
friend struct detail::writer_op;
820 template <
class>
friend struct detail::exec_op;
821 template <
class,
class>
friend struct detail::hello_op;
822 template <
class,
class>
friend class detail::ping_op;
823 template <
class>
friend class detail::run_op;
824 template <
class,
class>
friend class detail::check_timeout_op;
825 friend class connection;
827 template <
class CompletionToken>
828 auto reader(CompletionToken&& token)
830 return asio::async_compose<CompletionToken, void(system::error_code)>(
831 detail::reader_op<this_type>{*
this},
832 std::forward<CompletionToken>(token),
836 template <
class CompletionToken>
837 auto writer(CompletionToken&& token)
839 return asio::async_compose<CompletionToken, void(system::error_code)>(
840 detail::writer_op<this_type>{
this},
841 std::forward<CompletionToken>(token),
845 bool is_open() const noexcept {
return stream_.is_open(); }
847 detail::redis_stream<Executor> stream_;
852 timer_type writer_timer_;
853 timer_type reconnect_timer_;
854 receive_channel_type receive_channel_;
855 health_checker_type health_checker_;
856 resp3_handshaker_type handshaker_;
859 detail::multiplexer mpx_;
860 detail::connection_logger logger_;
887 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
902 asio::ssl::context{asio::ssl::context::tlsv12_client},
908 asio::io_context& ioc,
909 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
918 asio::ssl::context{asio::ssl::context::tlsv12_client},
937 template <
class CompletionToken = asio::deferred_t>
939 "The async_run overload taking a logger argument is deprecated. "
940 "Please pass the logger to the connection's constructor, instead, "
941 "and use the other async_run overloads.")
942 auto async_run(
config const& cfg,
logger l, CompletionToken&& token = {})
944 return asio::async_initiate<CompletionToken, void(boost::system::error_code)>(
946 self->async_run_impl(*cfg, std::move(l), std::move(handler));
955 template <
class CompletionToken = asio::deferred_t>
956 auto async_run(config
const& cfg, CompletionToken&& token = {})
958 return asio::async_initiate<CompletionToken, void(boost::system::error_code)>(
959 [](
auto handler,
connection* self, config
const* cfg) {
960 self->async_run_impl(*cfg, std::move(handler));
968 template <
class CompletionToken = asio::deferred_t>
969 auto async_receive(CompletionToken&& token = {})
971 return impl_.async_receive(std::forward<CompletionToken>(token));
975 std::size_t
receive(system::error_code& ec) {
return impl_.receive(ec); }
978 template <
class Response = ignore_t,
class CompletionToken = asio::deferred_t>
979 auto async_exec(request
const& req, Response& resp =
ignore, CompletionToken&& token = {})
981 return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
985 template <
class CompletionToken = asio::deferred_t>
986 auto async_exec(request
const& req, any_adapter adapter, CompletionToken&& token = {})
988 return asio::async_initiate<CompletionToken, void(boost::system::error_code, std::size_t)>(
989 [](
auto handler,
connection* self, request
const* req, any_adapter&& adapter) {
990 self->async_exec_impl(*req, std::move(adapter), std::move(handler));
1006 "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
1007 "the other member functions to interact with the connection.")
1008 auto& next_layer() noexcept {
return impl_.stream_.next_layer(); }
1012 "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
1013 "the other member functions to interact with the connection.")
1014 auto const& next_layer() const noexcept {
return impl_.stream_.next_layer(); }
1018 "This function is no longer necessary and is currently a no-op. connection resets the stream "
1019 "internally as required. This function will be removed in subsequent releases")
1020 void reset_stream() { }
1023 template <
class Response>
1026 impl_.set_receive_response(
response);
1034 "ssl::context has no const methods, so this function should not be called. Set up any "
1035 "required TLS configuration before passing the ssl::context to the connection's constructor.")
1036 auto const& get_ssl_context() const noexcept {
return impl_.stream_.get_ssl_context(); }
1039 void async_run_impl(
1042 asio::any_completion_handler<
void(boost::system::error_code)> token);
1044 void async_run_impl(
1046 asio::any_completion_handler<
void(boost::system::error_code)> token);
1048 void async_exec_impl(
1051 asio::any_completion_handler<
void(boost::system::error_code, std::size_t)> token);
A type-erased reference to a response.
A SSL connection to the Redis server.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
basic_connection(executor_type ex, logger lgr)
Constructor.
executor_type get_executor() noexcept
Returns the associated executor.
basic_connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr={})
Constructs from a context.
basic_connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr={})
Constructor.
basic_connection(asio::io_context &ctx, logger lgr)
Constructs from a context.
auto async_run(config const &cfg, CompletionToken &&token={})
Starts underlying connection operations.
BOOST_DEPRECATED("The async_run overload taking a logger argument is deprecated. " "Please pass the logger to the connection's constructor, instead, " "and use the other async_run overloads.") auto async_run(config const &cfg
Starts underlying connection operations.
Executor executor_type
Executor type.
Rebinds the socket type to another executor.
A basic_connection that type erases the executor.
BOOST_DEPRECATED("This function is no longer necessary and is currently a no-op. connection resets the stream " "internally as required. This function will be removed in subsequent releases") void reset_stream()
Calls boost::redis::basic_connection::reset_stream.
bool will_reconnect() const noexcept
Calls boost::redis::basic_connection::will_reconnect.
asio::any_io_executor executor_type
Executor type.
connection(asio::io_context &ioc, logger lgr)
Constructs from a context.
connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr={})
Constructor.
BOOST_DEPRECATED("Accessing the underlying stream is deprecated and will be removed in the next release. Use " "the other member functions to interact with the connection.") auto &next_layer() noexcept
Calls boost::redis::basic_connection::next_layer.
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
BOOST_DEPRECATED("Accessing the underlying stream is deprecated and will be removed in the next release. Use " "the other member functions to interact with the connection.") auto const &next_layer() const noexcept
Calls boost::redis::basic_connection::next_layer.
BOOST_DEPRECATED("The async_run overload taking a logger argument is deprecated. " "Please pass the logger to the connection's constructor, instead, " "and use the other async_run overloads.") auto async_run(config const &cfg
Calls boost::redis::basic_connection::async_run.
connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr={})
Constructs from a context.
usage get_usage() const noexcept
Returns connection usage information.
connection(executor_type ex, logger lgr)
Constructor.
executor_type get_executor() noexcept
Returns the underlying executor.
BOOST_DEPRECATED("ssl::context has no const methods, so this function should not be called. Set up any " "required TLS configuration before passing the ssl::context to the connection's constructor.") auto const &get_ssl_context() const noexcept
Returns the ssl context.
std::chrono::steady_clock::duration reconnect_wait_interval
Time waited before trying a reconnection.
bool use_ssl
Uses SSL instead of a plain connection.
level
Syslog-like log levels.
ignore_t ignore
Global ignore object.
std::decay_t< decltype(std::ignore)> ignore_t
Type used to ignore responses.
operation
Connection operations that can be cancelled.
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
@ unix_sockets_ssl_unsupported
The configuration specified UNIX sockets with SSL, which is not supported.
@ sync_receive_push_failed
Can't receive push synchronously without blocking.
@ pong_timeout
Connect timeout.
@ unix_sockets_unsupported
The configuration specified a UNIX socket address, but UNIX sockets are not supported by the system.
@ health_check
Health check operation.
@ exec
Refers to connection::async_exec operations.
@ resolve
Resolve operation.
@ reconnection
Cancels reconnection.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Connection usage information.
Defines logging configuration.
level lvl
Defines a severity filter for messages.