71    using GetLedgerResponseType = 
typename LedgerLoaderType::GetLedgerResponseType;
 
   72    using RawLedgerObjectType = 
typename LedgerLoaderType::RawLedgerObjectType;
 
   76    std::reference_wrapper<DataPipeType> pipe_;
 
   77    std::shared_ptr<BackendInterface> backend_;
 
   78    std::reference_wrapper<LedgerLoaderType> loader_;
 
   79    std::reference_wrapper<LedgerPublisherType> publisher_;
 
   80    std::reference_wrapper<AmendmentBlockHandlerType> amendmentBlockHandler_;
 
   82    uint32_t startSequence_;
 
   83    std::reference_wrapper<SystemState> state_;  
 
   96        std::shared_ptr<BackendInterface> backend,
 
   97        LedgerLoaderType& loader,
 
   98        LedgerPublisherType& publisher,
 
   99        AmendmentBlockHandlerType& amendmentBlockHandler,
 
  100        uint32_t startSequence,
 
  103        : pipe_{std::ref(pipe)}
 
  104        , backend_{std::move(backend)}
 
  105        , loader_{std::ref(loader)}
 
  106        , publisher_{std::ref(publisher)}
 
  107        , amendmentBlockHandler_{std::ref(amendmentBlockHandler)}
 
  108        , startSequence_{startSequence}
 
  109        , state_{std::ref(state)}
 
  111        thread_ = std::thread([
this]() { process(); });
 
 
  119        if (thread_.joinable())
 
 
  129        ASSERT(thread_.joinable(), 
"Transformer thread must be joinable");
 
 
  137        beast::setCurrentThreadName(
"ETLService transform");
 
  138        uint32_t currentSequence = startSequence_;
 
  140        while (not hasWriteConflict()) {
 
  141            auto fetchResponse = pipe_.get().popNext(currentSequence);
 
  152            auto const start = std::chrono::system_clock::now();
 
  153            auto [lgrInfo, success] = buildNextLedger(*fetchResponse);
 
  156                auto const numTxns = fetchResponse->transactions_list().transactions_size();
 
  157                auto const numObjects = fetchResponse->ledger_objects().objects_size();
 
  158                auto const end = std::chrono::system_clock::now();
 
  159                auto const duration = ((end - start).count()) / 1000000000.0;
 
  161                LOG(log_.
info()) << 
"Load phase of ETL. Successfully wrote ledger! Ledger info: " 
  163                                 << 
". object count = " << numObjects << 
". load time = " << duration
 
  164                                 << 
". load txns per second = " << numTxns / duration
 
  165                                 << 
". load objs per second = " << numObjects / duration;
 
  168                publisher_.get().publish(lgrInfo);
 
  173            setWriteConflict(not success);
 
  184    std::pair<ripple::LedgerHeader, bool>
 
  185    buildNextLedger(GetLedgerResponseType& rawData)
 
  187        LOG(log_.debug()) << 
"Beginning ledger update";
 
  190        LOG(log_.debug()) << 
"Deserialized ledger header. " << 
::util::toString(lgrInfo);
 
  191        backend_->startWrites();
 
  192        backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header()));
 
  194        writeSuccessors(lgrInfo, rawData);
 
  195        std::optional<FormattedTransactionsData> insertTxResultOp;
 
  197            updateCache(lgrInfo, rawData);
 
  199            LOG(log_.debug()) << 
"Inserted/modified/deleted all objects. Number of objects = " 
  200                              << rawData.ledger_objects().objects_size();
 
  202            insertTxResultOp.emplace(loader_.get().insertTransactions(lgrInfo, rawData));
 
  203        } 
