Skip to content

Commit

Permalink
Added CloseableIterator interface (#115)
Browse files Browse the repository at this point in the history
* Added CloseableIterator interface
* Use parseUnsignedLong(CharSequence s,  int beginIndex,  int endIndex,  int radix) everywhere

Signed-off-by: Laurent Klock <[email protected]>
  • Loading branch information
klockla authored Nov 27, 2024
1 parent dd80796 commit a20e241
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ public void listURLs(
continue;
}

Iterator<URLItem> urliter = urlIterator(e);
CloseableIterator<URLItem> urliter = urlIterator(e);

while (urliter.hasNext()) {
totalCount++;
Expand All @@ -951,17 +951,24 @@ public void listURLs(
break;
}
}

try {
urliter.close();
} catch (IOException e1) {
LOG.warn("Error closing URLIterator", e1);
}
}
}

responseObserver.onCompleted();
}

protected Iterator<URLItem> urlIterator(Entry<QueueWithinCrawl, QueueInterface> qentry) {
protected CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry) {
return urlIterator(qentry, 0L, Long.MAX_VALUE);
}

protected abstract Iterator<URLItem> urlIterator(
protected abstract CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long max);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-FileCopyrightText: 2020 Crawler-commons
// SPDX-License-Identifier: Apache-2.0

package crawlercommons.urlfrontier.service;

import java.io.Closeable;
import java.util.Iterator;

/**
* Adds close to the Iterator Needed when we need to close resources used by the Iterator (e.g. The
* RocksDBIterator in case of RocksDb implementation).
*
* @param <T>
*/
public interface CloseableIterator<T> extends Closeable, Iterator<T> {}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.service.AbstractFrontierService;
import crawlercommons.urlfrontier.service.CloseableIterator;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import crawlercommons.urlfrontier.service.SynchronizedStreamObserver;
Expand Down Expand Up @@ -224,12 +225,12 @@ public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> respo
}
}

public Iterator<URLItem> urlIterator(
public CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long maxURLs) {
return new MemoryURLItemIterator(qentry, start, maxURLs);
}

class MemoryURLItemIterator implements Iterator<URLItem> {
class MemoryURLItemIterator implements CloseableIterator<URLItem> {

private final org.slf4j.Logger LOG = LoggerFactory.getLogger(MemoryURLItemIterator.class);

Expand Down Expand Up @@ -298,5 +299,10 @@ public URLItem next() {
}
return null; // shouldn't happen
}

@Override
public void close() {
// No need to close anything here
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.service.AbstractFrontierService;
import crawlercommons.urlfrontier.service.CloseableIterator;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import crawlercommons.urlfrontier.service.SynchronizedStreamObserver;
Expand All @@ -26,7 +27,6 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -300,7 +300,7 @@ protected int sendURLsForQueue(
}

// too early for it?
long scheduled = Long.parseLong(currentKey.substring(pos2 + 1, pos3));
long scheduled = Long.parseLong(currentKey, pos2 + 1, pos3, 10);
if (scheduled > now) {
// they are sorted by date no need to go further
return alreadySent;
Expand Down Expand Up @@ -823,7 +823,7 @@ public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> respo
final int pos2 = currentKey.indexOf('_', pos + 1);
final int pos3 = currentKey.indexOf('_', pos2 + 1);

fromEpoch = Long.parseLong(currentKey.substring(pos2 + 1, pos3));
fromEpoch = Long.parseLong(currentKey, pos2 + 1, pos3, 10);

try {
info =
Expand Down Expand Up @@ -856,12 +856,12 @@ public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> respo
}
}

public Iterator<URLItem> urlIterator(
public CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long maxURLs) {
return new RocksDBURLItemIterator(qentry, start, maxURLs);
}

class RocksDBURLItemIterator implements Iterator<URLItem> {
class RocksDBURLItemIterator implements CloseableIterator<URLItem> {

private final org.slf4j.Logger LOG = LoggerFactory.getLogger(RocksDBURLItemIterator.class);

Expand Down Expand Up @@ -960,7 +960,7 @@ public URLItem next() {
final int pos2 = schedulingKey.indexOf('_', pos1 + 1);
final int pos3 = schedulingKey.indexOf('_', pos2 + 1);

fromEpoch = Long.parseLong(schedulingKey.substring(pos2 + 1, pos3));
fromEpoch = Long.parseLong(schedulingKey, pos2 + 1, pos3, 10);

try {
info = URLInfo.parseFrom(scheduled);
Expand Down Expand Up @@ -998,5 +998,10 @@ public URLItem next() {

return null; // Shouldn't happen
}

@Override
public void close() {
this.rocksIterator.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import crawlercommons.urlfrontier.Urlfrontier.URLInfo;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.service.CloseableIterator;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import crawlercommons.urlfrontier.service.SynchronizedStreamObserver;
import crawlercommons.urlfrontier.service.cluster.DistributedFrontierService;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -101,7 +101,7 @@ public void listURLs(ListUrlParams request, StreamObserver<URLItem> responseObse

@Override
// TODO Implementation of urlIterator for ShardedRocksDB
protected Iterator<URLItem> urlIterator(
protected CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long max) {
throw new UnsupportedOperationException(
"Feature not implemented for ShardedRocksDB backend");
Expand Down

0 comments on commit a20e241

Please sign in to comment.