-
Notifications
You must be signed in to change notification settings - Fork 14
/
fastNLP_module.py
423 lines (381 loc) · 23.5 KB
/
fastNLP_module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
from fastNLP.embeddings.embedding import TokenEmbedding
from fastNLP.core import Vocabulary
from fastNLP.io.file_utils import PRETRAIN_STATIC_FILES, _get_embedding_url, cached_path
import os
import warnings
from collections import defaultdict
from copy import deepcopy
import numpy as np
import torch
import torch.nn as nn
from fastNLP.core import logger
from fastNLP.modules.utils import _get_file_name_base_on_postfix
from utils import MyDropout
from fastNLP.embeddings.contextual_embedding import ContextualEmbedding
from fastNLP.embeddings.bert_embedding import _WordBertModel
from fastNLP.io.file_utils import PRETRAINED_BERT_MODEL_DIR
class StaticEmbedding(TokenEmbedding):
"""
StaticEmbedding组件. 给定预训练embedding的名称或路径,根据vocab从embedding中抽取相应的数据(只会将出现在vocab中的词抽取出来,
如果没有找到,则会随机初始化一个值(但如果该word是被标记为no_create_entry的话,则不会单独创建一个值,而是会被指向unk的index))。
当前支持自动下载的预训练vector有以下的几种(待补充);
Example::
>>> from fastNLP import Vocabulary
>>> from fastNLP.embeddings import StaticEmbedding
>>> vocab = Vocabulary().add_word_lst("The whether is good .".split())
>>> embed = StaticEmbedding(vocab, model_dir_or_name='en-glove-50d')
>>> vocab = Vocabulary().add_word_lst(["The", 'the', "THE"])
>>> embed = StaticEmbedding(vocab, model_dir_or_name="en-glove-50d", lower=True)
>>> # "the", "The", "THE"它们共用一个vector,且将使用"the"在预训练词表中寻找它们的初始化表示。
>>> vocab = Vocabulary().add_word_lst(["The", "the", "THE"])
>>> embed = StaticEmbedding(vocab, model_dir_or_name=None, embedding_dim=5, lower=True)
>>> words = torch.LongTensor([[vocab.to_index(word) for word in ["The", "the", "THE"]]])
>>> embed(words)
>>> tensor([[[ 0.5773, 0.7251, -0.3104, 0.0777, 0.4849],
[ 0.5773, 0.7251, -0.3104, 0.0777, 0.4849],
[ 0.5773, 0.7251, -0.3104, 0.0777, 0.4849]]],
grad_fn=<EmbeddingBackward>) # 每种word的输出是一致的。
"""
def __init__(self, vocab: Vocabulary, model_dir_or_name: str = 'en', embedding_dim=-1, requires_grad: bool = True,
init_method=None, lower=False, dropout=0, word_dropout=0, normalize=False, min_freq=1, **kwargs):
"""
:param vocab: Vocabulary. 若该项为None则会读取所有的embedding。
:param model_dir_or_name: 可以有两种方式调用预训练好的static embedding:第一种是传入embedding文件夹(文件夹下应该只有一个
以.txt作为后缀的文件)或文件路径;第二种是传入embedding的名称,第二种情况将自动查看缓存中是否存在该模型,没有的话将自动下载。
如果输入为None则使用embedding_dim的维度随机初始化一个embedding。
:param int embedding_dim: 随机初始化的embedding的维度,当该值为大于0的值时,将忽略model_dir_or_name。
:param bool requires_grad: 是否需要gradient. 默认为True
:param callable init_method: 如何初始化没有找到的值。可以使用torch.nn.init.*中各种方法, 传入的方法应该接受一个tensor,并
inplace地修改其值。
:param bool lower: 是否将vocab中的词语小写后再和预训练的词表进行匹配。如果你的词表中包含大写的词语,或者就是需要单独
为大写的词语开辟一个vector表示,则将lower设置为False。
:param float dropout: 以多大的概率对embedding的表示进行Dropout。0.1即随机将10%的值置为0。
:param float word_dropout: 以多大的概率将一个词替换为unk。这样既可以训练unk也是一定的regularize。
:param bool normalize: 是否对vector进行normalize,使得每个vector的norm为1。
:param int min_freq: Vocabulary词频数小于这个数量的word将被指向unk。
:param dict kwarngs: only_train_min_freq, 仅对train中的词语使用min_freq筛选; only_norm_found_vector是否仅对在预训练中找到的词语使用normalize。
"""
super(StaticEmbedding, self).__init__(vocab, word_dropout=word_dropout, dropout=dropout)
if embedding_dim > 0:
model_dir_or_name = None
# 得到cache_path
if model_dir_or_name is None:
assert embedding_dim >= 1, "The dimension of embedding should be larger than 1."
embedding_dim = int(embedding_dim)
model_path = None
elif model_dir_or_name.lower() in PRETRAIN_STATIC_FILES:
model_url = _get_embedding_url('static', model_dir_or_name.lower())
model_path = cached_path(model_url, name='embedding')
# 检查是否存在
elif os.path.isfile(os.path.abspath(os.path.expanduser(model_dir_or_name))):
model_path = os.path.abspath(os.path.expanduser(model_dir_or_name))
elif os.path.isdir(os.path.abspath(os.path.expanduser(model_dir_or_name))):
model_path = _get_file_name_base_on_postfix(os.path.abspath(os.path.expanduser(model_dir_or_name)), '.txt')
else:
raise ValueError(f"Cannot recognize {model_dir_or_name}.")
# 根据min_freq缩小vocab
truncate_vocab = (vocab.min_freq is None and min_freq > 1) or (vocab.min_freq and vocab.min_freq < min_freq)
if truncate_vocab:
truncated_vocab = deepcopy(vocab)
truncated_vocab.min_freq = min_freq
truncated_vocab.word2idx = None
if lower: # 如果有lower,将大小写的的freq需要同时考虑到
lowered_word_count = defaultdict(int)
for word, count in truncated_vocab.word_count.items():
lowered_word_count[word.lower()] += count
for word in truncated_vocab.word_count.keys():
word_count = truncated_vocab.word_count[word]
if lowered_word_count[word.lower()] >= min_freq and word_count < min_freq:
truncated_vocab.add_word_lst([word] * (min_freq - word_count),
no_create_entry=truncated_vocab._is_word_no_create_entry(word))
# 只限制在train里面的词语使用min_freq筛选
if kwargs.get('only_train_min_freq', False) and model_dir_or_name is not None:
for word in truncated_vocab.word_count.keys():
if truncated_vocab._is_word_no_create_entry(word) and truncated_vocab.word_count[word] < min_freq:
truncated_vocab.add_word_lst([word] * (min_freq - truncated_vocab.word_count[word]),
no_create_entry=True)
truncated_vocab.build_vocab()
truncated_words_to_words = torch.arange(len(vocab)).long()
for word, index in vocab:
truncated_words_to_words[index] = truncated_vocab.to_index(word)
logger.info(
f"{len(vocab) - len(truncated_vocab)} out of {len(vocab)} words have frequency less than {min_freq}.")
vocab = truncated_vocab
self.only_norm_found_vector = kwargs.get('only_norm_found_vector', False)
# 读取embedding
if lower:
lowered_vocab = Vocabulary(padding=vocab.padding, unknown=vocab.unknown)
for word, index in vocab:
if vocab._is_word_no_create_entry(word):
lowered_vocab.add_word(word.lower(), no_create_entry=True)
else:
lowered_vocab.add_word(word.lower()) # 先加入需要创建entry的
logger.info(f"All word in the vocab have been lowered. There are {len(vocab)} words, {len(lowered_vocab)} "
f"unique lowered words.")
if model_path:
embedding = self._load_with_vocab(model_path, vocab=lowered_vocab, init_method=init_method)
else:
embedding = self._randomly_init_embed(len(vocab), embedding_dim, init_method)
self.register_buffer('words_to_words', torch.arange(len(vocab)).long())
if lowered_vocab.unknown:
unknown_idx = lowered_vocab.unknown_idx
else:
unknown_idx = embedding.size(0) - 1 # 否则是最后一个为unknow
self.register_buffer('words_to_words', torch.arange(len(vocab)).long())
words_to_words = torch.full((len(vocab),), fill_value=unknown_idx).long()
for word, index in vocab:
if word not in lowered_vocab:
word = word.lower()
if word not in lowered_vocab and lowered_vocab._is_word_no_create_entry(word):
continue # 如果不需要创建entry,已经默认unknown了
words_to_words[index] = self.words_to_words[lowered_vocab.to_index(word)]
self.register_buffer('words_to_words', words_to_words)
self._word_unk_index = lowered_vocab.unknown_idx # 替换一下unknown的index
else:
if model_path:
embedding = self._load_with_vocab(model_path, vocab=vocab, init_method=init_method)
else:
embedding = self._randomly_init_embed(len(vocab), embedding_dim, init_method)
self.register_buffer('words_to_words', torch.arange(len(vocab)).long())
if not self.only_norm_found_vector and normalize:
embedding /= (torch.norm(embedding, dim=1, keepdim=True) + 1e-12)
if truncate_vocab:
for i in range(len(truncated_words_to_words)):
index_in_truncated_vocab = truncated_words_to_words[i]
truncated_words_to_words[i] = self.words_to_words[index_in_truncated_vocab]
del self.words_to_words
self.register_buffer('words_to_words', truncated_words_to_words)
self.embedding = nn.Embedding(num_embeddings=embedding.shape[0], embedding_dim=embedding.shape[1],
padding_idx=vocab.padding_idx,
max_norm=None, norm_type=2, scale_grad_by_freq=False,
sparse=False, _weight=embedding)
self._embed_size = self.embedding.weight.size(1)
self.requires_grad = requires_grad
self.dropout = MyDropout(dropout)
def _randomly_init_embed(self, num_embedding, embedding_dim, init_embed=None):
"""
:param int num_embedding: embedding的entry的数量
:param int embedding_dim: embedding的维度大小
:param callable init_embed: 初始化方法
:return: torch.FloatTensor
"""
embed = torch.zeros(num_embedding, embedding_dim)
if init_embed is None:
nn.init.uniform_(embed, -np.sqrt(3 / embedding_dim), np.sqrt(3 / embedding_dim))
else:
init_embed(embed)
return embed
def _load_with_vocab(self, embed_filepath, vocab, dtype=np.float32, padding='<pad>', unknown='<unk>',
error='ignore', init_method=None):
"""
从embed_filepath这个预训练的词向量中抽取出vocab这个词表的词的embedding。EmbedLoader将自动判断embed_filepath是
word2vec(第一行只有两个元素)还是glove格式的数据。
:param str embed_filepath: 预训练的embedding的路径。
:param vocab: 词表 :class:`~fastNLP.Vocabulary` 类型,读取出现在vocab中的词的embedding。
没有出现在vocab中的词的embedding将通过找到的词的embedding的正态分布采样出来,以使得整个Embedding是同分布的。
:param dtype: 读出的embedding的类型
:param str padding: 词表中padding的token
:param str unknown: 词表中unknown的token
:param str error: `ignore` , `strict` ; 如果 `ignore` ,错误将自动跳过; 如果 `strict` , 错误将抛出。
这里主要可能出错的地方在于词表有空行或者词表出现了维度不一致。
:param init_method: 如何初始化没有找到的值。可以使用torch.nn.init.*中各种方法。默认使用torch.nn.init.zeros_
:return torch.tensor: shape为 [len(vocab), dimension], dimension由pretrain的embedding决定。
"""
assert isinstance(vocab, Vocabulary), "Only fastNLP.Vocabulary is supported."
if not os.path.exists(embed_filepath):
raise FileNotFoundError("`{}` does not exist.".format(embed_filepath))
with open(embed_filepath, 'r', encoding='utf-8') as f:
line = f.readline().strip()
parts = line.split()
start_idx = 0
if len(parts) == 2:
dim = int(parts[1])
start_idx += 1
else:
dim = len(parts) - 1
f.seek(0)
matrix = {}
if vocab.padding:
matrix[vocab.padding_idx] = torch.zeros(dim)
if vocab.unknown:
matrix[vocab.unknown_idx] = torch.zeros(dim)
found_count = 0
found_unknown = False
for idx, line in enumerate(f, start_idx):
try:
parts = line.strip().split()
word = ''.join(parts[:-dim])
nums = parts[-dim:]
# 对齐unk与pad
if word == padding and vocab.padding is not None:
word = vocab.padding
elif word == unknown and vocab.unknown is not None:
word = vocab.unknown
found_unknown = True
if word in vocab:
index = vocab.to_index(word)
matrix[index] = torch.from_numpy(np.fromstring(' '.join(nums), sep=' ', dtype=dtype, count=dim))
if self.only_norm_found_vector:
matrix[index] = matrix[index] / np.linalg.norm(matrix[index])
found_count += 1
except Exception as e:
if error == 'ignore':
warnings.warn("Error occurred at the {} line.".format(idx))
else:
logger.error("Error occurred at the {} line.".format(idx))
raise e
logger.info("Found {} out of {} words in the pre-training embedding.".format(found_count, len(vocab)))
for word, index in vocab:
if index not in matrix and not vocab._is_word_no_create_entry(word):
if found_unknown: # 如果有unkonwn,用unknown初始化
matrix[index] = matrix[vocab.unknown_idx]
else:
matrix[index] = None
# matrix中代表是需要建立entry的词
vectors = self._randomly_init_embed(len(matrix), dim, init_method)
if vocab.unknown is None: # 创建一个专门的unknown
unknown_idx = len(matrix)
vectors = torch.cat((vectors, torch.zeros(1, dim)), dim=0).contiguous()
else:
unknown_idx = vocab.unknown_idx
self.register_buffer('words_to_words', torch.full((len(vocab),), fill_value=unknown_idx).long())
for index, (index_in_vocab, vec) in enumerate(matrix.items()):
if vec is not None:
vectors[index] = vec
self.words_to_words[index_in_vocab] = index
return vectors
def drop_word(self, words):
"""
按照设定随机将words设置为unknown_index。
:param torch.LongTensor words: batch_size x max_len
:return:
"""
if self.word_dropout > 0 and self.training:
mask = torch.rand(words.size())
mask = mask.to(words.device)
mask = mask.lt(self.word_dropout)
# mask = torch.full_like(words, fill_value=self.word_dropout, dtype=torch.float)
# # mask = torch.bernoulli(mask).eq(1) # dropout_word越大,越多位置为1
# # mask = mask.to(words.device)
pad_mask = words.ne(self._word_pad_index)
mask = mask.__and__(pad_mask)
words = words.masked_fill(mask, self._word_unk_index)
return words
def forward(self, words):
"""
传入words的index
:param words: torch.LongTensor, [batch_size, max_len]
:return: torch.FloatTensor, [batch_size, max_len, embed_size]
"""
if hasattr(self, 'words_to_words'):
words = self.words_to_words[words]
words = self.drop_word(words)
words = self.embedding(words)
words = self.dropout(words)
return words
class BertEmbedding(ContextualEmbedding):
"""
使用BERT对words进行编码的Embedding。建议将输入的words长度限制在430以内,而不要使用512(根据预训练模型参数,可能有变化)。这是由于
预训练的bert模型长度限制为512个token,而因为输入的word是未进行word piece分割的(word piece的分割有BertEmbedding在输入word
时切分),在分割之后长度可能会超过最大长度限制。
BertEmbedding可以支持自动下载权重,当前支持的模型有以下的几种(待补充):
Example::
>>> import torch
>>> from fastNLP import Vocabulary
>>> from fastNLP.embeddings import BertEmbedding
>>> vocab = Vocabulary().add_word_lst("The whether is good .".split())
>>> embed = BertEmbedding(vocab, model_dir_or_name='en-base-uncased', requires_grad=False, layers='4,-2,-1')
>>> words = torch.LongTensor([[vocab.to_index(word) for word in "The whether is good .".split()]])
>>> outputs = embed(words)
>>> outputs.size()
>>> # torch.Size([1, 5, 2304])
"""
def __init__(self, vocab: Vocabulary, model_dir_or_name: str = 'en-base-uncased', layers: str = '-1',
pool_method: str = 'first', word_dropout=0, dropout=0, include_cls_sep: bool = False,
pooled_cls=True, requires_grad: bool = True, auto_truncate: bool = False):
"""
:param ~fastNLP.Vocabulary vocab: 词表
:param str model_dir_or_name: 模型所在目录或者模型的名称。当传入模型所在目录时,目录中应该包含一个词表文件(以.txt作为后缀名),
权重文件(以.bin作为文件后缀名), 配置文件(以.json作为后缀名)。
:param str layers: 输出embedding表示来自于哪些层,不同层的结果按照layers中的顺序在最后一维concat起来。以','隔开层数,层的序号是
从0开始,可以以负数去索引倒数几层。
:param str pool_method: 因为在bert中,每个word会被表示为多个word pieces, 当获取一个word的表示的时候,怎样从它的word pieces
中计算得到它对应的表示。支持 ``last`` , ``first`` , ``avg`` , ``max``。
:param float word_dropout: 以多大的概率将一个词替换为unk。这样既可以训练unk也是一定的regularize。
:param float dropout: 以多大的概率对embedding的表示进行Dropout。0.1即随机将10%的值置为0。
:param bool include_cls_sep: bool,在bert计算句子的表示的时候,需要在前面加上[CLS]和[SEP], 是否在结果中保留这两个内容。 这样
会使得word embedding的结果比输入的结果长两个token。如果该值为True,则在使用 :class::StackEmbedding 可能会与其它类型的
embedding长度不匹配。
:param bool pooled_cls: 返回的[CLS]是否使用预训练中的BertPool映射一下,仅在include_cls_sep时有效。如果下游任务只取[CLS]做预测,
一般该值为True。
:param bool requires_grad: 是否需要gradient以更新Bert的权重。
:param bool auto_truncate: 当句子words拆分为word pieces长度超过bert最大允许长度(一般为512), 自动截掉拆分后的超过510个
word pieces后的内容,并将第512个word piece置为[SEP]。超过长度的部分的encode结果直接全部置零。一般仅有只使用[CLS]
来进行分类的任务将auto_truncate置为True。
"""
super(BertEmbedding, self).__init__(vocab, word_dropout=word_dropout, dropout=dropout)
self.device_cpu = torch.device('cpu')
if model_dir_or_name.lower() in PRETRAINED_BERT_MODEL_DIR:
if 'cn' in model_dir_or_name.lower() and pool_method not in ('first', 'last'):
logger.warning("For Chinese bert, pooled_method should choose from 'first', 'last' in order to achieve"
" faster speed.")
warnings.warn("For Chinese bert, pooled_method should choose from 'first', 'last' in order to achieve"
" faster speed.")
self.dropout_p = dropout
self._word_sep_index = None
if '[SEP]' in vocab:
self._word_sep_index = vocab['[SEP]']
self.model = _WordBertModel(model_dir_or_name=model_dir_or_name, vocab=vocab, layers=layers,
pool_method=pool_method, include_cls_sep=include_cls_sep,
pooled_cls=pooled_cls, auto_truncate=auto_truncate, min_freq=2)
self.requires_grad = requires_grad
self._embed_size = len(self.model.layers) * self.model.encoder.hidden_size
def _delete_model_weights(self):
del self.model
def forward(self, words):
"""
计算words的bert embedding表示。计算之前会在每句话的开始增加[CLS]在结束增加[SEP], 并根据include_cls_sep判断要不要
删除这两个token的表示。
:param torch.LongTensor words: [batch_size, max_len]
:return: torch.FloatTensor. batch_size x max_len x (768*len(self.layers))
"""
words = self.drop_word(words)
outputs = self._get_sent_reprs(words)
if outputs is not None:
if self.dropout_p >1e-5:
return self.dropout(outputs)
else:
return outputs
outputs = self.model(words)
# print(outputs.size())
outputs = torch.cat([*outputs], dim=-1)
# print(outputs.size())
# exit()
if self.dropout_p > 1e-5:
return self.dropout(outputs)
else:
return outputs
def drop_word(self, words):
"""
按照设定随机将words设置为unknown_index。
:param torch.LongTensor words: batch_size x max_len
:return:
"""
if self.word_dropout > 0 and self.training:
with torch.no_grad():
if self._word_sep_index: # 不能drop sep
sep_mask = words.eq(self._word_sep_index)
mask = torch.full(words.size(), fill_value=self.word_dropout, dtype=torch.float)
# print(mask.device)
# print(mask)
# print(mask.device)
# exit()
# mask = mask.to(self.device_cpu)
mask = torch.bernoulli(mask).eq(1) # dropout_word越大,越多位置为1
mask = mask.to(words.device)
pad_mask = words.ne(0)
mask = pad_mask.__and__(mask) # pad的位置不为unk
words = words.masked_fill(mask, self._word_unk_index)
if self._word_sep_index:
words.masked_fill_(sep_mask, self._word_sep_index)
return words