def soft_thresholding(y: np.ndarray, mu: float):
"""
Soft thresholding operator as explained in Section 6.5.2 of https://web.stanford.edu/~boyd/papers/pdf/prox_algs.pdf
Solves the following problem:
argmin_x (1/2)*||x-y||_F^2 + lmb*||x||_1
Parameters
----------
y : np.ndarray
Target vector/matrix
lmb : float
Penalty parameter
Returns
-------
x : np.ndarray
argmin solution
"""
return np.sign(y) * np.clip(np.abs(y) - mu, a_min=0, a_max=None)
def svd_shrinkage(y: np.ndarray, tau: float):
"""
SVD shrinakge operator as explained in Theorem 2.1 of https://statweb.stanford.edu/~candes/papers/SVT.pdf
Solves the following problem:
argmin_x (1/2)*||x-y||_F^2 + tau*||x||_*
Parameters
----------
y : np.ndarray
Target vector/matrix
tau : float
Penalty parameter
Returns
-------
x : np.ndarray
argmin solution
"""
U, s, Vh = np.linalg.svd(y, full_matrices=False)
s_t = soft_thresholding(s, tau)
return U.dot(np.diag(s_t)).dot(Vh)
class RobustPCA:
"""
Solves robust PCA using Inexact ALM as explained in Algorithm 5 of https://arxiv.org/pdf/1009.5055.pdf
Parameters
----------
lmb:
penalty on sparse errors
mu_0:
initial lagrangian penalty
rho:
learning rate
tau:
mu update criterion parameter
max_iter:
max number of iterations for the algorithm to run
tol_rel:
relative tolerance
"""
def __init__(self, lmb: float, mu_0: float=1e-5, rho: float=2, tau: float=10,
max_iter: int=10, tol_rel: float=1e-3):
assert mu_0 > 0
assert lmb > 0
assert rho > 1
assert tau > 1
assert max_iter > 0
assert tol_rel > 0
self.mu_0_ = mu_0
self.lmb_ = lmb
self.rho_ = rho
self.tau_ = tau
self.max_iter_ = max_iter
self.tol_rel_ = tol_rel
def fit(self, X: np.ndarray):
"""
Fits robust PCA to X and returns the low-rank and sparse components
Parameters
----------
X:
Original data matrix
Returns
-------
L:
Low rank component of X
S:
Sparse error component of X
"""
assert X.ndim == 2
mu = self.mu_0_
Y = X / self._J(X, mu)
S = np.zeros_like(X)
S_last = np.empty_like(S)
for k in range(self.max_iter_):
# Solve argmin_L ||X - (L + S) + Y/mu||_F^2 + (lmb/mu)*||L||_*
L = svd_shrinkage(X - S + Y/mu, 1/mu)
# Solve argmin_S ||X - (L + S) + Y/mu||_F^2 + (lmb/mu)*||S||_1
S_last = S.copy()
S = soft_thresholding(X - L + Y/mu, self.lmb_/mu)
# Update dual variables Y <- Y + mu * (X - S - L)
Y += mu*(X - S - L)
r, h = self._get_residuals(X, S, L, S_last, mu)
# Check stopping cirteria
tol_r, tol_h = self._update_tols(X, L, S, Y)
if r < tol_r and h < tol_h:
break
# Update mu
mu = self._update_mu(mu, r, h)
return L, S
def _J(self, X: np.ndarray, lmb: float):
"""
The function J() required for initialization of dual variables as advised in Section 3.1 of
https://people.eecs.berkeley.edu/~yima/matrix-rank/Files/rpca_algorithms.pdf
"""
return max(np.linalg.norm(X), np.max(np.abs(X))/lmb)
@staticmethod
def _get_residuals(X: np.ndarray, S: np.ndarray, L: np.ndarray, S_last: np.ndarray, mu: float):
primal_residual = np.linalg.norm(X - S - L, ord="fro")
dual_residual = mu * np.linalg.norm(S - S_last, ord="fro")
return primal_residual, dual_residual
def _update_mu(self, mu: float, r: float, h: float):
if r > self.tau_ * h:
return mu * self.rho_
elif h > self.tau_ * r:
return mu / self.rho_
else:
return mu
def _update_tols(self, X, S, L, Y):
tol_primal = self.tol_rel_ * max(np.linalg.norm(X), np.linalg.norm(S), np.linalg.norm(L))
tol_dual = self.tol_rel_ * np.linalg.norm(Y)
return tol_primal, tol_dual