1.Tensorflow学习笔记(一) 基础
2.Tensorflow学习笔记(二) Toy Demo
3.Tensorflow学习笔记(三) 使用Skip-Gram和CBOW训练Word Embedding
Tensorflow学习笔记(四) 命名实体识别模型(NER-Model)
- NER的大致过程
- Config和Model的解耦
- Model内部将各个不同类型的节点解耦,分别的函数封装,整个过程在__init__中得到
- 提供模型的predict函数,用于test
- 如何使用dev集数据(在每次迭代后用dev集合测试目前模型的准确性)
- session的变量保存(使用tf.train.Saver(), save()函数)与加载(使用restore()函数)
- 通过困惑矩阵/混淆矩阵求每个类别的查准率和查全率
class Model(object):
"""Abstracts a Tensorflow graph for a learning task.
We use various Model classes as usual abstractions to encapsulate tensorflow
computational graphs. Each algorithm you will construct in this homework will
inherit from a Model object.
def load_data(self):
"""Loads data from disk and stores it in memory.
Feel free to add instance variables to Model object that store loaded data.
raise NotImplementedError("Each Model must re-implement this method.")
def add_placeholders(self):
"""Adds placeholder variables to tensorflow computational graph.
Tensorflow uses placeholder variables to represent locations in a
computational graph where data is inserted. These placeholders are used as
inputs by the rest of the model building code and will be fed data during
See for more information:
raise NotImplementedError("Each Model must re-implement this method.")
def create_feed_dict(self, input_batch, label_batch):
"""Creates the feed_dict for training the given step.
A feed_dict takes the form of:
feed_dict = {
<placeholder>: <tensor of values to be passed for placeholder>,
If label_batch is None, then no labels are added to feed_dict.
Hint: The keys for the feed_dict should be a subset of the placeholder
tensors created in add_placeholders.
input_batch: A batch of input data.
label_batch: A batch of label data.
feed_dict: The feed dictionary mapping from placeholders to values.
raise NotImplementedError("Each Model must re-implement this method.")
def add_model(self, input_data):
"""Implements core of model that transforms input_data into predictions.
The core transformation for this model which transforms a batch of input
data into a batch of predictions.
input_data: A tensor of shape (batch_size, n_features).
out: A tensor of shape (batch_size, n_classes)
raise NotImplementedError("Each Model must re-implement this method.")
def add_loss_op(self, pred):
"""Adds ops for loss to the computational graph.
pred: A tensor of shape (batch_size, n_classes)
loss: A 0-d tensor (scalar) output
raise NotImplementedError("Each Model must re-implement this method.")
def run_epoch(self, sess, input_data, input_labels):
"""Runs an epoch of training.
Trains the model for one-epoch.
sess: tf.Session() object
input_data: np.ndarray of shape (n_samples, n_features)
input_labels: np.ndarray of shape (n_samples, n_classes)
average_loss: scalar. Average minibatch loss of model on epoch.
raise NotImplementedError("Each Model must re-implement this method.")
def fit(self, sess, input_data, input_labels):
"""Fit model on provided data.
sess: tf.Session()
input_data: np.ndarray of shape (n_samples, n_features)
input_labels: np.ndarray of shape (n_samples, n_classes)
losses: list of loss per epoch
raise NotImplementedError("Each Model must re-implement this method.")
def predict(self, sess, input_data, input_labels=None):
"""Make predictions from the provided model.
sess: tf.Session()
input_data: np.ndarray of shape (n_samples, n_features)
input_labels: np.ndarray of shape (n_samples, n_classes)
average_loss: Average loss of model.
predictions: Predictions of model on input_data
raise NotImplementedError("Each Model must re-implement this method.")
class LanguageModel(Model):
"""Abstracts a Tensorflow graph for learning language models.
Adds ability to do embedding.
def add_embedding(self):
"""Add embedding layer. that maps from vocabulary to vectors.
raise NotImplementedError("Each Model must re-implement this method.")
# —*- encoding:utf8 -*-
import sys
import time
import os
import numpy as np
import tensorflow as tf
from q2_initialization import xavier_weight_init
import data_utils.utils as du
import data_utils.ner as ner
from utils import data_iterator
from model import LanguageModel
# ————————————————————
# ————————————————————
# 所有参数用一个Config类表示
class Config():
# 词向量维度
embed_size = 50
# 批量数据大小
batch_size = 64
# 标签种类数
label_size = 5
# 隐层大小
hidden_size = 100
# 最大迭代次数
max_epochs = 24
# 用于获取迭代最优点
early_stopping = 2
# 弃权值
dropout = 0.9
# 学习率
lr = 0.001
# l2正则化值
l2 = 0.001
# 窗口大小
window_size = 3
# 设定GPU的性质,允许将不能在GPU上处理的部分放到CPU
# 设置log打印
cf = tf.ConfigProto(allow_soft_placement=True, log_device_placement=True)
# 只占用20%的GPU内存
cf.gpu_options.per_process_gpu_memory_fraction = 0.2
# NER模型
class NERModel(LanguageModel):
def __init__(self, config):
self.config = config
window = self.add_embedding()
y = self.add_model(window)
self.predictions = tf.nn.softmax(y)
onehot_pred = tf.argmax(self.predictions, 1)
real_pred = tf.equal(onehot_pred, tf.argmax(self.labels_placeholder, 1))
self.correct_predictions = tf.reduce_sum(tf.cast(real_pred, tf.int32))
self.loss = self.add_loss_op(y)
self.train_op = self.add_training_op(self.loss)
# 加载标注的数据
def load_data(self, debug=False):
self.wv, word2num, num2word = ner.load_wv(
'data/ner/vocab.txt', 'data/ner/wordVectors.txt')
self.wv = self.wv.astype(np.float32)
tags = ["O", "LOC", "MISC", "ORG", "PER"]
self.num2tag = dict(enumerate(tags))
tag2num = dict(zip(self.num2tag.values(), self.num2tag.keys()))
docs = du.load_dataset('data/ner/train')
self.X_train, self.y_train = du.docs_to_windows(
docs, word2num, tag2num, wsize=self.config.window_size
if debug:
self.X_train = self.X_train[:1024]
self.y_train = self.y_train[:1024]
docs = du.load_dataset('data/ner/dev')
self.X_dev, self.y_dev = du.docs_to_windows(
docs, word2num, tag2num, wsize=self.config.window_size)
if debug:
self.X_dev = self.X_dev[:1024]
self.y_dev = self.y_dev[:1024]
docs = du.load_dataset('data/ner/test.masked')
self.X_test, self.y_test = du.docs_to_windows(
docs, word2num, tag2num, wsize=self.config.window_size)
# 加入输入的容器,和feed对应
def add_placeholders(self):
self.input_placeholder = tf.placeholder(
tf.int32, [None, self.config.window_size], 'input')
self.labels_placeholder = tf.placeholder(
tf.float32, [None, self.config.label_size], 'labels')
self.dropout_placeholder = tf.placeholder(tf.float32, name='dropout')
# 创建feed_dict
def create_feed_dict(self, input_batch, dropout, label_batch=None):
feed_dict = {self.input_placeholder:input_batch}
if dropout is not None:
feed_dict[self.dropout_placeholder] = dropout
if label_batch is not None:
feed_dict[self.labels_placeholder] = label_batch
return feed_dict
# 将输入的样本变为词向量
# 输入shape为[batch_size, window_size],输出shape为[batch_size, embed_size*window_size]
def add_embedding(self):
with tf.device('/cpu:0'):
with tf.variable_scope('embedding'):
# 使用预训练词向量,wv类型若为float,则必须为float32,保持统一
# embedding = tf.get_variable('Embedding', initializer=self.wv)
# 不使用预训练词向量
embedding = tf.get_variable('embed', [len(self.wv), self.config.embed_size])
window = tf.nn.embedding_lookup(embedding, self.input_placeholder)
window = tf.reshape(window, [-1, self.config.embed_size*self.config.window_size])
return window
# 建立模型
# 所有变量使用xavier_weight_init来初始化,防止神经元过早饱和
def add_model(self, window):
with tf.device('/cpu:0'):
# 第一层的网络
with tf.variable_scope('Layer1', initializer=xavier_weight_init()):
W = tf.get_variable('w1',
[self.config.embed_size*self.config.window_size, self.config.hidden_size])
b1 = tf.get_variable('b1', [self.config.hidden_size])
h = tf.nn.tanh(tf.matmul(window, W)+b1)
# 加入l2正则化项
if self.config.l2:
# 加入到一个名为'total_loss'的变量里
tf.add_to_collection('total_loss', 0.5*self.config.l2*tf.nn.l2_loss(W))
# 第二层的网络
with tf.variable_scope('Layer2', initializer=xavier_weight_init()):
U = tf.get_variable('w2',
[self.config.hidden_size, self.config.label_size])
b2 = tf.get_variable('b2', [self.config.label_size])
y = tf.matmul(h, U) + b2
# 加入l2正则化项
if self.config.l2:
tf.add_to_collection('total_loss', 0.5*self.config.l2*tf.nn.l2_loss(U))
# 输出使用dropout
output = tf.nn.dropout(y, self.dropout_placeholder)
return output
# 加入损失节点
def add_loss_op(self, pred):
# 交叉熵损失
ce = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, self.labels_placeholder))
# 加入ce到total_loss
tf.add_to_collection('total_loss', ce)
# 总损失,add_n对多个形状一样的tensor求和
loss = tf.add_n(tf.get_collection('total_loss'))
return loss
# 加入训练节点
def add_training_op(self, loss):
opt = tf.train.AdamOptimizer(
return opt
# 每个循环需要做的事情
def run_epoch(self, sess, input_data, input_labels, shuffle=True, verbose=True):
# 所有的数据
orig_X, orig_y = input_data, input_labels
dp = self.config.dropout
# 记录各个循环的误差
total_loss = []
# 记录所有训练数据中预测正确的数量
total_correct_examples = 0
# 记录所有已经处理的数据
total_processed_examples = 0
total_steps = len(orig_X) / self.config.batch_size
# 对每份批量数据
for step, (x, y) in enumerate(
data_iterator(orig_X, orig_y, batch_size=self.config.batch_size,
label_size=self.config.label_size, shuffle=shuffle)):
# 获取feed字典
feed = self.create_feed_dict(input_batch=x, dropout=dp, label_batch=y)
# 运行计算图,获取损失和正确预测个数
loss, total_correct, _ =
[self.loss, self.correct_predictions, self.train_op],
total_processed_examples += len(x)
total_correct_examples += total_correct
# 若可显示,则输出每个循环/每步迭代的结果
if verbose and step % verbose == 0:
sys.stdout.write('\r{} / {} : loss = {}'.format(
step, total_steps, np.mean(total_loss)))
if verbose:
# 返回平均误差和正确率
return np.mean(total_loss), total_correct_examples / float(total_processed_examples)
# 在已经训练好的模型下进行预测
# 注意此时的dropout/keep_prob应该等于1
# 如果y=None,表示我只需要获取X对应的预测值
# 如果y!=None,表示我要获取X对应的预测值,以及预测值与真实值比较的损失
def predict(self, session, X, y=None):
# dropout
dp = 1
# 损失
losses = []
# 输出结果
results = []
# 判定y是否为None
if np.any(y):
data = data_iterator(X, y, batch_size=self.config.batch_size,
label_size=self.config.label_size, shuffle=False)
data = data_iterator(X, batch_size=self.config.batch_size,
label_size=self.config.label_size, shuffle=False)
# 每步循环
for step, (x, y) in enumerate(data):
feed = self.create_feed_dict(input_batch=x, dropout=dp)
# 如果y非空,获取预测值和损失
if np.any(y):
feed[self.labels_placeholder] = y
loss, preds =
[self.loss, self.predictions], feed_dict=feed)
# 如果y为空,只获取预测值
preds =, feed_dict=feed)
predicted_indices = preds.argmax(axis=1)
# 返回平均损失和预测结果
return np.mean(losses), results
# 打印困惑矩阵
def print_confusion(confusion, num_to_tag):
# 从上到下求和
total_guessed_tags = confusion.sum(axis=0)
# 从左到右求和
total_true_tags = confusion.sum(axis=1)
print confusion
for i, tag in sorted(num_to_tag.items()):
# 查准率
prec = confusion[i, i] / float(total_guessed_tags[i])
# 查全率
recall = confusion[i, i] / float(total_true_tags[i])
print 'Tag: {} - P {:2.4f} / R {:2.4f}'.format(tag, prec, recall)
# 返回困惑矩阵(对角线上数据越大越准确,其余位置越大效果越差),大小为5*5
def calculate_confusion(config, predicted_indices, y_indices):
confusion = np.zeros((config.label_size, config.label_size), dtype=np.int32)
for i in xrange(len(y_indices)):
correct_label = y_indices[i]
guessed_label = predicted_indices[i]
confusion[correct_label, guessed_label] += 1
return confusion
# 保存预测值
def save_predictions(predictions, filename):
with open(filename, "wb") as f:
for prediction in predictions:
f.write(str(prediction) + "\n")
# 测试NER
# 如果要debug,可以设置max_epochs=1会快速迭代
def test_NER():
# 获取配置
config = Config()
# 建立默认图
with tf.Graph().as_default():
# 初始化模型
model = NERModel(config)
# 初始化变量
init = tf.initialize_all_variables()
# 用于存储变量
saver = tf.train.Saver()
with tf.Session( as session:
# 存储最小损失
best_val_loss = float('inf')
# 存储最小损失对应的迭代次数
best_val_epoch = 0
# 开始迭代
for epoch in xrange(config.max_epochs):
print 'Epoch {}'.format(epoch)
start = time.time()
# 获取一次迭代的损失和准确率
train_loss, train_acc = model.run_epoch(session, model.X_train,
# 验证集的损失和预测 model.y_train)
val_loss, predictions = model.predict(session, model.X_dev, model.y_dev)
print 'Training loss: {}'.format(train_loss)
print 'Training acc: {}'.format(train_acc)
print 'Validation loss: {}'.format(val_loss)
# 保存最优值
if val_loss < best_val_loss:
best_val_loss = val_loss
best_val_epoch = epoch
if not os.path.exists("./weights"):
# 保存最优时的变量, './weights/ner.weights')
# 超过early_stopping,跳出循环
if epoch - best_val_epoch > config.early_stopping:
confusion = calculate_confusion(config, predictions, model.y_dev)
# 打印
print_confusion(confusion, model.num2tag)
# 耗时
print 'Total time: {}'.format(time.time() - start)
# 读取之前存储的变量到session里
saver.restore(session, './weights/ner.weights')
# 测试
print 'Test'
print '=-=-='
print 'Writing predictions to q2_test.predicted'
_, predictions = model.predict(session, model.X_test, model.y_test)
# 保存预测值
save_predictions(predictions, "q2_test.predicted")
if __name__ == "__main__":