rippled
Loading...
Searching...
No Matches
OrderBookDB.cpp
1#include <xrpld/app/ledger/LedgerMaster.h>
2#include <xrpld/app/ledger/OrderBookDB.h>
3#include <xrpld/app/main/Application.h>
4#include <xrpld/app/misc/AMMUtils.h>
5#include <xrpld/app/misc/NetworkOPs.h>
6#include <xrpld/core/Config.h>
7#include <xrpld/core/JobQueue.h>
8
9#include <xrpl/basics/Log.h>
10#include <xrpl/protocol/Indexes.h>
11
12namespace ripple {
13
15 : app_(app), seq_(0), j_(app.journal("OrderBookDB"))
16{
17}
18
19void
21{
23 {
24 JLOG(j_.warn()) << "Eliding full order book update: no ledger";
25 return;
26 }
27
28 auto seq = seq_.load();
29
30 if (seq != 0)
31 {
32 if ((ledger->seq() > seq) && ((ledger->seq() - seq) < 25600))
33 return;
34
35 if ((ledger->seq() <= seq) && ((seq - ledger->seq()) < 16))
36 return;
37 }
38
39 if (seq_.exchange(ledger->seq()) != seq)
40 return;
41
42 JLOG(j_.debug()) << "Full order book update: " << seq << " to "
43 << ledger->seq();
44
45 if (app_.config().PATH_SEARCH_MAX != 0)
46 {
47 if (app_.config().standalone())
48 update(ledger);
49 else
52 "OrderBookDB::update: " + std::to_string(ledger->seq()),
53 [this, ledger]() { update(ledger); });
54 }
55}
56
57void
59{
60 if (app_.config().PATH_SEARCH_MAX == 0)
61 return; // pathfinding has been disabled
62
63 // A newer full update job is pending
64 if (auto const seq = seq_.load(); seq > ledger->seq())
65 {
66 JLOG(j_.debug()) << "Eliding update for " << ledger->seq()
67 << " because of pending update to later " << seq;
68 return;
69 }
70
71 decltype(allBooks_) allBooks;
72 decltype(xrpBooks_) xrpBooks;
73 decltype(domainBooks_) domainBooks;
74 decltype(xrpDomainBooks_) xrpDomainBooks;
75
76 allBooks.reserve(allBooks_.size());
77 xrpBooks.reserve(xrpBooks_.size());
78
79 JLOG(j_.debug()) << "Beginning update (" << ledger->seq() << ")";
80
81 // walk through the entire ledger looking for orderbook/AMM entries
82 int cnt = 0;
83
84 try
85 {
86 for (auto& sle : ledger->sles)
87 {
88 if (app_.isStopping())
89 {
90 JLOG(j_.info())
91 << "Update halted because the process is stopping";
92 seq_.store(0);
93 return;
94 }
95
96 if (sle->getType() == ltDIR_NODE &&
97 sle->isFieldPresent(sfExchangeRate) &&
98 sle->getFieldH256(sfRootIndex) == sle->key())
99 {
100 Book book;
101
102 book.in.currency = sle->getFieldH160(sfTakerPaysCurrency);
103 book.in.account = sle->getFieldH160(sfTakerPaysIssuer);
104 book.out.currency = sle->getFieldH160(sfTakerGetsCurrency);
105 book.out.account = sle->getFieldH160(sfTakerGetsIssuer);
106 book.domain = (*sle)[~sfDomainID];
107
108 if (book.domain)
109 domainBooks_[{book.in, *book.domain}].insert(book.out);
110 else
111 allBooks[book.in].insert(book.out);
112
113 if (book.domain && isXRP(book.out))
114 xrpDomainBooks.insert({book.in, *book.domain});
115 else if (isXRP(book.out))
116 xrpBooks.insert(book.in);
117
118 ++cnt;
119 }
120 else if (sle->getType() == ltAMM)
121 {
122 auto const issue1 = (*sle)[sfAsset].get<Issue>();
123 auto const issue2 = (*sle)[sfAsset2].get<Issue>();
124 auto addBook = [&](Issue const& in, Issue const& out) {
125 allBooks[in].insert(out);
126
127 if (isXRP(out))
128 xrpBooks.insert(in);
129
130 ++cnt;
131 };
132 addBook(issue1, issue2);
133 addBook(issue2, issue1);
134 }
135 }
136 }
137 catch (SHAMapMissingNode const& mn)
138 {
139 JLOG(j_.info()) << "Missing node in " << ledger->seq()
140 << " during update: " << mn.what();
141 seq_.store(0);
142 return;
143 }
144
145 JLOG(j_.debug()) << "Update completed (" << ledger->seq() << "): " << cnt
146 << " books found";
147
148 {
150 allBooks_.swap(allBooks);
151 xrpBooks_.swap(xrpBooks);
152 domainBooks_.swap(domainBooks);
153 xrpDomainBooks_.swap(xrpDomainBooks);
154 }
155
157}
158
159void
161{
162 bool toXRP = isXRP(book.out);
163
165
166 if (book.domain)
167 domainBooks_[{book.in, *book.domain}].insert(book.out);
168 else
169 allBooks_[book.in].insert(book.out);
170
171 if (book.domain && toXRP)
172 xrpDomainBooks_.insert({book.in, *book.domain});
173 else if (toXRP)
174 xrpBooks_.insert(book.in);
175}
176
177// return list of all orderbooks that want this issuerID and currencyID
180 Issue const& issue,
181 std::optional<uint256> const& domain)
182{
184
185 {
187
188 auto getBooks = [&](auto const& container, auto const& key) {
189 if (auto it = container.find(key); it != container.end())
190 {
191 auto const& books = it->second;
192 ret.reserve(books.size());
193
194 for (auto const& gets : books)
195 ret.emplace_back(issue, gets, domain);
196 }
197 };
198
199 if (!domain)
200 getBooks(allBooks_, issue);
201 else
202 getBooks(domainBooks_, std::make_pair(issue, *domain));
203 }
204
205 return ret;
206}
207
208int
210 Issue const& issue,
211 std::optional<uint256> const& domain)
212{
214
215 if (!domain)
216 {
217 if (auto it = allBooks_.find(issue); it != allBooks_.end())
218 return static_cast<int>(it->second.size());
219 }
220 else
221 {
222 if (auto it = domainBooks_.find({issue, *domain});
223 it != domainBooks_.end())
224 return static_cast<int>(it->second.size());
225 }
226
227 return 0;
228}
229
230bool
232{
234 if (domain)
235 return xrpDomainBooks_.contains({issue, *domain});
236 return xrpBooks_.contains(issue);
237}
238
241{
243 auto ret = getBookListeners(book);
244
245 if (!ret)
246 {
248
249 mListeners[book] = ret;
250 XRPL_ASSERT(
251 getBookListeners(book) == ret,
252 "ripple::OrderBookDB::makeBookListeners : result roundtrip "
253 "lookup");
254 }
255
256 return ret;
257}
258
261{
264
265 auto it0 = mListeners.find(book);
266 if (it0 != mListeners.end())
267 ret = it0->second;
268
269 return ret;
270}
271
272// Based on the meta, send the meta to the streams that are listening.
273// We need to determine which streams a given meta effects.
274void
277 AcceptedLedgerTx const& alTx,
278 MultiApiJson const& jvObj)
279{
281
282 // For this particular transaction, maintain the set of unique
283 // subscriptions that have already published it. This prevents sending
284 // the transaction multiple times if it touches multiple ltOFFER
285 // entries for the same book, or if it touches multiple books and a
286 // single client has subscribed to those books.
287 hash_set<std::uint64_t> havePublished;
288
289 for (auto const& node : alTx.getMeta().getNodes())
290 {
291 try
292 {
293 if (node.getFieldU16(sfLedgerEntryType) == ltOFFER)
294 {
295 auto process = [&, this](SField const& field) {
296 if (auto data = dynamic_cast<STObject const*>(
297 node.peekAtPField(field));
298 data && data->isFieldPresent(sfTakerPays) &&
299 data->isFieldPresent(sfTakerGets))
300 {
301 auto listeners = getBookListeners(
302 {data->getFieldAmount(sfTakerGets).issue(),
303 data->getFieldAmount(sfTakerPays).issue(),
304 (*data)[~sfDomainID]});
305 if (listeners)
306 listeners->publish(jvObj, havePublished);
307 }
308 };
309
310 // We need a field that contains the TakerGets and TakerPays
311 // parameters.
312 if (node.getFName() == sfModifiedNode)
313 process(sfPreviousFields);
314 else if (node.getFName() == sfCreatedNode)
315 process(sfNewFields);
316 else if (node.getFName() == sfDeletedNode)
317 process(sfFinalFields);
318 }
319 }
320 catch (std::exception const& ex)
321 {
322 JLOG(j_.info())
323 << "processTxn: field not found (" << ex.what() << ")";
324 }
325 }
326}
327
328} // namespace ripple
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream warn() const
Definition Journal.h:321
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
virtual Config & config()=0
virtual bool isStopping() const =0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual LedgerMaster & getLedgerMaster()=0
Specifies an order book.
Definition Book.h:17
Issue in
Definition Book.h:19
Issue out
Definition Book.h:20
std::optional< uint256 > domain
Definition Book.h:21
bool standalone() const
Definition Config.h:317
int PATH_SEARCH_MAX
Definition Config.h:179
A currency issued by an account.
Definition Issue.h:14
AccountID account
Definition Issue.h:17
Currency currency
Definition Issue.h:16
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:149
virtual bool isNeedNetworkLedger()=0
BookListeners::pointer getBookListeners(Book const &)
void addOrderBook(Book const &)
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj)
BookListeners::pointer makeBookListeners(Book const &)
std::atomic< std::uint32_t > seq_
Definition OrderBookDB.h:79
OrderBookDB(Application &app)
void update(std::shared_ptr< ReadView const > const &ledger)
int getBookSize(Issue const &, std::optional< Domain > const &domain=std::nullopt)
hardened_hash_map< std::pair< Issue, Domain >, hardened_hash_set< Issue > > domainBooks_
Definition OrderBookDB.h:65
std::recursive_mutex mLock
Definition OrderBookDB.h:73
hash_set< std::pair< Issue, Domain > > xrpDomainBooks_
Definition OrderBookDB.h:71
beast::Journal const j_
Definition OrderBookDB.h:81
BookToListenersMap mListeners
Definition OrderBookDB.h:77
hash_set< Issue > xrpBooks_
Definition OrderBookDB.h:68
hardened_hash_map< Issue, hardened_hash_set< Issue > > allBooks_
Definition OrderBookDB.h:62
void setup(std::shared_ptr< ReadView const > const &ledger)
Application & app_
Definition OrderBookDB.h:59
std::vector< Book > getBooksByTakerPays(Issue const &, std::optional< Domain > const &domain=std::nullopt)
bool isBookToXRP(Issue const &, std::optional< Domain > domain=std::nullopt)
Identifies fields.
Definition SField.h:127
STArray & getNodes()
Definition TxMeta.h:70
T emplace_back(T... args)
T end(T... args)
T exchange(T... args)
T find(T... args)
T is_same_v
T load(T... args)
T make_pair(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool isXRP(AccountID const &c)
Definition AccountID.h:71
@ jtUPDATE_PF
Definition Job.h:37
T reserve(T... args)
T store(T... args)
T to_string(T... args)
T what(T... args)