import torch
import torch.nn as nn
import warnings
from tqdm import tqdm
[docs]
class FixedPoint(nn.Module):
"""
Fixed-point iterations module.
This module implements the fixed-point iteration algorithm given a specific fixed-point iterator (e.g.
proximal gradient iteration, the ADMM iteration, see :meth:`deepinv.optim.optim_iterators`), that is
for :math:`k=1,2,...`
.. math::
\qquad (x_{k+1}, u_{k+1}) = \operatorname{FixedPoint}(x_k, u_k, f, g, A, y, ...) \hspace{2cm} (1)
where :math:`f` is the data-fidelity term, :math:`g` is the prior, :math:`A` is the physics model, :math:`y` is the data.
:Examples: This example shows how to use the :class:`FixedPoint` class to solve the problem
:math:`\min_x 0.5*||Ax-y||_2^2 + \lambda*||x||_1` with the PGD algorithm, where A is the identity operator,
:math:`\lambda = 1` and :math:`y = [2, 2]`.
>>> import deepinv as dinv
>>> # Create the measurement operator A
>>> A = torch.tensor([[1, 0], [0, 1]], dtype=torch.float64)
>>> A_forward = lambda v: A @ v
>>> A_adjoint = lambda v: A.transpose(0, 1) @ v
>>> # Define the physics model associated to this operator
>>> physics = dinv.physics.LinearPhysics(A=A_forward, A_adjoint=A_adjoint)
>>> # Define the measurement y
>>> y = torch.tensor([2, 2], dtype=torch.float64)
>>> # Define the data fidelity term
>>> data_fidelity = dinv.optim.data_fidelity.L2()
>>> # Define the prior term
>>> prior = dinv.optim.prior.L1Prior()
>>> # Define the parameters of the algorithm
>>> params_algo = {"g_param": 1.0, "stepsize": 1.0, "lambda": 1.0, "beta": 1.0}
>>> # Choose the iterator associated to the PGD algorithm
>>> iterator = dinv.optim.optim_iterators.PGDIteration()
>>> # Iterate the iterator
>>> x_init = torch.tensor([2, 2], dtype=torch.float64) # Define initialisation of the algorithm
>>> X = {"est": (x_init ,), "cost": []} # Iterates are stored in a dictionary of the form {'est': (x,z), 'cost': F}
>>> max_iter = 50
>>> for it in range(max_iter):
... X = iterator(X, data_fidelity, prior, params_algo, y, physics)
>>> # Return the solution
>>> X["est"][0]
tensor([1., 1.], dtype=torch.float64)
:param deepinv.optim.optim_iterators.optim_iterator iterator: function that takes as input the current iterate, as
well as parameters of the optimization problem (prior, measurements, etc.)
:param function update_params_fn: function that returns the parameters to be used at each iteration. Default: ``None``.
:param function update_prior_fn: function that returns the prior to be used at each iteration. Default: ``None``.
:param function init_iterate_fn: function that returns the initial iterate. Default: ``None``.
:param function init_metrics_fn: function that returns the initial metrics. Default: ``None``.
:param function check_iteration_fn: function that performs a check on the last iteration and returns a bool indicating if we can proceed to next iteration. Default: ``None``.
:param function check_conv_fn: function that checks the convergence after each iteration, returns a bool indicating if convergence has been reached. Default: ``None``.
:param int max_iter: maximum number of iterations. Default: ``50``.
:param bool early_stop: if True, the algorithm stops when the convergence criterion is reached. Default: ``True``.
:param bool anderson_acceleration: if True, the Anderson acceleration is used. Default: ``False``.
:param int history_size: size of the history used for the Anderson acceleration. Default: ``5``.
:param float beta_anderson_acc: momentum of the Anderson acceleration step. Default: ``1.0``.
:param float eps_anderson_acc: regularization parameter of the Anderson acceleration step. Default: ``1e-4``.
"""
def __init__(
self,
iterator=None,
update_params_fn=None,
update_data_fidelity_fn=None,
update_prior_fn=None,
init_iterate_fn=None,
init_metrics_fn=None,
update_metrics_fn=None,
check_iteration_fn=None,
check_conv_fn=None,
max_iter=50,
early_stop=True,
anderson_acceleration=False,
history_size=5,
beta_anderson_acc=1.0,
eps_anderson_acc=1e-4,
verbose=False,
show_progress_bar=False,
):
super().__init__()
self.iterator = iterator
self.max_iter = max_iter
self.early_stop = early_stop
self.update_params_fn = update_params_fn
self.update_data_fidelity_fn = update_data_fidelity_fn
self.update_prior_fn = update_prior_fn
self.init_iterate_fn = init_iterate_fn
self.init_metrics_fn = init_metrics_fn
self.update_metrics_fn = update_metrics_fn
self.check_conv_fn = check_conv_fn
self.check_iteration_fn = check_iteration_fn
self.anderson_acceleration = anderson_acceleration
self.history_size = history_size
self.beta_anderson_acc = beta_anderson_acc
self.eps_anderson_acc = eps_anderson_acc
self.verbose = verbose
self.show_progress_bar = show_progress_bar
if self.check_conv_fn is None and self.early_stop:
warnings.warn(
"early_stop is set to True but no check_conv_fn has been defined."
)
self.early_stop = False
[docs]
def init_anderson_acceleration(self, X):
r"""
Initialize the Anderson acceleration algorithm.
:param dict X: initial iterate.
"""
x = X["est"][0]
b, d, h, w = x.shape
x_hist = torch.zeros(
b, self.history_size, d * h * w, dtype=x.dtype, device=x.device
) # history of iterates.
T_hist = torch.zeros(
b, self.history_size, d * h * w, dtype=x.dtype, device=x.device
) # history of T(x_k) with T the fixed point operator.
H = torch.zeros(
b,
self.history_size + 1,
self.history_size + 1,
dtype=x.dtype,
device=x.device,
) # H in the Anderson acceleration linear system Hp = q .
H[:, 0, 1:] = H[:, 1:, 0] = 1.0
q = torch.zeros(
b, self.history_size + 1, 1, dtype=x.dtype, device=x.device
) # q in the Anderson acceleration linear system Hp = q .
q[:, 0] = 1
return x_hist, T_hist, H, q
[docs]
def anderson_acceleration_step(
self,
it,
X_prev,
TX_prev,
x_hist,
T_hist,
H,
q,
cur_data_fidelity,
cur_prior,
cur_params,
*args,
):
r"""
Anderson acceleration step.
:param int it: current iteration.
:param dict X_prev: previous iterate.
:param dict TX_prev: output of the fixed-point operator evaluated at X_prev
:param torch.Tensor x_hist: history of last ``history-size`` iterates.
:param torch.Tensor T_hist: history of T evlauaton at the last ``history-size``, where T is the fixed-point operator.
:param torch.Tensor H: H in the Anderson acceleration linear system Hp = q .
:param torch.Tensor q: q in the Anderson acceleration linear system Hp = q .
:param deepinv.optim.DataFidelity cur_data_fidelity: Instance of the DataFidelity class defining the current data_fidelity.
:param deepinv.optim.prior cur_prior: Instance of the Prior class defining the current prior.
:param dict cur_params: Dictionary containing the current parameters of the algorithm.
:param args: arguments for the iterator.
"""
x_prev = X_prev["est"][0] # current iterate Tx
Tx_prev = TX_prev["est"][0] # current iterate x
b = x_prev.shape[0] # batchsize
x_hist[:, it % self.history_size] = x_prev.reshape(
b, -1
) # prepare history of x
T_hist[:, it % self.history_size] = Tx_prev.reshape(
b, -1
) # prepare history of Tx
m = min(it + 1, self.history_size)
G = T_hist[:, :m] - x_hist[:, :m]
H[:, 1 : m + 1, 1 : m + 1] = (
torch.bmm(G, G.transpose(1, 2))
+ self.eps_anderson_acc
* torch.eye(m, dtype=Tx_prev.dtype, device=Tx_prev.device)[None]
)
p = torch.linalg.solve(H[:, : m + 1, : m + 1], q[:, : m + 1])[
:, 1 : m + 1, 0
] # solve the linear system H p = q.
x = (
self.beta_anderson_acc * (p[:, None] @ T_hist[:, :m])[:, 0]
+ (1 - self.beta_anderson_acc) * (p[:, None] @ x_hist[:, :m])[:, 0]
) # Anderson acceleration step.
x = x.view(x_prev.shape)
F = (
self.iterator.F_fn(x, cur_data_fidelity, cur_prior, cur_params, *args)
if self.iterator.has_cost
else None
)
est = list(TX_prev["est"])
est[0] = x
return {"est": est, "cost": F}
[docs]
def forward(self, *args, compute_metrics=False, x_gt=None, **kwargs):
r"""
Loops over the fixed-point iterator as (1) and returns the fixed point.
The iterates are stored in a dictionary of the form ``X = {'est': (x_k, u_k), 'cost': F_k}`` where:
* ``est`` is a tuple containing the current primal and auxiliary iterates,
* ``cost`` is the value of the cost function at the current iterate.
Since the prior and parameters (stepsize, regularisation parameter, etc.) can change at each iteration,
the prior and parameters are updated before each call to the iterator.
:param bool compute_metrics: if ``True``, the metrics are computed along the iterations. Default: ``False``.
:param torch.Tensor x_gt: ground truth solution. Default: ``None``.
:param args: optional arguments for the iterator. Commonly (y,physics) where ``y`` (torch.Tensor y) is the measurement and
``physics`` (deepinv.physics) is the physics model.
:param kwargs: optional keyword arguments for the iterator.
:return tuple: ``(x,metrics)`` with ``x`` the fixed-point solution (dict) and
``metrics`` the computed along the iterations if ``compute_metrics`` is ``True`` or ``None``
otherwise.
"""
X = (
self.init_iterate_fn(*args, F_fn=self.iterator.F_fn)
if self.init_iterate_fn
else None
)
metrics = (
self.init_metrics_fn(X, x_gt=x_gt)
if self.init_metrics_fn and compute_metrics
else None
)
self.check_iteration = True
if self.anderson_acceleration:
self.x_hist, self.T_hist, self.H, self.q = self.init_anderson_acceleration(
X
)
it = 0
for it in tqdm(
range(self.max_iter),
disable=(not self.verbose or not self.show_progress_bar),
):
X_prev = X
X = self.single_iteration(
X,
it,
*args,
compute_metrics=compute_metrics,
metrics=metrics,
x_gt=x_gt,
**kwargs,
)
if self.check_iteration:
metrics = (
self.update_metrics_fn(metrics, X_prev, X, x_gt=x_gt)
if self.update_metrics_fn and compute_metrics
else None
)
if (
self.early_stop
and (self.check_conv_fn is not None)
and it > 1
and self.check_conv_fn(it, X_prev, X)
):
break
it += 1
return X, metrics
def single_iteration(self, X, it, *args, **kwargs):
cur_params = self.update_params_fn(it) if self.update_params_fn else None
cur_data_fidelity = (
self.update_data_fidelity_fn(it) if self.update_data_fidelity_fn else None
)
cur_prior = self.update_prior_fn(it) if self.update_prior_fn else None
X_prev = X
X = self.iterator(X_prev, cur_data_fidelity, cur_prior, cur_params, *args)
if self.anderson_acceleration:
X = self.anderson_acceleration_step(
it,
X_prev,
X,
self.x_hist,
self.T_hist,
self.H,
self.q,
cur_data_fidelity,
cur_prior,
cur_params,
*args,
)
self.check_iteration = (
self.check_iteration_fn(X_prev, X) if self.check_iteration_fn else True
)
return X if self.check_iteration else X_prev