Skip to content

Commit

Permalink
Add URL search filter on CountURLs and ListURLs with optional case se…
Browse files Browse the repository at this point in the history
…nsitivity

Signed-off-by: Laurent Klock <[email protected]>
  • Loading branch information
klockla committed Nov 20, 2024
1 parent fcf741f commit 4b9da25
Show file tree
Hide file tree
Showing 8 changed files with 1,420 additions and 271 deletions.
342 changes: 179 additions & 163 deletions API/src/main/java/crawlercommons/urlfrontier/URLFrontierGrpc.java

Large diffs are not rendered by default.

1,099 changes: 1,006 additions & 93 deletions API/src/main/java/crawlercommons/urlfrontier/Urlfrontier.java

Large diffs are not rendered by default.

12 changes: 10 additions & 2 deletions API/urlfrontier.proto
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,21 @@ message ListUrlParams {
string crawlID = 4;
// only for the current local instance
bool local = 5;
// Search filter on url (can be empty, default is empty)
optional string filter = 6;
// Case sensitivity for search filter (default is false)
optional bool caseSensitive = 7;
}

message CountUrlParams {
/** ID for the queue **/
string key = 1;
// crawl ID
string crawlID = 2;
// only for the current local instance
bool local = 3;
// Search filter on url (can be empty, default is empty)
optional string filter = 3;
// Case sensitivity for search filter (default is false)
optional bool caseSensitive = 4;
// only for the current local instance (default is false)
optional bool local = 5;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ public class CountURLs implements Runnable {
"restricts the scope to this frontier instance instead of aggregating over the cluster")
private Boolean local;

@Option(
names = {"-f", "--filter"},
defaultValue = "",
paramLabel = "STRING",
description = "String filter applied to URLs")
private String filter;

@Option(
names = {"-s", "--case-sensitive"},
defaultValue = "false",
paramLabel = "BOOLEAN",
description = "Search filter is case sensitive")
private Boolean caseSensitive;

@Override
public void run() {
ManagedChannel channel =
Expand All @@ -55,6 +69,11 @@ public void run() {
builder.setCrawlID(crawl);
builder.setLocal(local);

builder.setFilter(filter);
builder.setCaseSensitive(caseSensitive);

builder.setFilter(filter);

Long s = blockingFrontier.countURLs(builder.build());
System.out.println(s.getValue() + " URLs in frontier");

Expand Down
8 changes: 7 additions & 1 deletion service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<logback.version>1.5.8</logback.version>
<mockito.version>5.13.0</mockito.version>
<commons.io.version>2.16.1</commons.io.version>
<commons.lang.version>3.17.0</commons.lang.version>
</properties>

<build>
Expand Down Expand Up @@ -118,7 +119,12 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.LoggerFactory;

public abstract class AbstractFrontierService
Expand Down Expand Up @@ -905,6 +906,9 @@ public void listURLs(
long start = request.getStart();
String key = request.getKey();

String filter = request.getFilter();
boolean caseSensitive = request.getCaseSensitive();

final String normalisedCrawlID = CrawlID.normaliseCrawlID(request.getCrawlID());

// 100 by default
Expand All @@ -919,7 +923,7 @@ public void listURLs(
normalisedCrawlID,
key);

long totalCount = -1;
long totalCount = 0;
long sentCount = 0;

synchronized (getQueues()) {
Expand All @@ -942,14 +946,23 @@ public void listURLs(
CloseableIterator<URLItem> urliter = urlIterator(e);

while (urliter.hasNext()) {
totalCount++;
if (totalCount < start) {
urliter.next();
} else if (sentCount < maxURLs) {
responseObserver.onNext(urliter.next());
sentCount++;
} else {
break;
URLItem cur = urliter.next();

if (StringUtils.isEmpty(filter)
|| (caseSensitive && cur.getKnown().getInfo().getUrl().contains(filter))
|| (!caseSensitive
&& StringUtils.containsIgnoreCase(
cur.getKnown().getInfo().getUrl(), filter))) {

if (totalCount < start) {
totalCount++;
}
if (sentCount < maxURLs) {
sentCount++;
responseObserver.onNext(cur);
} else {
break;
}
}
}

Expand Down Expand Up @@ -1001,10 +1014,17 @@ public void countURLs(
StreamObserver<crawlercommons.urlfrontier.Urlfrontier.Long> responseObserver) {

String key = request.getKey();
String filter = request.getFilter();
boolean caseSensitive = request.getCaseSensitive();

final String normalisedCrawlID = CrawlID.normaliseCrawlID(request.getCrawlID());

LOG.info("Received request to count URLs [crawlId {}, key {}]", normalisedCrawlID, key);
LOG.info(
"Received request to count URLs [crawlId={}, key={}, filter={}, caseSensitive={}]",
normalisedCrawlID,
key,
filter,
caseSensitive);

long totalCount = 0;

Expand All @@ -1028,8 +1048,15 @@ public void countURLs(
CloseableIterator<URLItem> urliter = urlIterator(e);

while (urliter.hasNext()) {
urliter.next();
totalCount++;
URLItem cur = urliter.next();

if (StringUtils.isBlank(filter)
|| (caseSensitive && cur.getKnown().getInfo().getUrl().contains(filter))
|| (!caseSensitive
&& StringUtils.containsIgnoreCase(
cur.getKnown().getInfo().getUrl(), filter))) {
totalCount++;
}
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,96 @@ void testMemoryIteratorSingleQueue() {
assertEquals(3, nbUrls);
}

@Test
@Order(9)
void testListAllURLsCaseInsensitive() {

ListUrlParams params =
ListUrlParams.newBuilder()
.setCrawlID("crawl_id")
.setStart(0)
.setSize(100)
.setFilter("COMPLETED")
.setCaseSensitive(false)
.build();

final AtomicInteger fetched = new AtomicInteger(0);
final AtomicInteger count = new AtomicInteger(0);

StreamObserver<URLItem> statusObserver =
new StreamObserver<>() {

@Override
public void onNext(URLItem value) {
// receives confirmation that the value has been received
logURLItem(value);

if (value.hasKnown()) {
fetched.incrementAndGet();
}
count.incrementAndGet();
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
}

@Override
public void onCompleted() {
LOG.info("completed testListAllURLsCaseInsensitive");
}
};

memoryFrontierService.listURLs(params, statusObserver);
assertEquals(1, count.get());
}

@Test
@Order(10)
void testListAllURLsCaseSensitive() {

ListUrlParams params =
ListUrlParams.newBuilder()
.setCrawlID("crawl_id")
.setStart(0)
.setSize(100)
.setFilter("COMPLETED")
.setCaseSensitive(true)
.build();

final AtomicInteger fetched = new AtomicInteger(0);
final AtomicInteger count = new AtomicInteger(0);

StreamObserver<URLItem> statusObserver =
new StreamObserver<>() {

@Override
public void onNext(URLItem value) {
// receives confirmation that the value has been received
logURLItem(value);

if (value.hasKnown()) {
fetched.incrementAndGet();
}
count.incrementAndGet();
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
}

@Override
public void onCompleted() {
LOG.info("completed testListAllURLsCaseSensitive");
}
};

memoryFrontierService.listURLs(params, statusObserver);
assertEquals(0, count.get());
}

@Test
@Order(99)
void testNoRescheduleCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,76 @@ public void onCompleted() {
rocksDBService.countURLs(builder.build(), responseObserver);
}

@Test
@Order(10)
void testCountURLsCaseSensitive() {

Urlfrontier.CountUrlParams.Builder builder = Urlfrontier.CountUrlParams.newBuilder();

builder.setKey("queue_mysite");
builder.setCrawlID("crawl_id");
builder.setFilter("COMPLETED");
builder.setCaseSensitive(true);

StreamObserver<Urlfrontier.Long> responseObserver =
new StreamObserver<>() {

@Override
public void onNext(Urlfrontier.Long value) {
// receives confirmation that the value has been received
assertEquals(0, value.getValue());
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
fail();
}

@Override
public void onCompleted() {
LOG.info("completed testNoRescheduleCompleted 1/2");
}
};

rocksDBService.countURLs(builder.build(), responseObserver);
}

@Test
@Order(9)
void testCountURsLCaseInsensitive() {

Urlfrontier.CountUrlParams.Builder builder = Urlfrontier.CountUrlParams.newBuilder();

builder.setKey("queue_mysite");
builder.setCrawlID("crawl_id");
builder.setFilter("COMPLETED");
builder.setCaseSensitive(false);

StreamObserver<Urlfrontier.Long> responseObserver =
new StreamObserver<>() {

@Override
public void onNext(Urlfrontier.Long value) {
// receives confirmation that the value has been received
assertEquals(1, value.getValue());
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
fail();
}

@Override
public void onCompleted() {
LOG.info("completed testNoRescheduleCompleted 1/2");
}
};

rocksDBService.countURLs(builder.build(), responseObserver);
}

@Test
@Order(99)
void testNoRescheduleCompleted() {
Expand Down

0 comments on commit 4b9da25

Please sign in to comment.