Skip to content

Commit

Permalink
Merge pull request #2002 from keboola/feat-add-adjustable-payload-k6
Browse files Browse the repository at this point in the history
feat: Add adjustable payload size in k6.
  • Loading branch information
Matovidlo authored Sep 11, 2024
2 parents d52c437 + 18b568e commit b452c43
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 246 deletions.
464 changes: 232 additions & 232 deletions .github/workflows/test-k8s-service-stream.yml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ services:
- K6_RAMPING_UP_DURATION
- K6_RAMPING_STABLE_DURATION
- K6_RAMPING_DOWN_DURATION
- STREAM_PAYLOAD_SIZE

sandboxesMock:
image: mockserver/mockserver:latest
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/service/stream/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ source:
writeBufferSize: 4KB
# Max size of the HTTP request body. Validation rules: required
maxRequestBodySize: 1MB
# Whether the requests should be streamed into server.
streamRequestBody: false
sink:
table:
keboola:
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/service/stream/source/httpsource/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Config struct {
ReadBufferSize datasize.ByteSize `configKey:"readBufferSize" configUsage:"Read buffer size, all HTTP headers must fit in" validate:"required"`
WriteBufferSize datasize.ByteSize `configKey:"writeBufferSize" configUsage:"Write buffer size." validate:"required"`
MaxRequestBodySize datasize.ByteSize `configKey:"maxRequestBodySize" configUsage:"Max size of the HTTP request body." validate:"required"`
StreamRequestBody bool `configKey:"streamRequestBody" configUsage:"Whether the requests should be streamed into server."`
}

