Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement exact string searching using FM index and LZ index #57

Merged
merged 24 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
backup*/*
benchar/cbenchar/build/*
.vscode
*/__pycache__
*/*/__pycache__

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove it from the PR - it's IDE-dependent, shouldn't be in the commit

prolik123 marked this conversation as resolved.
Show resolved Hide resolved
Binary file not shown.
prolik123 marked this conversation as resolved.
Show resolved Hide resolved
Binary file not shown.
118 changes: 118 additions & 0 deletions string_indexing/fm_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@

class FMIndex:
Copy link
Owner

@krzysztof-turowski krzysztof-turowski Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'd avoid creating classes in favor of procedural (i.e. bunch of functions) approach, please check e.g. suffix_array.py for details. Of course you can build a structure which will be used in methods like contains - see suffix_array.contains


# all of strings beginns with # (idk why?)
# i sppose that patterns do not starts with #

def __init__ (self, SA, BWT, text, n):
self.L = BWT
self.F = '#$' + ''.join(text[SA[i]] for i in range(1, n + 1))
self.n = n
self.SA = SA
self.sampleSize = 8 # const for sampling

#prepare char mapping for F
self.mapperOfChar = { self.F[2] : 0}
self.begginings = [2]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beginnings

last = self.F[2]
lenOfBeginings = 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove and replace with len(beginnings), it's still $O(1)$

for i in range(3, n+2):
if self.F[i] != last:
last = self.F[i]
self.begginings.append(i)
self.mapperOfChar[last] = lenOfBeginings
lenOfBeginings += 1

self.lenOfAlphabet = len(self.mapperOfChar)

#prepare closest samplings
currentSample = 0
self.closestSample = [0]
for i in range(1, n+2):
if abs(currentSample-i) > abs(currentSample + self.sampleSize-i) and (i + self.sampleSize < self.n):
currentSample += self.sampleSize
self.closestSample.append(currentSample)

#Generate values for occ for given samples O(|A|*n)
self.occInSampleForChar = { self.L[i]: [0] for i in range(1, n+2)}
for c in self.mapperOfChar:
currValue = 0
nextSample = self.sampleSize
for i in range(1, n+2):
if self.L[i] == c:
currValue += 1
if i == nextSample:
self.occInSampleForChar[c].append(currValue)
nextSample = nextSample + self.sampleSize

# should be private
def getRangeOfOccurence(self, p, size):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS. Please check spelling of "occurrences", "beginnings" etc., because you have several variants of both ;)

if size > self.n:
return [-1, -1]

currChar = p[size-1]
if currChar not in self.mapperOfChar:
return [-1, -1]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if p[-1] not in self.mapperOfChar:
  return -1, -1


mapIdx = self.mapperOfChar[currChar]
l = self.begginings[mapIdx]
r = self.n + 1
if mapIdx != self.lenOfAlphabet - 1:
r = self.begginings[mapIdx + 1] - 1

for i in range(size-2, -1, -1):
currChar = p[i]
if currChar not in self.mapperOfChar:
return [-1, -1]
occurencesBefore = self._getOcc(currChar, l - 1)
occurencesAfter = self._getOcc(currChar, r)
if occurencesBefore == occurencesAfter:
return [-1, -1]
mapIdx = self.mapperOfChar[currChar]
l = self.begginings[mapIdx] + occurencesBefore
r = self.begginings[mapIdx] + occurencesAfter - 1
if r < l:
return [-1, -1]
return [l, r]

# O(|p|)
def count(self, p, size):
ran = self.getRangeOfOccurence(p, size)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unpack on return e.g.
low, high = self.getRangeOfOccurence(p, size)

if ran[0] == -1:
return 0
return max(ran[1] - ran[0] + 1, 0)
Copy link
Owner

@krzysztof-turowski krzysztof-turowski Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return max(ran[1] - ran[0] + 1, 0) if ran[0] > -1 else 0
or even better
return max(high - low + 1, 0) if low > -1 else 0 with tuple unpacking above



#Should be private
def _getOcc(self, c, i):
closestSample = self.closestSample[i]
toAdd = 0
if closestSample < i:
for j in range(closestSample + 1, i + 1):
if self.L[j] == c:
toAdd += 1
elif closestSample > i:
for j in range(i+1, closestSample + 1):
if self.L[j] == c:
toAdd -= 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if closest_sample < i:
  to_add = sum(1 for c_prim in self.L[closest_sample[i] + 1:i + 1] if c_prim == c)
else:
  to_add = sum(1 for c_prim in self.L[i + 1:closest_sample[i] + 1] if c_prim == c)
return self.occInSampleForChar[c][closest_sample // self.sample_size] + to_add


return self.occInSampleForChar[c][(closestSample)//self.sampleSize] + toAdd

#O(|p|)
def query(self, p, l):
return self.count(p, l) > 0

# O(|p| + k) where k is the number or occurances of p in text
def get_all_occurrance(self, p, l):
arr = self.getRangeOfOccurence(p, l)
if arr[0] == -1:
return []
return [self.SA[i-1] for i in range(arr[0], arr[1] + 1)]

# O(|p|)
def get_any_occurrance(self, p, l):
arr = self.getRangeOfOccurence(p, l)
if arr[0] == -1:
return -1
return self.SA[arr[0]-1]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace with a function "contains" like in suffix_tree and suffix_array packages, which returns values sequentially using yield

69 changes: 69 additions & 0 deletions test/test_fm_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import itertools
import os
import unittest

from compression import burrows_wheeler
from string_indexing import suffix_array
from string_indexing import fm_index

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from string_indexing import suffix_array, fm_index

from generator import rand

class TestFMIndex(unittest.TestCase):
run_large = unittest.skipUnless(
os.environ.get('LARGE', False), 'Skip test in small runs')

def get_all_occurences_of_pattern_naive(self, text, n, pattern, l):
Copy link
Owner

@krzysztof-turowski krzysztof-turowski Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can move retrieving all occurences to `test_exact_string_matching.py' (see implementations of suffix tree and suffix array there)

Is there any other operation that you would like to test aside from get_all_occurrance and get_any_occurrance (i.e. contains after renaming)? If not, then maybe this file is rendundant - or if you want to test query, please restict this file only to doing so.

result = []
for i in range(1, n-l + 2):
occurs = True
for j in range(0, l):
if text[i+j] != pattern[j]:
occurs = False
break
if occurs:
result.append(i)
return result


def check_fm_api_for_pattern(self, FMIndex, all_occurences_of_pattern, pattern, l):
cnt = FMIndex.count(pattern, l)
occurance = FMIndex.get_all_occurrance(pattern, l)
any_occurance = FMIndex.get_any_occurrance(pattern, l)
exists = FMIndex.query(pattern, l)
self.assertEqual(cnt, len(all_occurences_of_pattern))
self.assertEqual(sorted(occurance), sorted(all_occurences_of_pattern))
self.assertTrue((any_occurance in all_occurences_of_pattern) or (any_occurance == -1 and len(all_occurences_of_pattern) == 0))
self.assertTrue(exists == (len(all_occurences_of_pattern) > 0))


def check_patterns_for_text_naive(self, text, n, patterns):
SA = suffix_array.naive(text, n)
BWT = burrows_wheeler.transform_from_suffix_array(SA, text, n)
FMIndex = fm_index.FMIndex(SA, BWT, text, n)
for pattern in patterns:
l = len(pattern)
pattern_occurances = self.get_all_occurences_of_pattern_naive(text, n, pattern, l)
self.check_fm_api_for_pattern(FMIndex, pattern_occurances, pattern, l)


api_naive_test_cases = [
['#ababa', ['a', 'a', 'aba', 'aa', 'ba', 'ab', 'bb', 'c', 'abc', 'ababa', 'ababaa']],
['#aaababcaaabba', ['a', 'b', 'c', 'aab', 'aabb', 'aaababcaaabba']],
['#aaabaababaababaababaaababaaabaabaaa', ['a', 'ab', 'aab', 'aaab', 'aaaab', 'aba', 'abaa',
'abaaa', 'aaba', 'aabaa', 'aabaaa', 'aaaba', 'aaabaa']]
]

def test_fm_api_naive(self):
for test_case in self.api_naive_test_cases:
n = len(test_case[0]) - 1
self.check_patterns_for_text_naive(test_case[0], n, test_case[1])


@run_large
def test_large_random(self):
n = 10000
text = '#' + rand.random_word(n, ['a', 'b'])
q = 1000
patterns = [rand.random_word(100, ['a', 'b']) for i in range(q)]
self.check_patterns_for_text_naive(text, n, patterns)