tf.contrib.learn 阅读历程

tf.contrib.learn 基本用法

https://www.tensorflow.org/get_started/tflearn

基本过程

  1. Load file containing training/test data into a TensorFlow Dataset
  2. Construct a neural network classifier
  3. Fit the model using the training data
  4. Evaluate the accuracy of the model
  5. Classify new samples/infer work

    Logging and Monitoring Basics with tf.contrib.learn

    https://www.tensorflow.org/get_started/monitors
    To track and evaluate progress in real time, When training a model.
    Without any logging, model training feels like a bit of a black box; you can’t see what’s happening as TensorFlow steps through gradient descent, get a sense of whether the model is converging appropriately, or audit to determine whether early stopping might be appropriate.
    One way to address this problem would be to split model training into multiple fit calls with smaller numbers of steps in order to evaluate accuracy more progressively. However, this is not recommended practice, as it greatly slows down model training.
    tf.contrib.learn offers another solution: a Monitor API designed to help you log metrics and evaluate your model while training is in progress.

    Enabling Logging

    TensorFlow uses five different levels for log messages: DEBUG, INFO, WARN, ERROR, and FATAL.
    1
    tf.logging.set_verbosity(tf.logging.INFO)

when tracking model training, you’ll want to adjust the level to INFO, which will provide additional feedback as fit operations are in progress.

1
2
3
INFO:tensorflow:loss = 1.18812, step = 1
INFO:tensorflow:loss = 0.210323, step = 101
INFO:tensorflow:loss = 0.109025, step = 201

With INFO-level logging, tf.contrib.learn automatically outputs training-loss metrics to stderr after every 100 steps.

Configuring a ValidationMonitor for Streaming Evaluation

tf.contrib.learn provides several high-level Monitors you can attach to your fit operations to further track metrics and/or debug lower-level TensorFlow operations during model training, including:

Monitor Description
CaptureVariable Saves a specified variable’s values into a collection at every n steps of training
PrintTensor Logs a specified tensor’s values at every n steps of training
SummarySaver Saves tf.Summary protocol buffers for a given tensor using a tf.summary.FileWriter at every n steps of training
ValidationMonitor Logs a specified set of evaluation metrics at every n steps of training, and, if desired, implements early stopping under certain conditions
Evaluating Every N Steps

while logging training loss, you might also want to simultaneously evaluate against test data to see how well the model is generalizing.
You can accomplish this by configuring a ValidationMonitor with the test data,, and setting how often to evaluate with every_n_steps. The default value of every_n_steps is 100.

1
2
3
4
validation_monitor = tf.contrib.learn.monitors.ValidationMonitor(
test_set.data,
test_set.target,
every_n_steps=50)

ValidationMonitors rely on saved checkpoints to perform evaluation operations, so you’ll want to modify instantiation of the classifier to add a tf.contrib.learn.RunConfig that includes save_checkpoints_secs, which specifies how many seconds should elapse between checkpoint saves during training.

Customizing the Evaluation Metrics with MetricSpec

To specify the exact metrics you’d like to run in each evaluation pass, you can add a metrics param to the ValidationMonitor constructor.
metrics takes a dict of key/value pairs, where each key is the name you’d like logged for the metric, and the corresponding value is a MetricSpec object.
MetricSpec constructor: metric_fn, prediction_key, label_key, weights_key.

Early Stopping with ValidationMonitor

In addition to logging eval metrics, ValidationMonitors make it easy to implement early stopping when specified conditions are met, via three params:

Param Description
early_stopping_metric Metric that triggers early stopping (e.g., loss or accuracy) under conditions specified in early_stopping_rounds and early_stopping_metric_minimize. Default is “loss”.
early_stopping_metric_minimize True if desired model behavior is to minimize the value of early_stopping_metric; False if desired model behavior is to maximize the value of early_stopping_metric. Default is True.
early_stopping_rounds Sets a number of steps during which if the early_stopping_metric does not decrease (if early_stopping_metric_minimize is True) or increase (if early_stopping_metric_minimize is False), training will be stopped. Default is None, which means early stopping will never occur.

Building Input Functions

https://www.tensorflow.org/get_started/input_fn

How to construct an input_fn to preprocess and feed data into your models.
tf.contrib.learn supports using a custom input function (input_fn) to encapsulate the logic for preprocessing and piping data into your models.

Anatomy of an input_fn

the basic skeleton for an input function:

