Loading...
Searching...
No Matches
redis_stream.hpp
1/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
2 * Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
3 *
4 * Distributed under the Boost Software License, Version 1.0. (See
5 * accompanying file LICENSE.txt)
6 */
7#ifndef BOOST_REDIS_REDIS_STREAM_HPP
8#define BOOST_REDIS_REDIS_STREAM_HPP
9
10#include <boost/redis/config.hpp>
11#include <boost/redis/detail/connection_logger.hpp>
12#include <boost/redis/error.hpp>
13
14#include <boost/asio/basic_waitable_timer.hpp>
15#include <boost/asio/cancel_after.hpp>
16#include <boost/asio/compose.hpp>
17#include <boost/asio/connect.hpp>
18#include <boost/asio/coroutine.hpp>
19#include <boost/asio/ip/basic_resolver.hpp>
20#include <boost/asio/ip/tcp.hpp>
21#include <boost/asio/local/stream_protocol.hpp>
22#include <boost/asio/ssl/context.hpp>
23#include <boost/asio/ssl/stream.hpp>
24#include <boost/asio/ssl/stream_base.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <boost/system/error_code.hpp>
27
28#include <utility>
29
30namespace boost {
31namespace redis {
32namespace detail {
33
34// What transport is redis_stream using?
35enum class transport_type
36{
37 tcp, // plaintext TCP
38 tcp_tls, // TLS over TCP
39 unix_socket, // UNIX domain sockets
40};
41
42template <class Executor>
43class redis_stream {
44 asio::ssl::context ssl_ctx_;
45 asio::ip::basic_resolver<asio::ip::tcp, Executor> resolv_;
46 asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>> stream_;
47#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
48 asio::basic_stream_socket<asio::local::stream_protocol, Executor> unix_socket_;
49#endif
50 typename asio::steady_timer::template rebind_executor<Executor>::other timer_;
51
52 transport_type transport_{transport_type::tcp};
53 bool ssl_stream_used_{false};
54
55 void reset_stream() { stream_ = {resolv_.get_executor(), ssl_ctx_}; }
56
57 static transport_type transport_from_config(const config& cfg)
58 {
59 if (cfg.unix_socket.empty()) {
60 if (cfg.use_ssl) {
61 return transport_type::tcp_tls;
62 } else {
63 return transport_type::tcp;
64 }
65 } else {
66 BOOST_ASSERT(!cfg.use_ssl);
67 return transport_type::unix_socket;
68 }
69 }
70
71 struct connect_op {
72 redis_stream& obj;
73 const config* cfg;
74 connection_logger* lgr;
75 asio::coroutine coro{};
76
77 // This overload will be used for connects. We only need the endpoint
78 // for logging, so log it and call the coroutine
79 template <class Self>
80 void operator()(
81 Self& self,
82 system::error_code ec,
83 const asio::ip::tcp::endpoint& selected_endpoint)
84 {
85 lgr->on_connect(ec, selected_endpoint);
86 (*this)(self, ec);
87 }
88
89 template <class Self>
90 void operator()(
91 Self& self,
92 system::error_code ec = {},
93 asio::ip::tcp::resolver::results_type resolver_results = {})
94 {
95 BOOST_ASIO_CORO_REENTER(coro)
96 {
97 // Record the transport that we will be using
98 obj.transport_ = transport_from_config(*cfg);
99
100 if (obj.transport_ == transport_type::unix_socket) {
101#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
102 // Directly connect to the socket
103 BOOST_ASIO_CORO_YIELD
104 obj.unix_socket_.async_connect(
105 cfg->unix_socket,
106 asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));
107
108 // Log it
109 lgr->on_connect(ec, cfg->unix_socket);
110
111 // If this failed, we can't continue
112 if (ec) {
113 self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec);
114 return;
115 }
116#else
117 BOOST_ASSERT(false);
118#endif
119 } else {
120 // ssl::stream doesn't support being re-used. If we're to use
121 // TLS and the stream has been used, re-create it.
122 // Must be done before anything else is done on the stream
123 if (cfg->use_ssl && obj.ssl_stream_used_)
124 obj.reset_stream();
125
126 BOOST_ASIO_CORO_YIELD
127 obj.resolv_.async_resolve(
128 cfg->addr.host,
129 cfg->addr.port,
130 asio::cancel_after(obj.timer_, cfg->resolve_timeout, std::move(self)));
131
132 // Log it
133 lgr->on_resolve(ec, resolver_results);
134
135 // If this failed, we can't continue
136 if (ec) {
137 self.complete(ec == asio::error::operation_aborted ? error::resolve_timeout : ec);
138 return;
139 }
140
141 // Connect to the address that the resolver provided us
142 BOOST_ASIO_CORO_YIELD
143 asio::async_connect(
144 obj.stream_.next_layer(),
145 std::move(resolver_results),
146 asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));
147
148 // Note: logging is performed in the specialized operator() function.
149 // If this failed, we can't continue
150 if (ec) {
151 self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec);
152 return;
153 }
154
155 if (cfg->use_ssl) {
156 // Mark the SSL stream as used
157 obj.ssl_stream_used_ = true;
158
159 // If we were configured to use TLS, perform the handshake
160 BOOST_ASIO_CORO_YIELD
161 obj.stream_.async_handshake(
162 asio::ssl::stream_base::client,
163 asio::cancel_after(obj.timer_, cfg->ssl_handshake_timeout, std::move(self)));
164
165 lgr->on_ssl_handshake(ec);
166
167 // If this failed, we can't continue
168 if (ec) {
169 self.complete(
170 ec == asio::error::operation_aborted ? error::ssl_handshake_timeout : ec);
171 return;
172 }
173 }
174 }
175
176 // Done
177 self.complete(system::error_code());
178 }
179 }
180 };
181
182public:
183 explicit redis_stream(Executor ex, asio::ssl::context&& ssl_ctx)
184 : ssl_ctx_{std::move(ssl_ctx)}
185 , resolv_{ex}
186 , stream_{ex, ssl_ctx_}
187#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
188 , unix_socket_{ex}
189#endif
190 , timer_{std::move(ex)}
191 { }
192
193 // Executor. Required to satisfy the AsyncStream concept
194 using executor_type = Executor;
195 executor_type get_executor() noexcept { return resolv_.get_executor(); }
196
197 // Accessors
198 const auto& get_ssl_context() const noexcept { return ssl_ctx_; }
199 bool is_open() const
200 {
201#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
202 if (transport_ == transport_type::unix_socket)
203 return unix_socket_.is_open();
204#endif
205 return stream_.next_layer().is_open();
206 }
207 auto& next_layer() { return stream_; }
208 const auto& next_layer() const { return stream_; }
209
210 // I/O
211 template <class CompletionToken>
212 auto async_connect(const config* cfg, connection_logger* l, CompletionToken&& token)
213 {
214 return asio::async_compose<CompletionToken, void(system::error_code)>(
215 connect_op{*this, cfg, l},
216 token);
217 }
218
219 // These functions should only be used with callbacks (e.g. within async_compose function bodies)
220 template <class ConstBufferSequence, class CompletionToken>
221 void async_write_some(const ConstBufferSequence& buffers, CompletionToken&& token)
222 {
223 switch (transport_) {
224 case transport_type::tcp:
225 {
226 stream_.next_layer().async_write_some(buffers, std::forward<CompletionToken>(token));
227 break;
228 }
229 case transport_type::tcp_tls:
230 {
231 stream_.async_write_some(buffers, std::forward<CompletionToken>(token));
232 break;
233 }
234#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
235 case transport_type::unix_socket:
236 {
237 unix_socket_.async_write_some(buffers, std::forward<CompletionToken>(token));
238 break;
239 }
240#endif
241 default: BOOST_ASSERT(false);
242 }
243 }
244
245 template <class MutableBufferSequence, class CompletionToken>
246 void async_read_some(const MutableBufferSequence& buffers, CompletionToken&& token)
247 {
248 switch (transport_) {
249 case transport_type::tcp:
250 {
251 return stream_.next_layer().async_read_some(
252 buffers,
253 std::forward<CompletionToken>(token));
254 break;
255 }
256 case transport_type::tcp_tls:
257 {
258 return stream_.async_read_some(buffers, std::forward<CompletionToken>(token));
259 break;
260 }
261#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
262 case transport_type::unix_socket:
263 {
264 unix_socket_.async_read_some(buffers, std::forward<CompletionToken>(token));
265 break;
266 }
267#endif
268 default: BOOST_ASSERT(false);
269 }
270 }
271
272 // Cleanup
273 void cancel_resolve() { resolv_.cancel(); }
274
275 void close()
276 {
277 system::error_code ec;
278 if (stream_.next_layer().is_open())
279 stream_.next_layer().close(ec);
280#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
281 if (unix_socket_.is_open())
282 unix_socket_.close(ec);
283#endif
284 }
285};
286
287} // namespace detail
288} // namespace redis
289} // namespace boost
290
291#endif
@ resolve_timeout
Resolve timeout.
@ connect_timeout
Connect timeout.
@ ssl_handshake_timeout
SSL handshake timeout.