【推荐算法工程师技术栈系列】分布式&数据库--tensorflow
混沌战神阿瑞斯 人气:0
[TOC]
# TensorFlow 高阶API
## Dataset(tf.data)
源码定义:[tensorflow/pythonhttps://img.qb5200.com/download-x/data/opshttps://img.qb5200.com/download-x/dataset_ops.py](https://www.github.com/tensorflow/tensorflow/blob/r1.10/tensorflow/pythonhttps://img.qb5200.com/download-x/data/opshttps://img.qb5200.com/download-x/dataset_ops.py)
用户向导:[Dataset Input Pipeline](https://tensorflow.google.cn/versions/r1.10/api_guides/python/input_dataset)
**Dataset**可以用来表示输入管道元素集合(张量的嵌套结构)和“逻辑计划“对这些元素的转换操作。在Dataset中元素可以是向量,元组或字典等形式。
另外,Dataset需要配合另外一个类Iterator进行使用,Iterator对象是一个迭代器,可以对Dataset中的元素进行迭代提取。
api | 说明
---|---
from_generator(generator,output_types,output_shapes=None) | Creates a Dataset whose elements are generated by generator.
from_tensor_slices(tensors) | 创建dataset,其元素是给定张量的切片的元素
from_tensors(tensors) | 创建一个Dataset包含给定张量的单个元素。
interleave(map_func,cycle_length,block_length=1) | 对数据集进行map转换并合并插入结果
list_files(file_pattern,shuffle=None) | A dataset of all files matching a pattern.
range(*args) | Creates a Dataset of a step-separated range of values.
skip(count) | Creates a Dataset that skips count elements from this dataset.
take(count) | Creates a Dataset with at most count elements from this dataset.
zip(datasets) | Creates a Dataset by zipping together the given datasets.
prefetch(buffer_size) | 创建dataset,在请求输入数据集之前从输入数据集中预提取元素
apply(transformation_func) | Apply a transformation function to this dataset.
cache(filename='') | Caches the elements in this dataset.
concatenate(dataset) | Creates a Dataset by concatenating given dataset with this dataset.
filter(predicate) | Filters this dataset according to predicate.
map(map_func,num_parallel_calls=None) | Maps map_func across this dataset.
flat_map(map_func) | Maps map_func across this dataset and flattens the result.
batch(batch_size) | 将数据集的连续元素合成批次
padded_batch(batch_size,padded_shapes,padding_values=None) | 将数据集的连续元素组合到填充批次中,此转换将输入数据集的多个连续元素组合为单个元素。
repeat(count=None) | Repeats this dataset count times.
shard(num_shards,index) | 将Dataset分割成num_shards个子数据集。这个函数在分布式训练中非常有用,它允许每个设备读取唯一子集。
shuffle(buffer_size,seed=None,reshuffle_each_iteration=None) | 随机混洗数据集的元素。
make_initializable_iterator(shared_name=None) | Creates an Iterator for enumerating the elements of this dataset.
make_one_shot_iterator() | 创建Iterator用于枚举此数据集的元素。(可自动初始化)
**Iterator**
源码定义:[tensorflow/pythonhttps://img.qb5200.com/download-x/data/ops/iterator_ops.py](https://www.github.com/tensorflow/tensorflow/blob/r1.10/tensorflow/pythonhttps://img.qb5200.com/download-x/data/ops/iterator_ops.py)
用户向导:[Dataset Input Pipeline > Iterating over datasets](https://tensorflow.google.cn/versions/r1.10/api_guides/python/input_dataset#Iterating_over_datasets)
api | 说明
---|---
get_next(name=None) | In graph mode, you should typically call this method once and use its result as the input to another computation.
## Estimator(tf.estimator)
源码定义:[tensorflow/python/estimator/estimator.py](https://www.github.com/tensorflow/tensorflow/blob/r1.10/tensorflow/python/estimator/estimator.py)
用户向导:[Regression Examples](https://tensorflow.google.cn/versions/r1.10/api_guides/python/regression_examples)
Estimator对象包装(wraps)由model_fn指定的模型,model_fn由给定输入和其他一些参数,返回需要进行训练、计算,或预测的操作.
所有输出(检查点,事件文件等)都被写入model_dir或其子目录.如果model_dir未设置,则使用临时目录.
可以通过RunConfig对象(包含了有关执行环境的信息)传递config参数.它被传递给model_fn,如果model_fn有一个名为“config”的参数(和输入函数以相同的方式).Estimator使得模型可配置化,并且还使用其一些字段来控制内部,特别是关于检查点的控制.
该params参数包含hyperparameter,如果model_fn有一个名为“PARAMS”的参数,并且以相同的方式传递给输入函数,则将它传递给model_fn. Estimator只是沿着参数传递,并不检查它.因此,params的结构完全取决于开发人员.
不能在子类中重写任何Estimator方法(其构造函数强制执行此操作).子类应使用model_fn来配置基类,并且可以添加实现专门功能的方法.
api | 说明
---|---
__init__(model_fn, model_dir=None, config=None, params=None) | Estimator的构造方法,返回EstimatorSpec实例
evaluate(input_fn,steps=None,hooks=None,checkpoint_path=None,name=None) | 返回一个包含按name为键的model_fn中指定的计算指标的词典。
export_savedmodel(export_dir_base, serving_input_receiver_fn) | 将推理图作为SavedModel导出到给定的目录中.该方法通过首先调用serving_input_receiver_fn来获取特征Tensors来构建一个新图,然后调用这个Estimator的model_fn来基于这些特征生成模型图.它在新的会话中将给定的检查点恢复到该图中.最后它会在给定的export_dir_base下面创建一个时间戳导出目录,并在其中写入一个SavedModel,其中包含从此会话保存的单个MetaGraphDef.
get_variable_names() | 返回此模型中所有变量名称的列表.
get_variable_value(name) | 返回由名称给出的变量的值. 返回值类型:Numpy数组
latest_checkpoint() | 查找model_dir中最新保存的检查点文件的文件名.
predict(self,input_fn,predict_keys=None) | 对给定的features产生预测,返回predictions字典张量的计算值.
train(input_fn,hooks=None,steps=None,max_steps=None,saving_listeners=None) | 训练给定训练数据input_fn的模型.
## FeatureColumns(tf.feature_column)
源码定义:[tensorflow/python/feature_column/feature_column.py](https://www.github.com/tensorflow/tensorflow/blob/r1.10/tensorflow/python/feature_column/feature_column.py)
用户向导:[feature_columns](https://www.tensorflow.org/guide/feature_columns?hl=zh-cn)
[feature_columns2](http://xudongyang.coding.me/tensorflow-feature-columns/)
Feature cloumns是原始数据和Estimator模型之间的桥梁,它们被用来把各种形式的原始数据转换为模型能够使用的格式
api | 返回值 | 用法
---|---|---
tf.feature_column.bucketized_column(source_column,boundaries) | Represents discretized dense input.| Bucketized column用来把numeric column的值按照提供的边界(boundaries)离散化为多个值
tf.feature_column.categorical_column_with_hash_bucket(key,hash_bucket_size,dtype=tf.string) | Represents sparse feature where ids are set by hashing.| 用户指定类别的总数,通过hash的方式来得到最终的类别ID
tf.feature_column.categorical_column_with_identity(key,num_buckets,default_value=None) | A _CategoricalColumn that returns identity values.|与Bucketized column类似,Categorical identity column用单个唯一值表示bucket。
tf.feature_column.categorical_column_with_vocabulary_file(key,vocabulary_file) | A _CategoricalColumn with a vocabulary file.|类别特征做one-hot编码,字典通过文件给出
tf.feature_column.categorical_column_with_vocabulary_list(key,vocabulary_list) | A _CategoricalColumn with in-memory vocabulary. | 类别特征做one-hot编码,字典通过list给出
tf.feature_column.crossed_column(keys,hash_bucket_size,hash_key=None) | Returns a column for performing crosses of categorical features.| 对keys的笛卡尔积执行hash操作,再把hash的结果对hash_bucket_size取模得到最终的结果
tf.feature_column.embedding_column(categorical_column,dimension,combiner='mean') | _DenseColumn that converts from sparse, categorical input.|把原始特征映射为一个低维稠密的实数向量
tf.feature_column.indicator_column(categorical_column) | Represents multi-hot representation of given categorical column. | 把categorical column得到的稀疏tensor转换为one-hot或者multi-hot形式的稠密tensor
tf.feature_column.input_layer(features,feature_columns) | Returns a dense Tensor as input layer based on given feature_columns.
tf.feature_column.linear_model(features,feature_columns) | Returns a linear prediction Tensor based on given feature_columns.
tf.feature_column.make_parse_example_spec(feature_columns) | Creates parsing spec dictionary from input feature_columns.
tf.feature_column.numeric_column(key,shape=(1,),default_value=None,dtype=tf.float32,normalizer_fn=None) | Represents real valued or numerical features.
tf.feature_column.shared_embedding_columns(categorical_columns,dimension,combiner='mean') | List of dense columns that convert from sparse, categorical input.|把多个特征共享相同的embeding映射空间
tf.feature_column.weighted_categorical_column(categorical_column,weight_feature_key,dtype=tf.float32)| |给一个类别特征赋予一定的权重,给一个类别特征赋予一定的权重
## tf.nn
激活函数op,loss部分抽象
源码定义:[tensorflow/python/ops/gen_nn_ops.py](https://www.github.com/tensorflow/tensorflow/blob/r1.10/tensorflow/python/ops/gen_nn_ops.py)
用户向导:[Neural Network](https://tensorflow.google.cn/versions/r1.10/api_guides/python/nn)
api | 说明
---|---
tf.nn.relu(...) | Computes rectified linear: max(features, 0).
tf.nn.sigmoid(...) | Computes sigmoid of x element-wise.
tf.nn.bias_add(...) |
tf.nn.tanh(...) | Computes hyperbolic tangent of x element-wise.
tf.nn.dropout() |
tf.nn.softmax() |
tf.nn.sigmoid_cross_entropy_with_logits(...) | Computes sigmoid cross entropy given logits.
tf.nn.top_k(...) | Finds values and indices of the k largest entries for the last dimension.
tf.nn.embedding_lookup(params,ids,partition_strategy='mod') | 按照ids查找params里面的vector然后输出
tf.nn.embedding_lookup_sparse(params,sp_ids,sp_weights) | Looks up ids in a list of embedding tensors,Computes embeddings for the given ids and weights.
tf.nn.zero_fraction(value,name=None) | Returns the fraction of zeros in value.
tf.nn.nce_loss() | 负采样的NCE loss实现
## tf.layers
网络模块抽象
api | 说明
---|---
average_pooling1d(...) | Average Pooling layer for 1D inputs.
average_pooling2d(...) | Average pooling layer for 2D inputs (e.g. images).
average_pooling3d(...) | Average pooling layer for 3D inputs (e.g. volumes).
batch_normalization(...) | Functional interface for the batch normalization layer.
conv1d(...) | Functional interface for 1D convolution layer (e.g. temporal convolution).
conv2d(...) | Functional interface for the 2D convolution layer.
conv2d_transpose(...) | Functional interface for transposed 2D convolution layer.
conv3d(...) | Functional interface for the 3D convolution layer.
conv3d_transpose(...) | Functional interface for transposed 3D convolution layer.
dense(...) | Functional interface for the densely-connected layer.
dropout(...) | Applies Dropout to the input.
flatten(...) | Flattens an input tensor while preserving the batch axis (axis 0).
max_pooling1d(...) | Max Pooling layer for 1D inputs.
max_pooling2d(...) | Max pooling layer for 2D inputs (e.g. images).
max_pooling3d(...) | Max pooling layer for 3D inputs (e.g. volumes).
separable_conv1d(...) | Functional interface for the depthwise separable 1D convolution layer.
separable_conv2d(...) | Functional interface for the depthwise separable 2D convolution layer.
## tf.train
tf.train provides a set of classes and functions that help train models.
用户向导:[Training](https://tensorflow.google.cn/versions/r1.10/api_guides/python/train)
api | 说明
---|---
tf.train.Optimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.GradientDescentOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdadeltaOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdagradOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdamOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.SyncReplicasOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.SessionRunHook | Training Hooks,Hook to extend calls to MonitoredSession.run().
tf.train.SessionRunArgs | Training Hooks,Represents arguments to be added to a Session.run() call.
tf.train.global_step | sess每执行完一个batch,就给global_step加1,用来统计目前执行的batch数
tf.train.basic_train_loop |
tf.train.get_global_step() | 返回global_step作为name的tensor
tf.train.get_or_create_global_step() | Returns and create (if necessary) the global step tensor.
tf.train.write_graph |
tf.train.string_input_producer | 输出字符串到一个输入管道队列
tf.train.Saver(max_to_keep=5) | 保存模型
tf.train.latest_checkpoint(checkpoint_dir,latest_filename=None) | Finds the filename of latest saved checkpoint file.
tf.train.Int64List | Int64List
tf.train.FloatList | FloatList
tf.train.Features | Features
tf.train.Example | Example
tf.train.batch(tensors,batch_size) | Creates batches of tensors in tensors.
tf.trainable_variables() |
tf.train.Scaffold | Scaffold
## tf.linalg
TensorFlow 的线性代数库。
[w3cschool TensorFlow官方文档-linalg](https://www.w3cschool.cn/tensorflow_python/tensorflow_python-pcx62vdu.html)
api | 说明
---|---
adjoint(...) | 转置最后两个维度和共轭张量matrix.
band_part(...) | 复制张量设置每个最内层矩阵中心带外的所有内容
cholesky(...) | 计算一个或多个方阵的Cholesky分解.
cholesky_solve(...) | A X = RHS给出的Cholesky因子分解,求解线性方程组.
det(...) | 计算一个或多个方阵的行列式.
diag(...) | 返回具有给定批处理对角线值的批处理对角线张量.
diag_part(...) | 返回批处理张量的批处理对角线部分.
eigh(...) | 计算了一批自共轭矩阵的特征分解.
eigvalsh(...) | 计算一个或多个自共轭矩阵的特征值.
einsum(...) | 任意维度的张量之间的广义收缩.
expm(...) | 计算一个或多个方阵的矩阵指数.
eye(...) | 构造一个单位矩阵或批矩阵.
inv(...) | 计算一个或多个平方可逆矩阵或它们的倒数
logdet(...) | 计算hermitian正定矩阵的行列式的对数.
logm(...) | 计算一个或多个方阵的矩阵对数:
lstsq(...) | 解决一个或多个线性最小二乘问题.
norm(...) | 计算向量,矩阵和张量的范数.(不赞成的参数)
qr(...) | 计算一个或多个矩阵的QR分解.
set_diag(...) | 返回具有新批处理对角线值的批处理矩阵张量.
slogdet(...) | 计算行列式的绝对值的符号和日志
solve(...) | 求解线性方程组.
svd(...) | 计算一个或多个矩阵的奇异值分解.
tensordot(...) | a和b沿指定轴的张量收缩.
trace(...) | 计算张量x的轨迹.
transpose(...) | 转置张量a的最后两个维度.
triangular_solve(...) | 求解具有上三角矩阵或下三角矩阵的线性方程组.
## checkpoint(模型保存与恢复)
Estimator 将一个检查点保存到 model_dir 中后,每次调用 Estimator 的 train、eval 或 predict 方法时,都会发生下列情况:
a) Estimator 通过运行 model_fn() 构建模型图。(要详细了解 model_fn(),请参阅创建自定义 Estimator。)
b) Estimator 根据最近写入的检查点中存储的数据来初始化新模型的权重。
换言之,一旦存在检查点,TensorFlow 就会在您每次调用 train()、evaluate() 或 predict() 时重建模型。
# Tensorflow Serving
## 官方例子
### half_plus_two的例子
```
# Download the TensorFlow Serving Docker image and repo
docker pull tensorflow/serving
git clone https://github.com/tensorflow/serving
# Location of demo models
TESTDATA="$(pwd)/serving/tensorflow_serving/servables/tensorflow/testdata"
# Start TensorFlow Serving container and open the REST API port
# docker run -t 表示是否允许伪TTY
# docker run --rm表示如果实例已经存在,则先remove掉该实例再重新启动新实例
# docker -p设置端口映射
# docker -v设置磁盘映射
# docker -e设置环境变量
docker run -t --rm -p 8501:8501 \
-v "$TESTDATA/saved_model_half_plus_two_cpu:/models/half_plus_two" \
-e MODEL_NAME=half_plus_two \
tensorflow/serving &
# Query the model using the predict API
curl -d '{"instances": [1.0, 2.0, 5.0]}' \
-X POST http://localhost:8501/v1/models/half_plus_two:predict
# Returns => { "predictions": [2.5, 3.0, 4.5] }
```
serving镜像提供了两种调用方式:gRPC和HTTP请求。gRPC默认端口是8500,HTTP请求的默认端口是8501,serving镜像中的程序会自动加载镜像内/models下的模型,通过MODEL_NAME指定/models下的哪个模型。
### 创建自定义镜像
```
docker run --rm -p 8501:8501 \
--mount type=bind,source=${model_dir},target=/modelshttps://img.qb5200.com/download-x/dpp_model \
-e MODEL_NAME=dpp_model -t tensorflow/serving &
```
[官方文档](https://tensorflow.google.cn/tfx/servinghttps://img.qb5200.com/download-x/docker)
## 架构
TensorFlow Serving可抽象为一些组件构成,每个组件实现了不同的API任务,其中最重要的是Servable, Loader, Source, 和 Manager,我们看一下组件之间是如何交互的。
### Source
TF Serving发现磁盘上的模型文件,该模型服务的生命周期就开始了。**Source组件负责发现模型文件**,找出需要加载的新模型。实际上,该组件监控文件系统,当发现一个新的模型版本,就为该模型创建一个Loader。
### Loader
Loader需要知道模型的相关信息,包括如何加载模型如何估算模型需要的资源,包括需要请求的RAM、GPU内存。**Loader带一个指针,连接到磁盘上存储的模型,其中包含加载模型需要的相关元数据**。不过记住,Loader现在还不允许加载模型。
### Manager
Manager收到待加载模型版本,开始模型服务流程。此处有两种可能性,第一种情况是模型首次推送部署,Manager先确保模型需要的资源可用,一旦获取相应的资源,Manager赋予Loader权限去加载模型。
第二种情况是为已上线模型部署一个新版本。Manager会先查询**Version Policy**插件,决定加载新模型的流程如何进行。
具体来说,当加载新模型时,可选择保持 (1) 可用性(the Availability Preserving Policy)或 (2) 资源(the Resource Preserving Policy)。
### Servable
最后,当用户请求模型的句柄,Manager返回句柄给Servable。
Servables 是 TensorFlow Serving 中最核心的抽象,是客户端用于执行计算 (例如:查找或推断) 的底层对象。
## 部署服务
### 模型导出
将TensorFlow构建的模型用作服务,首先需要确保导出为正确的格式,可以采用TensorFlow提供的SavedModel类。TensorFlow Saver提供模型checkpoint磁盘文件的保存/恢复。事实上SavedModel封装了TensorFlow Saver,对于模型服务是一种标准的导出方法。
SignatureDefs定义了一组TensorFlow支持的计算签名,便于在计算图中找到适合的输入输出张量。简单的说,使用这些计算签名,可以准确指定特定的输入输出节点。目前有3个服务API: 分类、预测和回归。每个签名定义关联一个RPC API。分类SignatureDef用于分类RPC API,预测SignatureDef用于RPC API等等。对于分类SignatureDef,需要一个输入张量(接收数据)以及可能的输出张量: 类别或得分。回归SignatureDef需要一个输入张量以及另一个输出张量。最后预测SignatureDef需要一个可变长度的输入输出张量。
[提供tensorflow模型](https://tensorflow.google.cn/tfx/serving/serving_basic)
#### tf.saved_model API
```
with sess.graph.as_default() as graph:
builder = tf.saved_model.builder.SavedModelBuilder(saved_model_dir)
# 1.8版本,1.12之后调整为tf.saved_model.predict_signature_def
signature = tf.saved_model.signature_def_utils.predict_signature_def(inputs={'image': in_image},
outputs={'prediction': graph.get_tensor_by_name('final_result:0')},)
builder.add_meta_graph_and_variables(sess=sess,
tags=["serve"],
signature_def_map={'predict':signature,
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:signature
})
builder.save()
```
#### tf.estimator API
```
def export_savedmodel(self,
export_dir,
serving_input_receiver_fn=None,
as_text=False,
checkpoint_path=None):
if serving_input_receiver_fn is None and self.feature_cfgs is None:
raise ValueError('Both serving_input_receiver_fn and feature_cfgs are none.')
if serving_input_receiver_fn is None:
feature_spec = self.feature_cfgs.to_feature_spec(exclude_targets=True)
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
# 自己封装的estimator
savedmodel_path = self.estimator.export_savedmodel(
export_dir_base=export_dir,
serving_input_receiver_fn=serving_input_receiver_fn,
as_text=as_text,
checkpoint_path=checkpoint_path
)
return savedmodel_path
```
其中self.feature_cfgs.to_feature_spec函数得到如下结果:
```
{'follow_seq': VarLenFeature(dtype=tf.int64), 'uid': VarLenFeature(dtype=tf.int64)}
```
说明了输入】
TensorFlow Serving ModelServer 用于发现新导出的模型,并启动 gRPC 用于提供模型服务。
## API请求(predict)
TensorFlow ModelServer通过host:port接受下面这种RESTful API请求:
POST http://host:port/
加载全部内容