rippled
Loading...
Searching...
No Matches
OrderBookDBImpl.cpp
1#include <xrpld/app/ledger/LedgerMaster.h>
2#include <xrpld/app/ledger/OrderBookDBImpl.h>
3#include <xrpld/app/misc/AMMUtils.h>
4#include <xrpld/core/Config.h>
5
6#include <xrpl/basics/Log.h>
7#include <xrpl/core/JobQueue.h>
8#include <xrpl/protocol/Indexes.h>
9#include <xrpl/server/NetworkOPs.h>
10
11namespace xrpl {
12
14 : registry_(registry)
15 , pathSearchMax_(config.pathSearchMax)
16 , standalone_(config.standalone)
17 , seq_(0)
18 , j_(registry.journal("OrderBookDB"))
19{
20}
21
24{
25 return std::make_unique<OrderBookDBImpl>(registry, config);
26}
27
28void
30{
32 {
33 JLOG(j_.warn()) << "Eliding full order book update: no ledger";
34 return;
35 }
36
37 auto seq = seq_.load();
38
39 if (seq != 0)
40 {
41 if ((ledger->seq() > seq) && ((ledger->seq() - seq) < 25600))
42 return;
43
44 if ((ledger->seq() <= seq) && ((seq - ledger->seq()) < 16))
45 return;
46 }
47
48 if (seq_.exchange(ledger->seq()) != seq)
49 return;
50
51 JLOG(j_.debug()) << "Full order book update: " << seq << " to " << ledger->seq();
52
53 if (pathSearchMax_ != 0)
54 {
55 if (standalone_)
56 update(ledger);
57 else
59 jtUPDATE_PF, "OrderBookUpd" + std::to_string(ledger->seq()), [this, ledger]() { update(ledger); });
60 }
61}
62
63void
65{
66 if (pathSearchMax_ == 0)
67 return; // pathfinding has been disabled
68
69 // A newer full update job is pending
70 if (auto const seq = seq_.load(); seq > ledger->seq())
71 {
72 JLOG(j_.debug()) << "Eliding update for " << ledger->seq() << " because of pending update to later " << seq;
73 return;
74 }
75
76 decltype(allBooks_) allBooks;
77 decltype(xrpBooks_) xrpBooks;
78 decltype(domainBooks_) domainBooks;
79 decltype(xrpDomainBooks_) xrpDomainBooks;
80
81 allBooks.reserve(allBooks_.size());
82 xrpBooks.reserve(xrpBooks_.size());
83
84 JLOG(j_.debug()) << "Beginning update (" << ledger->seq() << ")";
85
86 // walk through the entire ledger looking for orderbook/AMM entries
87 int cnt = 0;
88
89 try
90 {
91 for (auto& sle : ledger->sles)
92 {
94 {
95 JLOG(j_.info()) << "Update halted because the process is stopping";
96 seq_.store(0);
97 return;
98 }
99
100 if (sle->getType() == ltDIR_NODE && sle->isFieldPresent(sfExchangeRate) &&
101 sle->getFieldH256(sfRootIndex) == sle->key())
102 {
103 Book book;
104
105 book.in.currency = sle->getFieldH160(sfTakerPaysCurrency);
106 book.in.account = sle->getFieldH160(sfTakerPaysIssuer);
107 book.out.currency = sle->getFieldH160(sfTakerGetsCurrency);
108 book.out.account = sle->getFieldH160(sfTakerGetsIssuer);
109 book.domain = (*sle)[~sfDomainID];
110
111 if (book.domain)
112 domainBooks[{book.in, *book.domain}].insert(book.out);
113 else
114 allBooks[book.in].insert(book.out);
115
116 if (book.domain && isXRP(book.out))
117 xrpDomainBooks.insert({book.in, *book.domain});
118 else if (isXRP(book.out))
119 xrpBooks.insert(book.in);
120
121 ++cnt;
122 }
123 else if (sle->getType() == ltAMM)
124 {
125 auto const issue1 = (*sle)[sfAsset].get<Issue>();
126 auto const issue2 = (*sle)[sfAsset2].get<Issue>();
127 auto addBook = [&](Issue const& in, Issue const& out) {
128 allBooks[in].insert(out);
129
130 if (isXRP(out))
131 xrpBooks.insert(in);
132
133 ++cnt;
134 };
135 addBook(issue1, issue2);
136 addBook(issue2, issue1);
137 }
138 }
139 }
140 catch (SHAMapMissingNode const& mn)
141 {
142 JLOG(j_.info()) << "Missing node in " << ledger->seq() << " during update: " << mn.what();
143 seq_.store(0);
144 return;
145 }
146
147 JLOG(j_.debug()) << "Update completed (" << ledger->seq() << "): " << cnt << " books found";
148
149 {
151 allBooks_.swap(allBooks);
152 xrpBooks_.swap(xrpBooks);
153 domainBooks_.swap(domainBooks);
154 xrpDomainBooks_.swap(xrpDomainBooks);
155 }
156
158}
159
160void
162{
163 bool toXRP = isXRP(book.out);
164
166
167 if (book.domain)
168 domainBooks_[{book.in, *book.domain}].insert(book.out);
169 else
170 allBooks_[book.in].insert(book.out);
171
172 if (book.domain && toXRP)
173 xrpDomainBooks_.insert({book.in, *book.domain});
174 else if (toXRP)
175 xrpBooks_.insert(book.in);
176}
177
178// return list of all orderbooks that want this issuerID and currencyID
181{
183
184 {
186
187 auto getBooks = [&](auto const& container, auto const& key) {
188 if (auto it = container.find(key); it != container.end())
189 {
190 auto const& books = it->second;
191 ret.reserve(books.size());
192
193 for (auto const& gets : books)
194 ret.emplace_back(issue, gets, domain);
195 }
196 };
197
198 if (!domain)
199 getBooks(allBooks_, issue);
200 else
201 getBooks(domainBooks_, std::make_pair(issue, *domain));
202 }
203
204 return ret;
205}
206
207int
209{
211
212 if (!domain)
213 {
214 if (auto it = allBooks_.find(issue); it != allBooks_.end())
215 return static_cast<int>(it->second.size());
216 }
217 else
218 {
219 if (auto it = domainBooks_.find({issue, *domain}); it != domainBooks_.end())
220 return static_cast<int>(it->second.size());
221 }
222
223 return 0;
224}
225
226bool
228{
230 if (domain)
231 return xrpDomainBooks_.contains({issue, *domain});
232 return xrpBooks_.contains(issue);
233}
234
237{
239 auto ret = getBookListeners(book);
240
241 if (!ret)
242 {
244
245 mListeners[book] = ret;
246 XRPL_ASSERT(
247 getBookListeners(book) == ret,
248 "xrpl::OrderBookDB::makeBookListeners : result roundtrip "
249 "lookup");
250 }
251
252 return ret;
253}
254
257{
260
261 auto it0 = mListeners.find(book);
262 if (it0 != mListeners.end())
263 ret = it0->second;
264
265 return ret;
266}
267
268// Based on the meta, send the meta to the streams that are listening.
269// We need to determine which streams a given meta effects.
270void
273 AcceptedLedgerTx const& alTx,
274 MultiApiJson const& jvObj)
275{
277
278 // For this particular transaction, maintain the set of unique
279 // subscriptions that have already published it. This prevents sending
280 // the transaction multiple times if it touches multiple ltOFFER
281 // entries for the same book, or if it touches multiple books and a
282 // single client has subscribed to those books.
283 hash_set<std::uint64_t> havePublished;
284
285 for (auto const& node : alTx.getMeta().getNodes())
286 {
287 try
288 {
289 if (node.getFieldU16(sfLedgerEntryType) == ltOFFER)
290 {
291 auto process = [&, this](SField const& field) {
292 if (auto data = dynamic_cast<STObject const*>(node.peekAtPField(field));
293 data && data->isFieldPresent(sfTakerPays) && data->isFieldPresent(sfTakerGets))
294 {
295 auto listeners = getBookListeners(
296 {data->getFieldAmount(sfTakerGets).issue(),
297 data->getFieldAmount(sfTakerPays).issue(),
298 (*data)[~sfDomainID]});
299 if (listeners)
300 listeners->publish(jvObj, havePublished);
301 }
302 };
303
304 // We need a field that contains the TakerGets and TakerPays
305 // parameters.
306 if (node.getFName() == sfModifiedNode)
307 process(sfPreviousFields);
308 else if (node.getFName() == sfCreatedNode)
309 process(sfNewFields);
310 else if (node.getFName() == sfDeletedNode)
311 process(sfFinalFields);
312 }
313 }
314 catch (std::exception const& ex)
315 {
316 JLOG(j_.info()) << "processTxn: field not found (" << ex.what() << ")";
317 }
318 }
319}
320
321} // namespace xrpl
Stream debug() const
Definition Journal.h:300
Stream info() const
Definition Journal.h:306
Stream warn() const
Definition Journal.h:312
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
Specifies an order book.
Definition Book.h:16
Issue in
Definition Book.h:18
std::optional< uint256 > domain
Definition Book.h:20
Issue out
Definition Book.h:19
A currency issued by an account.
Definition Issue.h:13
Currency currency
Definition Issue.h:15
AccountID account
Definition Issue.h:16
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:145
virtual bool isNeedNetworkLedger()=0
hardened_hash_map< std::pair< Issue, Domain >, hardened_hash_set< Issue > > domainBooks_
void addOrderBook(Book const &book) override
Add an order book to track.
BookListeners::pointer makeBookListeners(Book const &) override
BookListeners::pointer getBookListeners(Book const &) override
BookToListenersMap mListeners
int getBookSize(Issue const &issue, std::optional< Domain > const &domain=std::nullopt) override
Get the count of order books that want a specific issue.
hash_set< Issue > xrpBooks_
bool isBookToXRP(Issue const &issue, std::optional< Domain > domain=std::nullopt) override
Check if an order book to XRP exists for the given issue.
void setup(std::shared_ptr< ReadView const > const &ledger) override
Initialize or update the order book database with a new ledger.
hash_set< std::pair< Issue, Domain > > xrpDomainBooks_
OrderBookDBImpl(ServiceRegistry &registry, OrderBookDBConfig const &config)
hardened_hash_map< Issue, hardened_hash_set< Issue > > allBooks_
std::atomic< std::uint32_t > seq_
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj) override
std::vector< Book > getBooksByTakerPays(Issue const &issue, std::optional< Domain > const &domain=std::nullopt) override
Get all order books that want a specific issue.
void update(std::shared_ptr< ReadView const > const &ledger)
ServiceRegistry & registry_
beast::Journal const j_
std::recursive_mutex mLock
Identifies fields.
Definition SField.h:126
Service registry for dependency injection.
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual bool isStopping() const =0
STArray & getNodes()
Definition TxMeta.h:69
T emplace_back(T... args)
T end(T... args)
T exchange(T... args)
T find(T... args)
T is_same_v
T load(T... args)
T make_pair(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool isXRP(AccountID const &c)
Definition AccountID.h:70
@ jtUPDATE_PF
Definition Job.h:35
std::unique_ptr< OrderBookDB > make_OrderBookDB(ServiceRegistry &registry, OrderBookDBConfig const &config)
Create an OrderBookDB instance.
T reserve(T... args)
T store(T... args)
Configuration for OrderBookDB.
T to_string(T... args)
T what(T... args)