from sklearn.utils.extmath import randomized_svd
def nuclear_norm(X):
'''
Nuclear norm of input matrix
'''
return np.sum(np.linalg.svd(X)[1])
def f_objective(X, Y, W = None, mask = None):
'''
Objective function
Y is observed, X is estimated
W is the weight of each observation.
'''
Xmask = X if mask is None else X * mask
Wmask = W if mask is None else W * mask
# The * operator can be used as a shorthand for np.multiply on ndarrays.
if Wmask is None:
f_obj = 0.5 * np.linalg.norm(Y - Xmask, 'fro')**2
else:
f_obj = 0.5 * np.linalg.norm(Wmask * (Y - Xmask), 'fro')**2
return f_obj
def f_gradient(X, Y, W = None, mask = None):
'''
Gradient of the objective function.
'''
Xmask = X if mask is None else X * mask
Wmask = W if mask is None else W * mask
if Wmask is None:
f_grad = Xmask - Y
else:
f_grad = np.square(Wmask) * (Xmask - Y)
return f_grad
def linopt_oracle(grad, r = 1.0, max_iter = 10, method = 'power'):
'''
Linear optimization oracle,
where the feasible region is a nuclear norm ball for some r
'''
if method == 'power':
U1, V1_T = singular_vectors_power_method(grad, max_iter = max_iter)
elif method == 'randomized':
U1, V1_T = singular_vectors_randomized_method(grad, max_iter = max_iter)
S = - r * U1 @ V1_T
return S
def singular_vectors_randomized_method(X, max_iter = 10):
u, s, vh = randomized_svd(X, n_components = 1, n_iter = max_iter,
power_iteration_normalizer = 'none',
random_state = 0)
return u, vh
def singular_vectors_power_method(X, max_iter = 10):
'''
Power method.
Computes approximate top left and right singular vector.
Parameters:
-----------
X : array {m, n},
input matrix
max_iter : integer, optional
number of steps
Returns:
--------
u, v : (n, 1), (p, 1)
two arrays representing approximate top left and right
singular vectors.
'''
n, p = X.shape
u = np.random.normal(0, 1, n)
u /= np.linalg.norm(u)
v = X.T.dot(u)
v /= np.linalg.norm(v)
for _ in range(max_iter):
u = X.dot(v)
u /= np.linalg.norm(u)
v = X.T.dot(u)
v /= np.linalg.norm(v)
return u.reshape(-1, 1), v.reshape(1, -1)
def do_step_size(dg, D, W = None, old_step = None):
if W is None:
denom = np.linalg.norm(D, 'fro')**2
else:
denom = np.linalg.norm(W * D, 'fro')**2
step_size = dg / denom
step_size = min(step_size, 1.0)
if step_size < 0:
print ("Warning: Step Size is less than 0")
if old_step is not None and old_step > 0:
print ("Using previous step size")
step_size = old_step
else:
step_size = 1.0
return step_size
def frank_wolfe_minimize_step(X, Y, r, istep, W = None, mask = None, old_step = None, svd_iter = None, svd_method = 'power'):
# 1. Gradient for X_(t-1)
G = f_gradient(X, Y, W = W, mask = mask)
# 2. Linear optimization subproblem
if svd_iter is None:
svd_iter = 10 + int(istep / 20)
svd_iter = min(svd_iter, 25)
S = linopt_oracle(G, r, max_iter = svd_iter, method = svd_method)
# 3. Define D
D = X - S
# 4. Duality gap
dg = np.trace(D.T @ G)
# 5. Step size
step = do_step_size(dg, D, W = W, old_step = old_step)
# 6. Update
Xnew = X - step * D
return Xnew, G, dg, step
def frank_wolfe_minimize(Y, r, X0 = None,
weight = None,
mask = None,
max_iter = 1000,
svd_iter = None,
svd_method = 'power',
tol = 1e-4, step_tol = 1e-3, rel_tol = 1e-8,
return_all = True,
debug = False, debug_step = 10):
# Step 0
old_X = np.zeros_like(Y) if X0 is None else X0.copy()
dg = np.inf
step = 1.0
if return_all:
dg_list = [dg]
fx_list = [f_objective(old_X, Y, W = weight, mask = mask)]
st_list = [1]
# Steps 1, ..., max_iter
for istep in range(max_iter):
X, G, dg, step = \
frank_wolfe_minimize_step(old_X, Y, r, istep, W = weight, mask = mask, old_step = step, svd_iter = svd_iter, svd_method = svd_method)
f_obj = f_objective(X, Y, W = weight, mask = mask)
fx_list.append(f_obj)
if return_all:
dg_list.append(dg)
st_list.append(step)
if debug:
if (istep % debug_step == 0):
print (f"Iteration {istep}. Step size {step:.3f}. Duality Gap {dg:g}")
# Stopping criteria
# duality gap
if np.abs(dg) <= tol:
break
# step size
if step > 0 and step <= step_tol:
break
# relative tolerance of objective function
f_rel = np.abs((f_obj - fx_list[-2]) / f_obj)
if f_rel <= rel_tol:
break
old_X = X.copy()
if return_all:
return X, dg_list, fx_list, st_list
else:
return X