1
2
3
4
5
6
7
def my_input_fn():
# Preprocess your data here...
# ...then return 1) a mapping of feature columns to Tensors with
# the corresponding feature data, and 2) a Tensor containing labels
return feature_cols, labels

The body of the input function contains the specific logic for preprocessing your input data, such as scrubbing out bad examples or feature scaling.
Input functions must return the following two values containing the final feature and label data to be fed into your model (as shown in the above code skeleton):

  1. feature_cols (list of Tensors)
    A dict containing key/value pairs that map feature column names to Tensors (or SparseTensors) containing the corresponding feature data.
  2. labels
    A Tensor containing your label (target) values: the values your model aims to predict.

Converting Feature Data to Tensors

For continuous data, you can create and populate a Tensor using tf.constant:

1
2
feature_column_data = [1, 2.4, 0, 9.9, 3, 120]
feature_tensor = tf.constant(feature_column_data)

For sparse, categorical data (data where the majority of values are 0), you’ll instead want to populate a SparseTensor, which is instantiated with three arguments:

  1. dense_shape
    The shape of the tensor. Takes a list indicating the number of elements in each dimension.
  2. indices.
    The indices of the elements in your tensor that contain nonzero values. Takes a list of terms, where each term is itself a list containing the index of a nonzero element.
  3. values
    A one-dimensional tensor of values. Term i in values corresponds to term i in indices and specifies its value.
    1
    2
    3
    4
    5
    6
    sparse_tensor = tf.SparseTensor(indices=[[0,1], [2,4]],
    values=[6, 0.5],
    dense_shape=[3, 5])
    [[0, 6, 0, 0, 0]
    [0, 0, 0, 0, 0]
    [0, 0, 0, 0, 0.5]]

Passing input_fn Data to Your Model

1
classifier.fit(input_fn=my_input_fn, steps=2000)

Threading and Queues

Queues are a powerful mechanism for asynchronous computation using TensorFlow.
A queue is a node in a TensorFlow graph. In particular, nodes can enqueue new items in to the queue, or dequeue existing items from the queue.
N.B. Queue methods (such as q.enqueue(…)) must run on the same device as the queue. Incompatible device placement directives will be ignored when creating these operations.

Queue usage overview

Queues, such as tf.FIFOQueue and tf.RandomShuffleQueue, are important TensorFlow objects for computing tensors asynchronously in a graph.
A typical input architecture is to use a RandomShuffleQueue to prepare inputs for training a model:

  • Multiple threads prepare training examples and push them in the queue.
  • A training thread executes a training op that dequeues mini-batches from the queue.
