Skip to content

Commit

Permalink
Merge branch 'master' into reap-query
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms authored Jul 23, 2024
2 parents be19207 + 2674e20 commit 775c8fd
Show file tree
Hide file tree
Showing 70 changed files with 1,487 additions and 830 deletions.
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
name: Ledger Exporter release
name: Galexie Release

on:
push:
tags: ['ledgerexporter-v*']
tags: ['galexie-v*']

jobs:

publish-docker:
name: Test and push the Ledger Exporter images
name: Test and push docker image
runs-on: ubuntu-latest
env:
LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true"
LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core
GALEXIE_INTEGRATION_TESTS_ENABLED: "true"
GALEXIE_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core
# this pins to a version of quickstart:testing that has the same version as STELLAR_CORE_VERSION
# this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing'
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:03c6679f838a92b1eda4cd3a9e2bdee4c3586e278a138a0acf36a9bc99a0041f
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false"
GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:03c6679f838a92b1eda4cd3a9e2bdee4c3586e278a138a0acf36a9bc99a0041f
GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false"
STELLAR_CORE_VERSION: 21.1.0-1921.b3aeb14cc.focal
steps:
- name: Set VERSION
run: |
echo "VERSION=${GITHUB_REF_NAME#ledgerexporter-v}" >> $GITHUB_ENV
echo "VERSION=${GITHUB_REF_NAME#galexie-v}" >> $GITHUB_ENV
- uses: actions/checkout@v3
with:
ref: ${{ github.sha }}
- name: Pull Quickstart image
shell: bash
run: |
docker pull "$LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE"
docker pull "$GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE"
- name: Install captive core
run: |
# Workaround for https://github.com/actions/virtual-environments/issues/5245,
Expand All @@ -42,11 +42,11 @@ jobs:
sudo apt-get update && sudo apt-get install -y stellar-core="$STELLAR_CORE_VERSION"
echo "Using stellar core version $(stellar-core version)"
- name: Run Ledger Exporter test
run: go test -v -race -run TestLedgerExporterTestSuite ./exp/services/ledgerexporter/...
- name: Run tests
run: go test -v -race -run TestGalexieTestSuite ./exp/services/galexie/...

- name: Build Ledger Exporter docker
run: make -C exp/services/ledgerexporter docker-build
- name: Build docker
run: make -C exp/services/galexie docker-build

# Push images
- name: Login to DockerHub
Expand All @@ -56,4 +56,4 @@ jobs:
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Push to DockerHub
run: make -C exp/services/ledgerexporter docker-push
run: make -C exp/services/galexie docker-push
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
name: LedgerExporter
name: Galexie

on:
push:
branches: [master]
pull_request:

jobs:
ledger-exporter:
name: Test Ledger Exporter
galexie:
name: Test
runs-on: ubuntu-latest
env:
CAPTIVE_CORE_DEBIAN_PKG_VERSION: 21.1.0-1921.b3aeb14cc.focal
LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true"
LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core
# this pins to a version of quickstart:testing that has the same version as LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN
GALEXIE_INTEGRATION_TESTS_ENABLED: "true"
GALEXIE_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core
# this pins to a version of quickstart:testing that has the same version as GALEXIE_INTEGRATION_TESTS_CAPTIVE_CORE_BIN
# this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing'
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:03c6679f838a92b1eda4cd3a9e2bdee4c3586e278a138a0acf36a9bc99a0041f
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false"
GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:5c8186f53cc98571749054dd782dce33b0aca2d1a622a7610362f7c15b79b1bf
GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false"
steps:
- name: Install captive core
run: |
Expand All @@ -34,12 +34,12 @@ jobs:
- name: Pull Quickstart image
shell: bash
run: |
docker pull "$LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE"
docker pull "$GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE"
- uses: actions/checkout@v3
with:
# For pull requests, build and test the PR head not a merge of the PR with the destination.
ref: ${{ github.event.pull_request.head.sha || github.ref }}

- name: Run Ledger Exporter test
run: go test -v -race -run TestLedgerExporterTestSuite ./exp/services/ledgerexporter/...
- name: Run test
run: go test -v -race -run TestGalexieTestSuite ./services/galexie/...
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ friendbot:
horizon:
$(MAKE) -C services/horizon/ binary-build

ledger-exporter:
$(MAKE) -C exp/services/ledgerexporter/ docker-build
galexie:
$(MAKE) -C services/galexie/ docker-build

