from sklearn.utils.extmath import randomized_svd
def proj_simplex_sort(y, a = 1.0 ):
if np.sum (y) == a and np.alltrue(y >= 0 ):
return y
u = np.sort(y)[::- 1 ]
ukvals = (np.cumsum(u) - a) / np.arange(1 , y.shape[0 ] + 1 )
K = np.nonzero(ukvals < u)[0 ][- 1 ]
tau = ukvals[K]
x = np.clip(y - tau, a_min= 0 , a_max= None )
return x
def proj_l1ball_sort(y, a = 1.0 ):
if y.ndim == 2 :
n, p = y.shape
yflat = y.flatten()
yproj = np.sign(y) * proj_simplex_sort(np.abs (yflat), a = a).reshape(n, p)
else :
yproj = np.sign(y) * proj_simplex_sort(np.abs (yflat), a = a).reshape(n, p)
return yproj
def l1_norm(x):
return np.sum (np.abs (x))
def nuclear_norm(X):
'''
Nuclear norm of input matrix
'''
return np.sum (np.linalg.svd(X)[1 ])
def f_objective(X, Y, W = None , mask = None ):
'''
Objective function
Y is observed, X is estimated
W is the weight of each observation.
'''
Xmask = X if mask is None else X * mask
Wmask = W if mask is None else W * mask
# The * operator can be used as a shorthand for np.multiply on ndarrays.
if Wmask is None :
f_obj = 0.5 * np.linalg.norm(Y - Xmask, 'fro' )** 2
else :
f_obj = 0.5 * np.linalg.norm(Wmask * (Y - Xmask), 'fro' )** 2
return f_obj
def f_gradient(X, Y, W = None , mask = None ):
'''
Gradient of the objective function.
'''
Xmask = X if mask is None else X * mask
Wmask = W if mask is None else W * mask
if Wmask is None :
f_grad = Xmask - Y
else :
f_grad = np.square(Wmask) * (Xmask - Y)
return f_grad
def linopt_oracle_l1norm(grad, r = 1.0 , max_iter = 10 ):
'''
Linear optimization oracle,
where the feasible region is a l1 norm ball for some r
'''
maxidx = np.unravel_index(np.argmax(grad), grad.shape)
S = np.zeros_like(grad)
S[maxidx] = - r
return S
def linopt_oracle_nucnorm(grad, r = 1.0 , max_iter = 10 , method = 'power' ):
'''
Linear optimization oracle,
where the feasible region is a nuclear norm ball for some r
'''
if method == 'power' :
U1, V1_T = singular_vectors_power_method(grad, max_iter = max_iter)
elif method == 'randomized' :
U1, V1_T = singular_vectors_randomized_method(grad, max_iter = max_iter)
S = - r * U1 @ V1_T
return S
def singular_vectors_randomized_method(X, max_iter = 10 ):
u, s, vh = randomized_svd(X, n_components = 1 , n_iter = max_iter,
power_iteration_normalizer = 'none' ,
random_state = 0 )
return u, vh
def singular_vectors_power_method(X, max_iter = 10 ):
'''
Power method.
Computes approximate top left and right singular vector.
Parameters:
-----------
X : array {m, n},
input matrix
max_iter : integer, optional
number of steps
Returns:
--------
u, v : (n, 1), (p, 1)
two arrays representing approximate top left and right
singular vectors.
'''
n, p = X.shape
u = np.random.normal(0 , 1 , n)
u /= np.linalg.norm(u)
v = X.T.dot(u)
v /= np.linalg.norm(v)
for _ in range (max_iter):
u = X.dot(v)
u /= np.linalg.norm(u)
v = X.T.dot(u)
v /= np.linalg.norm(v)
return u.reshape(- 1 , 1 ), v.reshape(1 , - 1 )
def do_step_size(dg, D, W = None , old_step = None ):
if W is None :
denom = np.linalg.norm(D, 'fro' )** 2
else :
denom = np.linalg.norm(W * D, 'fro' )** 2
step_size = dg / denom
step_size = min (step_size, 1.0 )
if step_size < 0 :
print ("Warning: Step Size is less than 0" )
if old_step is not None and old_step > 0 :
print ("Using previous step size" )
step_size = old_step
else :
step_size = 1.0
return step_size
def frank_wolfe_minimize_step(L, M, Y, rl, rm, istep, W = None , mask = None , old_step = None , svd_iter = None , svd_method = 'power' ):
#
# 1. Gradient for X_(t-1)
#
G = f_gradient(L + M, Y, W = W, mask = mask)
#
# 2. Linear optimization subproblem
#
if svd_iter is None :
svd_iter = 10 + int (istep / 20 )
svd_iter = min (svd_iter, 25 )
SL = linopt_oracle_nucnorm(G, rl, max_iter = svd_iter, method = svd_method)
SM = linopt_oracle_l1norm(G, rm)
#
# 3. Define D
#
DL = L - SL
DM = M - SM
#
# 4. Duality gap
#
dg = np.trace(DL.T @ G) + np.trace(DM.T @ G)
#
# 5. Step size
#
step = do_step_size(dg, DL + DM, W = W, old_step = old_step)
#
# 6. Update
#
Lnew = L - step * DL
Mnew = M - step * DM
#
# 7. l1 projection
#
G_half = f_gradient(Lnew + Mnew, Y, W = W, mask = mask)
Mnew = proj_l1ball_sort(Mnew - G_half, rm)
return Lnew, Mnew, G, dg, step
def frank_wolfe_minimize(Y, r, X0 = None ,
weight = None ,
mask = None ,
max_iter = 1000 ,
svd_iter = None ,
svd_method = 'power' ,
tol = 1e-4 , step_tol = 1e-3 , rel_tol = 1e-8 ,
return_all = True ,
debug = False , debug_step = 10 ):
rl, rm = r
# Step 0
old_L = np.zeros_like(Y) if X0 is None else X0.copy()
old_M = np.zeros_like(Y)
dg = np.inf
step = 1.0
if return_all:
dg_list = [dg]
fx_list = [f_objective(old_L + old_M, Y, W = weight, mask = mask)]
fl_list = [f_objective(old_L, Y, W = weight, mask = mask)]
fm_list = [f_objective(old_M, Y, W = weight, mask = mask)]
st_list = [1 ]
# Steps 1, ..., max_iter
for istep in range (max_iter):
L, M, G, dg, step = \
frank_wolfe_minimize_step(old_L, old_M, Y, rl, rm, istep, W = weight, mask = mask, old_step = step, svd_iter = svd_iter, svd_method = svd_method)
f_obj = f_objective(L + M, Y, W = weight, mask = mask)
fl_obj = f_objective(L, Y, W = weight, mask = mask)
fm_obj = f_objective(M, Y, W = weight, mask = mask)
fx_list.append(f_obj)
fl_list.append(fl_obj)
fm_list.append(fm_obj)
if return_all:
dg_list.append(dg)
st_list.append(step)
if debug:
if (istep % debug_step == 0 ):
print (f"Iteration { istep} . Step size { step:.3f} . Duality Gap { dg:g} " )
# Stopping criteria
# duality gap
if np.abs (dg) <= tol:
break
# step size
if step > 0 and step <= step_tol:
break
# relative tolerance of objective function
f_rel = np.abs ((f_obj - fx_list[- 2 ]) / f_obj)
if f_rel <= rel_tol:
break
old_L = L.copy()
old_M = M.copy()
if return_all:
return L, M, dg_list, fx_list, st_list, fl_list, fm_list
else :
return L, M