func NewConfig() Config {
Expand All @@ -28,5 +29,6 @@ func NewConfig() Config {
ReadBufferSize: 16 * datasize.KB,
WriteBufferSize: 4 * datasize.KB,
MaxRequestBodySize: 1 * datasize.MB,
StreamRequestBody: false,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func Start(ctx context.Context, d dependencies, cfg Config) error {
WriteBufferSize: int(cfg.WriteBufferSize.Bytes()),
ReduceMemoryUsage: false, // aggressively reduces memory usage at the cost of higher CPU usage
MaxRequestBodySize: int(cfg.MaxRequestBodySize.Bytes()),
StreamRequestBody: cfg.StreamRequestBody,
TCPKeepalive: true,
NoDefaultServerHeader: true,
DisablePreParseMultipartForm: true,
Expand Down
2 changes: 2 additions & 0 deletions provisioning/stream/kubernetes/templates/benchmark/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ spec:
value: "4m"
- name: K6_RAMPING_DOWN_DURATION
value: "2m"
- name: STREAM_PAYLOAD_SIZE
value: "1"
tolerations:
- key: app
operator: Exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ data:
writeBufferSize: 4KB
# Max size of the HTTP request body. Validation rules: required
maxRequestBodySize: 1MB
# Whether the requests should be streamed into server. Validation rules: required
streamRequestBody: false
storage:
# Mounted volumes path, each volume is in "{type}/{label}" subdir. Validation rules: required
volumesPath: "/stream/volumes"
Expand Down
26 changes: 13 additions & 13 deletions scripts/k6/stream-api/api.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { sleep } from 'k6';
import http from "k6/http";
import { SharedArray } from 'k6/data';
import {randomString} from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';
import { randomString } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';

export class Api {
constructor(apiHost, apiToken) {
if (!apiHost) throw new Error("Please set the `K6_API_HOST` env var.");
if (!apiToken) throw new Error("Please set the `K6_API_TOKEN` env var.");
this.apiHost = apiHost
this.headers = {"X-StorageApi-Token": apiToken}
this.headers = { "X-StorageApi-Token": apiToken }
}

createHTTPSource() {
Expand All @@ -20,7 +20,7 @@ export class Api {
}

// Create source
let res = http.post(`${this.apiHost}/v1/branches/default/sources`, JSON.stringify(body), {headers: this.headers, responseType:"text"});
let res = http.post(`${this.apiHost}/v1/branches/default/sources`, JSON.stringify(body), { headers: this.headers, responseType: "text" });
if (res.status !== 202) {
console.error(res);
throw new Error("failed to create source task");
Expand All @@ -43,7 +43,7 @@ export class Api {
// awaitTask(res.json().url)

// Get source
res = http.get(`${this.apiHost}/v1/branches/default/sources/${sourceId}`, {headers: this.headers, responseType:"text"});
res = http.get(`${this.apiHost}/v1/branches/default/sources/${sourceId}`, { headers: this.headers, responseType: "text" });
if (res.status !== 200) {
throw new Error("failed to get source");
}
Expand All @@ -54,7 +54,7 @@ export class Api {
throw new Error("missing source URL");
}

return {id: sourceId, url: sourceUrl}
return { id: sourceId, url: sourceUrl }
}

createKeboolaTableSink(sourceId, mapping) {
Expand All @@ -70,15 +70,15 @@ export class Api {
}

// Create sink
let res = http.post(`${this.apiHost}/v1/branches/default/sources/${sourceId}/sinks`, JSON.stringify(body), {headers: this.headers, responseType:"text"});
let res = http.post(`${this.apiHost}/v1/branches/default/sources/${sourceId}/sinks`, JSON.stringify(body), { headers: this.headers, responseType: "text" });
if (res.status !== 202) {
console.error(res);
throw new Error("failed to create sink task");
}
this.awaitTask(res.json().url)

// Get sink
res = http.get(`${this.apiHost}/v1/branches/default/sources/${sourceId}/sinks/${body.sinkId}`, {headers: this.headers, responseType:"text"});
res = http.get(`${this.apiHost}/v1/branches/default/sources/${sourceId}/sinks/${body.sinkId}`, { headers: this.headers, responseType: "text" });
if (res.status !== 200) {
throw new Error("failed to get sink");
}
Expand All @@ -89,13 +89,13 @@ export class Api {
throw new Error("missing sink ID");
}

return {id: sinkId}
return { id: sinkId }
}

deleteSource(sourceId) {
console.info("waiting 100s before source deletion")
sleep(100)
const res = http.del(`${this.apiHost}/v1/branches/default/sources/${sourceId}`, null, {headers: this.headers, responseType:"text"});
const res = http.del(`${this.apiHost}/v1/branches/default/sources/${sourceId}`, null, { headers: this.headers, responseType: "text" });
if (res.status !== 202) {
console.error(res);
throw new Error("failed to delete source");
Expand All @@ -105,7 +105,7 @@ export class Api {
awaitTask(taskUrl) {
const taskTimeout = 60
for (let retries = taskTimeout; retries > 0; retries--) {
let res = http.get(taskUrl, {headers: this.headers, responseType:"text"})
let res = http.get(taskUrl, { headers: this.headers, responseType: "text" })
if (res.status !== 200) {
console.error(res);
throw new Error("failed to get task");
Expand All @@ -121,11 +121,11 @@ export class Api {
}
}

export function randomPayloads() {
return new SharedArray('payloads', function () {
export function randomPayloads(payloadLength) {
return new SharedArray('payloads', function() {
let payloads = []
for (let i = 0; i < 100; i++) {
payloads.push(JSON.stringify({a: "b", c: {d: "e", f: {g: randomString(10)}}}))
payloads.push(JSON.stringify({ a: "b", c: { d: "e", f: { g: randomString(10), h: "a".repeat(payloadLength) } } }))
}
return payloads
})
Expand Down
6 changes: 5 additions & 1 deletion scripts/k6/stream-api/main.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { check } from 'k6';
import http from "k6/http";
import { TextEncoder } from "https://raw.githubusercontent.com/inexorabletash/text-encoding/master/index.js"
import { Counter } from 'k6/metrics';
import { randomItem } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';
import { URL } from 'https://jslib.k6.io/url/1.0.0/index.js';
Expand Down Expand Up @@ -31,6 +32,9 @@ const RAMPING_DOWN_DURATION = __ENV.K6_RAMPING_DOWN_DURATION || "2m";
const SYNC_MODE = __ENV.STREAM_SYNC_MODE || "disk"; // cache / disk
const SYNC_WAIT = __ENV.STREAM_SYNC_WAIT || "1"; // 1 = enabled, 0 = disabled

// Payload configuration
const PAYLOAD_SIZE = __ENV.STREAM_PAYLOAD_SIZE || 1 // 1 = 54B, 1024 = ~1KB (1077B), 1048500 = ~1MB (1048576)

// Available scenarios, use K6_SCENARIO env to select one
const scenarios = {
constant: {
Expand Down Expand Up @@ -105,7 +109,7 @@ const api = new Api(API_HOST, API_TOKEN)

const errors_metrics = new Counter("failed_imports");

const payloads = randomPayloads()
const payloads = randomPayloads(PAYLOAD_SIZE)

export function setup() {
// Create source
Expand Down

0 comments on commit b452c43

Please sign in to comment.