catch (std::runtime_error 
const& e) {
 
  204            LOG(log_.fatal()) << 
"Failed to build next ledger: " << e.what();
 
  206            amendmentBlockHandler_.get().notifyAmendmentBlocked();
 
  207            return {ripple::LedgerHeader{}, 
false};
 
  210        LOG(log_.debug()) << 
"Inserted all transactions. Number of transactions  = " 
  211                          << rawData.transactions_list().transactions_size();
 
  213        backend_->writeAccountTransactions(std::move(insertTxResultOp->accountTxData));
 
  214        backend_->writeNFTs(insertTxResultOp->nfTokensData);
 
  215        backend_->writeNFTTransactions(insertTxResultOp->nfTokenTxData);
 
  216        backend_->writeMPTHolders(insertTxResultOp->mptHoldersData);
 
  218        auto [success, duration] =
 
  221        LOG(log_.debug()) << 
"Finished writes. Total time: " << std::to_string(duration);
 
  222        LOG(log_.debug()) << 
"Finished ledger update: " << 
::util::toString(lgrInfo);
 
  224        return {lgrInfo, success};
 
  234    updateCache(ripple::LedgerHeader 
const& lgrInfo, GetLedgerResponseType& rawData)
 
  236        std::vector<data::LedgerObject> cacheUpdates;
 
  237        cacheUpdates.reserve(rawData.ledger_objects().objects_size());
 
  240        std::set<ripple::uint256> bookSuccessorsToCalculate;
 
  241        std::set<ripple::uint256> modified;
 
  243        for (
auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) {
 
  244            auto key = ripple::uint256::fromVoidChecked(obj.key());
 
  245            ASSERT(key.has_value(), 
"Failed to deserialize key from void");
 
  247            cacheUpdates.push_back({*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}});
 
  248            LOG(log_.debug()) << 
"key = " << ripple::strHex(*key) << 
" - mod type = " << obj.mod_type();
 
  250            if (obj.mod_type() != RawLedgerObjectType::MODIFIED && !rawData.object_neighbors_included()) {
 
  251                LOG(log_.debug()) << 
"object neighbors not included. using cache";
 
  253                if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq - 1)
 
  254                    throw std::logic_error(
"Cache is not full, but object neighbors were not included");
 
  256                auto const blob = obj.mutable_data();
 
  257                auto checkBookBase = 
false;
 
  258                auto const isDeleted = (blob->size() == 0);
 
  261                    auto const old = backend_->cache().get(*key, lgrInfo.seq - 1);
 
  262                    ASSERT(old.has_value(), 
"Deleted object {} must be in cache", ripple::strHex(*key));
 
  269                    LOG(log_.debug()) << 
"Is book dir. Key = " << ripple::strHex(*key);
 
  272                    auto const oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1);
 
  274                        oldFirstDir.has_value(),
 
  275                        "Book base must have a successor for lgrInfo.seq - 1 = {}",
 
  281                    if ((isDeleted && key == oldFirstDir->key) || (!isDeleted && key < oldFirstDir->key)) {
 
  283                            << 
"Need to recalculate book base successor. base = " << ripple::strHex(bookBase)
 
  284                            << 
" - key = " << ripple::strHex(*key) << 
" - isDeleted = " << isDeleted
 
  285                            << 
" - seq = " << lgrInfo.seq;
 
  286                        bookSuccessorsToCalculate.insert(bookBase);
 
  291            if (obj.mod_type() == RawLedgerObjectType::MODIFIED)
 
  292                modified.insert(*key);
 
  294            backend_->writeLedgerObject(std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data()));
 
  297        backend_->cache().update(cacheUpdates, lgrInfo.seq);
 
  300        if (!rawData.object_neighbors_included()) {
 
  301            LOG(log_.debug()) << 
"object neighbors not included. using cache";
 
  302            if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq)
 
  303                throw std::logic_error(
"Cache is not full, but object neighbors were not included");
 
  305            for (
auto const& obj : cacheUpdates) {
 
  306                if (modified.contains(obj.key))
 
  309                auto lb = backend_->cache().getPredecessor(obj.key, lgrInfo.seq);
 
  311                    lb = {.key = data::kFIRST_KEY, .blob = {}};
 
  313                auto ub = backend_->cache().getSuccessor(obj.key, lgrInfo.seq);
 
  315                    ub = {.key = data::kLAST_KEY, .blob = {}};
 
  317                if (obj.blob.empty()) {
 
  318                    LOG(log_.debug()) << 
"writing successor for deleted object " << ripple::strHex(obj.key) << 
" - " 
  319                                      << ripple::strHex(lb->key) << 
" - " << ripple::strHex(ub->key);
 
  326                    LOG(log_.debug()) << 
"writing successor for new object " << ripple::strHex(lb->key) << 
" - " 
  327                                      << ripple::strHex(obj.key) << 
" - " << ripple::strHex(ub->key);
 
  331            for (
auto const& base : bookSuccessorsToCalculate) {
 
  332                auto succ = backend_->cache().getSuccessor(base, lgrInfo.seq);
 
  336                    LOG(log_.debug()) << 
"Updating book successor " << ripple::strHex(base) << 
" - " 
  337                                      << ripple::strHex(succ->key);
 
  341                    LOG(log_.debug()) << 
"Updating book successor " << ripple::strHex(base) << 
" - " 
  342                                      << ripple::strHex(data::kLAST_KEY);
 
  355    writeSuccessors(ripple::LedgerHeader 
const& lgrInfo, GetLedgerResponseType& rawData)
 
  358        if (rawData.object_neighbors_included()) {
 
  359            LOG(log_.debug()) << 
"object neighbors included";
 
  361            for (
auto& obj : *(rawData.mutable_book_successors())) {
 
  362                auto firstBook = std::move(*obj.mutable_first_book());
 
  363                if (!firstBook.size())
 
  365                LOG(log_.debug()) << 
"writing book successor " << ripple::strHex(obj.book_base()) << 
" - " 
  366                                  << ripple::strHex(firstBook);
 
  368                backend_->writeSuccessor(std::move(*obj.mutable_book_base()), lgrInfo.seq, std::move(firstBook));
 
  371            for (
auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) {
 
  372                if (obj.mod_type() != RawLedgerObjectType::MODIFIED) {
 
  373                    std::string* predPtr = obj.mutable_predecessor();
 
  374                    if (predPtr->empty())
 
  376                    std::string* succPtr = obj.mutable_successor();
 
  377                    if (succPtr->empty())
 
  380                    if (obj.mod_type() == RawLedgerObjectType::DELETED) {
 
  381                        LOG(log_.debug()) << 
"Modifying successors for deleted object " << ripple::strHex(obj.key())
 
  382                                          << 
" - " << ripple::strHex(*predPtr) << 
" - " << ripple::strHex(*succPtr);
 
  384                        backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::move(*succPtr));
 
  386                        LOG(log_.debug()) << 
"adding successor for new object " << ripple::strHex(obj.key()) << 
" - " 
  387                                          << ripple::strHex(*predPtr) << 
" - " << ripple::strHex(*succPtr);
 
  389                        backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::string{obj.key()});
 
  390                        backend_->writeSuccessor(std::string{obj.key()}, lgrInfo.seq, std::move(*succPtr));
 
  393                    LOG(log_.debug()) << 
"object modified " << ripple::strHex(obj.key());
 
  402        return state_.get().isStopping;
 
  407    hasWriteConflict()
 const 
  409        return state_.get().writeConflict;
 
  418    setWriteConflict(
bool conflict)
 
  420        state_.get().writeConflict = conflict;