7#ifndef BOOST_REDIS_MULTIPLEXER_HPP
8#define BOOST_REDIS_MULTIPLEXER_HPP
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/adapter/any_adapter.hpp>
12#include <boost/redis/config.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/resp3/type.hpp>
15#include <boost/redis/usage.hpp>
17#include <boost/asio/experimental/channel.hpp>
27namespace boost::redis {
33using tribool = std::optional<bool>;
36 using adapter_type = std::function<void(
resp3::node_view const&, system::error_code&)>;
37 using pipeline_adapter_type = std::function<
42 explicit elem(request
const& req, pipeline_adapter_type adapter);
44 void set_done_callback(std::function<
void()> f)
noexcept { done_ = std::move(f); };
46 auto notify_done() noexcept ->
void
48 status_ = status::done;
52 auto notify_error(system::error_code ec)
noexcept -> void;
55 auto is_waiting() const noexcept
57 return status_ == status::waiting;
61 auto is_written() const noexcept
63 return status_ == status::written;
67 auto is_staged() const noexcept
69 return status_ == status::staged;
73 bool is_done() const noexcept
75 return status_ == status::done;
78 void mark_written() noexcept { status_ = status::written; }
80 void mark_staged() noexcept { status_ = status::staged; }
82 void mark_waiting() noexcept { status_ = status::waiting; }
84 auto get_error() const -> system::error_code const& {
return ec_; }
86 auto get_request() const -> request const& {
return *req_; }
88 auto get_read_size() const -> std::
size_t {
return read_size_; }
90 auto get_remaining_responses() const -> std::
size_t {
return remaining_responses_; }
92 auto commit_response(std::size_t read_size) -> void;
94 auto get_adapter() -> adapter_type& {
return adapter_; }
106 adapter_type adapter_;
108 std::function<void()> done_;
111 std::size_t remaining_responses_;
114 system::error_code ec_;
115 std::size_t read_size_;
118 auto remove(std::shared_ptr<elem>
const& ptr) -> bool;
121 auto prepare_write() -> std::size_t;
125 auto commit_write() -> std::size_t;
130 auto consume_next(system::error_code& ec) -> std::pair<tribool, std::size_t>;
132 auto add(std::shared_ptr<elem>
const& ptr) -> void;
133 auto reset() -> void;
136 auto const& get_parser() const noexcept
142 auto cancel_waiting() -> std::size_t;
145 auto cancel_on_conn_lost() -> std::size_t;
148 auto get_cancel_run_state() const noexcept ->
bool
150 return cancel_run_called_;
154 auto get_write_buffer() noexcept -> std::string_view
156 return std::string_view{write_buffer_};
160 auto get_read_buffer() noexcept -> std::
string&
166 auto get_read_buffer() const noexcept -> std::
string const&
173 template <
class Response>
174 void set_receive_response(Response&
response)
176 using namespace boost::redis::adapter;
177 auto g = boost_redis_adapt(
response);
178 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
182 auto get_usage() const noexcept ->
usage
188 auto is_writing() const noexcept ->
bool;
192 auto is_waiting_response() const noexcept ->
bool;
195 auto on_finish_parsing(
bool is_push) -> std::
size_t;
198 auto is_next_push() const noexcept ->
bool;
202 auto release_push_requests() -> std::
size_t;
204 std::
string read_buffer_;
205 std::
string write_buffer_;
206 std::deque<std::shared_ptr<elem>> reqs_;
207 resp3::parser parser_{};
208 bool on_push_ =
false;
209 bool cancel_run_called_ =
false;
211 adapter_type receive_adapter_;
214auto make_elem(
request const& req, multiplexer::pipeline_adapter_type adapter)
215 -> std::shared_ptr<multiplexer::elem>;
basic_node< std::string_view > node_view
A node view in the response tree.
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Connection usage information.