Loading...
Searching...
No Matches
multiplexer.hpp
1/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_MULTIPLEXER_HPP
8#define BOOST_REDIS_MULTIPLEXER_HPP
9
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/adapter/any_adapter.hpp>
12#include <boost/redis/config.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/resp3/type.hpp>
15#include <boost/redis/usage.hpp>
16
17#include <boost/asio/experimental/channel.hpp>
18
19#include <algorithm>
20#include <deque>
21#include <functional>
22#include <memory>
23#include <optional>
24#include <string_view>
25#include <utility>
26
27namespace boost::redis {
28
29class request;
30
31namespace detail {
32
33using tribool = std::optional<bool>;
34
35struct multiplexer {
36 using adapter_type = std::function<void(resp3::node_view const&, system::error_code&)>;
37 using pipeline_adapter_type = std::function<
38 void(std::size_t, resp3::node_view const&, system::error_code&)>;
39
40 struct elem {
41 public:
42 explicit elem(request const& req, pipeline_adapter_type adapter);
43
44 void set_done_callback(std::function<void()> f) noexcept { done_ = std::move(f); };
45
46 auto notify_done() noexcept -> void
47 {
48 status_ = status::done;
49 done_();
50 }
51
52 auto notify_error(system::error_code ec) noexcept -> void;
53
54 [[nodiscard]]
55 auto is_waiting() const noexcept
56 {
57 return status_ == status::waiting;
58 }
59
60 [[nodiscard]]
61 auto is_written() const noexcept
62 {
63 return status_ == status::written;
64 }
65
66 [[nodiscard]]
67 auto is_staged() const noexcept
68 {
69 return status_ == status::staged;
70 }
71
72 [[nodiscard]]
73 bool is_done() const noexcept
74 {
75 return status_ == status::done;
76 }
77
78 void mark_written() noexcept { status_ = status::written; }
79
80 void mark_staged() noexcept { status_ = status::staged; }
81
82 void mark_waiting() noexcept { status_ = status::waiting; }
83
84 auto get_error() const -> system::error_code const& { return ec_; }
85
86 auto get_request() const -> request const& { return *req_; }
87
88 auto get_read_size() const -> std::size_t { return read_size_; }
89
90 auto get_remaining_responses() const -> std::size_t { return remaining_responses_; }
91
92 auto commit_response(std::size_t read_size) -> void;
93
94 auto get_adapter() -> adapter_type& { return adapter_; }
95
96 private:
97 enum class status
98 {
99 waiting, // the request hasn't been written yet
100 staged, // we've issued the write for this request, but it hasn't finished yet
101 written, // the request has been written successfully
102 done, // the request has completed and the done callback has been invoked
103 };
104
105 request const* req_;
106 adapter_type adapter_;
107
108 std::function<void()> done_;
109
110 // Contains the number of commands that haven't been read yet.
111 std::size_t remaining_responses_;
112 status status_;
113
114 system::error_code ec_;
115 std::size_t read_size_;
116 };
117
118 auto remove(std::shared_ptr<elem> const& ptr) -> bool;
119
120 [[nodiscard]]
121 auto prepare_write() -> std::size_t;
122
123 // Returns the number of requests that have been released because
124 // they don't have a response e.g. SUBSCRIBE.
125 auto commit_write() -> std::size_t;
126
127 // If the tribool contains no value more data is needed, otherwise
128 // if the value is true the message consumed is a push.
129 [[nodiscard]]
130 auto consume_next(system::error_code& ec) -> std::pair<tribool, std::size_t>;
131
132 auto add(std::shared_ptr<elem> const& ptr) -> void;
133 auto reset() -> void;
134
135 [[nodiscard]]
136 auto const& get_parser() const noexcept
137 {
138 return parser_;
139 }
140
141 //[[nodiscard]]
142 auto cancel_waiting() -> std::size_t;
143
144 //[[nodiscard]]
145 auto cancel_on_conn_lost() -> std::size_t;
146
147 [[nodiscard]]
148 auto get_cancel_run_state() const noexcept -> bool
149 {
150 return cancel_run_called_;
151 }
152
153 [[nodiscard]]
154 auto get_write_buffer() noexcept -> std::string_view
155 {
156 return std::string_view{write_buffer_};
157 }
158
159 [[nodiscard]]
160 auto get_read_buffer() noexcept -> std::string&
161 {
162 return read_buffer_;
163 }
164
165 [[nodiscard]]
166 auto get_read_buffer() const noexcept -> std::string const&
167 {
168 return read_buffer_;
169 }
170
171 // TODO: Change signature to receive an adapter instead of a
172 // response.
173 template <class Response>
174 void set_receive_response(Response& response)
175 {
176 using namespace boost::redis::adapter;
177 auto g = boost_redis_adapt(response);
178 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
179 }
180
181 [[nodiscard]]
182 auto get_usage() const noexcept -> usage
183 {
184 return usage_;
185 }
186
187 [[nodiscard]]
188 auto is_writing() const noexcept -> bool;
189
190private:
191 [[nodiscard]]
192 auto is_waiting_response() const noexcept -> bool;
193
194 [[nodiscard]]
195 auto on_finish_parsing(bool is_push) -> std::size_t;
196
197 [[nodiscard]]
198 auto is_next_push() const noexcept -> bool;
199
200 // Releases the number of requests that have been released.
201 [[nodiscard]]
202 auto release_push_requests() -> std::size_t;
203
204 std::string read_buffer_;
205 std::string write_buffer_;
206 std::deque<std::shared_ptr<elem>> reqs_;
207 resp3::parser parser_{};
208 bool on_push_ = false;
209 bool cancel_run_called_ = false;
210 usage usage_;
211 adapter_type receive_adapter_;
212};
213
214auto make_elem(request const& req, multiplexer::pipeline_adapter_type adapter)
215 -> std::shared_ptr<multiplexer::elem>;
216
217} // namespace detail
218} // namespace boost::redis
219
220#endif // BOOST_REDIS_MULTIPLEXER_HPP
Creates Redis requests.
Definition request.hpp:46
basic_node< std::string_view > node_view
A node view in the response tree.
Definition node.hpp:67
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Definition response.hpp:25
Connection usage information.
Definition usage.hpp:20