webauth:
$(MAKE) -C exp/services/webauth/ docker-build
Expand Down
20 changes: 7 additions & 13 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ import (
var _ LedgerBackend = (*BufferedStorageBackend)(nil)

type BufferedStorageBackendConfig struct {
LedgerBatchConfig datastore.DataStoreSchema
DataStore datastore.DataStore
BufferSize uint32
NumWorkers uint32
RetryLimit uint32
RetryWait time.Duration
BufferSize uint32 `toml:"buffer_size"`
NumWorkers uint32 `toml:"num_workers"`
RetryLimit uint32 `toml:"retry_limit"`
RetryWait time.Duration `toml:"retry_wait"`
}

// BufferedStorageBackend is a ledger backend that reads from a storage service.
Expand All @@ -45,7 +43,7 @@ type BufferedStorageBackend struct {
}

// NewBufferedStorageBackend returns a new BufferedStorageBackend instance.
func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig) (*BufferedStorageBackend, error) {
func NewBufferedStorageBackend(config BufferedStorageBackendConfig, dataStore datastore.DataStore) (*BufferedStorageBackend, error) {
if config.BufferSize == 0 {
return nil, errors.New("buffer size must be > 0")
}
Expand All @@ -54,17 +52,13 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken
return nil, errors.New("number of workers must be <= BufferSize")
}

if config.DataStore == nil {
return nil, errors.New("no DataStore provided")
}

if config.LedgerBatchConfig.LedgersPerFile <= 0 {
if dataStore.GetSchema().LedgersPerFile <= 0 {
return nil, errors.New("ledgersPerFile must be > 0")
}

bsBackend := &BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
dataStore: dataStore,
}

return bsBackend, nil
Expand Down
64 changes: 39 additions & 25 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,21 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig
param := make(map[string]string)
param["destination_bucket_path"] = "testURL"

ledgerBatchConfig := datastore.DataStoreSchema{
LedgersPerFile: 1,
FilesPerPartition: 64000,
}

dataStore := new(datastore.MockDataStore)

return BufferedStorageBackendConfig{
LedgerBatchConfig: ledgerBatchConfig,
DataStore: dataStore,
BufferSize: 100,
NumWorkers: 5,
RetryLimit: 3,
RetryWait: time.Microsecond,
BufferSize: 100,
NumWorkers: 5,
RetryLimit: 3,
RetryWait: time.Microsecond,
}
}

func createBufferedStorageBackendForTesting() BufferedStorageBackend {
config := createBufferedStorageBackendConfigForTesting()

dataStore := new(datastore.MockDataStore)
return BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
dataStore: dataStore,
}
}

Expand All @@ -86,6 +78,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32)
}
mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil)
}
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: count,
FilesPerPartition: partitionSize,
})

t.Cleanup(func() {
mockDataStore.AssertExpectations(t)
Expand Down Expand Up @@ -126,15 +122,18 @@ func createLCMBatchReader(start, end, count uint32) io.ReadCloser {
}

func TestNewBufferedStorageBackend(t *testing.T) {
ctx := context.Background()
config := createBufferedStorageBackendConfigForTesting()

bsb, err := NewBufferedStorageBackend(ctx, config)
mockDataStore := new(datastore.MockDataStore)
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(1),
FilesPerPartition: partitionSize,
})
bsb, err := NewBufferedStorageBackend(config, mockDataStore)
assert.NoError(t, err)

assert.Equal(t, bsb.dataStore, config.DataStore)
assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition)
assert.Equal(t, bsb.dataStore, mockDataStore)
assert.Equal(t, uint32(1), bsb.dataStore.GetSchema().LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.dataStore.GetSchema().FilesPerPartition)
assert.Equal(t, uint32(100), bsb.config.BufferSize)
assert.Equal(t, uint32(5), bsb.config.NumWorkers)
assert.Equal(t, uint32(3), bsb.config.RetryLimit)
Expand Down Expand Up @@ -210,12 +209,14 @@ func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) {
lcmArray := createLCMForTesting(startLedger, endLedger)
bsb := createBufferedStorageBackendForTesting()
ctx := context.Background()
bsb.config.LedgerBatchConfig.LedgersPerFile = uint32(2)
ledgerRange := BoundedRange(startLedger, endLedger)

mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2)
bsb.dataStore = mockDataStore

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(2),
FilesPerPartition: partitionSize,
})
assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange))
assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50)

Expand Down Expand Up @@ -451,6 +452,10 @@ func TestLedgerBufferClose(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})

objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
afterPrepareRange := make(chan struct{})
Expand Down Expand Up @@ -483,7 +488,10 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once()
t.Cleanup(func() {
Expand All @@ -509,7 +517,10 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
iteration := &atomic.Int32{}
cancelAfter := int32(bsb.config.RetryLimit) + 2
Expand Down Expand Up @@ -551,7 +562,10 @@ func TestLedgerBufferRetryLimit(t *testing.T) {
})

bsb.dataStore = mockDataStore

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
assert.NoError(t, bsb.PrepareRange(context.Background(), ledgerRange))

bsb.ledgerBuffer.wg.Wait()
Expand Down
6 changes: 3 additions & 3 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (lb *ledgerBuffer) pushTaskQueue() {
return
}
lb.taskQueue <- lb.nextTaskLedger
lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile
lb.nextTaskLedger += lb.dataStore.GetSchema().LedgersPerFile
}

// sleepWithContext returns true upon sleeping without interruption from the context
Expand Down Expand Up @@ -163,7 +163,7 @@ func (lb *ledgerBuffer) worker(ctx context.Context) {
}

func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint32) ([]byte, error) {
objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence)
objectKey := lb.dataStore.GetSchema().GetObjectKeyFromSequenceNumber(sequence)

reader, err := lb.dataStore.GetFile(ctx, objectKey)
if err != nil {
Expand Down Expand Up @@ -198,7 +198,7 @@ func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) {
for lb.ledgerPriorityQueue.Len() > 0 && lb.currentLedger == uint32(lb.ledgerPriorityQueue.Peek().startLedger) {
item := lb.ledgerPriorityQueue.Pop()
lb.ledgerQueue <- item.payload
lb.currentLedger += lb.config.LedgerBatchConfig.LedgersPerFile
lb.currentLedger += lb.dataStore.GetSchema().LedgersPerFile
}
}

Expand Down
Empty file added services/galexie/CHANGELOG.md
Empty file.
Loading

0 comments on commit 775c8fd

Please sign in to comment.