7#ifndef BOOST_REDIS_REDIS_STREAM_HPP
8#define BOOST_REDIS_REDIS_STREAM_HPP
10#include <boost/redis/config.hpp>
11#include <boost/redis/detail/connection_logger.hpp>
12#include <boost/redis/error.hpp>
14#include <boost/asio/basic_waitable_timer.hpp>
15#include <boost/asio/cancel_after.hpp>
16#include <boost/asio/compose.hpp>
17#include <boost/asio/connect.hpp>
18#include <boost/asio/coroutine.hpp>
19#include <boost/asio/ip/basic_resolver.hpp>
20#include <boost/asio/ip/tcp.hpp>
21#include <boost/asio/local/stream_protocol.hpp>
22#include <boost/asio/ssl/context.hpp>
23#include <boost/asio/ssl/stream.hpp>
24#include <boost/asio/ssl/stream_base.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <boost/system/error_code.hpp>
35enum class transport_type
42template <
class Executor>
44 asio::ssl::context ssl_ctx_;
45 asio::ip::basic_resolver<asio::ip::tcp, Executor> resolv_;
46 asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>> stream_;
47#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
48 asio::basic_stream_socket<asio::local::stream_protocol, Executor> unix_socket_;
50 typename asio::steady_timer::template rebind_executor<Executor>::other timer_;
52 transport_type transport_{transport_type::tcp};
53 bool ssl_stream_used_{
false};
55 void reset_stream() { stream_ = {resolv_.get_executor(), ssl_ctx_}; }
57 static transport_type transport_from_config(
const config& cfg)
59 if (cfg.unix_socket.empty()) {
61 return transport_type::tcp_tls;
63 return transport_type::tcp;
66 BOOST_ASSERT(!cfg.use_ssl);
67 return transport_type::unix_socket;
74 connection_logger* lgr;
75 asio::coroutine coro{};
82 system::error_code ec,
83 const asio::ip::tcp::endpoint& selected_endpoint)
85 lgr->on_connect(ec, selected_endpoint);
92 system::error_code ec = {},
93 asio::ip::tcp::resolver::results_type resolver_results = {})
95 BOOST_ASIO_CORO_REENTER(coro)
98 obj.transport_ = transport_from_config(*cfg);
100 if (obj.transport_ == transport_type::unix_socket) {
101#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
103 BOOST_ASIO_CORO_YIELD
104 obj.unix_socket_.async_connect(
106 asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));
109 lgr->on_connect(ec, cfg->unix_socket);
123 if (cfg->use_ssl && obj.ssl_stream_used_)
126 BOOST_ASIO_CORO_YIELD
127 obj.resolv_.async_resolve(
130 asio::cancel_after(obj.timer_, cfg->resolve_timeout, std::move(self)));
133 lgr->on_resolve(ec, resolver_results);
142 BOOST_ASIO_CORO_YIELD
144 obj.stream_.next_layer(),
145 std::move(resolver_results),
146 asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));
157 obj.ssl_stream_used_ =
true;
160 BOOST_ASIO_CORO_YIELD
161 obj.stream_.async_handshake(
162 asio::ssl::stream_base::client,
163 asio::cancel_after(obj.timer_, cfg->ssl_handshake_timeout, std::move(self)));
165 lgr->on_ssl_handshake(ec);
177 self.complete(system::error_code());
183 explicit redis_stream(Executor ex, asio::ssl::context&& ssl_ctx)
184 : ssl_ctx_{std::move(ssl_ctx)}
186 , stream_{ex, ssl_ctx_}
187#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
190 , timer_{std::move(ex)}
194 using executor_type = Executor;
195 executor_type get_executor() noexcept {
return resolv_.get_executor(); }
198 const auto& get_ssl_context() const noexcept {
return ssl_ctx_; }
201#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
202 if (transport_ == transport_type::unix_socket)
203 return unix_socket_.is_open();
205 return stream_.next_layer().is_open();
207 auto& next_layer() {
return stream_; }
208 const auto& next_layer()
const {
return stream_; }
211 template <
class CompletionToken>
212 auto async_connect(
const config* cfg, connection_logger* l, CompletionToken&& token)
214 return asio::async_compose<CompletionToken, void(system::error_code)>(
215 connect_op{*
this, cfg, l},
220 template <
class ConstBufferSequence,
class CompletionToken>
221 void async_write_some(
const ConstBufferSequence& buffers, CompletionToken&& token)
223 switch (transport_) {
224 case transport_type::tcp:
226 stream_.next_layer().async_write_some(buffers, std::forward<CompletionToken>(token));
229 case transport_type::tcp_tls:
231 stream_.async_write_some(buffers, std::forward<CompletionToken>(token));
234#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
235 case transport_type::unix_socket:
237 unix_socket_.async_write_some(buffers, std::forward<CompletionToken>(token));
241 default: BOOST_ASSERT(
false);
245 template <
class MutableBufferSequence,
class CompletionToken>
246 void async_read_some(
const MutableBufferSequence& buffers, CompletionToken&& token)
248 switch (transport_) {
249 case transport_type::tcp:
251 return stream_.next_layer().async_read_some(
253 std::forward<CompletionToken>(token));
256 case transport_type::tcp_tls:
258 return stream_.async_read_some(buffers, std::forward<CompletionToken>(token));
261#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
262 case transport_type::unix_socket:
264 unix_socket_.async_read_some(buffers, std::forward<CompletionToken>(token));
268 default: BOOST_ASSERT(
false);
273 void cancel_resolve() { resolv_.cancel(); }
277 system::error_code ec;
278 if (stream_.next_layer().is_open())
279 stream_.next_layer().close(ec);
280#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
281 if (unix_socket_.is_open())
282 unix_socket_.close(ec);
@ resolve_timeout
Resolve timeout.
@ connect_timeout
Connect timeout.
@ ssl_handshake_timeout
SSL handshake timeout.