Skip to content

Commit

Permalink
Merge branch 'airbytehq:master' into teradata_master
Browse files Browse the repository at this point in the history
  • Loading branch information
sc250072 authored Jan 21, 2025
2 parents 0f52263 + 1f40c82 commit 37f06b0
Show file tree
Hide file tree
Showing 1,484 changed files with 67,351 additions and 63,205 deletions.
37 changes: 24 additions & 13 deletions .github/actions/install-airbyte-ci/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,26 @@ runs:
- name: "Determine how Airbyte CI should be installed"
shell: bash
id: determine-install-mode
# When the PR is from a fork, we always install from binary
if: inputs.is_fork == 'false'
run: |
if [[ "${{ github.ref }}" != "refs/heads/master" ]] && [[ "${{ steps.changes.outputs.pipelines_any_changed }}" == "true" ]]; then
echo "Making changes to Airbyte CI on a non-master branch. Airbyte-CI will be installed from source."
echo "install-mode=source" >> $GITHUB_OUTPUT
echo "SENTRY_ENVIRONMENT=dev" >> $GITHUB_ENV
else
echo "install-mode=binary" >> $GITHUB_OUTPUT
echo "SENTRY_ENVIRONMENT=production" >> $GITHUB_ENV
fi
echo "install-mode=source" >> $GITHUB_OUTPUT
echo "SENTRY_ENVIRONMENT=dev" >> $GITHUB_ENV
# When the PR is from a fork, we always install from binary
# if: inputs.is_fork == 'false'
# run: |
# if [[ "${{ github.ref }}" != "refs/heads/master" ]] && [[ "${{ steps.changes.outputs.pipelines_any_changed }}" == "true" ]]; then
# echo "Making changes to Airbyte CI on a non-master branch. Airbyte-CI will be installed from source."
# echo "install-mode=source" >> $GITHUB_OUTPUT
# echo "SENTRY_ENVIRONMENT=dev" >> $GITHUB_ENV
# else
# echo "install-mode=binary" >> $GITHUB_OUTPUT
# echo "SENTRY_ENVIRONMENT=production" >> $GITHUB_ENV
# fi

- name: Install Airbyte CI from binary
id: install-airbyte-ci-binary
if: steps.determine-install-mode.outputs.install-mode == 'binary' || ${{ inputs.is_fork }} == 'true'
if: false
# if: steps.determine-install-mode.outputs.install-mode == 'binary' || ${{ inputs.is_fork }} == 'true'
shell: bash
run: |
curl -sSL ${{ inputs.airbyte_ci_binary_url }} --output airbyte-ci-bin
Expand All @@ -54,21 +59,27 @@ runs:
- name: Install Python 3.10
id: install-python-3-10
uses: actions/setup-python@v4
if: steps.determine-install-mode.outputs.install-mode == 'source'
# if: steps.determine-install-mode.outputs.install-mode == 'source'
with:
python-version: "3.10"
token: ${{ inputs.github_token }}

- name: Install Airbyte CI from source
id: install-airbyte-ci-source
if: steps.determine-install-mode.outputs.install-mode == 'source'
if: true
# if: steps.determine-install-mode.outputs.install-mode == 'source'
shell: bash
run: |
pip install --upgrade pip
pip install pipx
pipx ensurepath
pipx install ${{ inputs.path_to_airbyte_ci_source }}
- name: Print installed `airbyte-ci` version
shell: bash
run: |
airbyte-ci --version
- name: Get dagger engine image name
id: get-dagger-engine-image-name
shell: bash
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/airbyte-ci-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ jobs:
- name: Install Poetry
id: install_poetry
uses: snok/install-poetry@v1
with:
version: 1.8.5

- name: Install Dependencies
id: install_dependencies
Expand All @@ -54,7 +56,7 @@ jobs:
working-directory: airbyte-ci/connectors/pipelines/
run: poetry run poe build-release-binary ${{ env.BINARY_FILE_NAME }}

- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
with:
name: airbyte-ci-${{ matrix.os }}-${{ steps.get_short_sha.outputs.sha }}
path: airbyte-ci/connectors/pipelines/dist/${{ env.BINARY_FILE_NAME }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/auto_merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ jobs:
python-version: "3.10"
- name: Install and configure Poetry
uses: snok/install-poetry@v1
with:
version: 1.8.5
- name: Run auto merge
shell: bash
working-directory: airbyte-ci/connectors/auto_merge
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/connectors_insights.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.8.5
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true
Expand Down
34 changes: 27 additions & 7 deletions .github/workflows/connectors_nightly_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,38 @@ on:
# 0AM UTC is 2AM CEST, 3AM EEST, 5PM PDT.
- cron: "0 0 * * *"
workflow_dispatch:
inputs:
test-connectors-options:
default: --concurrency=5 --support-level=certified
required: true

run-name: "Test connectors: ${{ inputs.test-connectors-options || 'nightly build for Certified connectors' }}"

jobs:
generate_matrix:
name: Generate matrix
runs-on: ubuntu-24.04
outputs:
generated_matrix: ${{ steps.generate_matrix.outputs.generated_matrix }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Run airbyte-ci connectors list [SCHEDULED TRIGGER]
id: airbyte-ci-connectors-list-scheduled
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
subcommand: "connectors --support-level=certified list --output=selected_connectors.json"
- name: Generate matrix - 30 connectors per job
id: generate_matrix
run: |
matrix=$(jq -c -r '{include: [.[] | "--name=" + .] | to_entries | group_by(.key / 30 | floor) | map(map(.value) | {"connector_names": join(" ")})}' selected_connectors.json)
echo "generated_matrix=$matrix" >> $GITHUB_OUTPUT
test_connectors:
needs: generate_matrix
name: "Test connectors: ${{ inputs.test-connectors-options || 'nightly build for Certified connectors' }}"
timeout-minutes: 720 # 12 hours
runs-on: connector-nightly-xlarge
continue-on-error: true
strategy:
matrix: ${{fromJson(needs.generate_matrix.outputs.generated_matrix)}}

steps:
- name: Checkout Airbyte
uses: actions/checkout@v3
Expand All @@ -32,7 +52,7 @@ jobs:
with:
context: "master"
ci_job_key: "nightly_builds"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
# dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand All @@ -41,4 +61,4 @@ jobs:
github_token: ${{ secrets.GITHUB_TOKEN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors ${{ inputs.test-connectors-options || '--concurrency=8 --support-level=certified' }} test"
subcommand: "connectors ${{ matrix.connector_names}} test"
2 changes: 1 addition & 1 deletion .github/workflows/connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
# If the condition is not met the job will be skipped (it will not fail)
if: (github.event_name == 'pull_request' && needs.changes.outputs.connectors == 'true' && github.event.pull_request.head.repo.fork != true) || github.event_name == 'workflow_dispatch'
name: Connectors CI
runs-on: connector-test-large
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
timeout-minutes: 360 # 6 hours
steps:
- name: Checkout Airbyte
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connectors_version_increment_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ on:
jobs:
connectors_ci:
name: Connectors Version Increment Check
runs-on: connector-test-large
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
if: github.event.pull_request.head.repo.fork != true
timeout-minutes: 22
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
# Any revision upwards should be based on a performance analysis of gradle scans.
# See https://github.com/airbytehq/airbyte/pull/36055 for an example of this,
# which explains why which we went down from 64 cores to 16.
runs-on: connector-test-large
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
name: Gradle Check
timeout-minutes: 30
steps:
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/live_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ on:
jobs:
live_tests:
name: Live Tests
runs-on: connector-test-large
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
timeout-minutes: 360 # 6 hours
steps:
- name: Checkout Airbyte
Expand All @@ -63,6 +63,8 @@ jobs:
- name: Install Poetry
id: install_poetry
uses: snok/install-poetry@v1
with:
version: 1.8.5

- name: Make poetry venv in project
id: poetry_venv
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-bulk-cdk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ env:
jobs:
publish-bulk-cdk:
name: Publish Bulk CDK
runs-on: connector-test-large
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
timeout-minutes: 30
steps:
- name: Checkout Airbyte
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-java-cdk-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ env:
jobs:
publish-cdk:
name: Publish Java CDK
runs-on: connector-test-large
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
timeout-minutes: 30
steps:
- name: Link comment to Workflow Run
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ on:
jobs:
publish_connectors:
name: Publish connectors
runs-on: connector-publish-large
runs-on: ubuntu-24.04-4core
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,15 @@ on:
jobs:
regression_tests:
name: Regression Tests
runs-on: connector-test-large
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
timeout-minutes: 360 # 6 hours
steps:
- name: Install Python
id: install_python
uses: actions/setup-python@v4
with:
python-version: "3.10"

- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Check PAT rate limits
Expand All @@ -63,6 +69,8 @@ jobs:
- name: Install Poetry
id: install_poetry
uses: snok/install-poetry@v1
with:
version: 1.8.5

- name: Make poetry venv in project
id: poetry_venv
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-performance-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ jobs:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
- name: Archive test reports artifacts
if: github.event.inputs.comment-id && failure()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-reports
path: |
Expand All @@ -145,7 +145,7 @@ jobs:
- name: Test coverage reports artifacts
if: github.event.inputs.comment-id && success()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-reports
path: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ import java.util.function.Consumer

/** Emits the [AirbyteMessage] instances produced by the connector. */
@DefaultImplementation(StdoutOutputConsumer::class)
interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
val emittedAt: Instant
abstract class OutputConsumer(private val clock: Clock) : Consumer<AirbyteMessage>, AutoCloseable {
/**
* The constant emittedAt timestamp we use for record timestamps.
*
* TODO: use the correct emittedAt time for each record. Ryan: not changing this now as it could
* have performance implications for sources given the delicate serialization logic in place
* here.
*/
val recordEmittedAt: Instant = Instant.ofEpochMilli(clock.millis())

fun accept(record: AirbyteRecordMessage) {
record.emittedAt = emittedAt.toEpochMilli()
open fun accept(record: AirbyteRecordMessage) {
record.emittedAt = recordEmittedAt.toEpochMilli()
accept(AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(record))
}

Expand Down Expand Up @@ -66,7 +73,9 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
}

fun accept(trace: AirbyteTraceMessage) {
trace.emittedAt = emittedAt.toEpochMilli().toDouble()
// Use the correct emittedAt timestamp for trace messages. This allows platform and other
// downstream consumers to take emission time into account for error classification.
trace.emittedAt = clock.millis().toDouble()
accept(AirbyteMessage().withType(AirbyteMessage.Type.TRACE).withTrace(trace))
}

Expand Down Expand Up @@ -107,7 +116,7 @@ const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output"
@Secondary
private class StdoutOutputConsumer(
val stdout: PrintStream,
clock: Clock,
private val clock: Clock,
/**
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
* buffer's size (in bytes) grows past this value.
Expand All @@ -132,9 +141,7 @@ private class StdoutOutputConsumer(
*/
@Value("\${$CONNECTOR_OUTPUT_PREFIX.buffer-byte-size-threshold-for-flush:4096}")
val bufferByteSizeThresholdForFlush: Int,
) : OutputConsumer {
override val emittedAt: Instant = Instant.now(clock)

) : OutputConsumer(clock) {
private val buffer = ByteArrayOutputStream() // TODO: replace this with a StringWriter?
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)
Expand Down Expand Up @@ -233,7 +240,7 @@ private class StdoutOutputConsumer(
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() }
}
return streamToTemplateMap.getOrPut(stream) {
RecordTemplate.create(stream, namespace, emittedAt)
RecordTemplate.create(stream, namespace, recordEmittedAt)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.time.Clock
import java.time.Instant

/** [OutputConsumer] implementation for unit tests. Collects everything into thread-safe buffers. */
@Singleton
@Requires(notEnv = [Environment.CLI])
@Replaces(OutputConsumer::class)
class BufferingOutputConsumer(
clock: Clock,
) : OutputConsumer {
override val emittedAt: Instant = Instant.now(clock)
) : OutputConsumer(clock) {

private val records = mutableListOf<AirbyteRecordMessage>()
private val states = mutableListOf<AirbyteStateMessage>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.airbyte.cdk.discover

import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.DoubleCodec
import io.airbyte.cdk.data.JsonDecoder
import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.JsonStringCodec
Expand Down Expand Up @@ -63,8 +64,8 @@ interface MetaField : FieldOrMetaField {
enum class CommonMetaField(
override val type: FieldType,
) : MetaField {
CDC_UPDATED_AT(CdcOffsetDateTimeMetaFieldType),
CDC_DELETED_AT(CdcOffsetDateTimeMetaFieldType),
CDC_UPDATED_AT(CdcStringMetaFieldType),
CDC_DELETED_AT(CdcStringMetaFieldType),
;

override val id: String
Expand All @@ -89,3 +90,9 @@ data object CdcOffsetDateTimeMetaFieldType : LosslessFieldType {
override val jsonEncoder: JsonEncoder<OffsetDateTime> = OffsetDateTimeCodec
override val jsonDecoder: JsonDecoder<OffsetDateTime> = OffsetDateTimeCodec
}

data object CdcNumberMetaFieldType : LosslessFieldType {
override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.NUMBER
override val jsonEncoder: JsonEncoder<Double> = DoubleCodec
override val jsonDecoder: JsonDecoder<Double> = DoubleCodec
}
Loading

0 comments on commit 37f06b0

Please sign in to comment.