benefits
  • in the Reading data
  • Session object is multithreaded, so multiple threads can easily use the same session and run ops in parallel.
    However,it is not always easy in Python
    because All threads must be able to stop together, exceptions must be caught and reported, and queues must be properly closed when stopping.
    so, TensorFlow provides two classes to help: tf.train.Coordinator and tf.train.QueueRunner.`
  • tf.train.Coordinator
    1. helps multiple threads stop together,
    2. report exceptions to a program that waits for them to stop.
  • QueueRunner
    1. create a number of threads cooperating to enqueue tensors in the same queue.

Coordinator

helps multiple threads stop together

Method:

  • tf.train.Coordinator.should_stop: returns True if the threads should stop.
  • tf.train.Coordinator.request_stop: requests that threads should stop.
  • tf.train.Coordinator.join: waits until the specified threads have stopped.
Basic Usage
  1. first create a Coordinator object,
  2. then create a number of threads that use the coordinator.
  3. The threads typically run loops that stop when coordinator.should_stop() returns True.
  4. Any thread can decide that the computation should stop by calling coordinator.request_stop(), to ask for all the threads to stop. To cooperate with the requests, each thread must check for coord.should_stop() on a regular basis.
    coord.should_stop() returns True as soon as coord.request_stop() has been called.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Thread body: loop until the coordinator indicates a stop was requested.
# If some condition becomes true, ask the coordinator to stop.
def MyLoop(coord):
while not coord.should_stop():
...do something...
if ...some condition...:
coord.request_stop()
# Main thread: create a coordinator.
coord = tf.train.Coordinator()
# Create 10 threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]
# Start the threads and wait for all of them to stop.
for t in threads:
t.start()
coord.join(threads)
more detail

https://www.tensorflow.org/api_docs/python/tf/train/Coordinator

QueueRunner

The QueueRunner class creates a number of threads that repeatedly run an enqueue op.
These threads can use a coordinator to stop together.
In addition, a queue runner runs a closer thread that automatically closes the queue if an exception is reported to the coordinator.

basic usage
  1. First create a queue (e.g. a tf.RandomShuffleQueue).
  2. Add ops that process examples and enqueue them in the queue.
  3. Create dequeue ops from the queue, and return data tensor .
  4. Use data tensor to build the Tensorflow graph.
    1
    2
    3
    4
    5
    6
    7
    example = ...ops to create one example...
    # Create a queue, and an op that enqueues examples one at a time in the queue.
    queue = tf.RandomShuffleQueue(...)
    enqueue_op = queue.enqueue(example)
    # Create a training graph that starts by dequeuing a batch of examples.
    inputs = queue.dequeue_many(batch_size)
    train_op = ...use 'inputs' to build the training part of the graph...
2. create QueueRunner and combine with Coordinator
  1. create a QueueRunner that will run a few threads to process and enqueue examples. .
  2. Launch the graph.
  3. Create a coordinator, launch the queue runner threads.
  4. Run the training loop, controlling termination with the coordinator.
  5. When done, ask the threads to stop.
  6. wait for all threads to actually stop.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Create a queue runner that will run 4 threads in parallel to enqueue
# examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
# Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# Run the training loop, controlling termination with the coordinator.
for step in xrange(1000000):
if coord.should_stop():
break
sess.run(train_op)
# When done, ask the threads to stop.
coord.request_stop()
# And wait for them to actually do it.
coord.join(enqueue_threads)
Handling exceptions

Threads started by queue runners do more than just run the enqueue ops.
tf.errors.OutOfRangeError exception, which is used to report that a queue was closed.
A coordinator must similarly catch and report exceptions in its main loop.

1
2
3
4
5
6
7
8
9
10
11
12
try:
for step in xrange(1000000):
if coord.should_stop():
break
sess.run(train_op)
except Exception, e:
# Report exceptions to the coordinator.
coord.request_stop(e)
finally:
# Terminate as usual. It is safe to call `coord.request_stop()` twice.
coord.request_stop()
coord.join(threads)

Reading data

https://www.tensorflow.org/programmers_guide/reading_data

Reading from files

A typical pipeline for reading records from files has the following stages:

Step 1: Create string_input_producer

Creates a FIFO queue for holding the filenames until the reader needs them
Arguments:

  • filenames list.
    • filename in list is either a constant string Tensor or tf.train.match_filenames_once function.
  • shuffle and maximum number of epochs.
    • A queue runner adds the whole list of filenames to the queue once for each epoch
    • shuffling the filenames within an epoch if shuffle=True.
    • This procedure provides a uniform sampling of files, so that examples are not under- or over- sampled relative to each other.
    • The queue runner works in a thread separate from the reader that pulls filenames from the queue, so the shuffling and enqueuing process does not block the reader.
1
filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])
Step 2: create a Reader

A. Select the reader that matches your input file format
B. Then pass the filename queue to the reader’s read method.

  • The read method outputs a key and a scalar string value.
    • key: identifying the file and record (useful for debugging if you have some weird records)
    • Each execution of read reads a single line from the file.

C. Use one (or more) of the decoder and conversion ops to decode this string into the tensors that make up an example.

Step 3: call tf.train.start_queue_runners

call tf.train.start_queue_runners to populate the queue before you call run or eval to execute the read.

Standard TensorFlow format -TFRecords

more details:
https://www.tensorflow.org/api_guides/python/python_io#tfrecords_format_details
https://www.github.com/tensorflow/tensorflow/blob/r1.2/tensorflow/examples/how_tos/reading_data/convert_to_records.py

Preprocessing

You can then do any preprocessing of these examples you want.
Examples include normalization of your data, picking a random slice, adding noise or distortions
more details:
https://github.com/tensorflow/models/tree/master/tutorials/image/cifar10/cifar10_input.py

Batching

At the end of the pipeline we use another queue to batch together examples for training, evaluation, or inference. For this we use a queue that randomizes the order of examples, using the tf.train.shuffle_batch.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def read_my_file_format(filename_queue):
reader = tf.SomeReader()
key, record_string = reader.read(filename_queue)
example, label = tf.some_decoder(record_string)
processed_example = some_processing(example)
return processed_example, label
def input_pipeline(filenames, batch_size, num_epochs=None):
filename_queue = tf.train.string_input_producer(
filenames, num_epochs=num_epochs, shuffle=True)
example, label = read_my_file_format(filename_queue)
# min_after_dequeue defines how big a buffer we will randomly sample
# from -- bigger means better shuffling but slower start up and more
# memory used.
# capacity must be larger than min_after_dequeue and the amount larger
# determines the maximum we will prefetch. Recommendation:
# min_after_dequeue + (num_threads + a small safety margin) * batch_size
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch(
[example, label], batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch

multiple reader and one single filename queue

If we need more parallelism or shuffling of examples between files, use multiple reader instances using the tf.train.shuffle_batch_join. For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def read_my_file_format(filename_queue):
# Same as above
def input_pipeline(filenames, batch_size, read_threads, num_epochs=None):
filename_queue = tf.train.string_input_producer(
filenames, num_epochs=num_epochs, shuffle=True)
example_list = [read_my_file_format(filename_queue)
for _ in range(read_threads)]
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch_join(
example_list, batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch

We still only use a single filename queue that is shared by all the readers.

  • That way we ensure that the different readers use different files from the same epoch until all the files from the epoch have been started.
  • It is also usually sufficient for a single thread to fill the filename queue.)

How many threads do you need?
the tf.train.shuffle_batch* functions add a summary to the graph that indicates how full the example queue is. If you have enough reading threads, that summary will stay above zero.

Creating threads to prefetch using QueueRunner objects

many of the tf.train functions listed above add tf.train.QueueRunner objects to your graph.
These require that you call tf.train.start_queue_runners before running any training or inference steps, or it will hang forever.
This is best combined with a tf.train.Coordinator to cleanly shut down these threads when there are errors.
The recommended code pattern:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Create the graph, etc.
init_op = tf.global_variables_initializer()
# Create a session for running operations in the Graph.
sess = tf.Session()
# Initialize the variables (like the epoch counter).
sess.run(init_op)
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
while not coord.should_stop():
# Run training steps or whatever
sess.run(train_op)
except tf.errors.OutOfRangeError:
print('Done training -- epoch limit reached')
finally:
# When done, ask the threads to stop.
coord.request_stop()
# Wait for threads to finish.
coord.join(threads)
sess.close()

Summarize
read data process

First we create the graph.
It will have a few pipeline stages that are connected by queues.

  • The first stage: generate filenames to read and enqueue them in the filename queue.
  • The second stage: consumes filenames (using a Reader), produces examples, and enqueues them in an example queue.
    • Depending on how you have set things up, you may actually have a few independent copies of the second stage (in ther word, several readers), so that you can read from multiple files in parallel.
    • At the end is an enqueue operation, which enqueues into a queue that the next stage dequeues from.

We want to start threads running these enqueuing operations, so that our training loop can dequeue examples from the example queue.
Method: add a tf.train.QueueRunner to the graph using the tf.train.add_queue_runner function.

  • Each QueueRunner is responsible for one stage, and holds the list of enqueue operations that need to be run in threads.

Once the graph is constructed, the tf.train.start_queue_runners function asks each QueueRunner in the graph to start its threads running the enqueuing operations.

If all goes well, you can now run your training steps and the queues will be filled by the background threads.
If you have set an epoch limit, at some point an attempt to dequeue examples will get an tf.errors.OutOfRangeError(Equal to EOS/EOF).

The last ingredient is the tf.train.Coordinator. This is responsible for letting all the threads know if anything has signalled a shut down.

  • Most commonly this would be because an exception was raised, for example one of the threads got an error when running some operation (or an ordinary Python exception).
Filtering records or producing multiple examples per record

Instead of examples with shapes [x, y, z], you will produce a batch of examples with shape [batch, x, y, z]. The batch size can be 0 if you want to filter this record out (maybe it is in a hold-out set?), or bigger than 1 if you are producing multiple examples per record. Then simply set enqueue_many=True when calling one of the batching functions (such as shuffle_batch or shuffle_batch_join).

Using the Dataset

https://www.tensorflow.org/programmers_guide/datasets

Two API abstractions
tf.contrib.data.Dataset

contains a sequence of elements
There are two distinct ways to create a dataset:

  • Creating a source (e.g. Dataset.from_tensor_slices()) constructs a dataset from one or more tf.Tensor objects.
  • Applying a transformation (e.g. Dataset.batch()) constructs a dataset from one or more tf.contrib.data.Dataset objects.
tf.contrib.data.Iterator

Provides the main way to extract elements from a dataset.
The operation returned by Iterator.get_next() yields the next element of a Dataset when executed, and typically acts as the interface between input pipeline code and your model.
For more sophisticated uses, the Iterator.initializer operation enables you to reinitialize and parameterize an iterator with different datasets, so that you can, for example, iterate over training and validation data multiple times in the same program.

Basic mechanics
Define a source
  • construct a Dataset from some tensors in memory
    • use tf.contrib.data.Dataset.from_tensors()
    • use tf.contrib.data.Dataset.from_tensor_slices()
  • construct a Dataset from your input data are on disk in the recommend TFRecord format
    • use tf.contrib.data.TFRecordDataset
  • transform a Dataset into a new Dataset
    • apply per-element transformations such as Dataset.map()(to apply a function to each element), and multi-element transformations such as Dataset.batch().
  • The most common way to consume values from a Dataset
    • make an iterator object that provides access to one element of the dataset at a time.
    • for example, by calling Dataset.make_one_shot_iterator().
    • A tf.contrib.data.Iterator provides two operations:
      1. Iterator.initializer
        • enables you to (re)initialize the iterator’s state
      2. Iterator.get_next()
        • returns tf.Tensor objects that correspond to the symbolic next element.
    • Depending on your use case, you might choose a different type of iterator, and the options are outlined below.
Dataset structure

A dataset comprises elements that each have the same structure, called components.

  • Each component has a tf.DType and a tf.TensorShape.
  • The Dataset.output_types and Dataset.output_shapes properties allow you to inspect the inferred types and shapes of each component of a dataset element.
  • The nested structure of these properties map to the structure of an element, which may be a single tensor, a tuple of tensors, or a nested tuple of tensors.
1
2
3
4
5
6
7
8
9
10
11
12
13
dataset1 = tf.contrib.data.Dataset.from_tensor_slices(tf.random_uniform([4, 10]))
print(dataset1.output_types) # ==> "tf.float32"
print(dataset1.output_shapes) # ==> "(10,)"
dataset2 = tf.contrib.data.Dataset.from_tensor_slices(
(tf.random_uniform([4]),
tf.random_uniform([4, 100], maxval=100, dtype=tf.int32)))
print(dataset2.output_types) # ==> "(tf.float32, tf.int32)"
print(dataset2.output_shapes) # ==> "((), (100,))"
dataset3 = tf.contrib.data.Dataset.zip((dataset1, dataset2))
print(dataset3.output_types) # ==> (tf.float32, (tf.float32, tf.int32))
print(dataset3.output_shapes) # ==> "(10, ((), (100,)))"
  • use collections.namedtuple or a dictionary mapping strings to tensors to represent a single element of a Dataset.
1
2
3
4
5
dataset = tf.contrib.data.Dataset.from_tensor_slices(
{"a": tf.random_uniform([4]),
"b": tf.random_uniform([4, 100], maxval=100, dtype=tf.int32)})
print(dataset.output_types) # ==> "{'a': tf.float32, 'b': tf.int32}"
print(dataset.output_shapes) # ==> "{'a': (), 'b': (100,)}"

The Dataset transformations support datasets of any structure. When using the Dataset.map(), Dataset.flat_map(), and Dataset.filter() transformations, which apply a function to each element, the element structure determines the arguments of the function:

1
2
3
4
5
6
dataset1 = dataset1.map(lambda x: ...)
dataset2 = dataset2.flat_map(lambda x, y: ...)
# Note: Argument destructuring is not available in Python 3.
dataset3 = dataset3.filter(lambda x, (y, z): ...)
Creating an iterator

After building a Dataset, the next step is to create an Iterator to access elements from that dataset.
The Dataset API currently supports three kinds of iterator, in increasing level of sophistication:

one-shot
  • supports iterating once through a dataset
  • no need for explicit initialization
  • handle almost all of the cases that the existing queue-based input pipelines support
1
2
3
4
5
6
7
dataset = tf.contrib.data.Dataset.range(100)
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
for i in range(100):
value = sess.run(next_element)
assert i == value
initializable
  • must run an explicit iterator.initializer operation before using it.
  • enables you to parameterize the definition of the dataset
  • using one or more tf.placeholder() tensors that can be fed when you initialize the iterator.
reinitializable
  • can be initialized from multiple different Dataset objects
  • A reinitializable iterator is defined by its structure.
  • For example, training input pipeline uses random perturbations and validation input pipeline uses on unmodified data.
  • These pipelines will typically use different Dataset objects that have the same structure (i.e. the same types and compatible shapes for each component).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Define training and validation datasets with the same structure.
training_dataset = tf.contrib.data.Dataset.range(100).map(
lambda x: x + tf.random_uniform([], -10, 10, tf.int64))
validation_dataset = tf.contrib.data.Dataset.range(50)
# A reinitializable iterator is defined by its structure. We could use the
# `output_types` and `output_shapes` properties of either `training_dataset`
# or `validation_dataset` here, because they are compatible.
iterator = Iterator.from_structure(training_dataset.output_types,
training_dataset.output_shapes)
next_element = iterator.get_next()
training_init_op = iterator.make_initializer(training_dataset)
validation_init_op = iterator.make_initializer(validation_dataset)
# Run 20 epochs in which the training dataset is traversed, followed by the
# validation dataset.
for _ in range(20):
# Initialize an iterator over the training dataset.
sess.run(training_init_op)
for _ in range(100):
sess.run(next_element)
# Initialize an iterator over the validation dataset.
sess.run(validation_init_op)
for _ in range(50):
sess.run(next_element)
feedable
  • used together with tf.placeholder to select what Iterator to use in each call to tf.Session.run
  • offers the same functionality as a reinitializable iterator
  • not require to initialize the iterator from the start of a dataset when you switch between iterators
  • use tf.contrib.data.Iterator.from_string_handle to define a feedable iterator that allows you to switch between the two datasets
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Define training and validation datasets with the same structure.
training_dataset = tf.contrib.data.Dataset.range(100).map(
lambda x: x + tf.random_uniform([], -10, 10, tf.int64)).repeat()
validation_dataset = tf.contrib.data.Dataset.range(50)
# A feedable iterator is defined by a handle placeholder and its structure. We
# could use the `output_types` and `output_shapes` properties of either
# `training_dataset` or `validation_dataset` here, because they have
# identical structure.
handle = tf.placeholder(tf.string, shape=[])
iterator = tf.contrib.data.Iterator.from_string_handle(
handle, training_dataset.output_types, training_dataset.output_shapes)
next_element = iterator.get_next()
# You can use feedable iterators with a variety of different kinds of iterator
# (such as one-shot and initializable iterators).
training_iterator = training_dataset.make_one_shot_iterator()
validation_iterator = validation_dataset.make_initializable_iterator()
# The `Iterator.string_handle()` method returns a tensor that can be evaluated
# and used to feed the `handle` placeholder.
training_handle = sess.run(training_iterator.string_handle())
validation_handle = sess.run(validation_iterator.string_handle())
# Loop forever, alternating between training and validation.
while True:
# Run 200 steps using the training dataset. Note that the training dataset is
# infinite, and we resume from where we left off in the previous `while` loop
# iteration.
for _ in range(200):
sess.run(next_element, feed_dict={handle: training_handle})
# Run one pass over the validation dataset.
sess.run(validation_iterator.initializer)
for _ in range(50):
sess.run(next_element, feed_dict={handle: validation_handle})
Consuming values from an iterator

If the iterator reaches the end of the dataset, executing the Iterator.get_next() operation will raise a tf.errors.OutOfRangeError.
Must initialize it again if you want to use it further.
If each element of the dataset has a nested structure, the return value of Iterator.get_next() will be one or more tf.Tensor objects in the same nested structure:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
dataset1 = tf.contrib.data.Dataset.range(50)
dataset2 = tf.contrib.data.Dataset.range(50)
dataset3 = tf.contrib.data.Dataset.zip((dataset1, dataset2))
iterator = dataset3.make_initializable_iterator()
sess = tf.Session()
sess.run(iterator.initializer)
next1, next2 = iterator.get_next()
print(sess.run([next1]))
print(sess.run([next2]))
# evaluating any of next1, next2, or next3 will advance the iterator for all components
print(sess.run(iterator.get_next())
# A typical consumer of an iterator will include all components in a single expression.
print(sess.run([next1, next2 + 1]))

Notice:

  • evaluating any of next1, next2, or next3 will advance the iterator for all components
  • A typical consumer of an iterator will include all components in a single expression.

Reading input data

Consuming NumPy arrays

Dataset + tf.placeholder() + Iterator + feed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Load the training data into two NumPy arrays, for example using `np.load()`.
with np.load("/var/data/training_data.npy") as data:
features = data["features"]
labels = data["labels"]
# Assume that each row of `features` corresponds to the same row as `labels`.
assert features.shape[0] == labels.shape[0]
features_placeholder = tf.placeholder(features.dtype, features.shape)
labels_placeholder = tf.placeholder(labels.dtype, labels.shape)
dataset = tf.contrib.data.Dataset.from_tensor_slices((features_placeholder, labels_placeholder))
# [Other transformations on `dataset`...]
dataset = ...
iterator = dataset.make_initializable_iterator()
sess.run(iterator.initializer, feed_dict={features_placeholder: features,
labels_placeholder: labels})
Consuming TFRecord data

tf.contrib.data.TFRecordDataset

  • if you have two sets of files for training and validation
    • tf.placeholder + iterator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
filenames = tf.placeholder(tf.string, shape=[None])
dataset = tf.contrib.data.TFRecordDataset(filenames)
dataset = dataset.map(...) # Parse the record into tensors.
dataset = dataset.repeat() # Repeat the input indefinitely.
dataset = dataset.batch(32)
iterator = dataset.make_initializable_iterator()
# You can feed the initializer with the appropriate filenames for the current
# phase of execution, e.g. training vs. validation.
# Initialize `iterator` with training data.
training_filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
sess.run(iterator.initializer, feed_dict={filenames: training_filenames})
# Initialize `iterator` with validation data.
validation_filenames = ["/var/data/validation1.tfrecord", ...]
sess.run(iterator.initializer, feed_dict={filenames: validation_filenames})
Consuming text data

tf.contrib.data.TextLineDataset

  • Given one or more filenames,
  • produce one string-valued element per line of those files.
  • can use tf.placeholder(tf.string)
  • remove unrelated lines using the Dataset.skip() and Dataset.filter(), more than one filename use Dataset.flat_map() in addition.

Preprocessing data with Dataset.map()

Dataset.map() transformation works in each item in dataset.

Parsing tf.Example protocol buffer messages

For TFRecordDataset and TFRecord-format file
Each tf.train.Example record contains one or more “features”, and the input pipeline typically converts these features into tensors.

1
2
3
4
5
6
7
8
9
10
11
12
13
# Transforms a scalar string `example_proto` into a pair of a scalar string and
# a scalar integer, representing an image and its label, respectively.
def _parse_function(example_proto):
features = {"image": tf.FixedLenFeature((), tf.string, default_value=""),
"label": tf.FixedLenFeature((), tf.int32, default_value=0)}
parsed_features = tf.parse_single_example(example_proto, features)
return parsed_features["image"], parsed_features["label"]
# Creates a dataset that reads all of the examples from two files, and extracts
# the image and label features.
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.contrib.data.TFRecordDataset(filenames)
dataset = dataset.map(_parse_function)

Decoding image data and resizing it

necessary to convert images of different sizes to a common size, so that they may be batched into a fixed size.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Reads an image from a file, decodes it into a dense tensor, and resizes it
# to a fixed shape.
def _parse_function(filename, label):
image_string = tf.read_file(filename)
image_decoded = tf.image.decode_image(image_string)
image_resized = tf.image.resize_images(image_decoded, [28, 28])
return image_resized, label
# A vector of filenames.
filenames = tf.constant(["/var/data/image1.jpg", "/var/data/image2.jpg", ...])
# `labels[i]` is the label for the image in `filenames[i].
labels = tf.constant([0, 37, ...])
dataset = tf.contrib.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(_parse_function)
Applying arbitrary Python logic with tf.py_func()

