Created
August 10, 2025 10:52
-
-
Save nisarkhanatwork/804bb450b2749a8def40f01606166da6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import re | |
| import json | |
| # Expanded dataset of motivational quotes for better training | |
| quotes = [ | |
| "The only way to do great work is to love what you do.", | |
| "You are never too old to set another goal or to dream a new dream.", | |
| "The future belongs to those who believe in the beauty of their dreams.", | |
| "Success is not final, failure is not fatal: it is the courage to continue that counts.", | |
| "The only impossible journey is the one you never begin.", | |
| "In the middle of difficulty lies opportunity.", | |
| "Believe you can and you're halfway there.", | |
| "It does not matter how slowly you go as long as you do not stop.", | |
| "Everything you've ever wanted is on the other side of fear.", | |
| "Life is what happens to you while you're busy making other plans.", | |
| "It is during our darkest moments that we must focus to see the light.", | |
| "The only person you are destined to become is the person you decide to be.", | |
| "Go confidently in the direction of your dreams. Live the life you have imagined.", | |
| "The way to get started is to quit talking and begin doing.", | |
| "Your time is limited, so don't waste it living someone else's life.", | |
| "Innovation distinguishes between a leader and a follower.", | |
| "Stay hungry, stay foolish.", | |
| "The greatest glory in living lies not in never falling, but in rising every time we fall.", | |
| "If you want to live a happy life, tie it to a goal, not to people or things.", | |
| "The only thing we have to fear is fear itself.", | |
| ] * 15 # More diverse quotes, replicated for training data | |
| # Simple word-level tokenizer | |
| class Tokenizer: | |
| def __init__(self, corpus): | |
| words = set(word.lower() for quote in corpus for word in re.findall(r'\w+|[^\w\s]', quote)) | |
| self.vocab = ['<pad>', '<sos>', '<eos>'] + sorted(words)[:497] # Limit to ~500 tokens | |
| self.word2idx = {w: i for i, w in enumerate(self.vocab)} | |
| self.idx2word = {i: w for i, w in enumerate(self.vocab)} | |
| def encode(self, text): | |
| return [self.word2idx.get(word.lower(), 0) for word in re.findall(r'\w+|[^\w\s]', text)] + [self.word2idx['<eos>']] | |
| def decode(self, indices): | |
| return ' '.join(self.idx2word.get(i, '<unk>') for i in indices if i not in [0, self.word2idx['<eos>']]) | |
| # Positional Encoding | |
| def positional_encoding(max_len, d_model): | |
| pe = np.zeros((max_len, d_model)) | |
| for pos in range(max_len): | |
| for i in range(0, d_model, 2): | |
| pe[pos, i] = np.sin(pos / 10000 ** (i / d_model)) | |
| if i + 1 < d_model: | |
| pe[pos, i + 1] = np.cos(pos / 10000 ** ((i + 1) / d_model)) | |
| return pe | |
| # Scaled Dot-Product Attention | |
| def scaled_dot_product_attention(Q, K, V, mask=None): | |
| d_k = Q.shape[-1] | |
| # Use swapaxes to transpose the last two dimensions properly | |
| K_T = np.swapaxes(K, -2, -1) | |
| scores = np.matmul(Q, K_T) / np.sqrt(d_k) | |
| if mask is not None: | |
| scores = np.where(mask == 0, -1e9, scores) | |
| weights = np.exp(scores - np.max(scores, axis=-1, keepdims=True)) / np.sum(np.exp(scores - np.max(scores, axis=-1, keepdims=True)), axis=-1, keepdims=True) | |
| return np.matmul(weights, V), weights | |
| # Multi-Head Attention | |
| class MultiHeadAttention: | |
| def __init__(self, d_model, num_heads): | |
| self.d_model = d_model | |
| self.num_heads = num_heads | |
| self.d_k = d_model // num_heads | |
| self.W_q = np.random.randn(d_model, d_model) * 0.02 | |
| self.W_k = np.random.randn(d_model, d_model) * 0.02 | |
| self.W_v = np.random.randn(d_model, d_model) * 0.02 | |
| self.W_o = np.random.randn(d_model, d_model) * 0.02 | |
| def forward(self, x, mask=None): | |
| batch_size, seq_len, _ = x.shape | |
| Q = np.matmul(x, self.W_q).reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3) | |
| K = np.matmul(x, self.W_k).reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3) | |
| V = np.matmul(x, self.W_v).reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3) | |
| if mask is not None: | |
| # Expand mask to match multi-head attention dimensions: (batch_size, num_heads, seq_len, seq_len) | |
| mask = mask[np.newaxis, np.newaxis, :, :] # Add batch and head dimensions | |
| mask = np.broadcast_to(mask, (batch_size, self.num_heads, seq_len, seq_len)) | |
| output, weights = scaled_dot_product_attention(Q, K, V, mask) | |
| output = output.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, self.d_model) | |
| output = np.matmul(output, self.W_o) | |
| return output, weights | |
| # Feedforward Network | |
| class FeedForward: | |
| def __init__(self, d_model, d_ff): | |
| self.W1 = np.random.randn(d_model, d_ff) * 0.02 | |
| self.b1 = np.zeros((1, d_ff)) | |
| self.W2 = np.random.randn(d_ff, d_model) * 0.02 | |
| self.b2 = np.zeros((1, d_model)) | |
| def forward(self, x): | |
| h = np.maximum(0, np.matmul(x, self.W1) + self.b1) # ReLU | |
| return np.matmul(h, self.W2) + self.b2 | |
| # Transformer Decoder Layer | |
| class DecoderLayer: | |
| def __init__(self, d_model, num_heads, d_ff): | |
| self.mha = MultiHeadAttention(d_model, num_heads) | |
| self.ffn = FeedForward(d_model, d_ff) | |
| self.norm1 = lambda x: (x - np.mean(x, axis=-1, keepdims=True)) / (np.std(x, axis=-1, keepdims=True) + 1e-6) | |
| self.norm2 = lambda x: (x - np.mean(x, axis=-1, keepdims=True)) / (np.std(x, axis=-1, keepdims=True) + 1e-6) | |
| def forward(self, x, mask): | |
| x_norm = self.norm1(x) | |
| attn_output, weights = self.mha.forward(x_norm, mask) | |
| x = x + attn_output # Residual connection | |
| x_norm = self.norm2(x) | |
| ffn_output = self.ffn.forward(x_norm) | |
| x = x + ffn_output # Residual connection | |
| return x, weights | |
| # Toy Transformer | |
| class ToyTransformer: | |
| def __init__(self, vocab_size, d_model=64, num_heads=2, d_ff=64, max_len=50): | |
| self.d_model = d_model | |
| self.vocab_size = vocab_size | |
| self.max_len = max_len | |
| self.embedding = np.random.randn(vocab_size, d_model) * 0.02 | |
| self.pos_encoding = positional_encoding(max_len, d_model) | |
| self.decoder = DecoderLayer(d_model, num_heads, d_ff) | |
| self.output_layer = np.random.randn(d_model, vocab_size) * 0.02 | |
| self.output_bias = np.zeros((1, vocab_size)) | |
| def create_mask(self, seq_len): | |
| mask = np.tril(np.ones((seq_len, seq_len))) | |
| return mask | |
| def forward(self, x): | |
| batch_size, seq_len = x.shape | |
| x_emb = self.embedding[x] + self.pos_encoding[:seq_len] | |
| mask = self.create_mask(seq_len) | |
| x, attn_weights = self.decoder.forward(x_emb, mask) | |
| logits = np.matmul(x, self.output_layer) + self.output_bias | |
| probs = np.exp(logits - np.max(logits, axis=-1, keepdims=True)) / np.sum(np.exp(logits - np.max(logits, axis=-1, keepdims=True)), axis=-1, keepdims=True) | |
| return probs, attn_weights | |
| def predict(self, tokenizer, prompt, max_len=20, temperature=0.8, top_k=10): | |
| """Generate text with improved sampling strategies""" | |
| tokens = [tokenizer.word2idx['<sos>']] + tokenizer.encode(prompt) | |
| for _ in range(max_len): | |
| x = np.array([tokens[-50:]]) # Limit sequence length | |
| probs, _ = self.forward(x) | |
| # Apply temperature for more controlled randomness | |
| logits = np.log(probs[0, -1] + 1e-10) / temperature | |
| probs_temp = np.exp(logits) / np.sum(np.exp(logits)) | |
| # Top-k sampling for better quality | |
| if top_k > 0: | |
| top_k_indices = np.argpartition(probs_temp, -top_k)[-top_k:] | |
| top_k_probs = probs_temp[top_k_indices] | |
| top_k_probs = top_k_probs / np.sum(top_k_probs) | |
| next_token = top_k_indices[np.random.choice(len(top_k_indices), p=top_k_probs)] | |
| else: | |
| next_token = np.random.choice(self.vocab_size, p=probs_temp) | |
| tokens.append(next_token) | |
| if next_token == tokenizer.word2idx['<eos>']: | |
| break | |
| return tokenizer.decode(tokens[1:]) | |
| # Training with improvements | |
| def train(model, tokenizer, corpus, epochs=50, batch_size=8, lr=0.001): | |
| optimizer = AdamOptimizer(model, lr) | |
| best_loss = float('inf') | |
| for epoch in range(epochs): | |
| np.random.shuffle(corpus) | |
| total_loss = 0 | |
| num_batches = 0 | |
| # Learning rate decay | |
| if epoch > 0 and epoch % 15 == 0: | |
| optimizer.lr *= 0.8 # Reduce learning rate | |
| print(f"Learning rate reduced to: {optimizer.lr:.6f}") | |
| for i in range(0, len(corpus), batch_size): | |
| batch = corpus[i:i + batch_size] | |
| inputs = [tokenizer.encode(quote) for quote in batch] | |
| max_len = max(len(x) for x in inputs) | |
| inputs = np.array([np.pad(x, (0, max_len - len(x)), constant_values=0) for x in inputs]) | |
| targets = inputs[:, 1:] | |
| inputs = inputs[:, :-1] | |
| probs, _ = model.forward(inputs) | |
| # Handle variable batch sizes properly | |
| actual_batch_size = len(batch) | |
| loss = -np.mean(np.log(probs[np.arange(actual_batch_size)[:, None], np.arange(max_len-1)[None, :], targets] + 1e-10)) | |
| optimizer.zero_grad() | |
| grads = compute_gradients(model, inputs, targets, probs) | |
| optimizer.step(grads) | |
| total_loss += loss | |
| num_batches += 1 | |
| avg_loss = total_loss / num_batches | |
| print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}") | |
| # Save best model (conceptually) | |
| if avg_loss < best_loss: | |
| best_loss = avg_loss | |
| print(f"New best loss: {best_loss:.4f}") | |
| # Early stopping if loss plateaus | |
| if epoch > 20 and avg_loss > best_loss * 1.1: | |
| print("Loss plateaued, consider stopping early") | |
| # Simple Adam Optimizer | |
| class AdamOptimizer: | |
| def __init__(self, model, lr, beta1=0.9, beta2=0.999, eps=1e-8): | |
| self.lr = lr | |
| self.beta1 = beta1 | |
| self.beta2 = beta2 | |
| self.eps = eps | |
| self.model = model # Store model reference | |
| self.m = [np.zeros_like(param) for param in [model.embedding, model.decoder.mha.W_q, model.decoder.mha.W_k, model.decoder.mha.W_v, model.decoder.mha.W_o, model.decoder.ffn.W1, model.decoder.ffn.b1, model.decoder.ffn.W2, model.decoder.ffn.b2, model.output_layer, model.output_bias]] | |
| self.v = [np.zeros_like(param) for param in self.m] | |
| self.t = 0 | |
| def zero_grad(self): | |
| # Don't reinitialize momentum, just pass (gradients are computed fresh each time) | |
| pass | |
| def step(self, grads): | |
| self.t += 1 | |
| for i, (g, m, v) in enumerate(zip(grads, self.m, self.v)): | |
| m = self.beta1 * m + (1 - self.beta1) * g | |
| v = self.beta2 * v + (1 - self.beta2) * (g ** 2) | |
| m_hat = m / (1 - self.beta1 ** self.t) | |
| v_hat = v / (1 - self.beta2 ** self.t) | |
| self.m[i] = m | |
| self.v[i] = v | |
| if i == 0: self.model.embedding -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 1: self.model.decoder.mha.W_q -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 2: self.model.decoder.mha.W_k -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 3: self.model.decoder.mha.W_v -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 4: self.model.decoder.mha.W_o -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 5: self.model.decoder.ffn.W1 -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 6: self.model.decoder.ffn.b1 -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 7: self.model.decoder.ffn.W2 -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 8: self.model.decoder.ffn.b2 -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 9: self.model.output_layer -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| elif i == 10: self.model.output_bias -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) | |
| # Gradient Computation (Simplified) | |
| def compute_gradients(model, inputs, targets, probs): | |
| batch_size, seq_len = inputs.shape | |
| grad_logits = probs.copy() | |
| grad_logits[np.arange(batch_size)[:, None], np.arange(seq_len), targets] -= 1 | |
| grad_logits /= batch_size | |
| # Get the decoder output without calling forward again (use cached values) | |
| x_emb = model.embedding[inputs] + model.pos_encoding[:seq_len] | |
| mask = model.create_mask(seq_len) | |
| decoder_output, _ = model.decoder.forward(x_emb, mask) | |
| grad_output_layer = np.sum(np.matmul(decoder_output.transpose(0, 2, 1), grad_logits), axis=0) | |
| grad_output_bias = np.sum(grad_logits, axis=(0, 1), keepdims=True).reshape(1, -1) | |
| # Backprop through decoder (simplified, focusing on output layer for brevity) | |
| return [np.zeros_like(model.embedding), *([np.zeros_like(p) for p in [model.decoder.mha.W_q, model.decoder.mha.W_k, model.decoder.mha.W_v, model.decoder.mha.W_o, model.decoder.ffn.W1, model.decoder.ffn.b1, model.decoder.ffn.W2, model.decoder.ffn.b2]]), grad_output_layer, grad_output_bias] | |
| # Visualize Attention | |
| def visualize_attention(tokenizer, model, prompt): | |
| tokens = [tokenizer.word2idx['<sos>']] + tokenizer.encode(prompt) | |
| x = np.array([tokens[-50:]]) | |
| _, attn_weights = model.forward(x) | |
| # Use the actual sequence length from the input | |
| seq_len = x.shape[1] | |
| token_labels = [tokenizer.idx2word.get(i, '<unk>') for i in x[0]] | |
| plt.figure(figsize=(8, 6)) | |
| plt.imshow(attn_weights[0, 0][:seq_len, :seq_len], cmap='hot', interpolation='nearest') | |
| plt.colorbar(label='Attention Weight') | |
| plt.title(f'Attention Weights for: {prompt}') | |
| plt.xlabel('Key Tokens') | |
| plt.ylabel('Query Tokens') | |
| plt.xticks(range(seq_len), token_labels, rotation=45) | |
| plt.yticks(range(seq_len), token_labels) | |
| plt.tight_layout() | |
| plt.show() | |
| # Main | |
| def main(): | |
| print("Initializing tokenizer...") | |
| tokenizer = Tokenizer(quotes) | |
| print(f"Vocabulary size: {len(tokenizer.vocab)}") | |
| print(f"Training on {len(quotes)} quotes") | |
| print("Initializing model...") | |
| model = ToyTransformer(vocab_size=len(tokenizer.vocab)) | |
| print("Starting training...") | |
| train(model, tokenizer, quotes, epochs=30, batch_size=8, lr=0.002) # Better parameters | |
| print("\n" + "="*60) | |
| print("GENERATING TEXT WITH DIFFERENT SETTINGS") | |
| print("="*60) | |
| # Test different prompts and generation settings | |
| test_prompts = [ | |
| "The only way to do great", | |
| "Success is", | |
| "Life is", | |
| "Dreams are", | |
| "The future belongs" | |
| ] | |
| for prompt in test_prompts: | |
| print(f"\n--- Prompt: '{prompt}' ---") | |
| # Conservative generation (low temperature) | |
| conservative = model.predict(tokenizer, prompt, max_len=15, temperature=0.5, top_k=5) | |
| print(f"Conservative (T=0.5): {conservative}") | |
| # Balanced generation | |
| balanced = model.predict(tokenizer, prompt, max_len=15, temperature=0.8, top_k=10) | |
| print(f"Balanced (T=0.8): {balanced}") | |
| # Creative generation (higher temperature) | |
| creative = model.predict(tokenizer, prompt, max_len=15, temperature=1.2, top_k=15) | |
| print(f"Creative (T=1.2): {creative}") | |
| # Visualize attention (comment out to avoid display issues) | |
| # visualize_attention(tokenizer, model, test_prompts[0]) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment