Created
August 29, 2019 14:33
-
-
Save RobertTLange/2a843c5b5b2b99106fda4787ababc48b to your computer and use it in GitHub Desktop.
Logistic Regression with SGD using Forward cumulative mode + Dual Numbers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| from sklearn.linear_model import LogisticRegression | |
| class DataLoader(object): | |
| # Small class object for dual number - Two real numbers (real & dual part) | |
| def __init__(self, n, d, batch_size, binary=False): | |
| self.total_dim = d + 1 | |
| self.X, self.y = self.generate_regression_data(n, d, binary) | |
| # Set batch_id for different indices | |
| self.num_batches = np.ceil(n/batch_size).astype(int) | |
| self.batch_ids = np.array([np.repeat(i, batch_size) for i in range(self.num_batches)]).flatten()[:n] | |
| def generate_regression_data(self, n, d, binary=False): | |
| self.b_true = self.generate_coefficients(d) | |
| X = np.random.normal(0, 1, n*d).reshape((n, d)) | |
| noise = np.random.normal(0, 1, n).reshape((n, 1)) | |
| inter = np.ones(n).reshape((n, 1)) | |
| X = np.hstack((inter, X)) | |
| y = np.matmul(X, self.b_true) + noise | |
| if binary: | |
| y[y > 0] = 1 | |
| y[y < 0] = 0 | |
| return X, y | |
| def generate_coefficients(self, d, intercept=True): | |
| b_random = np.random.randint(-5, 5, d + intercept) | |
| return b_random.reshape((d + intercept, 1)) | |
| def shuffle_arrays(self): | |
| assert len(self.X) == len(self.y) | |
| p = np.random.permutation(len(self.X)) | |
| self.X, self.y = self.X[p], self.y[p] | |
| def get_batch_idx(self, batch_id, get_batch=False): | |
| # Subselect the current batch processed in forward mode differentiation! | |
| idx = np.where(self.batch_ids == batch_id)[0] | |
| if get_batch: | |
| return self.X[idx, :], self.y[idx].flatten() | |
| else: | |
| return idx | |
| class DualTensor(object): | |
| # Class object for dual representation of a vector | |
| def __init__(self, real, dual): | |
| self.real = real | |
| self.dual = dual | |
| def dot_product(b_dual, x): | |
| real_part = np.dot(x, b_dual.real) | |
| dual_part = np.dot(x, b_dual.dual) | |
| return DualTensor(real_part, dual_part) | |
| def add_duals(dual_a, dual_b): | |
| # Operator non-"overload": Add a list of dual numbers | |
| real_part = dual_a.real + dual_b.real | |
| dual_part = dual_a.dual + dual_b.dual | |
| return DualTensor(real_part, dual_part) | |
| def log(dual_tensor): | |
| real_part = np.log(dual_tensor.real) | |
| temp_1 = 1/dual_tensor.real | |
| # Fill matrix with diagonal entries of log derivative | |
| temp_2 = np.zeros((temp_1.shape[0], temp_1.shape[0])) | |
| np.fill_diagonal(temp_2, temp_1) | |
| dual_part = np.dot(temp_2, dual_tensor.dual) | |
| return DualTensor(real_part, dual_part) | |
| def sigmoid(dual_tensor): | |
| real_part = 1/(1+np.exp(-dual_tensor.real)) | |
| temp_1 = np.multiply(real_part, 1-real_part) | |
| # Fill matrix with diagonal entries of sigmoid derivative | |
| temp_2 = np.zeros((temp_1.shape[0], temp_1.shape[0])) | |
| np.fill_diagonal(temp_2, temp_1) | |
| dual_part = np.dot(temp_2, dual_tensor.dual) | |
| return DualTensor(real_part, dual_part) | |
| def binary_cross_entropy_dual(data_loader, batch_id, b_current): | |
| # Transform the coefficients into dual numbers | |
| b_dual = np.zeros((len(b_current), len(b_current))) | |
| np.fill_diagonal(b_dual, 1) | |
| b_dual = DualTensor(b_current, b_dual) | |
| # Select the current batch | |
| X, y = data_loader.get_batch_idx(batch_id, get_batch=True) | |
| # Apply element-wise sigmoid activation | |
| y_pred = sigmoid(dot_product(b_dual, X)) | |
| y_pred_2 = DualTensor(1-y_pred.real, -y_pred.dual) | |
| # Make numerically stable! | |
| y_pred.real = np.clip(y_pred.real, 1e-15, 1-1e-15) | |
| y_pred_2.real = np.clip(y_pred_2.real, 1e-15, 1-1e-15) | |
| # Compute actual binary cross-entropy term | |
| log_y_pred, log_y_pred_2 = log(y_pred), log(y_pred_2) | |
| bce_l1, bce_l2 = dot_product(log_y_pred, -y), dot_product(log_y_pred_2, -(1 - y)) | |
| bce = add_duals(bce_l1, bce_l2) | |
| # Calculate the batch classification accuracy | |
| acc = (y == (y_pred.real > 0.5)).sum()/y.shape[0] | |
| return bce, acc | |
| def train_logistic_regression(n, d, n_epoch, batch_size, b_init, l_rate): | |
| # Generate the data for a coefficient vector & init progress tracker! | |
| data_loader = DataLoader(n, d, batch_size, binary=True) | |
| b_hist, func_val_hist, param_error = [b_init], [binary_cross_entropy_dual(data_loader, 0, | |
| b_init)], [] | |
| logreg = LogisticRegression(C=1e5, solver='lbfgs', multi_class='multinomial') | |
| logreg.fit(data_loader.X, data_loader.y) | |
| for epoch in range(n_epoch): | |
| # Shuffle the batch identities at beginning of each epoch | |
| data_loader.shuffle_arrays() | |
| for batch_id in range(data_loader.num_batches): | |
| # Calculate the forward AD - real = func, dual = deriv | |
| current_dual, acc = binary_cross_entropy_dual(data_loader, batch_id, b_hist[-1]) | |
| # Perform grad step & append results to the placeholder list | |
| b_hist.append(b_hist[-1] - l_rate*np.array(current_dual.dual).flatten()) | |
| func_val_hist.append(current_dual.real) | |
| param_error.append(np.linalg.norm(logreg.coef_.ravel() - b_hist[-1])) | |
| if epoch % 1 == 0: | |
| print("Accuracy: {} | Euclidean Param Norm: {} | fct min: {}".format(acc, param_error[-1], current_dual.real)) | |
| return b_hist[1:], func_val_hist[1:], param_error | |
| if __name__ == "__main__": | |
| np.random.seed(1) | |
| b, f, error = train_logistic_regression(1000, 4, 50, 32, np.array([0, 0, 0, 0, 0]).astype(float), 0.005) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment