https://github.com/GPflow/GPflow
Revision 65532774c1b0257b20e50fb1f6f7b84502e9d700 authored by Jesper Nielsen on 17 October 2022, 10:33:06 UTC, committed by GitHub on 17 October 2022, 10:33:06 UTC
1 parent ca6c523
Tip revision: 65532774c1b0257b20e50fb1f6f7b84502e9d700 authored by Jesper Nielsen on 17 October 2022, 10:33:06 UTC
Rework GPflow introduction. (#1958)
Rework GPflow introduction. (#1958)
Tip revision: 6553277
run.py
# Copyright 2022 The GPflow Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Main script for running benchmarks.
Usage::
python -m benchmark.run <benchmark_suite> <output_directory>
Runs all benchmarks defined in the given suite, and plots the results. If you want to plot results
from across multiple runs, use the ``plot.py`` script.
"""
import argparse
import json
import multiprocessing as mp
import traceback
from datetime import timedelta
from pathlib import Path
from time import perf_counter
from typing import Mapping, Optional
import numpy as np
import pandas as pd
from tabulate import tabulate
import benchmark.benchmarks # pylint: disable=unused-import # Make sure registry is loaded.
import benchmark.metrics as mt
import gpflow
from benchmark.benchmark_api import BENCHMARK_SUITES, BenchmarkSuite, BenchmarkTask
from benchmark.dataset_api import DATASET_FACTORIES
from benchmark.metadata import BenchmarkMetadata
from benchmark.metric_api import Metric
from benchmark.model_api import MODEL_FACTORIES
from benchmark.paths import setup_dest
from benchmark.plot import plot as plot_
from benchmark.sharding import ShardingStrategy
BASE_SEED = 20220721
ONE_HOUR_S = 60 * 60
def _collect_metrics(
task: BenchmarkTask,
data_cache_dir: Path,
random_seed: int,
) -> Optional[Mapping[Metric, float]]:
print("Loading dataset:", task.dataset_name)
dataset_fac = DATASET_FACTORIES.get(task.dataset_name)
dataset = dataset_fac.create_dataset(data_cache_dir)
print(tabulate(dataset.stats, showindex="never"))
train_data = dataset.train
test_data = dataset.test
rng = np.random.default_rng(random_seed)
model_fac = MODEL_FACTORIES.get(task.model_name)
model = model_fac.create_model(train_data, rng)
metrics = {}
model.predict_y(test_data.X) # Warm-up TF.
print("Model before training:")
gpflow.utilities.print_summary(model)
if task.do_optimise:
t_before = perf_counter()
loss_fn = gpflow.models.training_loss_closure(model, train_data.XY, compile=task.do_compile)
opt_log = gpflow.optimizers.Scipy().minimize(
loss_fn,
variables=model.trainable_variables,
compile=task.do_compile,
options={"disp": 10, "maxiter": 1_000},
)
t_after = perf_counter()
n_iter = opt_log.nit
t_train = t_after - t_before
metrics[mt.n_training_iterations] = n_iter
metrics[mt.training_time] = t_train
metrics[mt.training_iteration_time] = t_train / n_iter
print(f"Training took {t_after - t_before}s for {n_iter} iterations.")
print("Model after training:")
gpflow.utilities.print_summary(model)
likelihood = model.likelihood
if task.do_predict:
t_before = perf_counter()
f_m, f_v = model.predict_f(test_data.X)
t_after = perf_counter()
metrics[mt.prediction_time] = t_after - t_before
metrics[mt.nlpd] = -np.sum(
likelihood.predict_log_density(test_data.X, f_m, f_v, test_data.Y)
)
y_m, _y_v = likelihood.predict_mean_and_var(test_data.X, f_m, f_v)
error = test_data.Y - y_m
metrics[mt.mae] = np.average(np.abs(error))
metrics[mt.rmse] = np.average(error ** 2) ** 0.5
if task.do_posterior:
t_before = perf_counter()
posterior = model.posterior()
posterior.predict_f(np.zeros((0, test_data.D)))
t_after = perf_counter()
metrics[mt.posterior_build_time] = t_after - t_before
t_before = perf_counter()
f_m, f_v = posterior.predict_f(test_data.X)
t_after = perf_counter()
metrics[mt.posterior_prediction_time] = t_after - t_before
metrics[mt.posterior_nlpd] = -np.sum(
likelihood.predict_log_density(test_data.X, f_m, f_v, test_data.Y)
)
y_m, _y_v = likelihood.predict_mean_and_var(test_data.X, f_m, f_v)
error = test_data.Y - y_m
metrics[mt.posterior_mae] = np.average(np.abs(error))
metrics[mt.posterior_rmse] = np.average(error ** 2) ** 0.5
metrics_values = list(metrics.values())
# pylint: disable=no-member
assert np.isfinite(metrics_values).all(), f"{metrics} not all finite."
return metrics
def _collect_metrics_process(
task: BenchmarkTask,
data_cache_dir: Path,
random_seed: int,
metrics_queue: "mp.Queue[Optional[Mapping[Metric, float]]]",
) -> None:
metrics: Optional[Mapping[Metric, float]] = None
try:
metrics = _collect_metrics(task, data_cache_dir, random_seed)
except Exception: # pylint: disable=broad-except
traceback.print_exc()
metrics_queue.put(metrics)
def _run_benchmarks(
metadata: BenchmarkMetadata,
shard: ShardingStrategy,
suite: BenchmarkSuite,
dest: Path,
cache_dir: Path,
) -> Optional[pd.DataFrame]:
data_cache_dir = cache_dir / "data"
# Run all tasks in separate processes, to make sure all memory etc. is released.
mp_ctx = mp.get_context("spawn")
tasks = suite.get_tasks()
global_repetition = 0
total_repetitions = sum(t.repetitions for t in tasks)
max_task_repetitions = max(t.repetitions for t in tasks)
metrics_list = []
metrics_df: Optional[pd.DataFrame] = None
t_before = perf_counter()
for task_repetition in range(max_task_repetitions):
for task in tasks:
if task_repetition >= task.repetitions:
continue
if global_repetition % shard.shard_n != shard.shard_i:
global_repetition += 1
continue
for retry in range(3):
t_now = perf_counter()
fraction_done = global_repetition / total_repetitions
if global_repetition == 0:
eta_str = "???"
else:
velocity = (t_now - t_before) / global_repetition
t_end = velocity * total_repetitions + t_before
t_remaining = t_end - t_now
eta_str = f"{timedelta(seconds=t_remaining)}"
print("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
print(f"Running: {task.name}/rep={task_repetition} (retry {retry})")
print(f"{fraction_done: .2%} done. ETA: {eta_str}.")
print("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
random_seed = BASE_SEED + task_repetition + retry * task.repetitions
metrics_queue: "mp.Queue[Optional[Mapping[Metric, float]]]" = mp_ctx.Queue()
metrics_process = mp_ctx.Process(
target=_collect_metrics_process,
args=(task, data_cache_dir, random_seed, metrics_queue),
)
metrics_process.start()
metrics = metrics_queue.get(timeout=ONE_HOUR_S)
metrics_process.join()
assert metrics_process.exitcode == 0
if metrics is None:
print("Model failed. Retrying with another random seed.")
continue
for metric, metric_value in metrics.items():
metrics_list.append(
(
metadata.run_id,
task.dataset_name,
task.model_name,
task.do_compile,
task.do_optimise,
task_repetition,
metric.name,
metric_value,
)
)
metrics_df = pd.DataFrame(
metrics_list,
columns=[
"run_id",
"dataset",
"model",
"do_compile",
"do_optimise",
"repetition",
"metric",
"value",
],
)
print(tabulate(metrics_df, headers="keys", showindex="never"))
metrics_path = dest / f"metrics{shard.file_suffix}.csv"
metrics_df.to_csv(metrics_path, index=False)
break
global_repetition += 1
return metrics_df
def run(
shard: ShardingStrategy,
suite: BenchmarkSuite,
dest: Path,
no_subdir: bool,
cache_dir: Path,
plot: bool,
) -> None:
""" Collect data for a benchmark suite. """
if shard.setup_dest:
metadata = BenchmarkMetadata.create(suite.name)
dest = setup_dest(metadata, dest, no_subdir)
else:
assert no_subdir, "Must use 'no-subdir=True' with 'shard={shard}'."
with open(dest / "metadata.json", "rt") as f:
metadata = BenchmarkMetadata.from_json(json.load(f))
assert metadata.suite_name == suite.name
metadata = metadata.for_shard()
if shard.print_dest:
print(dest.resolve())
if shard.write_metadata:
metadata_path = dest / f"metadata{shard.file_suffix}.json"
with open(metadata_path, "wt") as f:
json.dump(metadata.to_json(), f, indent=4)
metrics_df: Optional[pd.DataFrame] = None
if shard.run_benchmarks:
metrics_df = _run_benchmarks(
metadata,
shard,
suite,
dest,
cache_dir,
)
if shard.collect:
assert metrics_df is None
metrics_path = dest / "metrics.csv"
metrics_df_list = [pd.read_csv(p) for p in shard.find_shards(metrics_path)]
metrics_df = pd.concat(metrics_df_list, axis="index", ignore_index=True)
metrics_df.to_csv(metrics_path, index=False)
if shard.plot and plot and (metrics_df is not None):
plot_(
metadata=metadata,
suite=suite,
results_df=metrics_df,
results_metadata=[metadata],
dest=dest,
)
def run_from_argv() -> None:
""" Collect data for a benchmark suite, based on command line arguments. """
parser = argparse.ArgumentParser(description="Run a benchmark suite.")
parser.add_argument(
"--no_subdir",
"--no-subdir",
action="store_true",
help="By default this script will create a subdirectory inside `dest` and write output to"
" that. Setting this flag will suppress the subdirectory and use `dest` directly.",
)
parser.add_argument(
"--no_plot",
"--no-plot",
action="store_true",
help="By default this script will create plots of data."
" Setting this flag will suppress plotting.",
)
parser.add_argument(
"--cache_dir",
"--cache-dir",
type=Path,
default=Path("/tmp/gpflow_benchmark_data"),
help="Where to cache stuff, like datasets.",
)
parser.add_argument(
"--shard",
default=ShardingStrategy("no"),
type=ShardingStrategy,
help=(
"Sharding strategy:"
" If set to 'no' this script performs all necessary work."
" If set to 'start' this script creates an output directory for the shards, and"
" prints its path."
" If set to the format '<i>/<n>', where 0 <= i < n then this script only runs"
" benchmarks for shard 'i' out of 'n', and stores partial results in 'dest'."
" This requires '--no_subdir'."
" If set to 'collect' then this script assumes all benchmarks already have been"
" computed, using the '<i>/<n>' commands, and merges their results into one file."
" This requires '--no_subdir'."
),
)
parser.add_argument(
"suite",
type=str,
choices=BENCHMARK_SUITES.names(),
help="Which benchmark suite to run.",
)
parser.add_argument(
"dest",
type=Path,
help="Directory to write results to.",
)
args = parser.parse_args()
suite = BENCHMARK_SUITES.get(args.suite)
run(
args.shard,
suite,
args.dest,
args.no_subdir,
args.cache_dir,
not args.no_plot,
)
if __name__ == "__main__":
run_from_argv()
Computing file changes ...