Skip-Gram模型与CBOW模型代码实现

文章目录
  1. 1. 复现Word2Vec模型的步骤
  2. 2. Skip-Gram模型实现
  3. 3. CBOW模型实现

词向量是NLP的基础,也是NLP学习中更重要的组成部分,在补基础的过程中我觉得这两个模型有必要自己学习并且写一遍才能更好的理解相关概念。
Skip-Gram模型与CBOW模型出自于:Efficient Estimation of Word Representations in
Vector Space
Mikolov大神是真的强,学习之。
先前学习word2vec的学习笔记链接:
cs224n-Lecture-2 词向量表示:word2vec
cs224n-Lecture-3 高级词向量表示

在实践模型之前,我们需要知道两个关键的定义:词向量和词向量矩阵。
词向量:将一个词映射为一个向量,这个向量包含这个词的语法,语义的信息。
词向量矩阵:一个词向量就是一个行向量,把很多行向量排在一起就变成词向量矩阵。

复现Word2Vec模型的步骤

个人总结:
1.确定语料库(如果无法使用爬虫则直接下载);
2.文本预处理:
(1)取出单词生成词汇表;
(2)设置词汇表中原始单词的index;
(3)统计单词出现次数,根据需要选择出现最多的num个单词来构建使用的新词表;
(4)根据原始词表对新词表中的单词进行标号,生成一个正向词典和一个方向词典;
3.生成训练时需要的batch(cbow和sg的生成方式略有差别);
4.设置tensorflow训练使用的参数(loss,optimizer etc.);
5.开始训练。
代码链接:word2vec

Skip-Gram模型实现

Skip-Gram模型是通过中心词来推导出上下文词,结构如下:

具体定义和操作均在代码注释中,因为个人代码的书写习惯,对原始代码稍作改动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python
#coding:utf-8

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import math
import os
import random
import zipfile

import numpy as np
from six.moves import urllib
from six.moves import xrange
import tensorflow as tf
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

url='http://mattmahoney.net/dc/'

# 文件下载模块
def maybe_download(filename,expected_bytes):
if not os.path.exists(filename):
print('start downloading...')
filename,_=urllib.request.urlretrieve(url+filename,filename)
statinfo=os.stat(filename)
if statinfo.st_size==expected_bytes:
print('Found and verified',filename)
else:
print(statinfo.st_size)
raise Exception('Failed to verify ' + filename + '. Can you get to it with a browser?')
return filename

# 解压语料库并转换成一个word的list
def read_data(filename):
with zipfile.ZipFile(filename) as f:
data=tf.compat.as_str(f.read(f.namelist()[0])).split()
# tf.compat.as_str 将数据转为单词列表
return data

# 将原始的单词表示为index
def build_data(words,n_words):
count=[['UNK',-1]]
count.extend(collections.Counter(words).most_common(n_words-1))
dictionary=dict()
# 生成单词及编号
# 单词顺序是从出现次数最多的单词到出现次数较小的单词,逐次递减的
for word,_ in count:
dictionary[word]=len(dictionary)
data=list()
unk_count=0
for word in words:
if word in dictionary:
index=dictionary[word]
else:
index=0 # UNK的index值设置为0
unk_count+=1
# data表是由单词出现的次数大小生成的
data.append(index)
# print(unk_count)
count[0][1]=unk_count
reversed_dictionary=dict(zip(dictionary.values(),dictionary.keys()))
# data: 所有单词的编号集合
# count: 每个单词及其出现的次数
# dictionary: 词典,包含了出现最多的5w个单词中的单词和对应的编号
# reversed_dictionary: 反向词典
return data,count,dictionary,reversed_dictionary

