rippled
Loading...
Searching...
No Matches
PathRequestManager.cpp
1#include <xrpld/app/ledger/LedgerMaster.h>
2#include <xrpld/app/main/Application.h>
3#include <xrpld/rpc/detail/PathRequestManager.h>
4
5#include <xrpl/core/JobQueue.h>
6#include <xrpl/protocol/ErrorCodes.h>
7#include <xrpl/protocol/RPCErr.h>
8#include <xrpl/protocol/jss.h>
9
10#include <algorithm>
11
12namespace xrpl {
13
19{
20 std::lock_guard const sl(mLock);
21
22 auto lineCache = lineCache_.lock();
23
24 std::uint32_t const lineSeq = lineCache ? lineCache->getLedger()->seq() : 0;
25 std::uint32_t const lgrSeq = ledger->seq();
26 JLOG(mJournal.debug()) << "getLineCache has cache for " << lineSeq << ", considering "
27 << lgrSeq;
28
29 if ((lineSeq == 0) || // no ledger
30 (authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger
31 (authoritative && ((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason
32 (lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason
33 {
34 JLOG(mJournal.debug()) << "getLineCache creating new cache for " << lgrSeq;
35 // Assign to the local before the member, because the member is a
36 // weak_ptr, and will immediately discard it if there are no other
37 // references.
38 lineCache_ = lineCache =
39 std::make_shared<RippleLineCache>(ledger, app_.getJournal("RippleLineCache"));
40 }
41 return lineCache;
42}
43
44void
46{
47 auto event = app_.getJobQueue().makeLoadEvent(jtPATH_FIND, "PathRequest::updateAll");
48
51
52 // Get the ledger and cache we should be using
53 {
54 std::lock_guard const sl(mLock);
55 requests = requests_;
56 cache = getLineCache(inLedger, true);
57 }
58
59 bool newRequests = app_.getLedgerMaster().isNewPathRequest();
60 bool mustBreak = false;
61
62 JLOG(mJournal.trace()) << "updateAll seq=" << cache->getLedger()->seq() << ", "
63 << requests.size() << " requests";
64
65 int processed = 0, removed = 0;
66
67 auto getSubscriber = [](PathRequest::pointer const& request) -> InfoSub::pointer {
68 if (auto ipSub = request->getSubscriber(); ipSub && ipSub->getRequest() == request)
69 {
70 return ipSub;
71 }
72 request->doAborting();
73 return nullptr;
74 };
75
76 do
77 {
78 JLOG(mJournal.trace()) << "updateAll looping";
79 for (auto const& wr : requests)
80 {
82 break;
83
84 auto request = wr.lock();
85 bool remove = true;
86 JLOG(mJournal.trace()) << "updateAll request " << (request ? "" : "not ") << "found";
87
88 if (request)
89 {
90 auto continueCallback = [&getSubscriber, &request]() {
91 // This callback is used by doUpdate to determine whether to
92 // continue working. If getSubscriber returns null, that
93 // indicates that this request is no longer relevant.
94 return (bool)getSubscriber(request);
95 };
96 if (!request->needsUpdate(newRequests, cache->getLedger()->seq()))
97 {
98 remove = false;
99 }
100 else
101 {
102 if (auto ipSub = getSubscriber(request))
103 {
104 if (!ipSub->getConsumer().warn())
105 {
106 // Release the shared ptr to the subscriber so that
107 // it can be freed if the client disconnects, and
108 // thus fail to lock later.
109 ipSub.reset();
110 Json::Value update = request->doUpdate(cache, false, continueCallback);
111 request->updateComplete();
112 update[jss::type] = "path_find";
113 if ((ipSub = getSubscriber(request)))
114 {
115 ipSub->send(update, false);
116 remove = false;
117 ++processed;
118 }
119 }
120 }
121 else if (request->hasCompletion())
122 {
123 // One-shot request with completion function
124 request->doUpdate(cache, false);
125 request->updateComplete();
126 ++processed;
127 }
128 }
129 }
130
131 if (remove)
132 {
133 std::lock_guard const sl(mLock);
134
135 // Remove any dangling weak pointers or weak
136 // pointers that refer to this path request.
137 auto ret = std::remove_if(
138 requests_.begin(), requests_.end(), [&removed, &request](auto const& wl) {
139 auto r = wl.lock();
140
141 if (r && r != request)
142 return false;
143 ++removed;
144 return true;
145 });
146
147 requests_.erase(ret, requests_.end());
148 }
149
150 mustBreak = !newRequests && app_.getLedgerMaster().isNewPathRequest();
151
152 // We weren't handling new requests and then
153 // there was a new request
154 if (mustBreak)
155 break;
156 }
157
158 if (mustBreak)
159 { // a new request came in while we were working
160 newRequests = true;
161 }
162 else if (newRequests)
163 { // we only did new requests, so we always need a last pass
164 newRequests = app_.getLedgerMaster().isNewPathRequest();
165 }
166 else
167 { // if there are no new requests, we are done
168 newRequests = app_.getLedgerMaster().isNewPathRequest();
169 if (!newRequests)
170 break;
171 }
172
173 // Hold on to the line cache until after the lock is released, so it can
174 // be destroyed outside of the lock
176 {
177 // Get the latest requests, cache, and ledger for next pass
178 std::lock_guard const sl(mLock);
179
180 if (requests_.empty())
181 break;
182 requests = requests_;
183 lastCache = cache;
184 cache = getLineCache(cache->getLedger(), false);
185 }
186 } while (!app_.getJobQueue().isStopping());
187
188 JLOG(mJournal.debug()) << "updateAll complete: " << processed << " processed and " << removed
189 << " removed";
190}
191
192bool
194{
195 std::lock_guard const sl(mLock);
196 return !requests_.empty();
197}
198
199void
201{
202 std::lock_guard const sl(mLock);
203
204 // Insert after any older unserviced requests but before
205 // any serviced requests
206 auto ret = std::find_if(requests_.begin(), requests_.end(), [](auto const& wl) {
207 auto r = wl.lock();
208
209 // We come before handled requests
210 return r && !r->isNew();
211 });
212
213 requests_.emplace(ret, req);
214}
215
216// Make a new-style path_find request
219 std::shared_ptr<InfoSub> const& subscriber,
220 std::shared_ptr<ReadView const> const& inLedger,
221 Json::Value const& requestJson)
222{
223 auto req = std::make_shared<PathRequest>(app_, subscriber, ++mLastIdentifier, *this, mJournal);
224
225 auto [valid, jvRes] = req->doCreate(getLineCache(inLedger, false), requestJson);
226
227 if (valid)
228 {
229 subscriber->setRequest(req);
232 }
233 return std::move(jvRes);
234}
235
236// Make an old-style ripple_path_find request
240 std::function<void(void)> completion,
241 Resource::Consumer& consumer,
242 std::shared_ptr<ReadView const> const& inLedger,
243 Json::Value const& request)
244{
245 // This assignment must take place before the
246 // completion function is called
248 app_, completion, consumer, ++mLastIdentifier, *this, mJournal);
249
250 auto [valid, jvRes] = req->doCreate(getLineCache(inLedger, false), request);
251
252 if (!valid)
253 {
254 req.reset();
255 }
256 else
257 {
260 {
261 // The newPathRequest failed. Tell the caller.
262 jvRes = rpcError(rpcTOO_BUSY);
263 req.reset();
264 }
265 }
266
267 return std::move(jvRes);
268}
269
272 Resource::Consumer& consumer,
273 std::shared_ptr<ReadView const> const& inLedger,
274 Json::Value const& request)
275{
276 auto cache = std::make_shared<RippleLineCache>(inLedger, app_.getJournal("RippleLineCache"));
277
278 auto req =
279 std::make_shared<PathRequest>(app_, [] {}, consumer, ++mLastIdentifier, *this, mJournal);
280
281 auto [valid, jvRes] = req->doCreate(cache, request);
282 if (valid)
283 jvRes = req->doUpdate(cache, false);
284 return std::move(jvRes);
285}
286
287} // namespace xrpl
Represents a JSON value.
Definition json_value.h:130
Stream debug() const
Definition Journal.h:301
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
bool isStopping() const
Definition JobQueue.h:210
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:142
std::atomic< int > mLastIdentifier
std::vector< PathRequest::wptr > requests_
std::shared_ptr< RippleLineCache > getLineCache(std::shared_ptr< ReadView const > const &ledger, bool authoritative)
Get the current RippleLineCache, updating it if necessary.
void updateAll(std::shared_ptr< ReadView const > const &ledger)
Update all of the contained PathRequest instances.
Json::Value makePathRequest(std::shared_ptr< InfoSub > const &subscriber, std::shared_ptr< ReadView const > const &ledger, Json::Value const &request)
std::weak_ptr< RippleLineCache > lineCache_
Json::Value doLegacyPathRequest(Resource::Consumer &consumer, std::shared_ptr< ReadView const > const &inLedger, Json::Value const &request)
void insertPathRequest(PathRequest::pointer const &)
std::recursive_mutex mLock
Json::Value makeLegacyPathRequest(PathRequest::pointer &req, std::function< void(void)> completion, Resource::Consumer &consumer, std::shared_ptr< ReadView const > const &inLedger, Json::Value const &request)
An endpoint that consumes resources.
Definition Consumer.h:16
virtual JobQueue & getJobQueue()=0
virtual beast::Journal getJournal(std::string const &name)=0
virtual LedgerMaster & getLedgerMaster()=0
T find_if(T... args)
T is_same_v
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ jtPATH_FIND
Definition Job.h:63
Json::Value rpcError(error_code_i iError)
Definition RPCErr.cpp:12
@ rpcTOO_BUSY
Definition ErrorCodes.h:36
T remove_if(T... args)
T reset(T... args)
T size(T... args)