forked from bojone/SimCSE
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
115 lines (98 loc) · 3.06 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#! -*- coding: utf-8 -*-
# 数据读取函数
from tqdm import tqdm
import numpy as np
import scipy.stats
from bert4keras.backend import keras, K
from bert4keras.models import build_transformer_model
from bert4keras.tokenizers import Tokenizer
from bert4keras.snippets import open
from bert4keras.snippets import sequence_padding
from keras.models import Model
def load_data(filename):
"""加载数据(带标签)
单条格式:(文本1, 文本2, 标签)
"""
D = []
with open(filename, encoding='utf-8') as f:
for l in f:
l = l.strip().split('\t')
if len(l) == 3:
D.append((l[0], l[1], float(l[2])))
return D
def get_tokenizer(dict_path, pre_tokenize=None):
"""建立分词器
"""
return Tokenizer(dict_path, do_lower_case=True, pre_tokenize=pre_tokenize)
def get_encoder(
config_path,
checkpoint_path,
model='bert',
pooling='first-last-avg',
dropout_rate=0.1
):
"""建立编码器
"""
assert pooling in ['first-last-avg', 'last-avg', 'cls', 'pooler']
if pooling == 'pooler':
bert = build_transformer_model(
config_path,
checkpoint_path,
model=model,
with_pool='linear',
dropout_rate=dropout_rate
)
else:
bert = build_transformer_model(
config_path,
checkpoint_path,
model=model,
dropout_rate=dropout_rate
)
outputs, count = [], 0
while True:
try:
output = bert.get_layer(
'Transformer-%d-FeedForward-Norm' % count
).output
outputs.append(output)
count += 1
except:
break
if pooling == 'first-last-avg':
outputs = [
keras.layers.GlobalAveragePooling1D()(outputs[0]),
keras.layers.GlobalAveragePooling1D()(outputs[-1])
]
output = keras.layers.Average()(outputs)
elif pooling == 'last-avg':
output = keras.layers.GlobalAveragePooling1D()(outputs[-1])
elif pooling == 'cls':
output = keras.layers.Lambda(lambda x: x[:, 0])(outputs[-1])
elif pooling == 'pooler':
output = bert.output
# 最后的编码器
encoder = Model(bert.inputs, output)
return encoder
def convert_to_ids(data, tokenizer, maxlen=64):
"""转换文本数据为id形式
"""
a_token_ids, b_token_ids, labels = [], [], []
for d in tqdm(data):
token_ids = tokenizer.encode(d[0], maxlen=maxlen)[0]
a_token_ids.append(token_ids)
token_ids = tokenizer.encode(d[1], maxlen=maxlen)[0]
b_token_ids.append(token_ids)
labels.append(d[2])
a_token_ids = sequence_padding(a_token_ids)
b_token_ids = sequence_padding(b_token_ids)
return a_token_ids, b_token_ids, labels
def l2_normalize(vecs):
"""标准化
"""
norms = (vecs**2).sum(axis=1, keepdims=True)**0.5
return vecs / np.clip(norms, 1e-8, np.inf)
def compute_corrcoef(x, y):
"""Spearman相关系数
"""
return scipy.stats.spearmanr(x, y).correlation