data_index=0
# next,定义一个函数用于生成skip-gram模型用的batch
# batch_size: 批量训练时数据量的大小
# num_skips: 词窗大小,假如词数是7,那么num_skips=6
# skip_window: 目标词临近的词窗大小
def generate_batch(batch_size,num_skips,skip_window):
# data_index 相当于一个指针,初始为0
# 每生成一个batch_size,data_index就会相应地往后推
global data_index
# 确保训练的次数是整数
assert batch_size%num_skips==0
# 不能超过skip_window
assert num_skips<=skip_window*2
batch=np.ndarray(shape=(batch_size),dtype=np.int32)
labels=np.ndarray(shape=(batch_size,1),dtype=np.int32)
# span: 上下文词的范围和目标词的大小
span=2*skip_window+1
# buffer: 缓冲区大小,用于采样的移动窗口,当有新的单词进入缓冲区时,最左边的单词就会从缓冲区排出,给新的单词让位
buffer=collections.deque(maxlen=span)

# data_index 是当前数据开始的位置
# 产生batch后就往后推1位
for _ in range(span):
buffer.append(data[data_index])
data_index=(data_index+1)%len(data)
for i in range(batch_size//num_skips):
# 一个buffer生成num_skips个数的样本
# target 是最中间的词
target=skip_window
# targets_to_avoid 保证样本不重复
targets_to_avoid=[skip_window]
for j in range(num_skips):
while target in targets_to_avoid:
target=random.randint(0,span-1)
targets_to_avoid.append(target)
# skip-gram 模型核心语句
batch[i*num_skips+j]=buffer[skip_window]
labels[i*num_skips+j,0]=buffer[target]
buffer.append(data[data_index])
data_index=(data_index+1)%len(data)
data_index=(data_index+len(data)-span)%len(data)
return batch,labels


# 数据可视化
def plot_with_labels(low_dim_embs,labels,filename='tsne_skip.png'):
assert low_dim_embs.shape[0]>=len(labels),'More labels than embeddings'
plt.figure(figsize=(18,18)) #in inches
for i,label in enumerate(labels):
x,y=low_dim_embs[i,:]
plt.scatter(x,y)
plt.annotate(label,
xy=(x,y),
xytext=(5,2),
textcoords='offset points',
ha='right',
va='bottom')
plt.savefig(filename)


if __name__=="__main__":
#filename=maybe_download('text8.zip',31344016)

filename='text8.zip'
# 获得词汇表
vocabulary=read_data(filename)
print('Data size:',len(vocabulary))
# 词表总长度:17005207
print(vocabulary[:100])

# next,制作一个词表,将不常见的词变成一个UNK标识符
# 词表的大小为5w,只考虑常出现的5w个词
vocabulary_size=50000
data,count,dictionary,reverse_dictionary=build_data(vocabulary,vocabulary_size)
del vocabulary # 删除以节省内存
# 输出最常见的5个单词
print('Most common words(+UNK)',count[:5])
# 输出转换后的数据库data和原来的单词(前10个)
print('Sample data:',data[:10],[reverse_dictionary[i] for i in data[:10]])

# 默认情况下skip_window=1,num_skips=2
# 假如存在连续三个词['us','against','world']
# 生成的两个样本就是: against->us,against->world
batch,labels=generate_batch(batch_size=8,num_skips=2,skip_window=1)
for i in range(8):
print(batch[i],reverse_dictionary[batch[i]],
'->',labels[i,0],reverse_dictionary[labels[i,0]])

# 建立训练模型
# 设置参数
batch_size = 128
embedding_size = 128 # 词嵌入空间是128维的
skip_window = 1 # 与先前参数保持一致
num_skips = 2 # 与先前参数保持一致

# 在训练过程中,会对模型进行验证
# 验证方法就是找出和某个词最相近的词
# 只对valid_window的词进行验证,因为这些词最常出现
valid_size = 16 # 每次验证16个词
valid_window = 100 # 这16个词是在前100个最常见的词中选出来的
valid_examples = np.random.choice(valid_window, valid_size, replace=False)

# 构造损失时选取的噪声词的数量
num_sampled = 64
graph = tf.Graph()

with graph.as_default():
# 输入的batch
train_inputs=tf.placeholder(tf.int32,shape=[batch_size])
train_labels=tf.placeholder(tf.int32,shape=[batch_size,1])
# 用于验证的词
valid_dataset=tf.constant(valid_examples,dtype=tf.int32)

# 使用cpu运行
with tf.device('/cpu:0'):
# 定义1个embeddings变量,相当于一行存储一个词的embedding
embeddings=tf.Variable(
tf.random_uniform([vocabulary_size,embedding_size],-1.0,1.0))
# 利用embedding_lookup可以得到一个batch内的所有词嵌入
embed=tf.nn.embedding_lookup(embeddings,train_inputs)
# 在已经给定的词向量中查找索引
# 创建两个变量用于NCE Loss(即选取噪声词的二分类损失)
nce_weights=tf.Variable(
tf.truncated_normal([vocabulary_size,embedding_size],
stddev=1.0/math.sqrt(embedding_size)))
nce_biases=tf.Variable(tf.zeros([vocabulary_size]))

# tf.nn.nce_loss会自动选取噪声词,并且形成损失
# 随机选取num_sampled个噪声词
loss=tf.reduce_mean(
tf.nn.nce_loss(weights=nce_weights,
biases=nce_biases,
labels=train_labels,
inputs=embed,
num_sampled=num_sampled,
num_classes=vocabulary_size))

# 得到loss后,构造优化器
optimizer=tf.train.GradientDescentOptimizer(0.01).minimize(loss)

# 计算词和词的相似度(用于验证)
norm=tf.sqrt(tf.reduce_mean(tf.square(embeddings),1,keep_dims=True))
normalized_embeddings=embeddings/norm
# 找出和验证词的embedding并计算它们和所有单词的相似度
valid_embeddings=tf.nn.embedding_lookup(
normalized_embeddings,valid_dataset)
similarity=tf.matmul(
valid_embeddings,normalized_embeddings,transpose_b=True)

# 变量初始化
init=tf.global_variables_initializer()

# 开始训练
num_steps=100001

with tf.Session(graph=graph) as session:
# 初始化变量
init.run()
print('Initialized')

average_loss=0
for step in xrange(num_steps):
batch_inputs,batch_labels=generate_batch(
batch_size,num_skips,skip_window)
feed_dict={train_inputs:batch_inputs,train_labels:batch_labels}

# 优化一步
_,loss_val=session.run([optimizer,loss],feed_dict=feed_dict)
average_loss+=loss_val

# 每2000次训练步骤显示一次训练结果
if step%2000==0:
if step>0:
average_loss/=2000
# 2000个batch的平均损失
print('Average loss at step',step,':',average_loss)
average_loss=0

# 每10000次训练验证一次
if step%10000==0:
# sim:验证词与所有词之间的相似度
sim=similarity.eval()
# 一共有valid_size个验证词
for i in xrange(valid_size):
valid_word=reverse_dictionary[valid_examples[i]]
top_k=8 # 输出最相邻的8个词语
nearest=(-sim[i,:]).argsort()[1:top_k+1]# 这行代码含义不清晰
log_str='Nearest to %s:'%valid_word
for k in xrange(top_k):
close_word=reverse_dictionary[nearest[k]]
log_str='%s %s,'%(log_str,close_word)
print(log_str)
# final_embeddings是最后得到的embedding向量
# 它的形状是[vocabulary_size,embedding_size]
# 每一行就代表着对应index词的词嵌入表示
final_embeddings=normalized_embeddings.eval()
# 用t-SNE方法进行降维
tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
# 只画出500个词的位置
plot_only = 500
low_dim_embs = tsne.fit_transform(final_embeddings[:plot_only, :])
labels = [reverse_dictionary[i] for i in xrange(plot_only)]
plot_with_labels(low_dim_embs, labels)

其中进行了一个相似度生成,得到的图像如下:

CBOW模型实现

CBOW模型是通过上下文词来推导中心词,结构如下:

CBOW模型代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
#!/usr/bin/env python
#coding:utf-8

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import math
import os
import random
import zipfile

import numpy as np
from six.moves import urllib
from six.moves import xrange
import tensorflow as tf
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

url='http://mattmahoney.net/dc/'

# 文件下载模块
def maybe_download(filename,expected_bytes):
if not os.path.exists(filename):
print('start downloading...')
filename,_=urllib.request.urlretrieve(url+filename,filename)
statinfo=os.stat(filename)
if statinfo.st_size==expected_bytes:
print('Found and verified',filename)
else:
print(statinfo.st_size)
raise Exception('Failed to verify ' + filename + '. Can you get to it with a browser?')
return filename

# 解压语料库并转换成一个word的list
def read_data(filename):
with zipfile.ZipFile(filename) as f:
data=tf.compat.as_str(f.read(f.namelist()[0])).split()
# tf.compat.as_str 将数据转为单词列表
return data

# 将原始的单词表示为index
def build_data(words,n_words):
count=[['UNK',-1]]
count.extend(collections.Counter(words).most_common(n_words-1))
dictionary=dict()
# 生成单词及编号
# 单词顺序是从出现次数最多的单词到出现次数较小的单词,逐次递减的
for word,_ in count:
dictionary[word]=len(dictionary)
data=list()
unk_count=0
for word in words:
if word in dictionary:
index=dictionary[word]
else:
index=0 # UNK的index值设置为0
unk_count+=1
# data表是由单词出现的次数大小生成的
data.append(index)
# print(unk_count)
count[0][1]=unk_count
reversed_dictionary=dict(zip(dictionary.values(),dictionary.keys()))
# data: 所有单词的编号集合
# count: 每个单词及其出现的次数
# dictionary: 词典,包含了出现最多的5w个单词中的单词和对应的编号
# reversed_dictionary: 反向词典
return data,count,dictionary,reversed_dictionary

data_index=0
# 用于cbow模型的batch生成器
def generate_batch(batch_size,cbow_window):
global data_index
assert cbow_window%2==1
span=2*cbow_window+1
# 去除中心词:span-1
batch=np.ndarray(shape=(batch_size,span-1),dtype=np.int32)
labels=np.ndarray(shape=(batch_size,1),dtype=np.int32)
buffer=collections.deque(maxlen=span)
for _ in range(span):
buffer.append(data[data_index])
# 循环选取data中数据,到尾部则从头开始
data_index=(data_index+1)%len(data)

for i in range(batch_size):
# 目标词在span的中心位置
target=cbow_window
# 仅仅需要知道上下文词而不需要中心词
target_to_avoid=[cbow_window]
col_idx=0
for j in range(span):
# 忽略中心词
if j ==span//2:
continue
batch[i,col_idx]=buffer[j]
col_idx+=1
labels[i,0]=buffer[target]
# 更新buffer
buffer.append(data[data_index])
data_index=(data_index+1)%len(data)
return batch,labels

# 可视化
def plot_with_labels(low_dim_embs, labels, filename='tsne_cbow.png'):
assert low_dim_embs.shape[0] >= len(labels), 'More labels than embeddings'
plt.figure(figsize=(18, 18)) # in inches
for i, label in enumerate(labels):
x, y = low_dim_embs[i, :]
plt.scatter(x, y)
plt.annotate(label,
xy=(x, y),
xytext=(5, 2),
textcoords='offset points',
ha='right',
va='bottom')

plt.savefig(filename)

if __name__=="__main__":
filename = 'text8.zip'
# 获得词汇表
vocabulary = read_data(filename)
vocabulary_size = 50000
data, count, dictionary, reverse_dictionary = build_data(vocabulary, vocabulary_size)
del vocabulary # 删除以节省内存
# 建立训练模型
# 设置参数
batch_size = 128
embedding_size = 128 # 词嵌入空间是128维的
cbow_window = 1 # 与先前参数保持一致
num_skips = 2 # 与先前参数保持一致

# 在训练过程中,会对模型进行验证
# 验证方法就是找出和某个词最相近的词
# 只对valid_window的词进行验证,因为这些词最常出现
valid_size = 16 # 每次验证16个词
valid_window = 100 # 这16个词是在前100个最常见的词中选出来的
valid_examples = np.array(random.sample(range(valid_window), valid_size // 2))
valid_examples = np.append(valid_examples, random.sample(range(1000, 1000 + valid_window), valid_size // 2))

# 构造损失时选取的噪声词的数量
num_sampled = 64
graph = tf.Graph()

# 训练步数
num_steps=100001
with graph.as_default(),tf.device('/cpu:0'):
# 输入数据
train_dataset=tf.placeholder(tf.int32,shape=[batch_size,2*cbow_window])
train_labels=tf.placeholder(tf.int32,shape=[batch_size,1])
valid_dataset=tf.constant(valid_examples,dtype=tf.int32)

# 变量
# embedding:词表中每个词的向量
embeddings=tf.Variable(tf.random_uniform([vocabulary_size,embedding_size],-1.0,1.0))
nce_weights=tf.Variable(tf.truncated_normal([vocabulary_size,embedding_size],
stddev=1.0/math.sqrt(embedding_size)))
nce_biases=tf.Variable(tf.zeros([vocabulary_size]))

# 模型
embeds=None
for i in range(2*cbow_window):
embedding_i=tf.nn.embedding_lookup(embeddings,train_dataset[:,i])
print('embedding %d shape: %s'%(i,embedding_i.get_shape().as_list()))
emb_x,emb_y=embedding_i.get_shape().as_list()
if embeds is None:
embeds=tf.reshape(embedding_i,[emb_x,emb_y,1])
else:
embeds=tf.concat([embeds,tf.reshape(embedding_i,[emb_x,emb_y,1])],2)

assert embeds.get_shape().as_list()[2]==2*cbow_window
print("Concat embedding size: %s"%embeds.get_shape().as_list())
avg_embed=tf.reduce_mean(embeds,2,keep_dims=False)
print("Avg embedding size: %s"%avg_embed.get_shape().as_list())

loss = tf.reduce_mean(tf.nn.nce_loss(nce_weights, nce_biases,
labels=train_labels,
inputs=avg_embed,
num_sampled=num_sampled,
num_classes=vocabulary_size))
optimizer = tf.train.AdagradOptimizer(0.1).minimize(loss)
norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keep_dims=True))
normalized_embeddings = embeddings / norm
valid_embeddings = tf.nn.embedding_lookup(normalized_embeddings, valid_dataset)
similarity = tf.matmul(valid_embeddings, tf.transpose(normalized_embeddings))

with tf.Session(graph=graph) as session:
tf.global_variables_initializer().run()
print('Initialized')
average_loss = 0
for step in range(num_steps):
batch_data, batch_labels = generate_batch(batch_size, cbow_window)
feed_dict = {train_dataset: batch_data, train_labels: batch_labels}
_, l = session.run([optimizer, loss], feed_dict=feed_dict)
average_loss += l
if step % 2000 == 0:
if step > 0:
average_loss = average_loss / 2000
# The average loss is an estimate of the loss over the last 2000 batches.
print('Average loss at step %d: %f' % (step, average_loss))
average_loss = 0
# note that this is expensive (~20% slowdown if computed every 500 steps)
if step % 10000 == 0:
sim = similarity.eval()
for i in range(valid_size):
valid_word = reverse_dictionary[valid_examples[i]]
top_k = 8 # number of nearest neighbors
nearest = (-sim[i, :]).argsort()[1:top_k + 1]
log = 'Nearest to %s:' % valid_word
for k in range(top_k):
close_word = reverse_dictionary[nearest[k]]
log = '%s %s,' % (log, close_word)
print(log)
final_embeddings = normalized_embeddings.eval()
tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
# 只画出500个词的位置
plot_only = 500
low_dim_embs = tsne.fit_transform(final_embeddings[:plot_only, :])
labels = [reverse_dictionary[i] for i in xrange(plot_only)]
plot_with_labels(low_dim_embs, labels)

根据CBOW模型生成出来的图像如下所示: