Skip to content

Commit

Permalink
Prevent GCP Pub/Sub input blockage by increasing default value
Browse files Browse the repository at this point in the history
  • Loading branch information
kcreddy committed Apr 16, 2024
1 parent 3f1d32b commit 0e5e3b3
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 148 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]
- [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917]
- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38917[38917]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
#subscription.num_goroutines: 1

# Maximum number of unprocessed messages to allow at any time.
#subscription.max_outstanding_messages: 1000
# This must be at least queue.mem.flush.min_events to prevent input blockage.
#subscription.max_outstanding_messages: 1600

# Path to a JSON file containing the credentials and key used to subscribe.
credentials_file: ${path.config}/my-pubsub-subscriber-credentials.json
Expand Down
5 changes: 4 additions & 1 deletion x-pack/filebeat/docs/inputs/input-gcp-pubsub.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ set `subscription.max_outstanding_messages`. Default is 1.

The maximum number of unprocessed messages (unacknowledged but not yet expired).
If the value is negative, then there will be no limit on the number of
unprocessed messages. Default is 1000.
unprocessed messages. Due to the presence of internal queue, the input gets
blocked until `queue.mem.flush.min_events` or `queue.mem.flush.timeout`
is reached. To prevent this blockage, this option must be at least
`queue.mem.flush.min_events`. Default is 1600.

[float]
==== `credentials_file`
Expand Down
287 changes: 144 additions & 143 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1500,147 +1500,147 @@ filebeat.modules:
#var.password:

#------------------------------ Salesforce Module ------------------------------
# Configuration file for Salesforce module in Filebeat

# Common Configurations:
# - enabled: Set to true to enable ingestion of Salesforce module fileset
# - initial_interval: Initial interval for log collection. This setting determines the time period for which the logs will be initially collected when the ingestion process starts, i.e. 1d/h/m/s
# - api_version: API version for Salesforce, version should be greater than 46.0

# Authentication Configurations:
# User-Password Authentication:
# - enabled: Set to true to enable user-password authentication
# - client.id: Client ID for user-password authentication
# - client.secret: Client secret for user-password authentication
# - token_url: Token URL for user-password authentication
# - username: Username for user-password authentication
# - password: Password for user-password authentication

# JWT Authentication:
# - enabled: Set to true to enable JWT authentication
# - client.id: Client ID for JWT authentication
# - client.username: Username for JWT authentication
# - client.key_path: Path to client key for JWT authentication
# - url: Audience URL for JWT authentication

# Event Monitoring:
# - real_time: Set to true to enable real-time logging using object type data collection
# - real_time_interval: Interval for real-time logging

# Event Log File:
# - event_log_file: Set to true to enable event log file type data collection
# - elf_interval: Interval for event log file
# - log_file_interval: Interval type for log file collection, either Hourly or Daily

- module: salesforce

apex:
enabled: false
var.initial_interval: 1d
var.api_version: 56

var.authentication:
user_password_flow:
enabled: true
client.id: "<YourClientIdHere>"
client.secret: "<YourClientSecretHere>"
token_url: "<YourTokenURLHere>"
username: "<YourUsernameHere>"
password: "<YourPasswordHere>"
jwt_bearer_flow:
enabled: false
client.id: "<YourClientIdHere>"
client.username: "<YourClientUsernameHere>"
client.key_path: "<YourClientKeyPathHere>"
url: "https://login.salesforce.com"

var.url: "https://instance_id.my.salesforce.com"

var.event_log_file: true
var.elf_interval: 1h
var.log_file_interval: "Hourly"

login:
enabled: false
var.initial_interval: 1d
var.api_version: 56

var.authentication:
user_password_flow:
enabled: true
client.id: "<YourClientIdHere>"
client.secret: "client-secret"
token_url: "<YourTokenURLHere>"
username: "<YourUsernameHere>"
password: "<YourPasswordHere>"
jwt_bearer_flow:
enabled: false
client.id: "<YourClientIdHere>"
client.username: "<YourClientUsernameHere>"
client.key_path: "<YourClientKeyPathHere>"
url: "https://login.salesforce.com"

var.url: "https://instance_id.my.salesforce.com"

var.event_log_file: true
var.elf_interval: 1h
var.log_file_interval: "Hourly"

var.real_time: true
var.real_time_interval: 5m

logout:
enabled: false
var.initial_interval: 1d
var.api_version: 56

var.authentication:
user_password_flow:
enabled: true
client.id: "<YourClientIdHere>"
client.secret: "client-secret"
token_url: "<YourTokenURLHere>"
username: "<YourUsernameHere>"
password: "<YourPasswordHere>"
jwt_bearer_flow:
enabled: false
client.id: "<YourClientIdHere>"
client.username: "<YourClientUsernameHere>"
client.key_path: "<YourClientKeyPathHere>"
url: "https://login.salesforce.com"

var.url: "https://instance_id.my.salesforce.com"

var.event_log_file: true
var.elf_interval: 1h
var.log_file_interval: "Hourly"

var.real_time: true
var.real_time_interval: 5m

setupaudittrail:
enabled: false
var.initial_interval: 1d
var.api_version: 56

var.authentication:
user_password_flow:
enabled: true
client.id: "<YourClientIdHere>"
client.secret: "client-secret"
token_url: "<YourTokenURLHere>"
username: "<YourUsernameHere>"
password: "<YourPasswordHere>"
jwt_bearer_flow:
enabled: false
client.id: "<YourClientIdHere>"
client.username: "<YourClientUsernameHere>"
client.key_path: "<YourClientKeyPathHere>"
url: "https://login.salesforce.com"

var.url: "https://instance_id.my.salesforce.com"

var.real_time: true
# Configuration file for Salesforce module in Filebeat

# Common Configurations:
# - enabled: Set to true to enable ingestion of Salesforce module fileset
# - initial_interval: Initial interval for log collection. This setting determines the time period for which the logs will be initially collected when the ingestion process starts, i.e. 1d/h/m/s
# - api_version: API version for Salesforce, version should be greater than 46.0

# Authentication Configurations:
# User-Password Authentication:
# - enabled: Set to true to enable user-password authentication
# - client.id: Client ID for user-password authentication
# - client.secret: Client secret for user-password authentication
# - token_url: Token URL for user-password authentication
# - username: Username for user-password authentication
# - password: Password for user-password authentication

# JWT Authentication:
# - enabled: Set to true to enable JWT authentication
# - client.id: Client ID for JWT authentication
# - client.username: Username for JWT authentication
# - client.key_path: Path to client key for JWT authentication
# - url: Audience URL for JWT authentication

# Event Monitoring:
# - real_time: Set to true to enable real-time logging using object type data collection
# - real_time_interval: Interval for real-time logging

# Event Log File:
# - event_log_file: Set to true to enable event log file type data collection
# - elf_interval: Interval for event log file
# - log_file_interval: Interval type for log file collection, either Hourly or Daily

- module: salesforce

apex:
enabled: false
var.initial_interval: 1d
var.api_version: 56

var.authentication:
user_password_flow:
enabled: true
client.id: "<YourClientIdHere>"
client.secret: "<YourClientSecretHere>"
token_url: "<YourTokenURLHere>"
username: "<YourUsernameHere>"
password: "<YourPasswordHere>"
jwt_bearer_flow:
enabled: false
client.id: "<YourClientIdHere>"
client.username: "<YourClientUsernameHere>"
client.key_path: "<YourClientKeyPathHere>"
url: "https://login.salesforce.com"

var.url: "https://instance_id.my.salesforce.com"

var.event_log_file: true
var.elf_interval: 1h
var.log_file_interval: "Hourly"

login:
enabled: false
var.initial_interval: 1d
var.api_version: 56

var.authentication:
user_password_flow:
enabled: true
client.id: "<YourClientIdHere>"
client.secret: "client-secret"
token_url: "<YourTokenURLHere>"
username: "<YourUsernameHere>"
password: "<YourPasswordHere>"
jwt_bearer_flow:
enabled: false
client.id: "<YourClientIdHere>"
client.username: "<YourClientUsernameHere>"
client.key_path: "<YourClientKeyPathHere>"
url: "https://login.salesforce.com"

var.url: "https://instance_id.my.salesforce.com"

var.event_log_file: true
var.elf_interval: 1h
var.log_file_interval: "Hourly"

var.real_time: true
var.real_time_interval: 5m

logout:
enabled: false
var.initial_interval: 1d
var.api_version: 56

var.authentication:
user_password_flow:
enabled: true
client.id: "<YourClientIdHere>"
client.secret: "client-secret"
token_url: "<YourTokenURLHere>"
username: "<YourUsernameHere>"
password: "<YourPasswordHere>"
jwt_bearer_flow:
enabled: false
client.id: "<YourClientIdHere>"
client.username: "<YourClientUsernameHere>"
client.key_path: "<YourClientKeyPathHere>"
url: "https://login.salesforce.com"

var.url: "https://instance_id.my.salesforce.com"

var.event_log_file: true
var.elf_interval: 1h
var.log_file_interval: "Hourly"

var.real_time: true
var.real_time_interval: 5m

setupaudittrail:
enabled: false
var.initial_interval: 1d
var.api_version: 56

var.authentication:
user_password_flow:
enabled: true
client.id: "<YourClientIdHere>"
client.secret: "client-secret"
token_url: "<YourTokenURLHere>"
username: "<YourUsernameHere>"
password: "<YourPasswordHere>"
jwt_bearer_flow:
enabled: false
client.id: "<YourClientIdHere>"
client.username: "<YourClientUsernameHere>"
client.key_path: "<YourClientKeyPathHere>"
url: "https://login.salesforce.com"

var.url: "https://instance_id.my.salesforce.com"

var.real_time: true
var.real_time_interval: 5m
#----------------------------- Google Santa Module -----------------------------
- module: santa
Expand Down Expand Up @@ -2963,8 +2963,9 @@ filebeat.inputs:
# Number of goroutines to create to read from the subscription.
#subscription.num_goroutines: 1

# Maximum number of unprocessed messages to allow at any time.
#subscription.max_outstanding_messages: 1000
# Maximum number of unprocessed messages to allow at any time.
# This must be at least queue.mem.flush.min_events to prevent input blockage.
#subscription.max_outstanding_messages: 1600

# Path to a JSON file containing the credentials and key used to subscribe.
credentials_file: ${path.config}/my-pubsub-subscriber-credentials.json
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/gcppubsub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func defaultConfig() config {
Type: "gcp-pubsub",
}
c.Subscription.NumGoroutines = 1
c.Subscription.MaxOutstandingMessages = 1000
// The input gets blocked until flush.min_events or flush.timeout is reached.
// Hence max_outstanding_message has to be at least flush.min_events to avoid this blockage.
c.Subscription.MaxOutstandingMessages = 1600
c.Subscription.Create = true
return c
}
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/gcppubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package gcppubsub
import (
"context"
"errors"
"io/ioutil"
"io"
"net/http"
"os"
"strconv"
Expand Down Expand Up @@ -70,7 +70,7 @@ func testSetup(t *testing.T) (*pubsub.Client, context.CancelFunc) {
}
defer resp.Body.Close()

_, err = ioutil.ReadAll(resp.Body)
_, err = io.ReadAll(resp.Body)
if err != nil {
t.Fatal("failed to read response", err)
}
Expand Down

0 comments on commit 0e5e3b3

Please sign in to comment.