-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsharedFunctions.py
180 lines (142 loc) · 4.8 KB
/
sharedFunctions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def incrementCount(parent, child, d):
"""
Increment the count of [parent][child] in dictionary d
"""
if parent in d:
if child in d[parent]:
d[parent][child] += 1
else:
d[parent][child] = 1
else:
d[parent] = {child: 1}
def estEmissions(file, k=1):
"""
Given training file, return emission parameters
@param k: Words appearing less than k times will be
replaced with #UNK#
@return Dict: {tag: {word: emission}}
"""
emissions = {}
yCounts = {}
with open(file, encoding="utf-8") as f:
for line in f:
temp = line.strip()
# ignore empty lines
if len(temp) == 0:
continue
else:
last_space_index = temp.rfind(" ")
x = temp[:last_space_index].lower()
y = temp[last_space_index + 1:]
# update count(y)
if y in yCounts:
yCounts[y] += 1
else:
yCounts[y] = 1
# update count(y->x)
incrementCount(y, x, emissions)
# convert counts to emissions
for y, xDict in emissions.items():
for x, xCount in xDict.items():
xDict[x] = xCount / float(yCounts[y] + k)
emissions[y]["#UNK#"] = k / float(yCounts[y] + k)
return emissions
def estTransitions(file):
"""
Given training file, return transition parameters
@return Dict: {y_prev: {y_curr: transition}}
"""
start = "_START"
stop = "_STOP"
transitions = {}
yCounts = {start: 0}
prev = start
with open(file, encoding="utf-8") as f:
for line in f:
temp = line.strip()
# sentence has ended
if len(temp) == 0:
incrementCount(prev, stop, transitions)
prev = start
# part of a sentence
else:
last_space_index = temp.rfind(" ")
curr = temp[last_space_index + 1:]
# update count(start) if new sentence
if prev == start:
yCounts[start] += 1
# update count(y)
if curr in yCounts:
yCounts[curr] += 1
else:
yCounts[curr] = 1
# update count(prev, curr)
incrementCount(prev, curr, transitions)
prev = curr
# add count(prev, stop) if no blank lines at EOF
if prev != start:
incrementCount(prev, stop, transitions)
prev = start
# convert counts to transitions
for prev, currDict in transitions.items():
for curr, currCount in currDict.items():
currDict[curr] = currCount / float(yCounts[prev])
return transitions
def estTransitions2(file):
"""
Given training file, return transition parameters
@return Dict: {(y_jm2,y_jm1): {y_j: transition}}
"""
start = "_START"
stop = "_STOP"
transitions = {}
yCounts = {(start, start): 0}
y_jm2 = start
y_jm1 = start
with open(file, encoding="utf-8") as f:
for line in f:
temp = line.strip()
# sentence has ended
if len(temp) == 0:
incrementCount((y_jm2, y_jm1), stop, transitions)
y_jm2 = start
y_jm1 = start
# part of a sentence
else:
last_space_index = temp.rfind(" ")
y_j = temp[last_space_index + 1:]
# update count(start) if new sentence
if (y_jm2, y_jm1) == (start, start):
yCounts[(y_jm2, y_jm1)] += 1
# update count(y)
if (y_jm1, y_j) in yCounts:
yCounts[(y_jm1, y_j)] += 1
else:
yCounts[(y_jm1, y_j)] = 1
# update count(prev, curr)
incrementCount((y_jm2, y_jm1), y_j, transitions)
y_jm2 = y_jm1
y_jm1 = y_j
# convert counts to transitions
# parents are (y_jm2, y_jm1) pairs, children are possible y_j's
for parents, children in transitions.items():
for currTag, currCount in children.items():
children[currTag] = currCount / float(yCounts[parents])
return transitions
def getDictionary(file):
"""
Given training file, return set of all words
@return Set: set of all words in file
"""
out = set()
with open(file, encoding="utf-8") as f:
for line in f:
temp = line.strip()
# ignore empty lines
if len(temp) == 0:
continue
else:
last_space_index = temp.rfind(" ")
word = temp[:last_space_index].lower()
out.add(word)
return out