diff --git a/.github/workflows/connector-performance-command.yml b/.github/workflows/connector-performance-command.yml index dd8c46a1bea06..c458e08957584 100644 --- a/.github/workflows/connector-performance-command.yml +++ b/.github/workflows/connector-performance-command.yml @@ -19,15 +19,171 @@ on: uuid: description: "Custom UUID of workflow run. Used because GitHub dispatches endpoint does not return workflow run id." required: false - connector-acceptance-test-version: - description: "Set a specific connector acceptance test version to use. Enter 'dev' to test, build and use a local version of Connector Acceptance Test." - required: false - default: "latest" - local_cdk: - description: "Run Connector Acceptance Tests against the CDK version on the current branch." + dataset: + description: "Name of dataset to use for performance measurement. Currently supports 1m, 10m, 20m." required: false + default: "1m" jobs: - Stub: + uuid: + name: "Custom UUID of workflow run" + timeout-minutes: 10 + runs-on: ubuntu-latest + steps: + - name: UUID ${{ github.event.inputs.uuid }} + run: true + start-test-runner: + name: Start Build EC2 Runner + needs: uuid + timeout-minutes: 10 + runs-on: ubuntu-latest + outputs: + label: ${{ steps.start-ec2-runner.outputs.label }} + ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }} + steps: + - name: Checkout Airbyte + uses: actions/checkout@v3 + with: + repository: ${{ github.event.inputs.repo }} + ref: ${{ github.event.inputs.gitref }} + - name: Check PAT rate limits + run: | + ./tools/bin/find_non_rate_limited_PAT \ + ${{ secrets.GH_PAT_BUILD_RUNNER_OSS }} \ + ${{ secrets.GH_PAT_BUILD_RUNNER_BACKUP }} + - name: Start AWS Runner + id: start-ec2-runner + uses: ./.github/actions/start-aws-runner + with: + aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} + github-token: ${{ env.PAT }} + performance-test: + timeout-minutes: 240 + needs: start-test-runner + runs-on: ${{ needs.start-test-runner.outputs.label }} + steps: + - name: Link comment to workflow run + if: github.event.inputs.comment-id + uses: peter-evans/create-or-update-comment@v1 + with: + comment-id: ${{ github.event.inputs.comment-id }} + body: | + #### Note: The following `dataset=` values are supported: `1m`(default), `10m`, `20m`, `bottleneck_stream1` + > :runner: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}} + - name: Search for valid connector name format + id: regex + uses: AsasInnab/regex-action@v1 + with: + regex_pattern: "^((connectors|bases)/)?[a-zA-Z0-9-_]+$" + regex_flags: "i" # required to be set for this plugin + search_string: ${{ github.event.inputs.connector }} + - name: Validate input workflow format + if: steps.regex.outputs.first_match != github.event.inputs.connector + run: echo "The connector provided has an invalid format!" && exit 1 + - name: Filter supported connectors + if: "${{ github.event.inputs.connector != 'connectors/source-postgres' }}" + run: echo "Only connectors/source-postgres currently supported by harness" && exit 1 + - name: Checkout Airbyte + uses: actions/checkout@v3 + with: + repository: ${{ github.event.inputs.repo }} + ref: ${{ github.event.inputs.gitref }} + - name: Install Java + uses: actions/setup-java@v3 + with: + distribution: "zulu" + java-version: "17" + - name: Install Python + uses: actions/setup-python@v4 + with: + python-version: "3.9" + - name: Install CI scripts + # all CI python packages have the prefix "ci_" + run: | + pip install --quiet -e ./tools/ci_* + - name: Write source-harness credentials + run: | + ci_credentials connectors-performance/source-harness write-to-storage + env: + GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }} + - name: build harness + shell: bash + run: | + echo "Building... source-harness" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + ./gradlew :airbyte-integrations:connectors-performance:source-harness:build -x check + - name: build connector + shell: bash + run: | + echo "Building... ${{github.event.inputs.connector}}" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY # this is a blank line + echo "Running ./gradlew :airbyte-integrations:connectors:source-postgres:build -x check" + ./gradlew :airbyte-integrations:connectors:source-postgres:build -x check + - name: KIND Kubernetes Cluster Setup + uses: helm/kind-action@v1.4.0 + with: + config: ./tools/bin/source-harness-kind-cluster-config.yaml + - name: Run harness + id: run-harness + shell: bash + env: + CONN: ${{ github.event.inputs.connector }} + DS: ${{ github.event.inputs.dataset }} + prefix: '{"type":"LOG","log":{"level":"INFO","message":"INFO i.a.i.p.PerformanceTest(runTest):165' + suffix: '"}}' + run: | + kubectl apply -f ./tools/bin/admin-service-account.yaml + kind load docker-image airbyte/source-postgres:dev --name chart-testing + kind load docker-image airbyte/source-harness:dev --name chart-testing + export CONNECTOR_IMAGE_NAME=${CONN/connectors/airbyte}:dev + export DATASET=$DS + envsubst < ./tools/bin/source-harness-process.yaml | kubectl create -f - + POD=$(kubectl get pod -l app=source-harness -o jsonpath="{.items[0].metadata.name}") + kubectl wait --for=condition=Ready --timeout=20s "pod/$POD" + kubectl logs --follow $POD + EOF=$(dd if=/dev/urandom bs=15 count=1 status=none | base64) + echo "RUN_RESULT<<$EOF" >> $GITHUB_OUTPUT + kubectl logs --tail=1 $POD | while read line ; do line=${line#"$prefix"}; line=${line%"$suffix"}; echo $line >> $GITHUB_OUTPUT ; done + echo "$EOF" >> $GITHUB_OUTPUT + - name: Link comment to workflow run + uses: peter-evans/create-or-update-comment@v2 + with: + reactions: '+1' + comment-id: ${{ github.event.inputs.comment-id }} + body: | + ## Performance test Result: + ``` + ${{ steps.run-harness.outputs.RUN_RESULT }} + ``` + # need to add credentials here + # In case of self-hosted EC2 errors, remove this block. + stop-test-runner: + name: Stop Build EC2 Runner + timeout-minutes: 10 + needs: + - start-test-runner # required to get output from the start-runner job + - performance-test # required to wait when the main job is done + - uuid runs-on: ubuntu-latest + if: ${{ always() }} # required to stop the runner even if the error happened in the previous jobs steps: - - run: echo "Connector Performance harness stub" + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-2 + - name: Checkout Airbyte + uses: actions/checkout@v3 + - name: Check PAT rate limits + run: | + ./tools/bin/find_non_rate_limited_PAT \ + ${{ secrets.GH_PAT_BUILD_RUNNER_OSS }} \ + ${{ secrets.GH_PAT_BUILD_RUNNER_BACKUP }} + - name: Stop EC2 runner + uses: supertopher/ec2-github-runner@base64v1.0.10 + with: + mode: stop + github-token: ${{ env.PAT }} + label: ${{ needs.start-test-runner.outputs.label }} + ec2-instance-id: ${{ needs.start-test-runner.outputs.ec2-instance-id }} diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index 49e29b561d7b4..879e0010595e5 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -64,6 +64,16 @@ public String fieldSelectionWorkspaces() { return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg); } + @Override + public String strictComparisonNormalizationWorkspaces() { + return ""; + } + + @Override + public String strictComparisonNormalizationTag() { + return ""; + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java public T getEnvOrDefault(final String key, final T defaultValue, final Function parser) { final String value = System.getenv(key); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index aa20550474225..e9c4ad76564e3 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -37,4 +37,20 @@ public interface FeatureFlags { */ String fieldSelectionWorkspaces(); + /** + * Get the workspaces allow-listed for strict incremental comparison in normalization. This takes + * precedence over the normalization version in destination_definitions.yaml. + * + * @return a comma-separated list of workspace ids where strict incremental comparison should be + * enabled in normalization. + */ + String strictComparisonNormalizationWorkspaces(); + + /** + * Get the Docker image tag representing the normalization version with strict-comparison. + * + * @return The Docker image tag representing the normalization version with strict-comparison + */ + String strictComparisonNormalizationTag(); + } diff --git a/airbyte-integrations/connectors-performance/source-harness/.dockerignore b/airbyte-integrations/connectors-performance/source-harness/.dockerignore new file mode 100644 index 0000000000000..c8f982b06349c --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/.dockerignore @@ -0,0 +1,4 @@ +* +!Dockerfile +!build +!base.sh diff --git a/airbyte-integrations/connectors-performance/source-harness/Dockerfile b/airbyte-integrations/connectors-performance/source-harness/Dockerfile new file mode 100644 index 0000000000000..460f4afe40805 --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/Dockerfile @@ -0,0 +1,26 @@ +FROM airbyte/integration-base-java:dev AS build +WORKDIR /airbyte + +ENV APPLICATION source-harness + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar + +FROM airbyte/integration-base-java:dev +ARG TARGETARCH +WORKDIR /airbyte + +ENV APPLICATION source-harness +# Kubectl and socat are needed in order to create a pod similar to cloud orchestrator +# It brings up in cluster the source pod to measure, and communicates over socat +RUN curl -O https://s3.us-west-2.amazonaws.com/amazon-eks/1.25.6/2023-01-30/bin/linux/${TARGETARCH}/kubectl +RUN chmod +x ./kubectl +RUN mv ./kubectl /bin/ +RUN yum install -y socat && yum clean all + +COPY --from=build /airbyte /airbyte +COPY base.sh . + +LABEL io.airbyte.version=0.1 +LABEL io.airbyte.name=airbyte/source-harness diff --git a/airbyte-integrations/connectors-performance/source-harness/README.md b/airbyte-integrations/connectors-performance/source-harness/README.md new file mode 100644 index 0000000000000..3e30efba8b1a7 --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/README.md @@ -0,0 +1,6 @@ +# source-harness + +Performance harness for source connectors + +This component is used by the `/connector-performance` github action and is used in order to test throughput of +source connectors on a number of datasets. diff --git a/airbyte-integrations/connectors-performance/source-harness/base.sh b/airbyte-integrations/connectors-performance/source-harness/base.sh new file mode 100755 index 0000000000000..ba73c6bd5014c --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/base.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +set -e + +function echo2() { + echo >&2 "$@" +} + +function error() { + echo2 "$@" + exit 1 +} + +# todo: make it easy to select source or destination and validate based on selection by adding an integration type env variable. +function main() { + nohup bash -c "socat tcp-listen:9000,reuseaddr,fork \"exec:printf \'HTTP/1.0 200 OK\r\n\r\n\'\" &"; + cat <&0 | /airbyte/bin/"$APPLICATION" "$@" +} + +main "$@" diff --git a/airbyte-integrations/connectors-performance/source-harness/build.gradle b/airbyte-integrations/connectors-performance/source-harness/build.gradle new file mode 100644 index 0000000000000..0b6dc51d4a37c --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/build.gradle @@ -0,0 +1,25 @@ +plugins { + id 'application' + id 'airbyte-docker' +} + +repositories { + maven { + url 'https://airbyte.mycloudrepo.io/public/repositories/airbyte-public-jars/' + } +} + +application { + mainClass = 'io.airbyte.integrations.performance.Main' + applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] +} +dependencies { + implementation project(':airbyte-db:db-lib') + implementation project(':airbyte-integrations:bases:base-java') + implementation libs.airbyte.protocol + implementation 'io.fabric8:kubernetes-client:5.12.2' + implementation 'org.apache.commons:commons-lang3:3.11' + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + implementation 'io.airbyte:airbyte-commons-worker:0.42.0' + implementation 'io.airbyte.airbyte-config:config-models:0.42.0' +} diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/performance/Main.java b/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/performance/Main.java new file mode 100644 index 0000000000000..7ceb761128a73 --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/performance/Main.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.performance; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +@Slf4j +public class Main { + + private static final String CREDENTIALS_PATH = "secrets/%s_credentials.json"; + + public static void main(final String[] args) { + log.info("args: {}", Arrays.toString(args)); + String image = null; + String dataset = "1m"; + + switch (args.length) { + case 1 -> image = args[0]; + case 2 -> { + image = args[0]; + dataset = args[1]; + } + } + + final Path credsPath = Path.of(CREDENTIALS_PATH.formatted(dataset)); + + if (!Files.exists(credsPath)) { + throw new IllegalStateException("{module-root}/" + credsPath + " not found. Must provide path to a source-harness credentials file."); + } + + final JsonNode config = Jsons.deserialize(IOs.readFile(credsPath)); + + final JsonNode catalog; + try { + catalog = getCatalog(dataset); + } catch (final IOException ex) { + throw new IllegalStateException("Failed to read catalog", ex); + } + + if (StringUtils.isAnyBlank(config.toString(), catalog.toString(), image)) { + throw new IllegalStateException("Missing harness configuration: config [%s] catalog [%s] image [%s]".formatted(config, catalog, image)); + } + + log.info("Starting performance harness for {} ({})", image, dataset); + try { + final PerformanceTest test = new PerformanceTest( + image, + config.toString(), + catalog.toString()); + + // final ExecutorService executors = Executors.newFixedThreadPool(2); + // final CompletableFuture readSrcAndWriteDstThread = CompletableFuture.runAsync(() -> { + // try { + // test.runTest(); + // } catch (final Exception e) { + // throw new RuntimeException(e); + // } + // }, executors); + + // Uncomment to add destination + /* + * final CompletableFuture readFromDstThread = CompletableFuture.runAsync(() -> { try { + * Thread.sleep(20_000); test.readFromDst(); } catch (final InterruptedException e) { throw new + * RuntimeException(e); } }, executors); + */ + + // CompletableFuture.anyOf(readSrcAndWriteDstThread/* , readFromDstThread */).get(); + test.runTest(); + } catch (final Exception e) { + log.error("Test failed", e); + System.exit(1); + + } + System.exit(0); + } + + static JsonNode getCatalog(final String dataset) throws IOException { + final ObjectMapper objectMapper = new ObjectMapper(); + final String catalogFilename = "catalogs/%s_catalog.json".formatted(dataset); + final InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(catalogFilename); + return objectMapper.readTree(is); + } + +} diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/performance/PerformanceTest.java b/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/performance/PerformanceTest.java new file mode 100644 index 0000000000000..b9564863d0a9d --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/performance/PerformanceTest.java @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.performance; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.AllowedHosts; +import io.airbyte.config.EnvConfigs; +import io.airbyte.config.ResourceRequirements; +import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.workers.RecordSchemaValidator; +import io.airbyte.workers.WorkerConfigs; +import io.airbyte.workers.internal.DefaultAirbyteSource; +import io.airbyte.workers.internal.HeartbeatMonitor; +import io.airbyte.workers.process.AirbyteIntegrationLauncher; +import io.airbyte.workers.process.KubePortManagerSingleton; +import io.airbyte.workers.process.KubeProcessFactory; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import java.net.InetAddress; +import java.nio.file.Path; +import java.time.Duration; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.ImmutablePair; + +@Slf4j +public class PerformanceTest { + + public static final int PORT1 = 9877; + public static final int PORT2 = 9878; + public static final int PORT3 = 9879; + public static final int PORT4 = 9880; + + public static final Set PORTS = Set.of(PORT1, PORT2, PORT3, PORT4); + + private final String imageName; + private final JsonNode config; + private final ConfiguredAirbyteCatalog catalog; + + // private DefaultAirbyteDestination destination; + + PerformanceTest(final String imageName, final String config, final String catalog) throws JsonProcessingException { + final ObjectMapper mapper = new ObjectMapper(); + this.imageName = imageName; + this.config = mapper.readTree(config); + this.catalog = Jsons.deserialize(catalog, ConfiguredAirbyteCatalog.class); + } + + void runTest() throws Exception { + KubePortManagerSingleton.init(PORTS); + + final KubernetesClient fabricClient = new DefaultKubernetesClient(); + final String localIp = InetAddress.getLocalHost().getHostAddress(); + final String kubeHeartbeatUrl = localIp + ":" + 9000; + final var workerConfigs = new WorkerConfigs(new EnvConfigs()); + final var processFactory = new KubeProcessFactory(workerConfigs, "default", fabricClient, kubeHeartbeatUrl, false); + final ResourceRequirements resourceReqs = new ResourceRequirements() + .withCpuLimit("2.5") + .withCpuRequest("2.5") + .withMemoryLimit("2Gi") + .withMemoryRequest("2Gi"); + final var heartbeatMonitor = new HeartbeatMonitor(Duration.ofMillis(1)); + final var allowedHosts = new AllowedHosts().withHosts(List.of("*")); + final var integrationLauncher = + new AirbyteIntegrationLauncher("1", 0, this.imageName, processFactory, resourceReqs, allowedHosts, false, new EnvVariableFeatureFlags()); + final var source = new DefaultAirbyteSource(integrationLauncher, new EnvVariableFeatureFlags(), heartbeatMonitor); + final var jobRoot = "/"; + final WorkerSourceConfig sourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(this.config) + .withState(null) + .withCatalog(convertProtocolObject(this.catalog, io.airbyte.protocol.models.ConfiguredAirbyteCatalog.class)); + + // Uncomment to add destination + /* + * /////////// destiantion /////////// final var dstIntegtationLauncher = new + * AirbyteIntegrationLauncher("2", 0, "airbyte/destination-dev-null:0.2.7", processFactory, + * resourceReqs, allowedHosts, false, new EnvVariableFeatureFlags()); this.destination = new + * DefaultAirbyteDestination(dstIntegtationLauncher); final WorkerDestinationConfig dstConfig = new + * WorkerDestinationConfig() + * .withDestinationConnectionConfiguration(Jsons.jsonNode(Collections.singletonMap("type", + * "SILENT"))); destination.start(dstConfig, Path.of(jobRoot)); /////////////////////////////////// + */ + + // final ConcurrentHashMap, Integer>> + // validationErrors = new ConcurrentHashMap(); + // final Map> streamToSelectedFields = new HashMap(); + // final Map> streamToAllFields = new HashMap(); + // final Map> unexpectedFields = new HashMap(); + // populateStreamToAllFields(this.catalog, streamToAllFields); + // final String streamName0 = sourceConfig.getCatalog().getStreams().get(0).getStream().getName(); + // final String streamNamespace0 = + // sourceConfig.getCatalog().getStreams().get(0).getStream().getNamespace(); + // final String streamName1 = sourceConfig.getCatalog().getStreams().get(1).getStream().getName(); + // final String streamNamespace1 = + // sourceConfig.getCatalog().getStreams().get(1).getStream().getNamespace(); + // final String streamName2 = sourceConfig.getCatalog().getStreams().get(2).getStream().getName(); + // final String streamNamespace2 = + // sourceConfig.getCatalog().getStreams().get(2).getStream().getNamespace(); + + // final var recordSchemaValidator = new RecordSchemaValidator( + // Map.of( + // new AirbyteStreamNameNamespacePair(streamName0, streamNamespace0), + // sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema(), + // new AirbyteStreamNameNamespacePair(streamName1, streamNamespace1), + // sourceConfig.getCatalog().getStreams().get(1).getStream().getJsonSchema(), + // new AirbyteStreamNameNamespacePair(streamName2, streamNamespace2), + // sourceConfig.getCatalog().getStreams().get(2).getStream().getJsonSchema()), + // true); + + log.info("Source starting"); + source.start(sourceConfig, Path.of(jobRoot)); + var totalBytes = 0.0; + var counter = 0L; + final var start = System.currentTimeMillis(); + log.info("Starting Test"); + while (!source.isFinished()) { + final Optional airbyteMessageOptional = source.attemptRead(); + if (airbyteMessageOptional.isPresent()) { + final AirbyteMessage airbyteMessage = airbyteMessageOptional.get(); + + if (airbyteMessage.getRecord() != null) { + totalBytes += Jsons.getEstimatedByteSize(airbyteMessage.getRecord().getData()); + counter++; + + // validateSchema(recordSchemaValidator, streamToAllFields, unexpectedFields, validationErrors, + // airbyteMessage); + // airbyteMessage.getRecord().setStream(airbyteMessage.getRecord().getStream() + "SUFFIX"); + // ((ObjectNode) airbyteMessage.getRecord().getData()).retain("id", "user_id", "product_id", + // "added_to_cart_at", "purchased_at", "name", + // "email", + // "title", "gender", "height", "language", "blood_type", "created_at", "occupation", "updated_at", + // "nationality"); + // destination.accept(airbyteMessage); + } + + } + + if (counter > 0 && counter % 1_000_000 == 0) { + log.info("current throughput: {} total MB {}", (totalBytes / 1_000_000.0) / ((System.currentTimeMillis() - start) / 1000.0), + totalBytes / 1_000_000.0); + } + } + log.info("Test ended successfully"); + final var end = System.currentTimeMillis(); + final var totalMB = totalBytes / 1_000_000.0; + final var totalTimeSecs = (end - start) / 1000.0; + final var rps = counter / totalTimeSecs; + log.info("total secs: {}. total MB read: {}, rps: {}, throughput: {}", totalTimeSecs, totalMB, rps, totalMB / totalTimeSecs); + source.close(); + } + + private static void populateStreamToAllFields(final ConfiguredAirbyteCatalog catalog, + final Map> streamToAllFields) { + final Iterator var2 = catalog.getStreams().iterator(); + + while (var2.hasNext()) { + final ConfiguredAirbyteStream s = (ConfiguredAirbyteStream) var2.next(); + final Set fields = new HashSet(); + final JsonNode propertiesNode = s.getStream().getJsonSchema().findPath("properties"); + if (!propertiesNode.isObject()) { + throw new RuntimeException("No properties node in stream schema"); + } + + propertiesNode.fieldNames().forEachRemaining((fieldName) -> { + fields.add(fieldName); + }); + streamToAllFields.put(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(s), fields); + } + + } + + private static void validateSchema(final RecordSchemaValidator recordSchemaValidator, + final Map> streamToAllFields, + final Map> unexpectedFields, + final ConcurrentHashMap, Integer>> validationErrors, + final AirbyteMessage message) { + if (message.getRecord() != null) { + final AirbyteRecordMessage record = message.getRecord(); + final AirbyteStreamNameNamespacePair messageStream = AirbyteStreamNameNamespacePair.fromRecordMessage(record); + final boolean streamHasLessThenTenErrs = + validationErrors.get(messageStream) == null || (Integer) ((ImmutablePair) validationErrors.get(messageStream)).getRight() < 10; + if (streamHasLessThenTenErrs) { + recordSchemaValidator.validateSchema(record, messageStream, validationErrors); + final Set unexpectedFieldNames = (Set) unexpectedFields.getOrDefault(messageStream, new HashSet()); + populateUnexpectedFieldNames(record, (Set) streamToAllFields.get(messageStream), unexpectedFieldNames); + unexpectedFields.put(messageStream, unexpectedFieldNames); + } + + } + } + + private static void populateUnexpectedFieldNames(final AirbyteRecordMessage record, + final Set fieldsInCatalog, + final Set unexpectedFieldNames) { + final JsonNode data = record.getData(); + if (data.isObject()) { + final Iterator fieldNamesInRecord = data.fieldNames(); + + while (fieldNamesInRecord.hasNext()) { + final String fieldName = (String) fieldNamesInRecord.next(); + if (!fieldsInCatalog.contains(fieldName)) { + unexpectedFieldNames.add(fieldName); + } + } + } + + } + + private static V0 convertProtocolObject(final V1 v1, final Class klass) { + return Jsons.object(Jsons.jsonNode(v1), klass); + } + + // Uncomment to add destination + /* + * void readFromDst() { if (this.destination != null) { log.info("Start read from destination"); + * while (!this.destination.isFinished()) { final Optional messageOptional = + * this.destination.attemptRead(); + * + * if (messageOptional.isPresent()) { log.info("msg"); final AirbyteMessage message = + * (AirbyteMessage)messageOptional.get(); if (message.getType() == Type.STATE) { message.getState(); + * } } } } log.info("Done read from destination"); } + */ +} diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/10m_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/10m_catalog.json new file mode 100644 index 0000000000000..07b22645a2a94 --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/10m_catalog.json @@ -0,0 +1,161 @@ +{ + "streams": [ + { + "stream": { + "name": "purchases", + "namespace": "10m_users", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number", + "airbyte_type": "integer" + }, + "user_id": { + "type": "number", + "airbyte_type": "integer" + }, + "product_id": { + "type": "number", + "airbyte_type": "integer" + }, + "returned_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "purchased_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "added_to_cart_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["id"]], + "cursor_field": ["id"], + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "users", + "namespace": "10m_users", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number", + "airbyte_type": "integer" + }, + "age": { + "type": "number", + "airbyte_type": "integer" + }, + "name": { + "type": "string" + }, + "email": { + "type": "string" + }, + "title": { + "type": "string" + }, + "gender": { + "type": "string" + }, + "height": { + "type": "number" + }, + "weight": { + "type": "number", + "airbyte_type": "integer" + }, + "language": { + "type": "string" + }, + "telephone": { + "type": "string" + }, + "blood_type": { + "type": "string" + }, + "created_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "occupation": { + "type": "string" + }, + "updated_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "nationality": { + "type": "string" + }, + "academic_degree": { + "type": "string" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["id"]], + "cursor_field": ["updated_at"], + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "products", + "namespace": "10m_users", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number", + "airbyte_type": "integer" + }, + "make": { + "type": "string" + }, + "year": { + "type": "string" + }, + "model": { + "type": "string" + }, + "price": { + "type": "number" + }, + "created_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["id"]], + "cursor_field": ["created_at"], + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/1m_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/1m_catalog.json new file mode 100644 index 0000000000000..5e46b7b5d5cbd --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/1m_catalog.json @@ -0,0 +1,161 @@ +{ + "streams": [ + { + "stream": { + "name": "purchases", + "namespace": "1m_users", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number", + "airbyte_type": "integer" + }, + "user_id": { + "type": "number", + "airbyte_type": "integer" + }, + "product_id": { + "type": "number", + "airbyte_type": "integer" + }, + "returned_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "purchased_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "added_to_cart_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["id"]], + "cursor_field": ["id"], + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "users", + "namespace": "1m_users", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number", + "airbyte_type": "integer" + }, + "age": { + "type": "number", + "airbyte_type": "integer" + }, + "name": { + "type": "string" + }, + "email": { + "type": "string" + }, + "title": { + "type": "string" + }, + "gender": { + "type": "string" + }, + "height": { + "type": "number" + }, + "weight": { + "type": "number", + "airbyte_type": "integer" + }, + "language": { + "type": "string" + }, + "telephone": { + "type": "string" + }, + "blood_type": { + "type": "string" + }, + "created_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "occupation": { + "type": "string" + }, + "updated_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "nationality": { + "type": "string" + }, + "academic_degree": { + "type": "string" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["id"]], + "cursor_field": ["updated_at"], + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "products", + "namespace": "1m_users", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number", + "airbyte_type": "integer" + }, + "make": { + "type": "string" + }, + "year": { + "type": "string" + }, + "model": { + "type": "string" + }, + "price": { + "type": "number" + }, + "created_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["id"]], + "cursor_field": ["created_at"], + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/20m_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/20m_catalog.json new file mode 100644 index 0000000000000..93302af014f6f --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/20m_catalog.json @@ -0,0 +1,161 @@ +{ + "streams": [ + { + "stream": { + "name": "purchases", + "namespace": "20m_users", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number", + "airbyte_type": "integer" + }, + "user_id": { + "type": "number", + "airbyte_type": "integer" + }, + "product_id": { + "type": "number", + "airbyte_type": "integer" + }, + "returned_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "purchased_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "added_to_cart_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["id"]], + "cursor_field": ["id"], + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "users", + "namespace": "20m_users", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number", + "airbyte_type": "integer" + }, + "age": { + "type": "number", + "airbyte_type": "integer" + }, + "name": { + "type": "string" + }, + "email": { + "type": "string" + }, + "title": { + "type": "string" + }, + "gender": { + "type": "string" + }, + "height": { + "type": "number" + }, + "weight": { + "type": "number", + "airbyte_type": "integer" + }, + "language": { + "type": "string" + }, + "telephone": { + "type": "string" + }, + "blood_type": { + "type": "string" + }, + "created_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "occupation": { + "type": "string" + }, + "updated_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "nationality": { + "type": "string" + }, + "academic_degree": { + "type": "string" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["id"]], + "cursor_field": ["updated_at"], + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "products", + "namespace": "20m_users", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number", + "airbyte_type": "integer" + }, + "make": { + "type": "string" + }, + "year": { + "type": "string" + }, + "model": { + "type": "string" + }, + "price": { + "type": "number" + }, + "created_at": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["id"]], + "cursor_field": ["created_at"], + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/bottleneck_stream1_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/bottleneck_stream1_catalog.json new file mode 100644 index 0000000000000..7c2a48f1f4b70 --- /dev/null +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/bottleneck_stream1_catalog.json @@ -0,0 +1,37 @@ +{ + "streams": [ + { + "stream": { + "name": "stream1", + "namespace": "postgres", + "json_schema": { + "type": "object", + "properties": { + "field1": { + "type": "string" + }, + "field2": { + "type": "string" + }, + "field3": { + "type": "string" + }, + "field4": { + "type": "string" + }, + "field5": { + "type": "string" + } + } + }, + "default_cursor_field": [], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_primary_key": [["field1"]] + }, + "sync_mode": "full_refresh", + "primary_key": [["field1"]], + "cursor_field": ["field1"], + "destination_sync_mode": "append" + } + ] +} diff --git a/settings.gradle b/settings.gradle index 0fa20cf01aaf8..1c6d6c8b16aab 100644 --- a/settings.gradle +++ b/settings.gradle @@ -131,6 +131,8 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" include ':airbyte-integrations:connectors:destination-gcs' include ':tools:code-generator' + + include 'airbyte-integrations:connectors-performance:source-harness' } if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD") == "OCTAVIA_CLI") { diff --git a/tools/bin/admin-service-account.yaml b/tools/bin/admin-service-account.yaml new file mode 100644 index 0000000000000..10c8c941c5691 --- /dev/null +++ b/tools/bin/admin-service-account.yaml @@ -0,0 +1,20 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: airbyte-admin-role +rules: + - apiGroups: ["*"] + resources: ["jobs", "pods", "pods/log", "pods/exec", "pods/attach"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] # over-permission for now +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: default-admin-binding +roleRef: + apiGroup: "" + kind: Role + name: airbyte-admin-role +subjects: + - kind: ServiceAccount + name: default diff --git a/tools/bin/source-harness-kind-cluster-config.yaml b/tools/bin/source-harness-kind-cluster-config.yaml new file mode 100644 index 0000000000000..382b2582798c7 --- /dev/null +++ b/tools/bin/source-harness-kind-cluster-config.yaml @@ -0,0 +1,7 @@ +apiVersion: kind.x-k8s.io/v1alpha4 +kind: Cluster +nodes: + - role: control-plane + extraMounts: + - hostPath: /actions-runner/_work/airbyte/airbyte/airbyte-integrations/connectors/source-harness/secrets + containerPath: /secrets diff --git a/tools/bin/source-harness-process.yaml b/tools/bin/source-harness-process.yaml new file mode 100644 index 0000000000000..3ff69be8b1f9d --- /dev/null +++ b/tools/bin/source-harness-process.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Pod +metadata: + generateName: run-source-harness + labels: + app: source-harness +spec: + containers: + - name: main + image: airbyte/source-harness:dev + args: ["$CONNECTOR_IMAGE_NAME", "$DATASET"] + volumeMounts: + - name: secrets-volume + mountPath: /airbyte/secrets + resources: + limits: + cpu: "2.5" + memory: "2Gi" + requests: + cpu: "2.5" + memory: "2Gi" + volumes: + - name: secrets-volume + hostPath: + path: /secrets + type: Directory + imagePullSecrets: + - name: regcred + restartPolicy: Never