Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
WsConnection.hpp
1#pragma once
2
3#include "util/OverloadSet.hpp"
4#include "util/Taggable.hpp"
5#include "util/build/Build.hpp"
6#include "web/ng/Connection.hpp"
7#include "web/ng/Error.hpp"
8#include "web/ng/Request.hpp"
9#include "web/ng/Response.hpp"
10#include "web/ng/impl/Concepts.hpp"
11#include "web/ng/impl/SendingQueue.hpp"
12
13#include <boost/asio/buffer.hpp>
14#include <boost/asio/ip/tcp.hpp>
15#include <boost/asio/spawn.hpp>
16#include <boost/asio/ssl/context.hpp>
17#include <boost/asio/ssl/stream.hpp>
18#include <boost/beast/core/buffers_to_string.hpp>
19#include <boost/beast/core/flat_buffer.hpp>
20#include <boost/beast/core/role.hpp>
21#include <boost/beast/core/tcp_stream.hpp>
22#include <boost/beast/http/field.hpp>
23#include <boost/beast/http/message.hpp>
24#include <boost/beast/http/string_body.hpp>
25#include <boost/beast/ssl.hpp>
26#include <boost/beast/websocket/rfc6455.hpp>
27#include <boost/beast/websocket/stream.hpp>
28#include <boost/beast/websocket/stream_base.hpp>
29
30#include <chrono>
31#include <memory>
32#include <optional>
33#include <string>
34#include <utility>
35#include <variant>
36
37namespace web::ng::impl {
38
40public:
42
43 virtual std::expected<void, Error>
44 sendShared(std::shared_ptr<std::string> message, boost::asio::yield_context yield) = 0;
45};
46
47template <typename StreamType>
48class WsConnection : public WsConnectionBase {
49 boost::beast::websocket::stream<StreamType> stream_;
50 boost::beast::http::request<boost::beast::http::string_body> initialRequest_;
51
52 using MessageType = std::variant<Response, std::shared_ptr<std::string>>;
53 SendingQueue<MessageType> sendingQueue_;
54
55 bool closed_{false};
56
57public:
58 WsConnection(
59 StreamType&& stream,
60 std::string ip,
61 boost::beast::flat_buffer buffer,
62 boost::beast::http::request<boost::beast::http::string_body> initialRequest,
63 util::TagDecoratorFactory const& tagDecoratorFactory
64 )
65 : WsConnectionBase(std::move(ip), std::move(buffer), tagDecoratorFactory)
66 , stream_(std::move(stream))
67 , initialRequest_(std::move(initialRequest))
68 , sendingQueue_{[this](MessageType const& message, auto&& yield) {
69 boost::asio::const_buffer const buffer = std::visit(
71 [](Response const& r) -> boost::asio::const_buffer { return r.asWsResponse(); },
72 [](std::shared_ptr<std::string> const& m) -> boost::asio::const_buffer {
73 return boost::asio::buffer(*m);
74 }
75 },
76 message
77 );
78 stream_.async_write(buffer, yield);
79 }}
80 {
81 setupWsStream();
82 }
83
84 ~WsConnection() override = default;
85 WsConnection(WsConnection&&) = delete;
86 WsConnection&
87 operator=(WsConnection&&) = delete;
88 WsConnection(WsConnection const&) = delete;
89 WsConnection&
90 operator=(WsConnection const&) = delete;
91
92 std::expected<void, Error>
93 performHandshake(boost::asio::yield_context yield)
94 {
95 Error error;
96 stream_.async_accept(initialRequest_, yield[error]);
97 if (error)
98 return std::unexpected{error};
99 return {};
100 }
101
102 [[nodiscard]] bool
103 wasUpgraded() const override
104 {
105 return true;
106 }
107
108 std::expected<void, Error>
109 sendShared(std::shared_ptr<std::string> message, boost::asio::yield_context yield) override
110 {
111 return sendingQueue_.send(std::move(message), yield);
112 }
113
114 void
115 setTimeout(std::chrono::steady_clock::duration newTimeout) override
116 {
117 boost::beast::websocket::stream_base::timeout wsTimeout =
118 boost::beast::websocket::stream_base::timeout::suggested(
119 boost::beast::role_type::server
120 );
121 wsTimeout.idle_timeout = newTimeout;
122 wsTimeout.handshake_timeout = newTimeout;
123 stream_.set_option(wsTimeout);
124 }
125
126 std::expected<void, Error>
127 send(Response response, boost::asio::yield_context yield) override
128 {
129 return sendingQueue_.send(std::move(response), yield);
130 }
131
132 std::expected<Request, Error>
133 receive(boost::asio::yield_context yield) override
134 {
135 Error error;
136 stream_.async_read(buffer_, yield[error]);
137 if (error)
138 return std::unexpected{error};
139
140 auto request = boost::beast::buffers_to_string(buffer_.data());
141 buffer_.consume(buffer_.size());
142
143 return Request{std::move(request), initialRequest_};
144 }
145
146 void
147 close(boost::asio::yield_context yield) override
148 {
149 if (closed_)
150 return;
151
152 // This should be set before the async_close(). Otherwise there is a possibility to have
153 // multiple coroutines waiting on async_close(), but only one will be woken up after the
154 // actual close happened, others will hang.
155 closed_ = true;
156
157 boost::system::error_code error; // unused
158 stream_.async_close(boost::beast::websocket::close_code::normal, yield[error]);
159 }
160
161private:
162 void
163 setupWsStream()
164 {
165 // Disable the timeout. The websocket::stream uses its own timeout settings.
166 boost::beast::get_lowest_layer(stream_).expires_never();
168 stream_.set_option(
169 boost::beast::websocket::stream_base::decorator(
170 [](boost::beast::websocket::response_type& res) {
171 res.set(
172 boost::beast::http::field::server, util::build::getClioFullVersionString()
173 );
174 }
175 )
176 );
177 }
178};
179
180using PlainWsConnection = WsConnection<boost::beast::tcp_stream>;
181using SslWsConnection = WsConnection<boost::asio::ssl::stream<boost::beast::tcp_stream>>;
182
183template <typename StreamType>
184std::expected<std::unique_ptr<WsConnection<StreamType>>, Error>
185makeWsConnection(
186 StreamType&& stream,
187 std::string ip,
188 boost::beast::flat_buffer buffer,
189 boost::beast::http::request<boost::beast::http::string_body> request,
190 util::TagDecoratorFactory const& tagDecoratorFactory,
191 boost::asio::yield_context yield
192)
193{
194 auto connection = std::make_unique<WsConnection<StreamType>>(
195 std::forward<StreamType>(stream),
196 std::move(ip),
197 std::move(buffer),
198 std::move(request),
199 tagDecoratorFactory
200 );
201 auto const expectedSuccess = connection->performHandshake(yield);
202 if (not expectedSuccess.has_value())
203 return std::unexpected{expectedSuccess.error()};
204 return connection;
205}
206
207} // namespace web::ng::impl
A factory for TagDecorator instantiation.
Definition Taggable.hpp:165
std::string const & ip() const
Get the ip of the client.
Definition Connection.cpp:21
static constexpr std::chrono::steady_clock::duration kDefaultTimeout
The default timeout for send, receive, and close operations.
Definition Connection.hpp:124
Connection(std::string ip, boost::beast::flat_buffer buffer, util::TagDecoratorFactory const &tagDecoratorFactory)
Construct a new Connection object.
Definition Connection.cpp:32
Represents an HTTP or WebSocket request.
Definition Request.hpp:18
Represents an HTTP or Websocket response.
Definition Response.hpp:21
Definition SendingQueue.hpp:16
Definition WsConnection.hpp:39
Connection(std::string ip, boost::beast::flat_buffer buffer, util::TagDecoratorFactory const &tagDecoratorFactory)
Construct a new Connection object.
Definition Connection.cpp:32
std::expected< Request, Error > receive(boost::asio::yield_context yield) override
Receive a request from the client.
Definition WsConnection.hpp:133
void close(boost::asio::yield_context yield) override
Gracefully close the connection.
Definition WsConnection.hpp:147
std::expected< void, Error > send(Response response, boost::asio::yield_context yield) override
Send a response to the client.
Definition WsConnection.hpp:127
void setTimeout(std::chrono::steady_clock::duration newTimeout) override
Get the timeout for send, receive, and close operations. For WebSocket connections,...
Definition WsConnection.hpp:115
bool wasUpgraded() const override
Whether the connection was upgraded. Upgraded connections are websocket connections.
Definition WsConnection.hpp:103
Overload set for lambdas.
Definition OverloadSet.hpp:11