sometimes useful to call upon external Python libraries when parsing your input data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import cv2
# Use a custom OpenCV function to read the image, instead of the standard
# TensorFlow `tf.read_file()` operation.
def _read_py_function(filename, label):
image_decoded = cv2.imread(image_string, cv2.IMREAD_GRAYSCALE)
return image_decoded, label
# Use standard TensorFlow operations to resize the image to a fixed shape.
def _resize_function(image_decoded, label):
image_decoded.set_shape([None, None, None])
image_resized = tf.image.resize_images(image_decoded, [28, 28])
return image_resized, label
filenames = ["/var/data/image1.jpg", "/var/data/image2.jpg", ...]
labels = [0, 37, 29, 1, ...]
dataset = tf.contrib.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(
lambda filename, label: tf.py_func(
_read_py_function, [filename, label], [tf.uint8, label.dtype]))
dataset = dataset.map(_resize_function)

Batching dataset elements

Simple batching

Dataset.batch() transformation
batching stacks n consecutive elements of a dataset into a single element

1
2
3
4
5
6
7
8
9
10
11
12
inc_dataset = tf.contrib.data.Dataset.range(100)
dec_dataset = tf.contrib.data.Dataset.range(0, -100, -1)
dataset = tf.contrib.data.Dataset.zip((inc_dataset, dec_dataset))
batched_dataset = dataset.batch(4)
iterator = batched_dataset.make_one_shot_iterator()
next_element = iterator.get_next()
sess = tf.Session()
print(sess.run(next_element)) # ==> ([0, 1, 2, 3], [ 0, -1, -2, -3])
print(sess.run(next_element)) # ==> ([4, 5, 6, 7], [-4, -5, -6, -7])
print(sess.run(next_element)) # ==> ([8, 9, 10, 11], [-8, -9, -10, -11])

