Skip to content

Commit

Permalink
ingest/ledgerbackend: Create functional producer for CDP (#5462)
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland authored Oct 3, 2024
1 parent 9a19888 commit ba11f09
Show file tree
Hide file tree
Showing 6 changed files with 467 additions and 2 deletions.
6 changes: 6 additions & 0 deletions ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/).

## Pending

### New Features
* Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform).
* Add new functional producer, `cdp.ApplyLedgerMetadata`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `ApplyLedgerMetadata` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462).

### Stellar Core Protocol 21 Configuration Update:
* BucketlistDB is now the default database for stellar-core, replacing the experimental option. As a result, the `EXPERIMENTAL_BUCKETLIST_DB` configuration parameter has been deprecated.
* A new mandatory parameter, `DEPRECATED_SQL_LEDGER_STATE`, has been added with a default value of false which equivalent to `EXPERIMENTAL_BUCKETLIST_DB` being set to true.
Expand Down
147 changes: 147 additions & 0 deletions ingest/cdp/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package cdp

import (
"context"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

// provide testing hooks to inject mocks of these
var datastoreFactory = datastore.NewDataStore

// Generate a default buffered storage config with values
// set to optimize buffered performance to some degree based
// on number of ledgers per file expected in the underlying
// datastore used by an instance of BufferedStorageBackend.
//
// these numbers were derived empirically from benchmarking analysis:
// https://github.com/stellar/go/issues/5390
//
// ledgersPerFile - number of ledgers per file from remote datastore schema.
// return - preconfigured instance of BufferedStorageBackendConfig
func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.BufferedStorageBackendConfig {

config := ledgerbackend.BufferedStorageBackendConfig{
RetryLimit: 5,
RetryWait: 30 * time.Second,
}

switch {
case ledgersPerFile < 2:
config.BufferSize = 500
config.NumWorkers = 5
return config
case ledgersPerFile < 101:
config.BufferSize = 10
config.NumWorkers = 5
return config
default:
config.BufferSize = 10
config.NumWorkers = 2
return config
}
}

type PublisherConfig struct {
// Registry, optional, include to capture buffered storage backend metrics
Registry *prometheus.Registry
// RegistryNamespace, optional, include to emit buffered storage backend
// under this namespace
RegistryNamespace string
// BufferedStorageConfig, required
BufferedStorageConfig ledgerbackend.BufferedStorageBackendConfig
//DataStoreConfig, required
DataStoreConfig datastore.DataStoreConfig
// Log, optional, if nil uses go default logger
Log *log.Entry
}

// ApplyLedgerMetadata - creates an internal instance
// of BufferedStorageBackend using provided config and emits
// ledger metadata for the requested range by invoking the provided callback
// once per ledger.
//
// The function is blocking, it will only return when a bounded range
// is completed, the ctx is canceled, or an error occurs.
//
// ledgerRange - the requested range, can be bounded or unbounded.
//
// publisherConfig - PublisherConfig. Provide configuration settings for DataStore
// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create
// optimized BufferedStorageBackendConfig.
//
// ctx - the context. Caller uses this to cancel the internal ledger processing,
// when canceled, the function will return asap with that error.
//
// callback - function. Invoked for every LedgerCloseMeta. If callback invocation
// returns an error, the processing will stop and return an error asap.
//
// return - error, function only returns if requested range is bounded or an error occured.
// nil will be returned only if bounded range requested and completed processing with no errors.
// otherwise return will always be an error.
func ApplyLedgerMetadata(ledgerRange ledgerbackend.Range,
publisherConfig PublisherConfig,
ctx context.Context,
callback func(xdr.LedgerCloseMeta) error) error {

logger := publisherConfig.Log
if logger == nil {
logger = log.DefaultLogger
}

dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig)
if err != nil {
return fmt.Errorf("failed to create datastore: %w", err)
}

var ledgerBackend ledgerbackend.LedgerBackend
ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore)
if err != nil {
return fmt.Errorf("failed to create buffered storage backend: %w", err)
}

if publisherConfig.Registry != nil {
ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace)
}

if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() {
return fmt.Errorf("invalid end value for bounded range, must be greater than start")
}

if !ledgerRange.Bounded() && ledgerRange.To() > 0 {
return fmt.Errorf("invalid end value for unbounded range, must be zero")
}

from := ordered.Max(2, ledgerRange.From())
ledgerBackend.PrepareRange(ctx, ledgerRange)

for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...")
startTime := time.Now()
ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq)

if err != nil {
return fmt.Errorf("error getting ledger, %w", err)
}

log.WithFields(log.F{
"sequence": ledgerSeq,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

err = callback(ledgerCloseMeta)
if err != nil {
return fmt.Errorf("received an error from callback invocation: %w", err)
}
}
return nil
}
Loading

0 comments on commit ba11f09

Please sign in to comment.