From 1b3f07e94dc6e1883a578011a10096bfe2367105 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 16 Sep 2024 17:53:18 -0700 Subject: [PATCH 01/14] #5412: functional producer for bufferedstorage backend --- .../ledgerbackend/buffered_storage_backend.go | 156 ++++++++++ .../buffered_storage_backend_test.go | 283 +++++++++++++++++- support/datastore/mocks.go | 6 +- 3 files changed, 443 insertions(+), 2 deletions(-) diff --git a/ingest/ledgerbackend/buffered_storage_backend.go b/ingest/ledgerbackend/buffered_storage_backend.go index aa70336295..bc54bc8a6f 100644 --- a/ingest/ledgerbackend/buffered_storage_backend.go +++ b/ingest/ledgerbackend/buffered_storage_backend.go @@ -5,18 +5,26 @@ package ledgerbackend import ( "context" + "fmt" + "math" "sync" "time" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/ordered" "github.com/stellar/go/xdr" ) // Ensure BufferedStorageBackend implements LedgerBackend var _ LedgerBackend = (*BufferedStorageBackend)(nil) +// provide testing hooks to inject mocks of these +var datastoreFactory = datastore.NewDataStore + type BufferedStorageBackendConfig struct { BufferSize uint32 `toml:"buffer_size"` NumWorkers uint32 `toml:"num_workers"` @@ -24,6 +32,39 @@ type BufferedStorageBackendConfig struct { RetryWait time.Duration `toml:"retry_wait"` } +// Generate a default buffered storage config with values +// set to optimize buffered performance to some degree based +// on number of ledgers per file expected in the underlying +// datastore used by an instance of BufferedStorageBackend. +// +// these numbers were derived empirically from benchmarking analysis: +// https://github.com/stellar/go/issues/5390 +// +// ledgersPerFile - number of ledgers per file from remote datastore schema. +// return - preconfigured instance of BufferedStorageBackendConfig +func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) BufferedStorageBackendConfig { + + config := BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + } + + switch { + case ledgersPerFile < 2: + config.BufferSize = 500 + config.NumWorkers = 5 + return config + case ledgersPerFile < 101: + config.BufferSize = 10 + config.NumWorkers = 5 + return config + default: + config.BufferSize = 10 + config.NumWorkers = 2 + return config + } +} + // BufferedStorageBackend is a ledger backend that reads from a storage service. // The storage service contains files generated from the ledgerExporter. type BufferedStorageBackend struct { @@ -64,6 +105,121 @@ func NewBufferedStorageBackend(config BufferedStorageBackendConfig, dataStore da return bsBackend, nil } +type PublisherConfig struct { + // Registry, optional, include to capture buffered storage backend metrics + Registry *prometheus.Registry + // RegistryNamespace, optional, include to emit buffered storage backend + // under this namespace + RegistryNamespace string + // BufferedStorageConfig, required + BufferedStorageConfig BufferedStorageBackendConfig + //DataStoreConfig, required + DataStoreConfig datastore.DataStoreConfig + // Log, optional, if nil uses go default logger + Log *log.Entry +} + +// PublishFromBufferedStorageBackend is asynchronous. +// Proceeds to create an internal instance of BufferedStorageBackend +// using provided configs and emit ledgers asynchronously to the provided +// callback fn for all ledgers in the requested range. +// +// ledgerRange - the requested range. If bounded range, will close resultCh +// after last ledger is emitted. +// +// publisherConfig - PublisherConfig. Provide configuration settings for DataStore +// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create +// optimized BufferedStorageBackendConfig. +// +// ctx - the context. Caller uses this to cancel the asynchronousledger processing. +// If caller does cancel, can sync on resultCh to receive an error to confirm +// all asynchronous processing stopped. +// +// callback - function. Invoked for every LedgerCloseMeta. If callback invocation +// returns an error, the publishing will shut down and indicate with error on resultCh. +// +// return - channel, used to signal to caller when publishing has stopped. +// If stoppage was due to an error, the error will be sent on +// channel and then closed. If no errors and ledgerRange is bounded, +// the channel will be closed when range is completed. If ledgerRange +// is unbounded, then the channel is never closed until an error +// or caller cancels. +func PublishFromBufferedStorageBackend(ledgerRange Range, + publisherConfig PublisherConfig, + ctx context.Context, + callback func(xdr.LedgerCloseMeta) error) chan error { + + logger := publisherConfig.Log + if logger == nil { + logger = log.DefaultLogger + } + resultCh := make(chan error, 1) + + go func() { + dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) + if err != nil { + resultCh <- fmt.Errorf("failed to create datastore: %w", err) + return + } + + var ledgerBackend LedgerBackend + ledgerBackend, err = NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) + if err != nil { + resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err) + return + } + + if publisherConfig.Registry != nil { + ledgerBackend = WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) + } + + if ledgerRange.bounded && ledgerRange.to <= ledgerRange.from { + resultCh <- errors.New("invalid end value for bounded range, must be greater than start") + return + } + + if !ledgerRange.bounded && ledgerRange.to > 0 { + resultCh <- errors.New("invalid end value for unbounded ranged, must be zero") + return + } + + from := ordered.Max(2, ledgerRange.from) + to := ledgerRange.to + if !ledgerRange.bounded { + to = math.MaxUint32 + } + + ledgerBackend.PrepareRange(ctx, ledgerRange) + + for ledgerSeq := from; ledgerSeq <= to; ledgerSeq++ { + var ledgerCloseMeta xdr.LedgerCloseMeta + + logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") + startTime := time.Now() + ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) + + if err != nil { + resultCh <- errors.Wrap(err, "error getting ledger") + return + } + + log.WithFields(log.F{ + "sequence": ledgerSeq, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") + + err = callback(ledgerCloseMeta) + if err != nil { + resultCh <- errors.Wrap(err, "received an error from callback invocation") + return + } + } + close(resultCh) + }() + + return resultCh +} + // GetLatestLedgerSequence returns the most recent ledger sequence number available in the buffer. func (bsb *BufferedStorageBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { bsb.bsBackendLock.RLock() diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index 6c95b465f7..08b77149b5 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -76,7 +77,7 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) readCloser = createLCMBatchReader(i, i, count) objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-i, i) } - mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil) + mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil).Times(1) } mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ LedgersPerFile: count, @@ -302,6 +303,286 @@ func TestBSBPrepareRange(t *testing.T) { assert.NotNil(t, bsb.prepared) } +func TestBSBProducerFn(t *testing.T) { + startLedger := uint32(2) + endLedger := uint32(3) + ctx := context.Background() + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + dsConfig := datastore.DataStoreConfig{} + pubConfig := PublisherConfig{ + DataStoreConfig: dsConfig, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + + // inject the mock datastore using the package private testing factory override + datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { + assert.Equal(t, datastoreConfig, dsConfig) + return mockDataStore, nil + } + + expectedLcmSeqWasPublished := []bool{false, false} + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 2 { + if expectedLcmSeqWasPublished[0] { + assert.Fail(t, "producer fn had multiple callback invocations for same lcm") + } + expectedLcmSeqWasPublished[0] = true + } + if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 3 { + if expectedLcmSeqWasPublished[1] { + assert.Fail(t, "producer fn had multiple callback invocations for same lcm") + } + expectedLcmSeqWasPublished[1] = true + } + return nil + } + + resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) + + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.Failf(t, "", "producer fn should not have stopped with error %v", chErr) + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) + + assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") + +} + +func TestBSBProducerFnDataStoreError(t *testing.T) { + ctx := context.Background() + ledgerRange := BoundedRange(uint32(2), uint32(3)) + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: BufferedStorageBackendConfig{}, + } + + datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { + return &datastore.MockDataStore{}, errors.New("uhoh") + } + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "failed to create datastore:") + } else { + assert.Fail(t, "", "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestBSBProducerFnConfigError(t *testing.T) { + ctx := context.Background() + ledgerRange := BoundedRange(uint32(2), uint32(3)) + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: BufferedStorageBackendConfig{}, + } + mockDataStore := new(datastore.MockDataStore) + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "failed to create buffered storage backend") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestBSBProducerFnInvalidRange(t *testing.T) { + ctx := context.Background() + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "invalid end value for bounded range, must be greater than start") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) + + resultCh = PublishFromBufferedStorageBackend(Range{from: uint32(2), to: uint32(3), bounded: false}, pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "invalid end value for unbounded ranged, must be zero") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestBSBProducerFnGetLedgerError(t *testing.T) { + ctx := context.Background() + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + // we don't want to wait for retries, forece the first error to propagate + pubConfig.BufferedStorageConfig.RetryLimit = 0 + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(nil, os.ErrNotExist).Once() + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(createLCMBatchReader(3, 3, 1), nil).Once() + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "error getting ledger") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3000, + time.Millisecond*50) +} + +func TestBSBProducerFnCallbackError(t *testing.T) { + ctx := context.Background() + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + mockDataStore := createMockdataStore(t, 2, 3, partitionSize, ledgerPerFileCount) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return errors.New("uhoh") + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "received an error from callback invocation") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestDefaultBSBConfigs(t *testing.T) { + smallConfig := BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 500, + NumWorkers: 5, + } + + mediumConfig := BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 10, + NumWorkers: 5, + } + + largeConfig := BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 10, + NumWorkers: 2, + } + + assert.Equal(t, DefaultBufferedStorageBackendConfig(1), smallConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(2), mediumConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(100), mediumConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(101), largeConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(1000), largeConfig) +} + func TestBSBIsPrepared_Bounded(t *testing.T) { startLedger := uint32(3) endLedger := uint32(5) diff --git a/support/datastore/mocks.go b/support/datastore/mocks.go index 2fa39a4712..a8c10438ab 100644 --- a/support/datastore/mocks.go +++ b/support/datastore/mocks.go @@ -29,7 +29,11 @@ func (m *MockDataStore) GetFileMetadata(ctx context.Context, path string) (map[s func (m *MockDataStore) GetFile(ctx context.Context, path string) (io.ReadCloser, error) { args := m.Called(ctx, path) - return args.Get(0).(io.ReadCloser), args.Error(1) + closer := (io.ReadCloser)(nil) + if args.Get(0) != nil { + closer = args.Get(0).(io.ReadCloser) + } + return closer, args.Error(1) } func (m *MockDataStore) PutFile(ctx context.Context, path string, in io.WriterTo, metadata map[string]string) error { From a26ff3dd0d879b052ec4386902140559dfdf8634 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Fri, 20 Sep 2024 09:25:51 -0700 Subject: [PATCH 02/14] #5412: updated changelog to reflect new function feature --- ingest/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index ed168de74e..5203957d28 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -2,6 +2,11 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## Pending + +### New Features +* Create functional producer for `BufferedStorageBackend`. `PublishFromBufferedStorageBackend` is a new function which can be used to assist in developing streaming data pipelines from pre-computed `LedgerCloseMeta` on a `DataStore`. The stream can use `PublishFromBufferedStorageBackend` as the origin of `LedgerCloseMeta`, providing a callback function as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462). + ### Stellar Core Protocol 21 Configuration Update: * BucketlistDB is now the default database for stellar-core, replacing the experimental option. As a result, the `EXPERIMENTAL_BUCKETLIST_DB` configuration parameter has been deprecated. * A new mandatory parameter, `DEPRECATED_SQL_LEDGER_STATE`, has been added with a default value of false which equivalent to `EXPERIMENTAL_BUCKETLIST_DB` being set to true. From cd7bb34d6a03a876cfc5c8ae164f80fc43b808d5 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 24 Sep 2024 22:05:46 -0700 Subject: [PATCH 03/14] #5412: moved the producer fn into new cdp package under ingest --- .../ledgerbackend/buffered_storage_backend.go | 140 --------- .../buffered_storage_backend_test.go | 281 ------------------ ingest/ledgerbackend/range.go | 12 + 3 files changed, 12 insertions(+), 421 deletions(-) diff --git a/ingest/ledgerbackend/buffered_storage_backend.go b/ingest/ledgerbackend/buffered_storage_backend.go index bc54bc8a6f..4bbf05a3c0 100644 --- a/ingest/ledgerbackend/buffered_storage_backend.go +++ b/ingest/ledgerbackend/buffered_storage_backend.go @@ -5,8 +5,6 @@ package ledgerbackend import ( "context" - "fmt" - "math" "sync" "time" @@ -15,16 +13,12 @@ import ( "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/log" - "github.com/stellar/go/support/ordered" "github.com/stellar/go/xdr" ) // Ensure BufferedStorageBackend implements LedgerBackend var _ LedgerBackend = (*BufferedStorageBackend)(nil) -// provide testing hooks to inject mocks of these -var datastoreFactory = datastore.NewDataStore - type BufferedStorageBackendConfig struct { BufferSize uint32 `toml:"buffer_size"` NumWorkers uint32 `toml:"num_workers"` @@ -32,39 +26,6 @@ type BufferedStorageBackendConfig struct { RetryWait time.Duration `toml:"retry_wait"` } -// Generate a default buffered storage config with values -// set to optimize buffered performance to some degree based -// on number of ledgers per file expected in the underlying -// datastore used by an instance of BufferedStorageBackend. -// -// these numbers were derived empirically from benchmarking analysis: -// https://github.com/stellar/go/issues/5390 -// -// ledgersPerFile - number of ledgers per file from remote datastore schema. -// return - preconfigured instance of BufferedStorageBackendConfig -func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) BufferedStorageBackendConfig { - - config := BufferedStorageBackendConfig{ - RetryLimit: 5, - RetryWait: 30 * time.Second, - } - - switch { - case ledgersPerFile < 2: - config.BufferSize = 500 - config.NumWorkers = 5 - return config - case ledgersPerFile < 101: - config.BufferSize = 10 - config.NumWorkers = 5 - return config - default: - config.BufferSize = 10 - config.NumWorkers = 2 - return config - } -} - // BufferedStorageBackend is a ledger backend that reads from a storage service. // The storage service contains files generated from the ledgerExporter. type BufferedStorageBackend struct { @@ -119,107 +80,6 @@ type PublisherConfig struct { Log *log.Entry } -// PublishFromBufferedStorageBackend is asynchronous. -// Proceeds to create an internal instance of BufferedStorageBackend -// using provided configs and emit ledgers asynchronously to the provided -// callback fn for all ledgers in the requested range. -// -// ledgerRange - the requested range. If bounded range, will close resultCh -// after last ledger is emitted. -// -// publisherConfig - PublisherConfig. Provide configuration settings for DataStore -// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create -// optimized BufferedStorageBackendConfig. -// -// ctx - the context. Caller uses this to cancel the asynchronousledger processing. -// If caller does cancel, can sync on resultCh to receive an error to confirm -// all asynchronous processing stopped. -// -// callback - function. Invoked for every LedgerCloseMeta. If callback invocation -// returns an error, the publishing will shut down and indicate with error on resultCh. -// -// return - channel, used to signal to caller when publishing has stopped. -// If stoppage was due to an error, the error will be sent on -// channel and then closed. If no errors and ledgerRange is bounded, -// the channel will be closed when range is completed. If ledgerRange -// is unbounded, then the channel is never closed until an error -// or caller cancels. -func PublishFromBufferedStorageBackend(ledgerRange Range, - publisherConfig PublisherConfig, - ctx context.Context, - callback func(xdr.LedgerCloseMeta) error) chan error { - - logger := publisherConfig.Log - if logger == nil { - logger = log.DefaultLogger - } - resultCh := make(chan error, 1) - - go func() { - dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) - if err != nil { - resultCh <- fmt.Errorf("failed to create datastore: %w", err) - return - } - - var ledgerBackend LedgerBackend - ledgerBackend, err = NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) - if err != nil { - resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err) - return - } - - if publisherConfig.Registry != nil { - ledgerBackend = WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) - } - - if ledgerRange.bounded && ledgerRange.to <= ledgerRange.from { - resultCh <- errors.New("invalid end value for bounded range, must be greater than start") - return - } - - if !ledgerRange.bounded && ledgerRange.to > 0 { - resultCh <- errors.New("invalid end value for unbounded ranged, must be zero") - return - } - - from := ordered.Max(2, ledgerRange.from) - to := ledgerRange.to - if !ledgerRange.bounded { - to = math.MaxUint32 - } - - ledgerBackend.PrepareRange(ctx, ledgerRange) - - for ledgerSeq := from; ledgerSeq <= to; ledgerSeq++ { - var ledgerCloseMeta xdr.LedgerCloseMeta - - logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") - startTime := time.Now() - ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) - - if err != nil { - resultCh <- errors.Wrap(err, "error getting ledger") - return - } - - log.WithFields(log.F{ - "sequence": ledgerSeq, - "duration": time.Since(startTime).Seconds(), - }).Info("Ledger returned from the backend") - - err = callback(ledgerCloseMeta) - if err != nil { - resultCh <- errors.Wrap(err, "received an error from callback invocation") - return - } - } - close(resultCh) - }() - - return resultCh -} - // GetLatestLedgerSequence returns the most recent ledger sequence number available in the buffer. func (bsb *BufferedStorageBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { bsb.bsBackendLock.RLock() diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index 08b77149b5..0d461cff07 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -303,286 +302,6 @@ func TestBSBPrepareRange(t *testing.T) { assert.NotNil(t, bsb.prepared) } -func TestBSBProducerFn(t *testing.T) { - startLedger := uint32(2) - endLedger := uint32(3) - ctx := context.Background() - ledgerRange := BoundedRange(startLedger, endLedger) - mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) - dsConfig := datastore.DataStoreConfig{} - pubConfig := PublisherConfig{ - DataStoreConfig: dsConfig, - BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), - } - - // inject the mock datastore using the package private testing factory override - datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { - assert.Equal(t, datastoreConfig, dsConfig) - return mockDataStore, nil - } - - expectedLcmSeqWasPublished := []bool{false, false} - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 2 { - if expectedLcmSeqWasPublished[0] { - assert.Fail(t, "producer fn had multiple callback invocations for same lcm") - } - expectedLcmSeqWasPublished[0] = true - } - if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 3 { - if expectedLcmSeqWasPublished[1] { - assert.Fail(t, "producer fn had multiple callback invocations for same lcm") - } - expectedLcmSeqWasPublished[1] = true - } - return nil - } - - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.Failf(t, "", "producer fn should not have stopped with error %v", chErr) - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) - - assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") - -} - -func TestBSBProducerFnDataStoreError(t *testing.T) { - ctx := context.Background() - ledgerRange := BoundedRange(uint32(2), uint32(3)) - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: BufferedStorageBackendConfig{}, - } - - datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { - return &datastore.MockDataStore{}, errors.New("uhoh") - } - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return nil - } - - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "failed to create datastore:") - } else { - assert.Fail(t, "", "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) -} - -func TestBSBProducerFnConfigError(t *testing.T) { - ctx := context.Background() - ledgerRange := BoundedRange(uint32(2), uint32(3)) - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: BufferedStorageBackendConfig{}, - } - mockDataStore := new(datastore.MockDataStore) - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return nil - } - - datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { - return mockDataStore, nil - } - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "failed to create buffered storage backend") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) -} - -func TestBSBProducerFnInvalidRange(t *testing.T) { - ctx := context.Background() - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), - } - mockDataStore := new(datastore.MockDataStore) - mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ - LedgersPerFile: 1, - FilesPerPartition: 1, - }) - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return nil - } - - datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { - return mockDataStore, nil - } - resultCh := PublishFromBufferedStorageBackend(BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "invalid end value for bounded range, must be greater than start") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) - - resultCh = PublishFromBufferedStorageBackend(Range{from: uint32(2), to: uint32(3), bounded: false}, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "invalid end value for unbounded ranged, must be zero") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) -} - -func TestBSBProducerFnGetLedgerError(t *testing.T) { - ctx := context.Background() - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), - } - // we don't want to wait for retries, forece the first error to propagate - pubConfig.BufferedStorageConfig.RetryLimit = 0 - mockDataStore := new(datastore.MockDataStore) - mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ - LedgersPerFile: 1, - FilesPerPartition: 1, - }) - mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(nil, os.ErrNotExist).Once() - mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(createLCMBatchReader(3, 3, 1), nil).Once() - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return nil - } - - datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { - return mockDataStore, nil - } - resultCh := PublishFromBufferedStorageBackend(BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "error getting ledger") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3000, - time.Millisecond*50) -} - -func TestBSBProducerFnCallbackError(t *testing.T) { - ctx := context.Background() - pubConfig := PublisherConfig{ - DataStoreConfig: datastore.DataStoreConfig{}, - BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), - } - mockDataStore := createMockdataStore(t, 2, 3, partitionSize, ledgerPerFileCount) - - appCallback := func(lcm xdr.LedgerCloseMeta) error { - return errors.New("uhoh") - } - - datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { - return mockDataStore, nil - } - resultCh := PublishFromBufferedStorageBackend(BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "received an error from callback invocation") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) -} - -func TestDefaultBSBConfigs(t *testing.T) { - smallConfig := BufferedStorageBackendConfig{ - RetryLimit: 5, - RetryWait: 30 * time.Second, - BufferSize: 500, - NumWorkers: 5, - } - - mediumConfig := BufferedStorageBackendConfig{ - RetryLimit: 5, - RetryWait: 30 * time.Second, - BufferSize: 10, - NumWorkers: 5, - } - - largeConfig := BufferedStorageBackendConfig{ - RetryLimit: 5, - RetryWait: 30 * time.Second, - BufferSize: 10, - NumWorkers: 2, - } - - assert.Equal(t, DefaultBufferedStorageBackendConfig(1), smallConfig) - assert.Equal(t, DefaultBufferedStorageBackendConfig(2), mediumConfig) - assert.Equal(t, DefaultBufferedStorageBackendConfig(100), mediumConfig) - assert.Equal(t, DefaultBufferedStorageBackendConfig(101), largeConfig) - assert.Equal(t, DefaultBufferedStorageBackendConfig(1000), largeConfig) -} - func TestBSBIsPrepared_Bounded(t *testing.T) { startLedger := uint32(3) endLedger := uint32(5) diff --git a/ingest/ledgerbackend/range.go b/ingest/ledgerbackend/range.go index f0c80695a1..99b4dfc800 100644 --- a/ingest/ledgerbackend/range.go +++ b/ingest/ledgerbackend/range.go @@ -46,6 +46,18 @@ func (r Range) String() string { return fmt.Sprintf("[%d,latest)", r.from) } +func (r Range) Bounded() bool { + return r.bounded +} + +func (r Range) To() uint32 { + return r.to +} + +func (r Range) From() uint32 { + return r.from +} + func (r Range) Contains(other Range) bool { if r.bounded && !other.bounded { return false From 3414d24bc37f843c19342938923f56c6e4d47f75 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 24 Sep 2024 22:07:04 -0700 Subject: [PATCH 04/14] #5412: forgot to include new files on last commit --- ingest/cdp/producer.go | 152 ++++++++++++++++ ingest/cdp/producer_test.go | 337 ++++++++++++++++++++++++++++++++++++ 2 files changed, 489 insertions(+) create mode 100644 ingest/cdp/producer.go create mode 100644 ingest/cdp/producer_test.go diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go new file mode 100644 index 0000000000..05178b1164 --- /dev/null +++ b/ingest/cdp/producer.go @@ -0,0 +1,152 @@ +package cdp + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/ordered" + "github.com/stellar/go/xdr" +) + +// provide testing hooks to inject mocks of these +var datastoreFactory = datastore.NewDataStore + +// Generate a default buffered storage config with values +// set to optimize buffered performance to some degree based +// on number of ledgers per file expected in the underlying +// datastore used by an instance of BufferedStorageBackend. +// +// these numbers were derived empirically from benchmarking analysis: +// https://github.com/stellar/go/issues/5390 +// +// ledgersPerFile - number of ledgers per file from remote datastore schema. +// return - preconfigured instance of BufferedStorageBackendConfig +func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.BufferedStorageBackendConfig { + + config := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + } + + switch { + case ledgersPerFile < 2: + config.BufferSize = 500 + config.NumWorkers = 5 + return config + case ledgersPerFile < 101: + config.BufferSize = 10 + config.NumWorkers = 5 + return config + default: + config.BufferSize = 10 + config.NumWorkers = 2 + return config + } +} + +// PublishFromBufferedStorageBackend is asynchronous. +// Proceeds to create an internal instance of BufferedStorageBackend +// using provided configs and emit ledgers asynchronously to the provided +// callback fn for all ledgers in the requested range. +// +// ledgerRange - the requested range. If bounded range, will close resultCh +// after last ledger is emitted. +// +// publisherConfig - PublisherConfig. Provide configuration settings for DataStore +// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create +// optimized BufferedStorageBackendConfig. +// +// ctx - the context. Caller uses this to cancel the asynchronousledger processing. +// If caller does cancel, can sync on resultCh to receive an error to confirm +// all asynchronous processing stopped. +// +// callback - function. Invoked for every LedgerCloseMeta. If callback invocation +// returns an error, the publishing will shut down and indicate with error on resultCh. +// +// return - channel, used to signal to caller when publishing has stopped. +// If stoppage was due to an error, the error will be sent on +// channel and then closed. If no errors and ledgerRange is bounded, +// the channel will be closed when range is completed. If ledgerRange +// is unbounded, then the channel is never closed until an error +// or caller cancels. +func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, + publisherConfig ledgerbackend.PublisherConfig, + ctx context.Context, + callback func(xdr.LedgerCloseMeta) error) chan error { + + logger := publisherConfig.Log + if logger == nil { + logger = log.DefaultLogger + } + resultCh := make(chan error, 1) + + go func() { + dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) + if err != nil { + resultCh <- fmt.Errorf("failed to create datastore: %w", err) + return + } + + var ledgerBackend ledgerbackend.LedgerBackend + ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) + if err != nil { + resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err) + return + } + + if publisherConfig.Registry != nil { + ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) + } + + if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() { + resultCh <- errors.New("invalid end value for bounded range, must be greater than start") + return + } + + if !ledgerRange.Bounded() && ledgerRange.To() > 0 { + resultCh <- errors.New("invalid end value for unbounded ranged, must be zero") + return + } + + from := ordered.Max(2, ledgerRange.From()) + to := ledgerRange.To() + if !ledgerRange.Bounded() { + to = math.MaxUint32 + } + + ledgerBackend.PrepareRange(ctx, ledgerRange) + + for ledgerSeq := from; ledgerSeq <= to; ledgerSeq++ { + var ledgerCloseMeta xdr.LedgerCloseMeta + + logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") + startTime := time.Now() + ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) + + if err != nil { + resultCh <- errors.Wrap(err, "error getting ledger") + return + } + + log.WithFields(log.F{ + "sequence": ledgerSeq, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") + + err = callback(ledgerCloseMeta) + if err != nil { + resultCh <- errors.Wrap(err, "received an error from callback invocation") + return + } + } + close(resultCh) + }() + + return resultCh +} diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go new file mode 100644 index 0000000000..d783d1209a --- /dev/null +++ b/ingest/cdp/producer_test.go @@ -0,0 +1,337 @@ +package cdp + +import ( + "bytes" + "context" + "fmt" + "io" + "math" + "os" + "testing" + "time" + + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/compressxdr" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestDefaultBSBConfigs(t *testing.T) { + smallConfig := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 500, + NumWorkers: 5, + } + + mediumConfig := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 10, + NumWorkers: 5, + } + + largeConfig := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 10, + NumWorkers: 2, + } + + assert.Equal(t, DefaultBufferedStorageBackendConfig(1), smallConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(2), mediumConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(100), mediumConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(101), largeConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(1000), largeConfig) +} + +func TestBSBProducerFn(t *testing.T) { + startLedger := uint32(2) + endLedger := uint32(3) + ctx := context.Background() + ledgerRange := ledgerbackend.BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, 64000) + dsConfig := datastore.DataStoreConfig{} + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: dsConfig, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + + // inject the mock datastore using the package private testing factory override + datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { + assert.Equal(t, datastoreConfig, dsConfig) + return mockDataStore, nil + } + + expectedLcmSeqWasPublished := []bool{false, false} + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 2 { + if expectedLcmSeqWasPublished[0] { + assert.Fail(t, "producer fn had multiple callback invocations for same lcm") + } + expectedLcmSeqWasPublished[0] = true + } + if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 3 { + if expectedLcmSeqWasPublished[1] { + assert.Fail(t, "producer fn had multiple callback invocations for same lcm") + } + expectedLcmSeqWasPublished[1] = true + } + return nil + } + + resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) + + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.Failf(t, "", "producer fn should not have stopped with error %v", chErr) + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) + + assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") + +} + +func TestBSBProducerFnDataStoreError(t *testing.T) { + ctx := context.Background() + ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3)) + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{}, + } + + datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { + return &datastore.MockDataStore{}, errors.New("uhoh") + } + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "failed to create datastore:") + } else { + assert.Fail(t, "", "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestBSBProducerFnConfigError(t *testing.T) { + ctx := context.Background() + ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3)) + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{}, + } + mockDataStore := new(datastore.MockDataStore) + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "failed to create buffered storage backend") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestBSBProducerFnInvalidRange(t *testing.T) { + ctx := context.Background() + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "invalid end value for bounded range, must be greater than start") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func TestBSBProducerFnGetLedgerError(t *testing.T) { + ctx := context.Background() + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + // we don't want to wait for retries, forece the first error to propagate + pubConfig.BufferedStorageConfig.RetryLimit = 0 + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(nil, os.ErrNotExist).Once() + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil).Once() + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "error getting ledger") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3000, + time.Millisecond*50) +} + +func TestBSBProducerFnCallbackError(t *testing.T) { + ctx := context.Background() + pubConfig := ledgerbackend.PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + mockDataStore := createMockdataStore(t, 2, 3, 64000) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return errors.New("uhoh") + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) + assert.Eventually(t, func() bool { + select { + case chErr, ok := <-resultCh: + if ok { + assert.ErrorContains(t, chErr, "received an error from callback invocation") + } else { + assert.Fail(t, "producer fn should not have closed the result ch") + } + return true + default: + } + return false + }, + time.Second*3, + time.Millisecond*50) +} + +func createMockdataStore(t *testing.T, start, end, partitionSize uint32) *datastore.MockDataStore { + mockDataStore := new(datastore.MockDataStore) + partition := partitionSize - 1 + for i := start; i <= end; i++ { + objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-i, i) + mockDataStore.On("GetFile", mock.Anything, objectName).Return(makeSingleLCMBatch(i), nil).Times(1) + } + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: partitionSize, + }) + + t.Cleanup(func() { + mockDataStore.AssertExpectations(t) + }) + + return mockDataStore +} + +func makeSingleLCMBatch(seq uint32) io.ReadCloser { + lcm := xdr.LedgerCloseMetaBatch{ + StartSequence: xdr.Uint32(seq), + EndSequence: xdr.Uint32(seq), + LedgerCloseMetas: []xdr.LedgerCloseMeta{ + createLedgerCloseMeta(seq), + }, + } + encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, lcm) + var buf bytes.Buffer + encoder.WriteTo(&buf) + capturedBuf := buf.Bytes() + reader := bytes.NewReader(capturedBuf) + return io.NopCloser(reader) +} + +func createLedgerCloseMeta(ledgerSeq uint32) xdr.LedgerCloseMeta { + return xdr.LedgerCloseMeta{ + V: int32(0), + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(ledgerSeq), + }, + }, + TxSet: xdr.TransactionSet{}, + TxProcessing: nil, + UpgradesProcessing: nil, + ScpInfo: nil, + }, + V1: nil, + } +} From 68f7f430136861944bf0d504df274a1c932b7cb8 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 24 Sep 2024 22:18:24 -0700 Subject: [PATCH 05/14] #5412: moved PublisherConfig to cdp package --- ingest/cdp/producer.go | 17 ++++++++++++++++- ingest/cdp/producer_test.go | 12 ++++++------ .../ledgerbackend/buffered_storage_backend.go | 16 ---------------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go index 05178b1164..24041c58ad 100644 --- a/ingest/cdp/producer.go +++ b/ingest/cdp/producer.go @@ -6,6 +6,7 @@ import ( "math" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/errors" @@ -50,6 +51,20 @@ func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.Bu } } +type PublisherConfig struct { + // Registry, optional, include to capture buffered storage backend metrics + Registry *prometheus.Registry + // RegistryNamespace, optional, include to emit buffered storage backend + // under this namespace + RegistryNamespace string + // BufferedStorageConfig, required + BufferedStorageConfig ledgerbackend.BufferedStorageBackendConfig + //DataStoreConfig, required + DataStoreConfig datastore.DataStoreConfig + // Log, optional, if nil uses go default logger + Log *log.Entry +} + // PublishFromBufferedStorageBackend is asynchronous. // Proceeds to create an internal instance of BufferedStorageBackend // using provided configs and emit ledgers asynchronously to the provided @@ -76,7 +91,7 @@ func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.Bu // is unbounded, then the channel is never closed until an error // or caller cancels. func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, - publisherConfig ledgerbackend.PublisherConfig, + publisherConfig PublisherConfig, ctx context.Context, callback func(xdr.LedgerCloseMeta) error) chan error { diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index d783d1209a..96d764cb0f 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -55,7 +55,7 @@ func TestBSBProducerFn(t *testing.T) { ledgerRange := ledgerbackend.BoundedRange(startLedger, endLedger) mockDataStore := createMockdataStore(t, startLedger, endLedger, 64000) dsConfig := datastore.DataStoreConfig{} - pubConfig := ledgerbackend.PublisherConfig{ + pubConfig := PublisherConfig{ DataStoreConfig: dsConfig, BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), } @@ -107,7 +107,7 @@ func TestBSBProducerFn(t *testing.T) { func TestBSBProducerFnDataStoreError(t *testing.T) { ctx := context.Background() ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3)) - pubConfig := ledgerbackend.PublisherConfig{ + pubConfig := PublisherConfig{ DataStoreConfig: datastore.DataStoreConfig{}, BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{}, } @@ -141,7 +141,7 @@ func TestBSBProducerFnDataStoreError(t *testing.T) { func TestBSBProducerFnConfigError(t *testing.T) { ctx := context.Background() ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3)) - pubConfig := ledgerbackend.PublisherConfig{ + pubConfig := PublisherConfig{ DataStoreConfig: datastore.DataStoreConfig{}, BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{}, } @@ -173,7 +173,7 @@ func TestBSBProducerFnConfigError(t *testing.T) { func TestBSBProducerFnInvalidRange(t *testing.T) { ctx := context.Background() - pubConfig := ledgerbackend.PublisherConfig{ + pubConfig := PublisherConfig{ DataStoreConfig: datastore.DataStoreConfig{}, BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), } @@ -210,7 +210,7 @@ func TestBSBProducerFnInvalidRange(t *testing.T) { func TestBSBProducerFnGetLedgerError(t *testing.T) { ctx := context.Background() - pubConfig := ledgerbackend.PublisherConfig{ + pubConfig := PublisherConfig{ DataStoreConfig: datastore.DataStoreConfig{}, BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), } @@ -252,7 +252,7 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { func TestBSBProducerFnCallbackError(t *testing.T) { ctx := context.Background() - pubConfig := ledgerbackend.PublisherConfig{ + pubConfig := PublisherConfig{ DataStoreConfig: datastore.DataStoreConfig{}, BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), } diff --git a/ingest/ledgerbackend/buffered_storage_backend.go b/ingest/ledgerbackend/buffered_storage_backend.go index 4bbf05a3c0..aa70336295 100644 --- a/ingest/ledgerbackend/buffered_storage_backend.go +++ b/ingest/ledgerbackend/buffered_storage_backend.go @@ -9,10 +9,8 @@ import ( "time" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/support/datastore" - "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -66,20 +64,6 @@ func NewBufferedStorageBackend(config BufferedStorageBackendConfig, dataStore da return bsBackend, nil } -type PublisherConfig struct { - // Registry, optional, include to capture buffered storage backend metrics - Registry *prometheus.Registry - // RegistryNamespace, optional, include to emit buffered storage backend - // under this namespace - RegistryNamespace string - // BufferedStorageConfig, required - BufferedStorageConfig BufferedStorageBackendConfig - //DataStoreConfig, required - DataStoreConfig datastore.DataStoreConfig - // Log, optional, if nil uses go default logger - Log *log.Entry -} - // GetLatestLedgerSequence returns the most recent ledger sequence number available in the buffer. func (bsb *BufferedStorageBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { bsb.bsBackendLock.RLock() From 4876f2e984396f45a25e7d879ea9ade7de20efe7 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 26 Sep 2024 13:54:25 -0700 Subject: [PATCH 06/14] #5412: review feedback --- ingest/cdp/producer.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go index 24041c58ad..d9395fbfbc 100644 --- a/ingest/cdp/producer.go +++ b/ingest/cdp/producer.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/datastore" - "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" "github.com/stellar/go/support/ordered" "github.com/stellar/go/xdr" @@ -102,6 +101,7 @@ func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, resultCh := make(chan error, 1) go func() { + defer close(resultCh) dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) if err != nil { resultCh <- fmt.Errorf("failed to create datastore: %w", err) @@ -120,12 +120,12 @@ func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, } if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() { - resultCh <- errors.New("invalid end value for bounded range, must be greater than start") + resultCh <- fmt.Errorf("invalid end value for bounded range, must be greater than start") return } if !ledgerRange.Bounded() && ledgerRange.To() > 0 { - resultCh <- errors.New("invalid end value for unbounded ranged, must be zero") + resultCh <- fmt.Errorf("invalid end value for unbounded range, must be zero") return } @@ -145,7 +145,7 @@ func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) if err != nil { - resultCh <- errors.Wrap(err, "error getting ledger") + resultCh <- fmt.Errorf("error getting ledger, %w", err) return } @@ -156,11 +156,10 @@ func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, err = callback(ledgerCloseMeta) if err != nil { - resultCh <- errors.Wrap(err, "received an error from callback invocation") + resultCh <- fmt.Errorf("received an error from callback invocation: %w", err) return } } - close(resultCh) }() return resultCh From 94285c0aadc49892eaadf81a2d4625c01e18d5d9 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 26 Sep 2024 14:31:42 -0700 Subject: [PATCH 07/14] #5412: review feedback on loop logic --- ingest/cdp/producer.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go index d9395fbfbc..44cc7d3a6d 100644 --- a/ingest/cdp/producer.go +++ b/ingest/cdp/producer.go @@ -3,7 +3,6 @@ package cdp import ( "context" "fmt" - "math" "time" "github.com/prometheus/client_golang/prometheus" @@ -130,14 +129,9 @@ func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, } from := ordered.Max(2, ledgerRange.From()) - to := ledgerRange.To() - if !ledgerRange.Bounded() { - to = math.MaxUint32 - } - ledgerBackend.PrepareRange(ctx, ledgerRange) - for ledgerSeq := from; ledgerSeq <= to; ledgerSeq++ { + for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ { var ledgerCloseMeta xdr.LedgerCloseMeta logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") From 8c3f694b3eda98f4116eb17dbbf90b8c686f9ad2 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Fri, 27 Sep 2024 11:35:21 -0700 Subject: [PATCH 08/14] #5412: converted producer function to sync signature, per review feedback on api best practice --- ingest/cdp/producer.go | 117 ++++++++++++++++-------------------- ingest/cdp/producer_test.go | 114 ++++++----------------------------- 2 files changed, 69 insertions(+), 162 deletions(-) diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go index 44cc7d3a6d..ef863e0a55 100644 --- a/ingest/cdp/producer.go +++ b/ingest/cdp/producer.go @@ -63,98 +63,85 @@ type PublisherConfig struct { Log *log.Entry } -// PublishFromBufferedStorageBackend is asynchronous. -// Proceeds to create an internal instance of BufferedStorageBackend -// using provided configs and emit ledgers asynchronously to the provided -// callback fn for all ledgers in the requested range. +// PublishFromBufferedStorageBackend - create an internal instance +// of BufferedStorageBackend using provided config and emit +// ledger metadata for the requested range by invoking the provided callback +// once per ledger. // -// ledgerRange - the requested range. If bounded range, will close resultCh -// after last ledger is emitted. +// The function is blocking, it will only return when a bounded range +// is completed, the ctx is canceled, or an error occurs. +// +// ledgerRange - the requested range, can be bounded or unbounded. // // publisherConfig - PublisherConfig. Provide configuration settings for DataStore // and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create // optimized BufferedStorageBackendConfig. // -// ctx - the context. Caller uses this to cancel the asynchronousledger processing. -// If caller does cancel, can sync on resultCh to receive an error to confirm -// all asynchronous processing stopped. +// ctx - the context. Caller uses this to cancel the internal ledger processing, +// when canceled, the function will return asap with that error. // // callback - function. Invoked for every LedgerCloseMeta. If callback invocation -// returns an error, the publishing will shut down and indicate with error on resultCh. +// returns an error, the processing will stop and return an error asap. // -// return - channel, used to signal to caller when publishing has stopped. -// If stoppage was due to an error, the error will be sent on -// channel and then closed. If no errors and ledgerRange is bounded, -// the channel will be closed when range is completed. If ledgerRange -// is unbounded, then the channel is never closed until an error -// or caller cancels. +// return - error, function only returns if requested range is bounded or an error occured. +// nil will be returned only if bounded range requested and completed processing with no errors. +// otherwise return will always be an error. func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, publisherConfig PublisherConfig, ctx context.Context, - callback func(xdr.LedgerCloseMeta) error) chan error { + callback func(xdr.LedgerCloseMeta) error) error { logger := publisherConfig.Log if logger == nil { logger = log.DefaultLogger } - resultCh := make(chan error, 1) - go func() { - defer close(resultCh) - dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) - if err != nil { - resultCh <- fmt.Errorf("failed to create datastore: %w", err) - return - } + dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) + if err != nil { + return fmt.Errorf("failed to create datastore: %w", err) + } - var ledgerBackend ledgerbackend.LedgerBackend - ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) - if err != nil { - resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err) - return - } + var ledgerBackend ledgerbackend.LedgerBackend + ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) + if err != nil { + return fmt.Errorf("failed to create buffered storage backend: %w", err) + } - if publisherConfig.Registry != nil { - ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) - } + if publisherConfig.Registry != nil { + ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) + } - if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() { - resultCh <- fmt.Errorf("invalid end value for bounded range, must be greater than start") - return - } + if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() { + return fmt.Errorf("invalid end value for bounded range, must be greater than start") + } - if !ledgerRange.Bounded() && ledgerRange.To() > 0 { - resultCh <- fmt.Errorf("invalid end value for unbounded range, must be zero") - return - } + if !ledgerRange.Bounded() && ledgerRange.To() > 0 { + return fmt.Errorf("invalid end value for unbounded range, must be zero") + } - from := ordered.Max(2, ledgerRange.From()) - ledgerBackend.PrepareRange(ctx, ledgerRange) + from := ordered.Max(2, ledgerRange.From()) + ledgerBackend.PrepareRange(ctx, ledgerRange) - for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ { - var ledgerCloseMeta xdr.LedgerCloseMeta + for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ { + var ledgerCloseMeta xdr.LedgerCloseMeta - logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") - startTime := time.Now() - ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) + logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") + startTime := time.Now() + ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) - if err != nil { - resultCh <- fmt.Errorf("error getting ledger, %w", err) - return - } + if err != nil { + return fmt.Errorf("error getting ledger, %w", err) + } - log.WithFields(log.F{ - "sequence": ledgerSeq, - "duration": time.Since(startTime).Seconds(), - }).Info("Ledger returned from the backend") + log.WithFields(log.F{ + "sequence": ledgerSeq, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") - err = callback(ledgerCloseMeta) - if err != nil { - resultCh <- fmt.Errorf("received an error from callback invocation: %w", err) - return - } + err = callback(ledgerCloseMeta) + if err != nil { + return fmt.Errorf("received an error from callback invocation: %w", err) } - }() - - return resultCh + } + return nil } diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index 96d764cb0f..1b0dfa98f4 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -84,24 +84,8 @@ func TestBSBProducerFn(t *testing.T) { return nil } - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.Failf(t, "", "producer fn should not have stopped with error %v", chErr) - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) - + assert.Nil(t, PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback)) assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") - } func TestBSBProducerFnDataStoreError(t *testing.T) { @@ -120,22 +104,9 @@ func TestBSBProducerFnDataStoreError(t *testing.T) { return nil } - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "failed to create datastore:") - } else { - assert.Fail(t, "", "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback), + "failed to create datastore:") } func TestBSBProducerFnConfigError(t *testing.T) { @@ -153,22 +124,9 @@ func TestBSBProducerFnConfigError(t *testing.T) { datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { return mockDataStore, nil } - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "failed to create buffered storage backend") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback), + "failed to create buffered storage backend") } func TestBSBProducerFnInvalidRange(t *testing.T) { @@ -190,22 +148,10 @@ func TestBSBProducerFnInvalidRange(t *testing.T) { datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { return mockDataStore, nil } - resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "invalid end value for bounded range, must be greater than start") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) + + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback), + "invalid end value for bounded range, must be greater than start") } func TestBSBProducerFnGetLedgerError(t *testing.T) { @@ -232,22 +178,9 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { return mockDataStore, nil } - resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "error getting ledger") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3000, - time.Millisecond*50) + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + "error getting ledger") } func TestBSBProducerFnCallbackError(t *testing.T) { @@ -265,22 +198,9 @@ func TestBSBProducerFnCallbackError(t *testing.T) { datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { return mockDataStore, nil } - resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "received an error from callback invocation") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + "received an error from callback invocation") } func createMockdataStore(t *testing.T, start, end, partitionSize uint32) *datastore.MockDataStore { From f1e9d27fe4f7f3b55d1b96a61067f4b226a37248 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Fri, 27 Sep 2024 11:48:12 -0700 Subject: [PATCH 09/14] #5412: add unit test to assert caller ctx cancellation outcome --- ingest/cdp/producer_test.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index 1b0dfa98f4..eec0d77ecc 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -160,7 +160,7 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { DataStoreConfig: datastore.DataStoreConfig{}, BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), } - // we don't want to wait for retries, forece the first error to propagate + // we don't want to wait for retries, force the first error to propagate pubConfig.BufferedStorageConfig.RetryLimit = 0 mockDataStore := new(datastore.MockDataStore) mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ @@ -183,6 +183,28 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { "error getting ledger") } +func TestBSBProducerCallerCancelsCtx(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + mockDataStore := createMockdataStore(t, 2, 3, 64000) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + cancel() + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + assert.ErrorIs(t, + PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + context.Canceled) +} + func TestBSBProducerFnCallbackError(t *testing.T) { ctx := context.Background() pubConfig := PublisherConfig{ From 952d4d67a851dcc9d223a254ab75a87456f40e1f Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 30 Sep 2024 11:44:43 -0700 Subject: [PATCH 10/14] #5412: fixed unit test for producer caller cancel ctx --- ingest/cdp/producer_test.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index eec0d77ecc..2992a2a33b 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -127,6 +127,7 @@ func TestBSBProducerFnConfigError(t *testing.T) { assert.ErrorContains(t, PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback), "failed to create buffered storage backend") + mockDataStore.AssertExpectations(t) } func TestBSBProducerFnInvalidRange(t *testing.T) { @@ -152,6 +153,7 @@ func TestBSBProducerFnInvalidRange(t *testing.T) { assert.ErrorContains(t, PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback), "invalid end value for bounded range, must be greater than start") + mockDataStore.AssertExpectations(t) } func TestBSBProducerFnGetLedgerError(t *testing.T) { @@ -181,6 +183,8 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { assert.ErrorContains(t, PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), "error getting ledger") + + mockDataStore.AssertExpectations(t) } func TestBSBProducerCallerCancelsCtx(t *testing.T) { @@ -189,7 +193,18 @@ func TestBSBProducerCallerCancelsCtx(t *testing.T) { DataStoreConfig: datastore.DataStoreConfig{}, BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), } - mockDataStore := createMockdataStore(t, 2, 3, 64000) + + // the buffering runs async, test needs to stub datastore methods for potential invocation, + // but is race, since test also cancels the backend context which started the buffer, + // so, not deterministic, no assert + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(makeSingleLCMBatch(2), nil) + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil) appCallback := func(lcm xdr.LedgerCloseMeta) error { return nil @@ -230,7 +245,7 @@ func createMockdataStore(t *testing.T, start, end, partitionSize uint32) *datast partition := partitionSize - 1 for i := start; i <= end; i++ { objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-i, i) - mockDataStore.On("GetFile", mock.Anything, objectName).Return(makeSingleLCMBatch(i), nil).Times(1) + mockDataStore.On("GetFile", mock.Anything, objectName).Return(makeSingleLCMBatch(i), nil).Once() } mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ LedgersPerFile: 1, From 4447f6857c6a3662c1ca7fc94740c2a872e973f9 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 30 Sep 2024 12:35:13 -0700 Subject: [PATCH 11/14] #5412: fixed unit test for producer get ledger error case --- ingest/cdp/producer_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index 2992a2a33b..546e869b83 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -162,7 +162,7 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { DataStoreConfig: datastore.DataStoreConfig{}, BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), } - // we don't want to wait for retries, force the first error to propagate + // we don't want to let buffer do real retries, force the first error to propagate pubConfig.BufferedStorageConfig.RetryLimit = 0 mockDataStore := new(datastore.MockDataStore) mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ @@ -171,7 +171,9 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { }) mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(nil, os.ErrNotExist).Once() - mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil).Once() + // since buffer is multi-worker async, it may get to this on other worker, but not deterministic, + // don't assert on it + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil) appCallback := func(lcm xdr.LedgerCloseMeta) error { return nil From 03cd4b60de5a96207edd3dbe44686425f778eb71 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 30 Sep 2024 12:54:40 -0700 Subject: [PATCH 12/14] #5412: fixed unit test for producer get ledger error case, mock asserts --- ingest/cdp/producer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index 546e869b83..1e93955052 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -173,7 +173,7 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(nil, os.ErrNotExist).Once() // since buffer is multi-worker async, it may get to this on other worker, but not deterministic, // don't assert on it - mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil) + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil).Maybe() appCallback := func(lcm xdr.LedgerCloseMeta) error { return nil @@ -198,7 +198,7 @@ func TestBSBProducerCallerCancelsCtx(t *testing.T) { // the buffering runs async, test needs to stub datastore methods for potential invocation, // but is race, since test also cancels the backend context which started the buffer, - // so, not deterministic, no assert + // so, not deterministic, no assert on these. mockDataStore := new(datastore.MockDataStore) mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ LedgersPerFile: 1, From d25a506ea0b31072c5f871604252ab08fd5bf76e Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 30 Sep 2024 14:21:12 -0700 Subject: [PATCH 13/14] #5412: included changelog on new ingest/cdp package --- ingest/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index 5203957d28..04fb9b7bbe 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -5,7 +5,8 @@ All notable changes to this project will be documented in this file. This projec ## Pending ### New Features -* Create functional producer for `BufferedStorageBackend`. `PublishFromBufferedStorageBackend` is a new function which can be used to assist in developing streaming data pipelines from pre-computed `LedgerCloseMeta` on a `DataStore`. The stream can use `PublishFromBufferedStorageBackend` as the origin of `LedgerCloseMeta`, providing a callback function as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462). +* Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform). +* Add new functional producer, `cdp.PublishFromBufferedStorageBackend`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `PublishFromBufferedStorageBackend` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462). ### Stellar Core Protocol 21 Configuration Update: * BucketlistDB is now the default database for stellar-core, replacing the experimental option. As a result, the `EXPERIMENTAL_BUCKETLIST_DB` configuration parameter has been deprecated. From 1b1244bd9181e69a47a3389ea1c668ea2297e366 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 3 Oct 2024 09:50:23 -0700 Subject: [PATCH 14/14] #5412: renamed the producer fn to ApplyLedgerMetadata --- ingest/CHANGELOG.md | 2 +- ingest/cdp/producer.go | 6 +++--- ingest/cdp/producer_test.go | 14 +++++++------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index 04fb9b7bbe..3d3835a799 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -6,7 +6,7 @@ All notable changes to this project will be documented in this file. This projec ### New Features * Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform). -* Add new functional producer, `cdp.PublishFromBufferedStorageBackend`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `PublishFromBufferedStorageBackend` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462). +* Add new functional producer, `cdp.ApplyLedgerMetadata`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `ApplyLedgerMetadata` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462). ### Stellar Core Protocol 21 Configuration Update: * BucketlistDB is now the default database for stellar-core, replacing the experimental option. As a result, the `EXPERIMENTAL_BUCKETLIST_DB` configuration parameter has been deprecated. diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go index ef863e0a55..0c5ff6de49 100644 --- a/ingest/cdp/producer.go +++ b/ingest/cdp/producer.go @@ -63,8 +63,8 @@ type PublisherConfig struct { Log *log.Entry } -// PublishFromBufferedStorageBackend - create an internal instance -// of BufferedStorageBackend using provided config and emit +// ApplyLedgerMetadata - creates an internal instance +// of BufferedStorageBackend using provided config and emits // ledger metadata for the requested range by invoking the provided callback // once per ledger. // @@ -86,7 +86,7 @@ type PublisherConfig struct { // return - error, function only returns if requested range is bounded or an error occured. // nil will be returned only if bounded range requested and completed processing with no errors. // otherwise return will always be an error. -func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, +func ApplyLedgerMetadata(ledgerRange ledgerbackend.Range, publisherConfig PublisherConfig, ctx context.Context, callback func(xdr.LedgerCloseMeta) error) error { diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index 1e93955052..2c1b39c247 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -84,7 +84,7 @@ func TestBSBProducerFn(t *testing.T) { return nil } - assert.Nil(t, PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback)) + assert.Nil(t, ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback)) assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") } @@ -105,7 +105,7 @@ func TestBSBProducerFnDataStoreError(t *testing.T) { } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback), "failed to create datastore:") } @@ -125,7 +125,7 @@ func TestBSBProducerFnConfigError(t *testing.T) { return mockDataStore, nil } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback), "failed to create buffered storage backend") mockDataStore.AssertExpectations(t) } @@ -151,7 +151,7 @@ func TestBSBProducerFnInvalidRange(t *testing.T) { } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback), "invalid end value for bounded range, must be greater than start") mockDataStore.AssertExpectations(t) } @@ -183,7 +183,7 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { return mockDataStore, nil } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), "error getting ledger") mockDataStore.AssertExpectations(t) @@ -218,7 +218,7 @@ func TestBSBProducerCallerCancelsCtx(t *testing.T) { return mockDataStore, nil } assert.ErrorIs(t, - PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), context.Canceled) } @@ -238,7 +238,7 @@ func TestBSBProducerFnCallbackError(t *testing.T) { return mockDataStore, nil } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), "received an error from callback invocation") }