Batching tensors with padding

Dataset.padded_batch() transformation

1
2
3
4
5
6
7
8
9
10
11
12
dataset = tf.contrib.data.Dataset.range(100)
dataset = dataset.map(lambda x: tf.fill([tf.cast(x, tf.int32)], x))
dataset = dataset.padded_batch(4, padded_shapes=[None])
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
print(sess.run(next_element)) # ==> [[0, 0, 0], [1, 0, 0], [2, 2, 0], [3, 3, 3]]
print(sess.run(next_element)) # ==> [[4, 4, 4, 4, 0, 0, 0],
# [5, 5, 5, 5, 5, 0, 0],
# [6, 6, 6, 6, 6, 6, 0],
# [7, 7, 7, 7, 7, 7, 7]]

The Dataset.padded_batch() transformation allows you to set different padding for each dimension of each component

Training workflows

Processing multiple epochs

Dataset API offers two main ways to process multiple epochs of the same data.

  • The simplest way: Dataset.repeat(num_repeat)
    • with no arguments will repeat the input indefinitely
    • without signaling the end of one epoch and the beginning of the next epoch
1
2
3
4
5
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.contrib.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.repeat(10)
dataset = dataset.batch(32)
  • loop + catch the tf.errors.OutOfRangeError,
    • If want to receive a signal at the end of each epoch, to use a training loop that catches the tf.errors.OutOfRangeError.
    • No need repeat.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.contrib.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.batch(32)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
