-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathengine.py
308 lines (235 loc) · 8.49 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"06/07/2013"
"""
- This class is used to create, maintain, and update the index directory used to store the indeces for the documents to search against.
- For this version, only document names (paths) can be queried, but not the text within (will be added later to see difference in speed)
- The methods provided are:
-> initialize the Engine with directory: if exists, it will be used as it is/ it will be created
->
"""
"""
important note about the ix.writer():
opening the writer locks the index for writing, so opening another writer while one is already open will raise an exception (whoosh.store.LockError)
=> use whoosh.writing.AsyncWriter if you need to work around it.
- Opening a writer on the index does not prevent readers from opening and viewing the index
"""
import os
import sys
import time
from datetime import datetime
import random
import timeit
import Image
from whoosh.fields import Schema, TEXT, STORED, KEYWORD, DATETIME, ID
from whoosh.index import create_in, open_dir, exists_in
from whoosh.qparser import QueryParser, MultifieldParser
from whoosh.query import Query, Term
from whoosh.analysis import NgramTokenizer, LowercaseFilter
from whoosh.writing import BufferedWriter, AsyncWriter
"""
This class will be used as the medium to interact with the search index directory.
Documents can be added to, removed from, and modified in the directory that stores all the document indeces (index_directory)
"""
class Engine:
ANALYZER = NgramTokenizer(minsize=2, maxsize=15) | LowercaseFilter()
# Search based on only pathname
FILENAME_SCHEMA = Schema(path = ID(stored=True),
mod_time = DATETIME(stored=True),
content = TEXT(stored=True, analyzer=ANALYZER)
)
IMAGE_EXTENSIONS = ['png', 'jpg', 'jpeg']
"""
initializes the engine with:
- index_directory: where the indeces will be stored
- fresh = False: set to True if engine should wipe out existing index_directory and start fresh
- schema: set to FULLTEXT_SCHEMA if a full text search is required. Note that engine will automatically wipe out existing
index_directory to account for change in schema
"""
def __init__(self, index_directory, fresh = False, schema = FILENAME_SCHEMA):
self.index_directory = index_directory
self.schema = schema
self.ix = None
self.error_log = '/home/labuser/Code/FileIndexer/error_log.txt'
if not os.path.exists(index_directory):
os.mkdir(index_directory)
self.ix = create_in(index_directory, self.schema)
else:
if fresh:
self.ix = create_in(index_directory, self.schema)
else:
self.ix = open_dir(index_directory)
self.buffered_writer = None
"""
delete all currently stored indeces, and index contents of 'directory_path'
"""
def re_index(self, directory_path):
#first clearing the existing index in the "indexDirectory" specified when Engine was initialized
t0 = time.time()
#writer = AsyncWriter(self.ix, writerargs={'procs':4, 'limitmb':128})
#writer = self.ix.writer(procs=4, limitmb=128)
writer = self.ix.writer(limitmb=512)
print "writer is loaded"
directoryExists = self.__add_directory(directory_path, writer)
print "directory exists: ", directoryExists
writer.commit()
print "commited writer"
t1 = time.time()
return (t1 - t0)
#goes to a directory and recursively indexes every DOCUMENT/FILE under it, but not directories under it
#returns a bool of whether the directory exists
def __add_directory(self, directory_path, writer):
if not os.path.isdir(directory_path):
return False
else:
self.__add_path(directory_path, writer, 0)
return True
# recursive function that indexes an object if it is a file, else it traverses into that directory and continues
def __add_path(self, path, writer, counter):
if os.path.isfile(path):
t0 = time.time()
self.add_document(path, writer)
t1 = time.time()
time_taken = t1 - t0
if random.choice(range(1000)) < 10:
print "time taken to index: ", [str(datetime.now()), path, time_taken]
else:
# This means that the path is to a directory, not a file
print "not adding this path, it's a directory: ", [str(datetime.now()), path]
ls = os.listdir(path)
for i in ls:
self.__add_path(os.path.join(path, i), writer, counter)
"""
indexes the filepath to the index_directory through the provided writer
"""
def add_document(self, filepath, writer):
if not os.path.isfile(filepath):
#print "not a file: ", filepath
pass
else:
mtime = os.path.getmtime(filepath)
try:
writer.add_document(path = unicode(filepath), mod_time = datetime.fromtimestamp(mtime), content = unicode(filepath))
except UnicodeDecodeError:
print "The following gets unicode error: ", filepath
f = open(self.error_log, 'w')
f.write('UnicodeDecodeError: ' + filepath)
f.close()
"""
removes the document with filepath as its ID.
return: number of documents deleted
"""
def remove_document(self, filepath, writer):
removed = writer.delete_by_term("path", unicode(filepath))
return removed
"""
removes documents matching query (as would be entered in search)
return: number deleted
"""
def remove_by_query(self, query, writer):
removed = writer.delete_by_query(query, searcher=None)
return removed
# used in incremental index to get all paths from the FS located at the dirname.
def __my_docs(self, path):
if os.path.isfile(path):
return [path]
elif 'DS_Store' in path:
return []
else:
ls = os.listdir(path)
out = []
for i in ls:
out.extend(self.__my_docs(os.path.join(path,i)))
return out
"""
incrementally indexes the difference of the directory specified by dirname and the index stored at and for the dirname
note: This is not the event based index, but will be used for testing purposes.
"""
def incremental_index(self, dirname):
files_added = 0
files_deleted = 0
ix = open_dir(self.index_directory)
indexed_paths = set()
to_index = set()
with self.ix.searcher() as searcher:
writer = ix.writer()
for fields in searcher.all_stored_fields():
indexed_path = fields['path']
indexed_paths.add(indexed_path)
if not os.path.exists(indexed_path):
# Thise path was deleted since the last index
writer.delete_by_term('path', indexed_path)
files_deleted += 1
else:
# Checking if file was modified since last index:
indexed_time = fields['mod_time']
if datetime.fromtimestamp(os.path.getmtime(indexed_path)) > indexed_time:
# file has been modified, delete and reindex
writer.delete_by_term('path', indexed_path)
files_deleted += 1
to_index.add(indexed_path)
# loop over all files in file system
for path in self.__my_docs(dirname):
if path in to_index or path not in indexed_paths:
self.add_document(path, writer)
files_added += 1
writer.commit()
return [files_added, files_deleted]
"""
takes in a query and returns the hits from the indeces.
return: results as list of strings, time for query
"""
def query(self, query):
t0 = time.time()
qp = QueryParser("content", schema=self.schema)
q = qp.parse(query)
results_as_strings = []
with self.ix.searcher() as searcher:
results = searcher.search(q, limit=None)
for hit in results:
results_as_strings.append(hit['path'])
t1 = time.time()
return [results_as_strings, (t1 - t0)]
"""
A fresh index of everything from directory to the index_directory
"""
def timed_index(self, directory):
def test():
self.re_index(directory)
time = min(timeit.Timer(test).repeat(7, 1))
return time
"""
returns an async writer for the index_directory
"""
def new_buffered_writer(self, period, limit, writer_limitmb):
self.buffered_writer = BufferedWriter(self.ix, period=period, limit=limit, writerargs={'limitmb' : writer_limitmb})
return self.buffered_writer
def async_writer(self):
return AsyncWriter(self.ix)
"""
# should this ever be used?
def writer(self):
return
"""
"""
takes in a list of paths, and outputs the paths that are of images
"""
def filter_images(results_as_strings):
image_list = []
for filepath in results_as_strings:
try:
im = Image.open(filepath)
chars = list(filepath)
for i in range(len(chars)):
if chars[i] == ' ':
chars[i] = '%'
image_list.append("".join(chars))
except:
pass
return image_list
def main():
index_directory = '/Users/karthikuppuluri/Desktop/index/'
directory = '/Users/karthikuppuluri/Desktop/RawData/'
search_engine = Engine(index_directory, fresh=True)
time = search_engine.re_index(directory)
print "time taken to index raw data: ", time
if __name__ == '__main__':
main()