Source code for nofas

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from linfa.mlp import FNN
from os import path

torch.set_default_tensor_type(torch.DoubleTensor)

[docs]class Surrogate(object): """Class to create surrogate models for inference with NoFAS Args: model_name (string): name of the true model to be approximated model_func (function): with only one input of torch.Tensor input_size (int): input dimension of true model. output_size (int): output dimension of true model. dnn_arch (list of int): list containing the number of neurons for each hidden layer. Default None for [64,32] architecture. dnn_activation (stirng): type of activation function, either 'relu', model_folder (string): folder where surrogate model is stored. limits (list or lists): bounds for all inputs, in format of [[low_0, high_0], [low_1, high_1], ... ] memory_len (int): the maximal number of batches stored in buffer. Default: 20 surrogate (None or torch.nn.Module): the implementation of surrogate model used. Default: FNN This allows the user to spacify any NN architecture for the surrogate. If None the parameters "dnn_arch" and "dnn_activation" can be used to specify a dense neural network. """ def __init__(self, model_name, model_func, input_size, output_size, dnn_arch=None, dnn_activation='relu', model_folder='./', limits=None, memory_len=20, surrogate=None, device='cpu'): self.device = device self.input_size = input_size self.output_size = output_size self.model_name = model_name self.model_folder = model_folder self.mf = model_func self.pre_out = None self.m = None self.sd = None self.tm = None self.tsd = None self.limits = limits self.pre_grid = None self.surrogate = FNN(input_size, output_size, arch=dnn_arch, device=self.device) if surrogate is None else surrogate self.beta_0 = 0.5 self.beta_1 = 0.1 self.memory_grid = [] self.memory_out = [] self.memory_len = memory_len self.weights = torch.Tensor([np.exp(-self.beta_1 * i) for i in range(memory_len)]).to(self.device) self.grid_record = None @property def limits(self): """Retrieves the limits of all inputs """ return self.__limits @limits.setter def limits(self, limits): """Sets the limits of all inputs Args: limits (list or tuple): Lists lower and upper bound in the format *[[low_0, high_0], [low_1, high_1], ...]* """ limits = torch.Tensor(limits).tolist() if len(limits) != self.input_size: print("Error: Invalid input size for limit. Abort.") exit(-1) elif any(len(item) != 2 for item in limits): print("Error: Limits should be two bounds. Abort.") exit(-1) elif any(item[0] > item[1] for item in limits): print("Error: Upper bound should not be smaller than lower bound. Abort.") exit(-1) self.__limits = limits @property def pre_grid(self): return self.__pre_grid @pre_grid.setter def pre_grid(self, pre_grid): """Assign the pregrid for the surrogate model. f nofas is not enabled, this serves as the whole training grid. Args: pre_grid (None or torch.Tensor): Specifies a matrix of model inputs with size [data_num, feature_dim]. If None, will try to search for *npz* containing this info. Returns: None """ if pre_grid is None: if path.exists(self.model_folder + self.model_name + '.npz'): container = np.load(self.model_folder + self.model_name + '.npz') if 'pre_grid' in container: self.pre_grid = container['pre_grid'] print("Success: Pre-Grid found.") else: print("Warning: " + self.model_folder + self.model_name + ".npz does not found, please generate pre-grid.") print("Suggestion: Use Surrogate.gen_grid(input_limits=None, grid_num=5, store=True)") else: self.__pre_grid = torch.Tensor(pre_grid).to(self.device) self.m = torch.mean(self.pre_grid, 0) self.sd = torch.std(self.pre_grid, 0) # Evaluate model at pre-grid self.pre_out = self.mf(self.pre_grid) self.tm = torch.mean(self.pre_out, 0) self.tsd = torch.std(self.pre_out, 0) self.grid_record = self.__pre_grid.clone()
[docs] def gen_grid(self, input_limits=None, grid_type='tensor', gridnum=4, store=True): """Generates a pre-grid. Args: input_limits (None or list[list]): If None, use self.limits. If list[list], rewrite self.limits and use it. gridnum (int): Contains the number of grid points per dimension. store (bool): Flag indicating the pre-grid is store in self.pre_grid. If False then self.pre_grid is None. Returns: torch.Tensor: Input values for the full tensor grid in *dim* dimensions stored in a matrix [gridnum ** dim, dim]. """ if (grid_type == 'tensor'): meshpoints = [] if input_limits is not None: self.limits = input_limits print("Warning: Input limits recorded in surrogate.") for lim in self.limits: meshpoints.append(torch.linspace(lim[0], lim[1], steps=gridnum)) grid = torch.meshgrid(meshpoints,indexing='ij') grid = torch.cat([item.reshape(gridnum ** len(self.limits), 1) for item in grid], 1) elif (grid_type == 'sobol'): # Generate sobol samples in [0,1]^d soboleng = torch.quasirandom.SobolEngine(dimension=len(self.limits)) grid = soboleng.draw(gridnum) for i,lim in enumerate(self.limits): grid[:,i] = lim[0] + (lim[1] - lim[0]) * grid[:,i] else: print('Invalid type for pre-grid generation') exit(-1) # Store pre-grid if store: self.pre_grid = grid self.grid_record = self.pre_grid.clone() self.surrogate_save() # Return grid return grid
[docs] def surrogate_save(self): """Save surrogate model to [self.name].sur and [self.name].npz Returns: None """ torch.save(self.surrogate.state_dict(), self.model_folder + self.model_name + '.sur') np.savez(self.model_folder + self.model_name, limits=self.limits, pre_grid=self.pre_grid.clone().cpu().numpy(), grid_record=self.grid_record.clone().cpu().numpy())
[docs] def surrogate_load(self): """Load surrogate model from [self.name].sur and [self.name].npz Returns: None """ self.surrogate.load_state_dict(torch.load(self.model_folder + self.model_name + '.sur')) container = np.load(self.model_folder + self.model_name + '.npz') for key in container: try: setattr(self, key, torch.Tensor(container[key])) print("Success: [" + key + "] loaded.") except: print("Warning: [" + key + "] is not a surrogate variables.")
[docs] def pre_train(self, max_iters, lr, lr_exp, record_interval, store=True, reg=False): """Train surrogate model with pre-grid. Training is performed with the RMSprop optimizer and with an exponential learning rate scheduler. Args: max_iters (int): The maximal number of iterations. lr (double): Learning rate for RMSprop. lr_exp (double): Decay factor for exponential scheduler. record_interval (int): The number of iterations to print loss info store (bool): If true, self.surrogate_save() will be called. reg (bool): If true, L1 regularization will be used with parameter 0.0001 Returns: None """ print('') print('--- Pre-training surrogate model') print('') grid = (self.pre_grid - self.m) / self.sd out = (self.pre_out - self.tm) / self.tsd optimizer = torch.optim.RMSprop(self.surrogate.parameters(), lr=lr) scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, lr_exp) for i in range(max_iters): self.surrogate.train() y = self.surrogate(grid) loss = torch.sum((y - out) ** 2) / y.size(0) if reg: reg_loss = 0 for param in self.surrogate.parameters(): reg_loss += torch.abs(param).sum() * 0.0001 loss += reg_loss optimizer.zero_grad() loss.backward() optimizer.step() scheduler.step() if i % record_interval == 0: if reg: print('SUR: PRE: it: %7d | loss: %8.3e | reg_loss: %8.3e' % (i, loss, reg_loss)) else: print('SUR: PRE: it: %7d | loss: %8.3e' % (i, loss)) print('') print('--- Surrogate model pre-train complete') print('') if store: self.surrogate_save()
[docs] def update(self, x, max_iters=10000, lr=0.01, lr_exp=0.999, record_interval=500, store=False, tol=1e-5, reg=False): """Model calibration for NoFAS. Fine tuning is performed with the RMSprop optimizer and an exponential learning rate scheduler. Args: x (torch.Tensor): Input feature that needs true model output. max_iters (int): Maximum number of iteration. Default 10000. lr (double): Learning rate for RMSprop. Default 0.01. lr_exp (double): Decay factor of exponential scheduler. record_interval (int): The number of iterations to print loss information. store (bool): If ture, self.surrogate_save() will be called. tol (double): Optimization will be terminated if loss < tol ** 2. reg (bool): If ture, L1 regularizaiton will be used with parameter 0.1. Returns: None """ self.grid_record = torch.cat((self.grid_record.to(self.device), x), dim=0) s = torch.std(x, dim=0) thresh = torch.tensor(0.1).to(self.device) if torch.any(s < thresh): p = x[:, s < thresh] x[:, s < thresh] += torch.normal(0, 1, size=tuple(p.size())).to(self.device) * thresh s_aft = torch.std(x, dim=0) # Print the std for each dimension before and after inflation print('') print('--- Updating surrogate model') print('') print('Std before inflation -> Std after inflation') for loopA in range(s.size(0)): print("%8.3e -> %8.3e" % (s[loopA],s_aft[loopA])) print('') if len(self.memory_grid) >= self.memory_len: self.memory_grid.pop() self.memory_out.pop() self.memory_grid.insert(0, (x - self.m) / self.sd) self.memory_out.insert(0, (self.mf(x) - self.tm) / self.tsd) sizes = [list(self.pre_grid.size())[0]] + [list(item.size())[0] for item in self.memory_grid] optimizer = torch.optim.RMSprop(self.surrogate.parameters(), lr=lr) scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, lr_exp) for i in range(max_iters): self.surrogate.train() y = self.surrogate(torch.cat(((self.pre_grid - self.m) / self.sd, *self.memory_grid), dim=0)) out = torch.cat(((self.pre_out - self.tm) / self.tsd, *self.memory_out), dim=0) raw_loss = torch.stack([item.mean() for item in torch.split(torch.sum((y - out) ** 2, dim=1), sizes)]) loss = raw_loss[0] * 2 * self.beta_0 * self.weights[:len(self.memory_grid)].sum() + \ torch.sum(raw_loss[1:] * self.weights[:len(self.memory_grid)]) * (1 - self.beta_0) * 2 # loss = raw_loss[0] * self.weights[:len(self.memory_grid)].sum() + torch.sum( # raw_loss[1:] * self.weights[:len(self.memory_grid)]) if reg: for param in self.surrogate.parameters(): loss += torch.abs(param).sum() * 0.1 optimizer.zero_grad() loss.backward() optimizer.step() scheduler.step() if i % record_interval == 0: # print('Updating: {}\t loss {}'.format(i, loss), end='\r') print('SUR: UPD: it: %7d | loss: %8.3e' % (i, loss)) if loss < tol ** 2: break # print(' ', end='\r') print('') print('--- Surrogate model updated') print('') if store: self.surrogate_save()
[docs] def forward(self, x): """Function to evaluate the surrogate Args: x (torch.Tensor): Contains a matrix of model inputs in the form [data_num, feature_dim] Returns: Value of the surrogate at x. """ return self.surrogate((x - self.m) / self.sd) * self.tsd + self.tm
[docs]def test_surrogate(): import matplotlib.pyplot as plt # Define function to approximate def ishigami(z): z1, z2, z3 = torch.chunk(z, chunks=3, dim=1) return torch.sin(z1) + 7.0 * (torch.sin(z2))**2 + 0.1 * z3**4 * torch.sin(z1) # Set surrogate architecture # arch=[64,64,64,64] arch=None activation='silu' # Define emulator and pre-train on global grid curr_limits = [[-torch.pi,torch.pi],[-torch.pi,torch.pi],[-torch.pi,torch.pi]] emulator = Surrogate(model_name='ishigami', model_func=ishigami, input_size=3, output_size=1, limits = curr_limits, dnn_arch=arch, dnn_activation=activation) emulator.gen_grid(gridnum=10) emulator.pre_train(40000, 0.03, 0.9999, 500, store=True) # Eval MSE on Sobol point collection soboleng = torch.quasirandom.SobolEngine(dimension=3) # 2**10 = 1024 testing points z = (soboleng.draw_base2(10) - 0.5) * 2 * torch.pi true_fc = ishigami(z) emul_fc = emulator.forward(z) mse = torch.sum((true_fc - emul_fc) ** 2) / z.size(0) print('Test MSE: %.3f' % (mse.item())) # Check loss on pregrid true_fc_pg = ishigami(emulator.pre_grid) emul_fc_pg = emulator.forward(emulator.pre_grid) mse = torch.sum((true_fc_pg - emul_fc_pg) ** 2) / emulator.pre_grid.size(0) print('Pre-grid MSE: %.3f' % (mse.item())) print('Standard deviation on testing set: %.3f' % (emulator.tsd.item())) # Plot output histograms plt.figure(figsize=(5,5)) plt.hist(true_fc.detach().numpy(),bins='auto',alpha=0.5,density=True) plt.hist(emul_fc.detach().numpy(),bins='auto',alpha=0.5,density=True) plt.show()
# TEST SURROGATE if __name__ == '__main__': test_surrogate()