# Compute for 100 epochs.
for _ in range(100):
sess.run(iterator.initializer)
while True:
try:
sess.run(next_element)
except tf.errors.OutOfRangeError:
break
# [Perform end-of-epoch calculations here.]
Randomly shuffling input data

Dataset.shuffle() tansformation

  • Randomly shuffles the input dataset using a similar algorithm to tf.RandomShuffleQueue
    • maintains a fixed-size buffer and chooses the next element uniformly at random from that buffer.
1
2
3
4
5
6
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.contrib.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.repeat()
Using high-level APIs
tf.train.MonitoredTrainingSession

tf.train.MonitoredTrainingSession Simplifies many aspects of running TensorFlow in a distributed setting.

  • uses the tf.errors.OutOfRangeError to signal that training has completed
    when used with Dataset API, recommend using Dataset.make_one_shot_iterator()

Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.contrib.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()
next_example, next_label = iterator.get_next()
loss = model_function(next_example, next_label)
training_op = tf.train.AdagradOptimizer(...).minimize(loss)
with tf.train.MonitoredTrainingSession(...) as sess:
while not sess.should_stop():
sess.run(training_op)

tf.estimator.Estimator

when using a Dataset in the input_fn of tf.estimator.Estimator, recommend using Dataset.make_one_shot_iterator()
Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def dataset_input_fn():
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.contrib.data.TFRecordDataset(filenames)
# Use `tf.parse_single_example()` to extract data from a `tf.Example`
# protocol buffer, and perform any additional per-record preprocessing.
def parser(record):
keys_to_features = {
"image_data": tf.FixedLenFeature((), tf.string, default_value=""),
"date_time": tf.FixedLenFeature((), tf.int64, default_value=""),
"label": tf.FixedLenFeature((), tf.int64,
default_value=tf.zeros([], dtype=tf.int64)),
}
parsed = tf.parse_single_example(record, keys_to_features)
# Perform additional preprocessing on the parsed data.
image = tf.decode_jpeg(parsed["image_data"])
image = tf.reshape(image, [299, 299, 1])
label = tf.cast(parsed["label"], tf.int32)
return {"image_data": image, "date_time": parsed["date_time"]}, label
# Use `Dataset.map()` to build a pair of a feature dictionary and a label
# tensor for each example.
dataset = dataset.map(parser)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(32)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()
# `features` is a dictionary in which each value is a batch of values for
# that feature; `labels` is a batch of labels.
features, labels = iterator.get_next()
return features, labels

later

https://stackoverflow.com/questions/41175011/tf-contrib-learn-tutorial-deprecation-warning