Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin_public/main' into validate_only_…
Browse files Browse the repository at this point in the history
…changed
  • Loading branch information
wwoytenko committed Mar 6, 2024
2 parents f2bfc72 + 75168c5 commit 8b3a6e2
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 20 deletions.
51 changes: 44 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ on:

env:
go-version: '1.21.5'
python-version: '3.12'
cmd-name: 'greenmask'
docker-registry: greenmask/greenmask

Expand All @@ -30,10 +31,10 @@ jobs:
uses: actions/setup-go@v5
with:
go-version: ${{ env.go-version }}

- name: Echo Go version
run: go version

- name: Run tests
run: make tests

Expand All @@ -47,17 +48,17 @@ jobs:

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Run integration tests
run: |
docker-compose -f docker-compose-integration.yml -p greenmask up \
--renew-anon-volumes --force-recreate --build --exit-code-from greenmask \
--abort-on-container-exit greenmask
binaries:
build-binaries:
runs-on: ubuntu-22.04
needs:
- unit-tests
Expand All @@ -84,7 +85,7 @@ jobs:
uses: actions/setup-go@v5
with:
go-version: ${{ env.go-version }}

- name: Build with different arch
run: |
export GOOS=$(echo ${{ matrix.platforms }} | cut -d '/' -f 1)
Expand Down Expand Up @@ -130,7 +131,7 @@ jobs:
build-docker-images-and-push:
runs-on: ubuntu-22.04
needs:
- binaries
- build-binaries
if: startsWith(github.ref, 'refs/tags/v')
steps:
- name: Checkout repository
Expand Down Expand Up @@ -160,3 +161,39 @@ jobs:
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ env.docker-registry }}:${{ env.TAG }},${{ env.docker-registry }}:latest

deploy-docs:
runs-on: self-hosted
needs:
- build-binaries
- build-docker-images-and-push
if: startsWith(github.ref, 'refs/tags/v')
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Setup python
uses: actions/setup-python@v5
with:
python-version: ${{ env.python-version }}

- name: Install dependicies
run: pip install -r requirements.txt

- name: Build docs
run: mkdocs build

- name: Create docs directory
run: sudo mkdir -p ${{ secrets.DOCS_DEPLOY_DIR }}

- name: Move html files to docs directory
run: sudo mv site ${{ secrets.DOCS_DEPLOY_DIR }}/html-${{ github.ref_name}}

- name: Remove old symlink
run: sudo unlink ${{ secrets.DOCS_DEPLOY_DIR }}/html

- name: Create new symlink
run: sudo ln -s ${{ secrets.DOCS_DEPLOY_DIR }}/html-${{ github.ref_name}} ${{ secrets.DOCS_DEPLOY_DIR }}/html

- name: Restart web service
run: sudo systemctl restart nginx
37 changes: 24 additions & 13 deletions internal/db/postgres/restorers/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func NewTableRestorer(entry *toc.Entry, st storages.Storager) *TableRestorer {
}

func (td *TableRestorer) Execute(ctx context.Context, tx pgx.Tx) error {
// TODO: Refactor this logic
// 1. Decompose the Execute method into separate functions
// 2. Add tests
// 3. Get rid of the anonymous functions below

return func() error {
if td.Entry.FileName == nil {
Expand All @@ -63,12 +67,10 @@ func (td *TableRestorer) Execute(ctx context.Context, tx pgx.Tx) error {

log.Debug().Str("copyStmt", *td.Entry.CopyStmt).Msgf("performing pgcopy statement")
frontend := tx.Conn().PgConn().Frontend()
frontend.Send(&pgproto3.Query{
String: *td.Entry.CopyStmt,
})

if err = frontend.Flush(); err != nil {
return err
err = sendMessage(frontend, &pgproto3.Query{String: *td.Entry.CopyStmt})
if err != nil {
return fmt.Errorf("error sending Query message: %w", err)
}

// Prepare for streaming the pgcopy data
Expand Down Expand Up @@ -108,19 +110,19 @@ func (td *TableRestorer) Execute(ctx context.Context, tx pgx.Tx) error {
n, err = gz.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
frontend.Send(&pgproto3.CopyDone{})
completionErr := sendMessage(frontend, &pgproto3.CopyDone{})
if completionErr != nil {
return fmt.Errorf("error sending CopyDone message: %w", err)
}
break
}
return fmt.Errorf("error readimg from table dump: %w", err)
}

frontend.Send(&pgproto3.CopyData{
Data: buf[:n],
})
}

if err = frontend.Flush(); err != nil {
return err
err = sendMessage(frontend, &pgproto3.CopyData{Data: buf[:n]})
if err != nil {
return fmt.Errorf("error sending DopyData message: %w", err)
}
}

// Perform post streaming handling
Expand Down Expand Up @@ -152,3 +154,12 @@ func (td *TableRestorer) Execute(ctx context.Context, tx pgx.Tx) error {
func (td *TableRestorer) DebugInfo() string {
return fmt.Sprintf("table %s.%s", *td.Entry.Namespace, *td.Entry.Tag)
}

// sendMessage - send a message to the PostgreSQL backend and flush a buffer
func sendMessage(frontend *pgproto3.Frontend, msg pgproto3.FrontendMessage) error {
frontend.Send(msg)
if err := frontend.Flush(); err != nil {
return fmt.Errorf("error flushing pgx frontend buffer: %w", err)
}
return nil
}

0 comments on commit 8b3a6e2

Please sign in to comment.