Skip to content

Instantly share code, notes, and snippets.

@RobertTLange
Created August 29, 2019 14:33
Show Gist options
  • Select an option

  • Save RobertTLange/2a843c5b5b2b99106fda4787ababc48b to your computer and use it in GitHub Desktop.

Select an option

Save RobertTLange/2a843c5b5b2b99106fda4787ababc48b to your computer and use it in GitHub Desktop.
Logistic Regression with SGD using Forward cumulative mode + Dual Numbers
import numpy as np
from sklearn.linear_model import LogisticRegression
class DataLoader(object):
# Small class object for dual number - Two real numbers (real & dual part)
def __init__(self, n, d, batch_size, binary=False):
self.total_dim = d + 1
self.X, self.y = self.generate_regression_data(n, d, binary)
# Set batch_id for different indices
self.num_batches = np.ceil(n/batch_size).astype(int)
self.batch_ids = np.array([np.repeat(i, batch_size) for i in range(self.num_batches)]).flatten()[:n]
def generate_regression_data(self, n, d, binary=False):
self.b_true = self.generate_coefficients(d)
X = np.random.normal(0, 1, n*d).reshape((n, d))
noise = np.random.normal(0, 1, n).reshape((n, 1))
inter = np.ones(n).reshape((n, 1))
X = np.hstack((inter, X))
y = np.matmul(X, self.b_true) + noise
if binary:
y[y > 0] = 1
y[y < 0] = 0
return X, y
def generate_coefficients(self, d, intercept=True):
b_random = np.random.randint(-5, 5, d + intercept)
return b_random.reshape((d + intercept, 1))
def shuffle_arrays(self):
assert len(self.X) == len(self.y)
p = np.random.permutation(len(self.X))
self.X, self.y = self.X[p], self.y[p]
def get_batch_idx(self, batch_id, get_batch=False):
# Subselect the current batch processed in forward mode differentiation!
idx = np.where(self.batch_ids == batch_id)[0]
if get_batch:
return self.X[idx, :], self.y[idx].flatten()
else:
return idx
class DualTensor(object):
# Class object for dual representation of a vector
def __init__(self, real, dual):
self.real = real
self.dual = dual
def dot_product(b_dual, x):
real_part = np.dot(x, b_dual.real)
dual_part = np.dot(x, b_dual.dual)
return DualTensor(real_part, dual_part)
def add_duals(dual_a, dual_b):
# Operator non-"overload": Add a list of dual numbers
real_part = dual_a.real + dual_b.real
dual_part = dual_a.dual + dual_b.dual
return DualTensor(real_part, dual_part)
def log(dual_tensor):
real_part = np.log(dual_tensor.real)
temp_1 = 1/dual_tensor.real
# Fill matrix with diagonal entries of log derivative
temp_2 = np.zeros((temp_1.shape[0], temp_1.shape[0]))
np.fill_diagonal(temp_2, temp_1)
dual_part = np.dot(temp_2, dual_tensor.dual)
return DualTensor(real_part, dual_part)
def sigmoid(dual_tensor):
real_part = 1/(1+np.exp(-dual_tensor.real))
temp_1 = np.multiply(real_part, 1-real_part)
# Fill matrix with diagonal entries of sigmoid derivative
temp_2 = np.zeros((temp_1.shape[0], temp_1.shape[0]))
np.fill_diagonal(temp_2, temp_1)
dual_part = np.dot(temp_2, dual_tensor.dual)
return DualTensor(real_part, dual_part)
def binary_cross_entropy_dual(data_loader, batch_id, b_current):
# Transform the coefficients into dual numbers
b_dual = np.zeros((len(b_current), len(b_current)))
np.fill_diagonal(b_dual, 1)
b_dual = DualTensor(b_current, b_dual)
# Select the current batch
X, y = data_loader.get_batch_idx(batch_id, get_batch=True)
# Apply element-wise sigmoid activation
y_pred = sigmoid(dot_product(b_dual, X))
y_pred_2 = DualTensor(1-y_pred.real, -y_pred.dual)
# Make numerically stable!
y_pred.real = np.clip(y_pred.real, 1e-15, 1-1e-15)
y_pred_2.real = np.clip(y_pred_2.real, 1e-15, 1-1e-15)
# Compute actual binary cross-entropy term
log_y_pred, log_y_pred_2 = log(y_pred), log(y_pred_2)
bce_l1, bce_l2 = dot_product(log_y_pred, -y), dot_product(log_y_pred_2, -(1 - y))
bce = add_duals(bce_l1, bce_l2)
# Calculate the batch classification accuracy
acc = (y == (y_pred.real > 0.5)).sum()/y.shape[0]
return bce, acc
def train_logistic_regression(n, d, n_epoch, batch_size, b_init, l_rate):
# Generate the data for a coefficient vector & init progress tracker!
data_loader = DataLoader(n, d, batch_size, binary=True)
b_hist, func_val_hist, param_error = [b_init], [binary_cross_entropy_dual(data_loader, 0,
b_init)], []
logreg = LogisticRegression(C=1e5, solver='lbfgs', multi_class='multinomial')
logreg.fit(data_loader.X, data_loader.y)
for epoch in range(n_epoch):
# Shuffle the batch identities at beginning of each epoch
data_loader.shuffle_arrays()
for batch_id in range(data_loader.num_batches):
# Calculate the forward AD - real = func, dual = deriv
current_dual, acc = binary_cross_entropy_dual(data_loader, batch_id, b_hist[-1])
# Perform grad step & append results to the placeholder list
b_hist.append(b_hist[-1] - l_rate*np.array(current_dual.dual).flatten())
func_val_hist.append(current_dual.real)
param_error.append(np.linalg.norm(logreg.coef_.ravel() - b_hist[-1]))
if epoch % 1 == 0:
print("Accuracy: {} | Euclidean Param Norm: {} | fct min: {}".format(acc, param_error[-1], current_dual.real))
return b_hist[1:], func_val_hist[1:], param_error
if __name__ == "__main__":
np.random.seed(1)
b, f, error = train_logistic_regression(1000, 4, 50, 32, np.array([0, 0, 0, 0, 0]).astype(float), 0.005)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment