Logistic model quasi-Laplace with Mr.Ash
- About
- Generate toy data
- Implementation of quasi-Laplace with variational approximation
- Fitting
- Results
About
This is a proof of concept implementation of quasi-Laplace approximation with Mr.Ash. The quasi-Laplace approximation introduces a regularizer on the likelihood and approximates the posterior with a Gaussian distribution. This approach can be used for generalized linear model with any link function.
Bayesian logistic regression with a Gaussian prior (ridge regression) uses the Laplace approximation to approximate the posterior with a Gaussian distribution. The variational treatment of logistic model by Jordan and Jaakkola (2000) also leads to a Gaussian approximation of the posterior distribution. The greater flexibility of the variational approximation leads to improved accuracy compared to Laplace method.
Here, I compare the quasi-Laplace approach with cross-validated Lasso ($L_1$ penalty). I used the $R^2$ statistic by Tjur1 to quantify the quality of classification, and the Gini index2 to quantify the sparsity of the coefficients.
I found that the optimization depends on the choice of Gaussian mixture family.
#collapse_hide
import numpy as np
np.set_printoptions(precision = 4, suppress=True)
import pandas as pd
from scipy import optimize
from scipy import stats
from scipy.special import digamma
from sklearn import linear_model
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib import ticker as plticker
from mpl_toolkits.axes_grid1 import make_axes_locatable
import sys
sys.path.append("../../utils/")
import mpl_stylesheet
mpl_stylesheet.banskt_presentation(fontfamily = 'latex-clearsans', fontsize = 18, colors = 'banskt', dpi = 72)
Generate toy data
Let us consider a logistic model with sparse coefficients, so that the number of causal variables ncausal
is much less than the number of variables nvar
in the model. To ensure sparsity of the coefficients, we use a Gaussian mixture prior of nGcomp
components with variances given by $\sigma_k^2$ (sigmak2
) and unknown mixture coefficients. The sparsity of the initialization is controlled by the variable sparsity
that specifies the mixture coefficient of the zero-th component $\mathcal{N}(0, 0)$ (or the delta function). We are particularly interested in systems where the number of samples nsample
is less than nvar
. Validation is performed on separate test dataset with ntest
samples.
nsample = [50, 200]
nvar = [100, 100]
ncausal = 5
nGcomp = 10
sparsity = 0.95
prior_variance = 5
ntest = 2000
hg2 = 0.8
sigmak2 = np.array([prior_variance * np.square(np.power(2, (i)/nGcomp) - 1) for i in range(nGcomp)])
The data is generated from a liability threshold model such that the explanatory variables explain only a given fraction hg2
of the response variable. $\mathbf{X}$ is centered and scaled such that for each variable $j$, the variance is $\mathrm{var}(\mathbf{x}_j) = 1$.
# collapse-hide
def standardize(X):
Xnorm = (X - np.mean(X, axis = 0))
Xstd = Xnorm / np.std(Xnorm, axis = 0)
return Xstd
def logistic_data(X, beta):
Xbeta = np.dot(X, beta)
pred = 1 / (1 + np.exp(-Xbeta))
Y = np.random.binomial(1, pred)
return Y, pred
def liability_model(X, beta, prevalence = 0.5):
hg2 = np.sum(np.square(beta))
eff = np.dot(X, beta)
rand_std = np.sqrt(1 - hg2)
rand_eff = np.random.normal(0, rand_std, X.shape[0])
liability = eff + rand_eff
cutoff = stats.norm.ppf(1 - prevalence)
cases = np.where(liability >= cutoff)[0]
ctrls = np.where(liability < cutoff)[0]
Y = np.zeros(X.shape[0])
Y[cases] = 1
return Y, liability
def tjur_R2(X, Y, beta):
Xbeta = np.dot(X, beta)
pred = 1 / (1 + np.exp(-Xbeta))
R2 = np.mean(pred[Y == 1]) - np.mean(pred[Y == 0])
return R2
def gini(x):
n = len(x)
xabs = abs(x)
xsort = xabs[np.argsort(xabs)]
t1 = xsort / np.sum(xabs)
t2 = (n - np.arange(n) - 1 + 0.5) / n
gini = 1 - 2 * np.sum(t1 * t2)
return gini
def simulate_data(nsample, nvar, ncausal, ntest, hg2):
X = np.random.rand(nsample * nvar).reshape(nsample, nvar)
X = standardize(X)
Xtest = np.random.rand(ntest * nvar).reshape(ntest, nvar)
Xtest = standardize(Xtest)
beta = np.zeros(nvar)
causal_idx = np.random.choice(nvar, ncausal, replace = False)
beta[causal_idx] = np.random.normal(loc = 0., scale = 1., size = ncausal)
beta = beta * np.sqrt(hg2 / np.sum(np.square(beta)))
Y, Y_liab = liability_model(X, beta)
Ytest, Ytest_liab = liability_model(Xtest, beta)
#Y, Y_liab = logistic_data(X, beta)
#Ytest, Ytest_liab = logistic_data(Xtest, beta)
return X, Y, beta, Xtest, Ytest, Y_liab, Ytest_liab
Let us have a look at the generated data.
# collapse-hide
X = [None for n in nsample]
Xtest = [None for n in nsample]
Y = [None for n in nsample]
Ytest = [None for n in nsample]
Y_liab = [None for n in nsample]
Ytest_liab = [None for n in nsample]
beta = [None for n in nsample]
for i, n in enumerate(nsample):
X[i], Y[i], beta[i], Xtest[i], Ytest[i], Y_liab[i], Ytest_liab[i] \
= simulate_data(n, nvar[i], ncausal, ntest, hg2)
print(f'There are {ncausal} non-zero coefficients with heritability {hg2:.4f}')
fig = plt.figure(figsize = (12,6))
ax = [fig.add_subplot(121),
fig.add_subplot(122)]
for i, n in enumerate(nsample):
Xbeta = np.dot(X[i], beta[i])
pred = 1 / (1 + np.exp(-Xbeta))
ax[i].scatter(Xbeta, Y[i], s = 10)
ax[i].scatter(Xbeta, pred, s = 10)
ax[i].set_xlabel(r'$\sum_i X_{ni} \beta_i$')
ax[i].set_ylabel(r'$Y_n$')
plt.tight_layout()
plt.show()
Implementation of quasi-Laplace with variational approximation
See Latex notes for details of the theory. Current implementation focus on readability and debug. Computational efficiency has been ignored.
There must be an error in theory or implementation. Debug!
# collapse-hide
def regularized_log_likelihood(beta, X, Y, L):
nvar = beta.shape[0]
Xbeta = np.dot(X, beta)
## Function
llb = np.sum(np.dot(Y, Xbeta) - np.log(1 + np.exp(Xbeta)))
reg = - 0.5 * np.einsum('i,i->i', np.square(beta), L)
loglik = llb + reg
## Gradient
pred = 1 / (1 + np.exp(-Xbeta))
der = np.einsum('i,ij->j', (Y - pred), X) - np.multiply(beta, L)
return -loglik, -der
def precisionLL(X, beta, L):
nvar = X.shape[1]
Xbeta = np.einsum('i,ji->j', beta, X)
pred = 1 / (1 + np.exp(-Xbeta))
Sinv = np.einsum('i,i,ij,ik->jk', pred, (1 - pred), X, X)
Sinv[np.diag_indices(nvar)] += L
return Sinv
def get_regularized_mode(X, Y, beta0, L):
nsample, nvar = X.shape
args = X, Y, L
gmode = optimize.minimize(regularized_log_likelihood,
beta0,
args=args,
method='L-BFGS-B',
jac=True,
bounds=None,
options={'maxiter': 20000000,
'maxfun': 20000000,
'ftol': 1e-9,
'gtol': 1e-9
#'disp': True
})
return gmode.x
def get_elbo_qL(alpha, mu, s2, pi, sigmak2, XPX, SinvM, M, L, logdetS, debug = False):
Ebeta = np.sum(np.multiply(alpha, mu)[:, 1:], axis = 1)
mu2 = np.square(mu)
t1 = - 0.5 * np.dot(Ebeta.T, np.dot(XPX, Ebeta))
t2 = np.dot(Ebeta.T, SinvM)
t3 = 0
t4 = 0
for j in range(nvar):
Ebeta2j = 0
for k in range(nGcomp):
mom2jk = mu2[j, k] + s2[j, k]
Ebeta2j += alpha[j, k] * mom2jk
t4jk = 0
if alpha[j, k] != 0:
t4jk += np.log(pi[k])
t4jk += - np.log(alpha[j, k])
if k > 0:
t4jk += 0.5 * np.log(s2[j, k])
t4jk += - 0.5 * np.log(sigmak2[k])
t4jk += - 0.5 * mom2jk / sigmak2[k]
t4jk += 0.5
t4jk *= alpha[j, k]
t4 += t4jk
t3 += 0.5 * XPX[j, j] * (Ebeta[j] * Ebeta[j] - Ebeta2j)
logdetL = np.sum(np.log(L))
t0 = - 0.5 * (logdetL + logdetS + np.dot(M.T, SinvM))
elbo = t0 + t1 + t2 + t3 + t4
if debug:
print(t0, t1 + t2, t3, t4)
return elbo
#def get_elbo(alpha, mu, s2, pi, sigmak2, XPX, SinvM, M, L, logdetS):
def get_elbo(alpha, mu, s2, logpi, sigmak2, XPX, SinvM, M, L, logdetS):
nvar, nGcomp = alpha.shape
Ebeta = np.sum(np.multiply(alpha, mu)[:, 1:], axis = 1)
mu2 = np.square(mu)
logdetL = np.sum(np.log(L))
t0 = - 0.5 * (logdetL + logdetS + np.dot(M.T, SinvM))
t1 = - 0.5 * np.dot(Ebeta.T, np.dot(XPX, Ebeta))
t2 = np.dot(Ebeta.T, SinvM)
t3 = 0
t4 = 0
for j in range(nvar):
Ebeta2j = 0
for k in range(nGcomp):
mom2jk = mu2[j, k] + s2[j, k]
Ebeta2j += alpha[j, k] * mom2jk
t4jk = 0
if alpha[j, k] != 0:
#t4jk += np.log(pi[k])
t4jk += logpi[k]
t4jk += - np.log(alpha[j, k])
if k > 0:
t4jk += 0.5 * np.log(s2[j, k])
t4jk += - 0.5 * np.log(sigmak2[k])
t4jk += - 0.5 * mom2jk / sigmak2[k]
t4jk += 0.5
t4jk *= alpha[j, k]
t4 += t4jk
t3 += 0.5 * XPX[j, j] * (Ebeta[j] * Ebeta[j] - Ebeta2j)
t5 = np.sum(np.multiply(- np.sum(alpha, axis = 0), logpi))
elbo = t0 + t1 + t2 + t3 + t4 + t5
return elbo
def ash_qL_regularizer(X, Y, binit, L, mode_optim = True):
nsample, nvar = X.shape
if mode_optim:
M = get_regularized_mode(X, Y, binit, L)
else:
M = binit.copy()
Sinv = precisionLL(X, M, L)
S = np.linalg.inv(Sinv)
sgndetS, logdetS = np.linalg.slogdet(S)
XPX = Sinv.copy()
XPX[np.diag_indices(nvar)] -= L
return M, S, Sinv, logdetS, XPX
def ash_qL_pi_init(nGcomp, pi0, sparse = True):
'''
By default, pi is initialized with sparsity, i.e. pi[0] > pi[j], for j > 0
'''
pi = np.zeros(nGcomp)
pi[0] = pi0
pi[1:(nGcomp - 1)] = (1 - pi0) / (nGcomp - 1)
pi[nGcomp - 1] = 1 - np.sum(pi)
if not sparse:
pi = np.repeat(1 / nGcomp, nGcomp)
return pi
def ash_logistic_qL(X, Y, nGcomp, sigmak2, binit, Linit,
a_dirich = None,
maxiter = 1000, tol = 1e-10, pi0 = 0.8, sparse_init = True,
reg_step = 1, reg_tol = 0.1,
debug = True, debug_step = 10):
'''
Current focus is on readability and debug.
'''
nsample, nvar = X.shape
# default dirichlet prior on pi
if a_dirich is None:
a_dirich = np.repeat(1., nGcomp)
a_dirich[0] = 1. # soft max at pi[0]
'''
Initialize
'''
L = Linit.copy()
M, S, Sinv, logdetS, XPX = ash_qL_regularizer(X, Y, binit, L)
SinvM = np.dot(Sinv, M)
SinvRtj = np.dot(Sinv, M) - np.multiply(np.diag(Sinv), M)
pi = ash_qL_pi_init(nGcomp, pi0, sparse = sparse_init)
logpi = np.log(pi)
alpha = np.repeat(np.array([pi]), nvar, axis = 0)
logalpha = np.log(alpha)
s2 = np.zeros((nvar, nGcomp))
s2[:, 1:] = 1 / (np.diag(XPX).reshape(-1, 1) + 1 / sigmak2[1:])
mu = np.multiply(s2, (SinvM - SinvRtj).reshape(-1,1))
R = np.sum(np.multiply(alpha, mu)[:, 1:], axis = 1)
'''
Iteration
'''
elbo_old = get_elbo(alpha, mu, s2, logpi, sigmak2, XPX, SinvM, M, L, logdetS)
pi_old = pi.copy()
pi_reg = pi.copy()
if debug:
print(f"Starting ELBO: {elbo_old:.4f} Pi0: {pi[0]:.4f}")
for it in range(maxiter):
# E-step
for j in range(nvar):
SinvRtj = np.sum(np.multiply(Sinv[:, j], R[:])) - Sinv[j, j] * R[j]
s2[j, 0] = 0.
mu[j, 0] = 0.
#logalpha[j, 0] = np.log(pi[0])
logalpha[j, 0] = logpi[0]
for k in range(1, nGcomp):
s2[j, k] = 1 / (XPX[j, j] + (1 / sigmak2[k]))
mu[j, k] = s2[j, k] * (SinvM[j] - SinvRtj)
ssf = np.log(s2[j, k] / sigmak2[k])
ssr = mu[j, k] * mu[j, k] / s2[j, k]
logalpha[j, k] = logpi[k] + 0.5 * ssf + 0.5 * ssr
#logalpha[j, k] = np.log(pi[k]) + 0.5 * ssf + 0.5 * ssr
maxL = np.max(logalpha[j, :])
alpha[j, :] = np.exp(logalpha[j, :] - maxL)
alpha[j, :] /= np.sum(alpha[j, :])
R[j] = np.sum(np.multiply(alpha[j, 1:], mu[j, 1:]))
# M-step
bk = np.sum(alpha, axis = 0) + a_dirich
pi = bk / np.sum(bk)
logpi = digamma(bk) - digamma(np.sum(bk))
#for k in range(nGcomp):
# pi[k] = np.sum(alpha[:, k]) / nvar
# Conditional M-step
if (it + 1) % reg_step == 0:
eps_reg = max([abs(x - y) for x, y in zip(pi, pi_reg)])
if eps_reg > reg_tol:
print(f"Recalculating regularizer, step {it}")
L = 1 / np.sum(alpha * sigmak2, axis = 1)
M, S, Sinv, logdetS, XPX = ash_qL_regularizer(X, Y, R, L)
SinvM = np.dot(Sinv, M)
pi_reg = pi.copy()
elbo = get_elbo(alpha, mu, s2, logpi, sigmak2, XPX, SinvM, M, L, logdetS)
eps_elbo = elbo - elbo_old
eps_pi = max([abs(x - y) for x, y in zip(pi, pi_old)])
if debug and (it % debug_step == 0):
print(f"Iteration {it}. ELBO: {elbo:.4f} Pi0: {pi[0]:.4f} epsilon: {eps_pi:.4f}, {eps_elbo:.4f}")
'''
Check termination, continue otherwise.
'''
if eps_pi < tol:
print(eps_pi)
break
'''
Keep this step in memory.
Use deep copy to avoid in place substitutions.
'''
pi_old = pi.copy()
alpha_old = alpha.copy()
mu_old = mu.copy()
s2_old = s2.copy()
R_old = R.copy()
elbo_old = elbo
return pi, alpha, mu, s2, R, L
Hold the result summary statistic in a data frame
# collapse-hide
res_df = [pd.DataFrame(columns = ['Method', 'RMSE', 'Tjur R2', 'Gini Index']) for n in nsample]
data_Xbeta = [np.dot(Xtest[i], beta[i]) for i in range(len(nsample))]
data_tjur = [tjur_R2(Xtest[i], Ytest[i], beta[i]) for i in range(len(nsample))]
data_rmse = [np.sqrt(np.mean(np.square(data_Xbeta[i] -
np.dot(Xtest[i], beta[i])))) for i in range(len(nsample))]
data_gini = [gini(beta[i]) for i in range(len(nsample))]
for i in range(len(nsample)):
res_df[i] = res_df[i].append(pd.DataFrame([['Data', data_rmse[i], data_tjur[i], data_gini[i]]],
columns = res_df[i].columns))
# collapse-hide
clf = [linear_model.LogisticRegressionCV(cv = 5, penalty='l1', solver='saga',
fit_intercept = False, max_iter = 5000) for n in nsample]
for i, n in enumerate(nsample):
clf[i].fit(X[i], Y[i])
clf_Xbeta = [np.dot(Xtest[i], clf[i].coef_[0]) for i in range(len(nsample))]
clf_tjur = [tjur_R2(Xtest[i], Ytest[i], clf[i].coef_[0]) for i in range(len(nsample))]
clf_rmse = [np.sqrt(np.mean(np.square(clf_Xbeta[i] -
np.dot(Xtest[i], beta[i])))) for i in range(len(nsample))]
clf_gini = [gini(clf[i].coef_[0]) for i in range(len(nsample))]
for i in range(len(nsample)):
res_df[i] = res_df[i].append(pd.DataFrame([['LASSO', clf_rmse[i], clf_tjur[i], clf_gini[i]]],
columns = res_df[i].columns))
- QL Ash
# collapse-hide
pi = [None for n in nsample]
alpha = [None for n in nsample]
mu = [None for n in nsample]
s2 = [None for n in nsample]
R = [None for n in nsample]
L = [None for n in nsample]
#probk = ash_qL_pi_init(nGcomp, sparsity)
for i in range(len(nsample)):
gold_reg = np.repeat(1e4, nvar[i])
for j, b in enumerate(beta[i]):
if b != 0:
gold_reg[j] = 1 / prior_variance
#fix_reg = np.repeat(1 / sigmak2[nGcomp - 1], nvar[i])
#fix_reg = np.repeat( 1 / np.dot(probk, sigmak2), nvar[i])
fix_reg = np.repeat(1 / prior_variance, nvar[i])
binit = np.zeros(nvar[i])
Linit = fix_reg
pi[i], alpha[i], mu[i], s2[i], R[i], L[i] \
= ash_logistic_qL(X[i], Y[i], nGcomp, sigmak2, binit, Linit,
tol = 1e-4, debug_step = 1, maxiter = 1000,
reg_step = 10,
pi0 = sparsity, sparse_init = True)
qLash_Xbeta = [np.dot(Xtest[i], R[i]) for i in range(len(nsample))]
qLash_rmse = [np.sqrt(np.mean(np.square(qLash_Xbeta[i] - np.dot(Xtest[i], beta[i]))))
for i in range(len(nsample))]
qLash_r2 = [tjur_R2(Xtest[i], Ytest[i], R[i]) for i in range(len(nsample))]
qLash_gini = [gini(R[i]) for i in range(len(nsample))]
for i in range(len(nsample)):
res_df[i] = res_df[i].append(pd.DataFrame([['qLash', qLash_rmse[i], qLash_r2[i], qLash_gini[i]]],
columns = res_df[i].columns))
i = 0
print (f'N = {nsample[i]}, P = {nvar[i]}')
res_df[i]
i = 1
print (f'N = {nsample[i]}, P = {nvar[i]}')
res_df[i]
# collapse-hide
fig = plt.figure(figsize = (12, 7))
ax = [fig.add_subplot(121),
fig.add_subplot(122)]
for i, n in enumerate(nsample):
whichX = Xtest[i].copy()
whichY = Ytest[i].copy()
Xbeta = np.dot(whichX, beta[i])
pred_data = 1 / (1 + np.exp(-Xbeta))
ax[i].scatter(Xbeta, whichY, s = 1)
Xbeta_clf = np.dot(whichX, clf[i].coef_[0])
pred_clf = 1 / (1 + np.exp(-Xbeta_clf))
ax[i].scatter(Xbeta, pred_clf, s = 1, label = 'Lasso')
Xbeta_qL = np.dot(whichX, R[i])
pred_qL = 1 / (1 + np.exp(-Xbeta_qL))
ax[i].scatter(Xbeta, pred_qL, s = 1, label = 'qLash')
leg = ax[i].legend(markerscale = 10)
ax[i].set_xlabel(r'$\sum_i X_{ni} \beta_i$')
ax[i].set_ylabel(r'$p_n$')
ax[i].set_title(f'N = {nsample[i]}, P = {nvar[i]}', pad = 20)
plt.tight_layout()
plt.show()
1. Tjur, T. (2009). Coefficients of Determination in Logistic Regression Models - A New Proposal: The Coefficient of Discrimination. The American Statistician, 63(4), 366-372. DOI: 10.1198/tast.2009.08210↩
2. For a review of sparsity indices, see Harley and Rickard (2009). Comparing Measures of Sparsity. arXiv. DOI: arXiv:0811.4706v2↩