Skip to content

Commit

Permalink
Merge pull request #168 from maikelmclauflin/fix-erring-configuration
Browse files Browse the repository at this point in the history
omitted sasl when none required by protocol
  • Loading branch information
rob3000 authored Dec 23, 2020
2 parents b5bf604 + e23b735 commit 58483db
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions src/lib/kafkajs/JSProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { EventEmitter } from "events";
import { v4 as uuidv4} from "uuid";
import { murmur } from "murmurhash";
import { murmur2Partitioner } from "murmur2-partitioner";
import { Kafka, SASLMechanism, Admin, Producer, RecordMetadata, CompressionTypes } from "kafkajs";
import { Kafka, KafkaConfig, SASLMechanism, Admin, Producer, RecordMetadata, CompressionTypes } from "kafkajs";
import { Metadata, ProducerAnalytics, ProducerHealth, Check, ProducerRunResult, defaultAnalyticsInterval } from "../shared";
import { MessageReturn, JSKafkaProducerConfig, ProducerStats, AnalyticsConfig, KafkaLogger } from "../interfaces";
import fs from "fs";
Expand Down Expand Up @@ -51,7 +51,7 @@ export class JSProducer extends EventEmitter {
private _murmurHashVersion: string = DEFAULT_MURMURHASH_VERSION;
private _murmur;
private _errors = 0;

defaultPartitionCount = 1;

/**
Expand All @@ -75,7 +75,7 @@ export class JSProducer extends EventEmitter {
config.options = {};
}

const {
const {
"metadata.broker.list": brokerList,
"client.id": clientId,
"security.protocol": securityProtocol,
Expand All @@ -94,25 +94,28 @@ export class JSProducer extends EventEmitter {
throw new Error("You are missing a broker or group configs");
}

const conf = {
brokers,
clientId,
} as KafkaConfig
if (securityProtocol) {
this.kafkaClient = new Kafka({
brokers,
clientId,
ssl: {
if (securityProtocol.includes('sasl')) {
conf.sasl = {
mechanism: mechanism as SASLMechanism,
username: username as string,
password: password as string,
}
}
if (securityProtocol.includes('ssl')) {
conf.ssl = {
ca: [fs.readFileSync(sslCALocation as string, "utf-8")],
cert: fs.readFileSync(sslCertLocation as string, "utf-8"),
key: fs.readFileSync(sslKeyLocation as string, "utf-8"),
passphrase: sslKeyPassword,
},
sasl: {
mechanism: mechanism as SASLMechanism,
username: username as string,
password: password as string,
},
});
} else {
this.kafkaClient = new Kafka({ brokers, clientId });
}
}
}
this.kafkaClient = new Kafka(conf);

this.config = config;
this._health = new ProducerHealth(this, this.config.health);
Expand Down Expand Up @@ -334,7 +337,7 @@ export class JSProducer extends EventEmitter {
this._totalSentMessages++;
const timestamp = producedAt.toString();
const acks = this.config && this.config.tconf && this.config.tconf["request.required.acks"] || 1;
const compression = (this.config.noptions)
const compression = (this.config.noptions)
? this.config.noptions["compression.codec"]
: CompressionTypes.None;

Expand All @@ -344,7 +347,7 @@ export class JSProducer extends EventEmitter {
acks,
compression,
messages: [{
key,
key,
value: convertedMessage,
partition,
timestamp
Expand Down

0 comments on commit 58483db

Please sign in to comment.