rippled
Loading...
Searching...
No Matches
OrderBookDBImpl.cpp
1#include <xrpld/app/ledger/LedgerMaster.h>
2#include <xrpld/app/ledger/OrderBookDBImpl.h>
3
4#include <xrpl/core/JobQueue.h>
5#include <xrpl/protocol/Indexes.h>
6#include <xrpl/server/NetworkOPs.h>
7#include <xrpl/tx/transactors/dex/AMMUtils.h>
8
9namespace xrpl {
10
12 : registry_(registry)
13 , pathSearchMax_(config.pathSearchMax)
14 , standalone_(config.standalone)
15 , seq_(0)
16 , j_(registry.getJournal("OrderBookDB"))
17{
18}
19
22{
23 return std::make_unique<OrderBookDBImpl>(registry, config);
24}
25
26void
28{
29 if (!standalone_ && registry_.get().getOPs().isNeedNetworkLedger())
30 {
31 JLOG(j_.warn()) << "Eliding full order book update: no ledger";
32 return;
33 }
34
35 auto seq = seq_.load();
36
37 if (seq != 0)
38 {
39 if ((ledger->seq() > seq) && ((ledger->seq() - seq) < 25600))
40 return;
41
42 if ((ledger->seq() <= seq) && ((seq - ledger->seq()) < 16))
43 return;
44 }
45
46 if (seq_.exchange(ledger->seq()) != seq)
47 return;
48
49 JLOG(j_.debug()) << "Full order book update: " << seq << " to " << ledger->seq();
50
51 if (pathSearchMax_ != 0)
52 {
53 if (standalone_)
54 {
55 update(ledger);
56 }
57 else
58 {
59 // Shorten job name to fit Linux 15-char thread name limit with "j:" prefix
60 // "OB" + seq (max 9 digits) = 11 chars, + "j:" = 13 chars (fits in 15)
61 registry_.get().getJobQueue().addJob(
62 jtUPDATE_PF, "OB" + std::to_string(ledger->seq() % 1000000000), [this, ledger]() {
63 update(ledger);
64 });
65 }
66 }
67}
68
69void
71{
72 if (pathSearchMax_ == 0)
73 return; // pathfinding has been disabled
74
75 // A newer full update job is pending
76 if (auto const seq = seq_.load(); seq > ledger->seq())
77 {
78 JLOG(j_.debug()) << "Eliding update for " << ledger->seq()
79 << " because of pending update to later " << seq;
80 return;
81 }
82
83 decltype(allBooks_) allBooks;
84 decltype(xrpBooks_) xrpBooks;
85 decltype(domainBooks_) domainBooks;
86 decltype(xrpDomainBooks_) xrpDomainBooks;
87
88 allBooks.reserve(allBooks_.size());
89 xrpBooks.reserve(xrpBooks_.size());
90
91 JLOG(j_.debug()) << "Beginning update (" << ledger->seq() << ")";
92
93 // walk through the entire ledger looking for orderbook/AMM entries
94 int cnt = 0;
95
96 try
97 {
98 for (auto& sle : ledger->sles)
99 {
100 if (registry_.get().isStopping())
101 {
102 JLOG(j_.info()) << "Update halted because the process is stopping";
103 seq_.store(0);
104 return;
105 }
106
107 if (sle->getType() == ltDIR_NODE && sle->isFieldPresent(sfExchangeRate) &&
108 sle->getFieldH256(sfRootIndex) == sle->key())
109 {
110 Book book;
111
112 book.in.currency = sle->getFieldH160(sfTakerPaysCurrency);
113 book.in.account = sle->getFieldH160(sfTakerPaysIssuer);
114 book.out.currency = sle->getFieldH160(sfTakerGetsCurrency);
115 book.out.account = sle->getFieldH160(sfTakerGetsIssuer);
116 book.domain = (*sle)[~sfDomainID];
117
118 if (book.domain)
119 {
120 domainBooks[{book.in, *book.domain}].insert(book.out);
121 }
122 else
123 {
124 allBooks[book.in].insert(book.out);
125 }
126
127 if (book.domain && isXRP(book.out))
128 {
129 xrpDomainBooks.insert({book.in, *book.domain});
130 }
131 else if (isXRP(book.out))
132 {
133 xrpBooks.insert(book.in);
134 }
135
136 ++cnt;
137 }
138 else if (sle->getType() == ltAMM)
139 {
140 auto const issue1 = (*sle)[sfAsset].get<Issue>();
141 auto const issue2 = (*sle)[sfAsset2].get<Issue>();
142 auto addBook = [&](Issue const& in, Issue const& out) {
143 allBooks[in].insert(out);
144
145 if (isXRP(out))
146 xrpBooks.insert(in);
147
148 ++cnt;
149 };
150 addBook(issue1, issue2);
151 addBook(issue2, issue1);
152 }
153 }
154 }
155 catch (SHAMapMissingNode const& mn)
156 {
157 JLOG(j_.info()) << "Missing node in " << ledger->seq() << " during update: " << mn.what();
158 seq_.store(0);
159 return;
160 }
161
162 JLOG(j_.debug()) << "Update completed (" << ledger->seq() << "): " << cnt << " books found";
163
164 {
165 std::lock_guard const sl(mLock);
166 allBooks_.swap(allBooks);
167 xrpBooks_.swap(xrpBooks);
168 domainBooks_.swap(domainBooks);
169 xrpDomainBooks_.swap(xrpDomainBooks);
170 }
171
172 registry_.get().getLedgerMaster().newOrderBookDB();
173}
174
175void
177{
178 bool const toXRP = isXRP(book.out);
179
180 std::lock_guard const sl(mLock);
181
182 if (book.domain)
183 {
184 domainBooks_[{book.in, *book.domain}].insert(book.out);
185 }
186 else
187 {
188 allBooks_[book.in].insert(book.out);
189 }
190
191 if (book.domain && toXRP)
192 {
193 xrpDomainBooks_.insert({book.in, *book.domain});
194 }
195 else if (toXRP)
196 {
197 xrpBooks_.insert(book.in);
198 }
199}
200
201// return list of all orderbooks that want this issuerID and currencyID
204{
206
207 {
208 std::lock_guard const sl(mLock);
209
210 auto getBooks = [&](auto const& container, auto const& key) {
211 if (auto it = container.find(key); it != container.end())
212 {
213 auto const& books = it->second;
214 ret.reserve(books.size());
215
216 for (auto const& gets : books)
217 ret.emplace_back(issue, gets, domain);
218 }
219 };
220
221 if (!domain)
222 {
223 getBooks(allBooks_, issue);
224 }
225 else
226 {
227 getBooks(domainBooks_, std::make_pair(issue, *domain));
228 }
229 }
230
231 return ret;
232}
233
234int
236{
237 std::lock_guard const sl(mLock);
238
239 if (!domain)
240 {
241 if (auto it = allBooks_.find(issue); it != allBooks_.end())
242 return static_cast<int>(it->second.size());
243 }
244 else
245 {
246 if (auto it = domainBooks_.find({issue, *domain}); it != domainBooks_.end())
247 return static_cast<int>(it->second.size());
248 }
249
250 return 0;
251}
252
253bool
255{
256 std::lock_guard const sl(mLock);
257 if (domain)
258 return xrpDomainBooks_.contains({issue, *domain});
259 return xrpBooks_.contains(issue);
260}
261
264{
265 std::lock_guard const sl(mLock);
266 auto ret = getBookListeners(book);
267
268 if (!ret)
269 {
271
272 mListeners[book] = ret;
273 XRPL_ASSERT(
274 getBookListeners(book) == ret,
275 "xrpl::OrderBookDB::makeBookListeners : result roundtrip "
276 "lookup");
277 }
278
279 return ret;
280}
281
284{
286 std::lock_guard const sl(mLock);
287
288 auto it0 = mListeners.find(book);
289 if (it0 != mListeners.end())
290 ret = it0->second;
291
292 return ret;
293}
294
295// Based on the meta, send the meta to the streams that are listening.
296// We need to determine which streams a given meta effects.
297void
300 AcceptedLedgerTx const& alTx,
301 MultiApiJson const& jvObj)
302{
303 std::lock_guard const sl(mLock);
304
305 // For this particular transaction, maintain the set of unique
306 // subscriptions that have already published it. This prevents sending
307 // the transaction multiple times if it touches multiple ltOFFER
308 // entries for the same book, or if it touches multiple books and a
309 // single client has subscribed to those books.
310 hash_set<std::uint64_t> havePublished;
311
312 for (auto const& node : alTx.getMeta().getNodes())
313 {
314 try
315 {
316 if (node.getFieldU16(sfLedgerEntryType) == ltOFFER)
317 {
318 auto process = [&, this](SField const& field) {
319 if (auto data = dynamic_cast<STObject const*>(node.peekAtPField(field)); data &&
320 data->isFieldPresent(sfTakerPays) && data->isFieldPresent(sfTakerGets))
321 {
322 auto listeners = getBookListeners(
323 {data->getFieldAmount(sfTakerGets).issue(),
324 data->getFieldAmount(sfTakerPays).issue(),
325 (*data)[~sfDomainID]});
326 if (listeners)
327 listeners->publish(jvObj, havePublished);
328 }
329 };
330
331 // We need a field that contains the TakerGets and TakerPays
332 // parameters.
333 if (node.getFName() == sfModifiedNode)
334 {
335 process(sfPreviousFields);
336 }
337 else if (node.getFName() == sfCreatedNode)
338 {
339 process(sfNewFields);
340 }
341 else if (node.getFName() == sfDeletedNode)
342 {
343 process(sfFinalFields);
344 }
345 }
346 }
347 catch (std::exception const& ex)
348 {
349 JLOG(j_.info()) << "processTxn: field not found (" << ex.what() << ")";
350 }
351 }
352}
353
354} // namespace xrpl
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream warn() const
Definition Journal.h:313
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
Specifies an order book.
Definition Book.h:16
Issue in
Definition Book.h:18
std::optional< uint256 > domain
Definition Book.h:20
Issue out
Definition Book.h:19
A currency issued by an account.
Definition Issue.h:13
Currency currency
Definition Issue.h:15
AccountID account
Definition Issue.h:16
hardened_hash_map< std::pair< Issue, Domain >, hardened_hash_set< Issue > > domainBooks_
void addOrderBook(Book const &book) override
Add an order book to track.
BookListeners::pointer makeBookListeners(Book const &) override
Create a new book listeners for a book.
BookListeners::pointer getBookListeners(Book const &) override
Get the book listeners for a book.
BookToListenersMap mListeners
int getBookSize(Issue const &issue, std::optional< Domain > const &domain=std::nullopt) override
Get the count of order books that want a specific issue.
hash_set< Issue > xrpBooks_
void setup(std::shared_ptr< ReadView const > const &ledger) override
Initialize or update the order book database with a new ledger.
hash_set< std::pair< Issue, Domain > > xrpDomainBooks_
OrderBookDBImpl(ServiceRegistry &registry, OrderBookDBConfig const &config)
hardened_hash_map< Issue, hardened_hash_set< Issue > > allBooks_
std::atomic< std::uint32_t > seq_
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj) override
Process a transaction for order book tracking.
std::vector< Book > getBooksByTakerPays(Issue const &issue, std::optional< Domain > const &domain=std::nullopt) override
Get all order books that want a specific issue.
void update(std::shared_ptr< ReadView const > const &ledger)
bool isBookToXRP(Issue const &issue, std::optional< Domain > const &domain=std::nullopt) override
Check if an order book to XRP exists for the given issue.
std::reference_wrapper< ServiceRegistry > registry_
beast::Journal const j_
std::recursive_mutex mLock
Identifies fields.
Definition SField.h:126
Service registry for dependency injection.
STArray & getNodes()
Definition TxMeta.h:69
T emplace_back(T... args)
T end(T... args)
T exchange(T... args)
T find(T... args)
T is_same_v
T load(T... args)
T make_pair(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool isXRP(AccountID const &c)
Definition AccountID.h:70
@ jtUPDATE_PF
Definition Job.h:35
std::unique_ptr< OrderBookDB > make_OrderBookDB(ServiceRegistry &registry, OrderBookDBConfig const &config)
Create an OrderBookDB instance.
T reserve(T... args)
T store(T... args)
Configuration for OrderBookDB.
T to_string(T... args)
T what(T... args)