Skip to content
This repository was archived by the owner on Sep 26, 2024. It is now read-only.

Commit

Permalink
KAFKA-6157; Fix repeated words words in JavaDoc and comments.
Browse files Browse the repository at this point in the history
Author: Adem Efe Gencer <[email protected]>

Reviewers: Jiangjie Qin <[email protected]>

Closes apache#4170 from efeg/bug/typoFix
  • Loading branch information
efeg authored and becketqin committed Nov 6, 2017
1 parent 520b313 commit 86062e9
Show file tree
Hide file tree
Showing 40 changed files with 55 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private Deque<NetworkClient.InFlightRequest> requestQueue(String node) {
}

/**
* Get the oldest request (the one that that will be completed next) for the given node
* Get the oldest request (the one that will be completed next) for the given node
*/
public NetworkClient.InFlightRequest completeNext(String node) {
return requestQueue(node).pollLast();
Expand Down Expand Up @@ -167,5 +167,5 @@ public List<String> getNodesWithTimedOutRequests(long now, int requestTimeoutMs)
}
return nodeIds;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
if (!isInternalRequest) {
// If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust
// that that internal code has done this validation. Validation
// that internal code has done this validation. Validation
// will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in
// READY state.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@
* <p>
* Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically.
* In order for this to work, consumers reading from these partitions should be configured to only read committed data.
* This can be achieved by by setting the {@code isolation.level=read_committed} in the consumer's configuration.
* This can be achieved by setting the {@code isolation.level=read_committed} in the consumer's configuration.
*
* <p>
* In <code>read_committed</code> mode, the consumer will read only those transactional messages which have been
Expand Down Expand Up @@ -704,9 +704,9 @@ private KafkaConsumer(ConsumerConfig config,
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);

int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);

NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
this.metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ private void ensureValidRecordSize(int size) {
* </p>
* <p>
* Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will
* flush all buffered records before performing the commit. This ensures that all the the {@link #send(ProducerRecord)}
* flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)}
* calls made since the previous {@link #beginTransaction()} are completed before the commit.
* </p>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ private int writeLegacyCompressedWrapperHeader() {
}

/**
* Append a record and return its checksum for message format v0 and v1, or null for for v2 and above.
* Append a record and return its checksum for message format v0 and v1, or null for v2 and above.
*/
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void authenticate() throws IOException {
if (authenticateVersion != null)
saslAuthenticateVersion((short) Math.min(authenticateVersion.maxVersion, ApiKeys.SASL_AUTHENTICATE.latestVersion()));
setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
// Fall through to send send handshake request with the latest supported version
// Fall through to send handshake request with the latest supported version
}
case SEND_HANDSHAKE_REQUEST:
SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(saslHandshakeVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
log.info("Rebalance started");

// Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance,
// Note that since we don't reset the assignment, we don't revoke leadership here. During a rebalance,
// it is still important to have a leader that can write configs, offsets, etc.

if (rebalanceResolved) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void run() {

synchronized (KafkaBasedLog.this) {
// Only invoke exactly the number of callbacks we found before triggering the read to log end
// since it is possible for another write + readToEnd to sneak in in the meantime
// since it is possible for another write + readToEnd to sneak in the meantime
for (int i = 0; i < numCallbacks; i++) {
Callback<Void> cb = readLogEndOffsetCallbacks.poll();
cb.onCompletion(null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public void testPutTaskConfigs() throws Exception {
assertNull(configState.taskConfig(TASK_IDS.get(0)));
assertNull(configState.taskConfig(TASK_IDS.get(1)));

// Writing task task configs should block until all the writes have been performed and the root record update
// Writing task configs should block until all the writes have been performed and the root record update
// has completed
List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
configStorage.putTaskConfigs("connector1", taskConfigs);
Expand Down Expand Up @@ -335,7 +335,7 @@ public void testPutTaskConfigsZeroTasks() throws Exception {
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());

// Writing task task configs should block until all the writes have been performed and the root record update
// Writing task configs should block until all the writes have been performed and the root record update
// has completed
List<Map<String, String>> taskConfigs = Collections.emptyList();
configStorage.putTaskConfigs("connector1", taskConfigs);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ class Partition(val topic: String,
}

/**
* Update the the follower's state in the leader based on the last fetch request. See
* Update the follower's state in the leader based on the last fetch request. See
* [[kafka.cluster.Replica#updateLogReadResult]] for details.
*
* @return true if the leader's log start offset or high watermark have been updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.")
s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")

val currentOffsetOpt = offsets.get(topicPartition)
if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {
Expand All @@ -405,6 +405,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState

def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
topicPartitions.flatMap { topicPartition =>

pendingOffsetCommits.remove(topicPartition)
pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
pendingOffsets.remove(topicPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ class TransactionStateManager(brokerId: Int,

val append: Boolean = metadata.inLock {
if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
// the coordinator epoch has changed, reply to client immediately with with NOT_COORDINATOR
// the coordinator epoch has changed, reply to client immediately with NOT_COORDINATOR
responseCallback(Errors.NOT_COORDINATOR)
false
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DelayedDeleteRecords(delayMs: Long,
/**
* The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following:
*
* 1) There was an error while checking if all replicas have caught up to to the deleteRecordsOffset: set an error in response
* 1) There was an error while checking if all replicas have caught up to the deleteRecordsOffset: set an error in response
* 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response
*
*/
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/JmxTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object JmxTool extends Logging {
.describedAs("format")
.ofType(classOf[String])
val jmxServiceUrlOpt =
parser.accepts("jmx-url", "The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
.withRequiredArg
.describedAs("service-url")
.ofType(classOf[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
private val topic1 = "topic-1"

/**
* With replication, producer should able able to find new leader after it detects broker failure
* With replication, producer should able to find new leader after it detects broker failure
*/
@Ignore // To be re-enabled once we can make it less flaky (KAFKA-2837)
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topicToDelete)

TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist),
"Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
"Consumer group info on deleted topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist),
"Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class LogManagerTest {
log.appendAsLeader(set, leaderEpoch = 0)
}
time.sleep(logManager.InitialTaskDelayMs)
assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
assertTrue("Time based flush should have been triggered", lastFlush != log.lastFlushTime)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite {
TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed")
TestUtils.waitUntilTrue(() => openOrClosingChannel.isDefined, "Channel removed without processing staged receives")

// Create new connection with same id when when `channel1` is in Selector.closingChannels
// Create new connection with same id when `channel1` is in Selector.closingChannels
// Check that new connection is closed and openOrClosingChannel still contains `channel1`
connectAndWaitForConnectionRegister()
TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close channel")
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,7 @@ object TestUtils extends Logging {

private def asBytes(string: String) = string.getBytes(StandardCharsets.UTF_8)

// Verifies that the record was intended to be committed by checking the the headers for an expected transaction status
// Verifies that the record was intended to be committed by checking the headers for an expected transaction status
// If true, this will return the value as a string. It is expected that the record in question should have been created
// by the `producerRecordWithExpectedTransactionStatus` method.
def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = {
Expand Down
2 changes: 1 addition & 1 deletion docs/connect.html
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ <h5><a id="connect_taskexample" href="#connect_taskexample">Task Example - Sourc
}
</pre>

<p>These are slightly simplified versions, but show that that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.</p>
<p>These are slightly simplified versions, but show that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.</p>

<p>Next, we implement the main functionality of the task, the <code>poll()</code> method which gets events from the input system and returns a <code>List&lt;SourceRecord&gt;</code>:</p>

Expand Down
2 changes: 1 addition & 1 deletion docs/documentation/streams/architecture.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
limitations under the License.
-->

<!-- should always link the the latest release's documentation -->
<!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/architecture.html" -->
2 changes: 1 addition & 1 deletion docs/documentation/streams/core-concepts.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
limitations under the License.
-->

<!-- should always link the the latest release's documentation -->
<!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/core-concepts.html" -->
2 changes: 1 addition & 1 deletion docs/documentation/streams/developer-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
limitations under the License.
-->

<!-- should always link the the latest release's documentation -->
<!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/developer-guide.html" -->
2 changes: 1 addition & 1 deletion docs/documentation/streams/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
limitations under the License.
-->

<!-- should always link the the latest release's documentation -->
<!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/index.html" -->
2 changes: 1 addition & 1 deletion docs/documentation/streams/quickstart.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
limitations under the License.
-->

<!-- should always link the the latest release's documentation -->
<!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/quickstart.html" -->
2 changes: 1 addition & 1 deletion docs/documentation/streams/tutorial.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
limitations under the License.
-->

<!-- should always link the the latest release's documentation -->
<!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/tutorial.html" -->
2 changes: 1 addition & 1 deletion docs/documentation/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
limitations under the License.
-->

<!-- should always link the the latest release's documentation -->
<!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/upgrade-guide.html" -->
2 changes: 1 addition & 1 deletion docs/implementation.html
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ <h5><a id="recordheader" href="#recordheader">5.4.2.1 Record Header</a></h5>
headerValueLength: varint
Value: byte[]
</pre></p>
<p>We use the the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
<p>We use the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
is also encoded as a varint.</p>

<h3><a id="log" href="#log">5.4 Log</a></h3>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class StreamsBuilder {
final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;

private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);

/**
* Create a {@link KStream} from the specified topics.
* The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
Expand Down Expand Up @@ -237,7 +237,7 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
*
Expand All @@ -258,7 +258,7 @@ public synchronized <K, V> KTable<K, V> table(final String topic) {
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
*
Expand Down Expand Up @@ -312,7 +312,7 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
Expand Down Expand Up @@ -343,7 +343,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries.
* store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
Expand Down
Loading

0 comments on commit 86062e9

Please sign in to comment.