Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save nisarkhanatwork/804bb450b2749a8def40f01606166da6 to your computer and use it in GitHub Desktop.

Select an option

Save nisarkhanatwork/804bb450b2749a8def40f01606166da6 to your computer and use it in GitHub Desktop.
import numpy as np
import matplotlib.pyplot as plt
import re
import json
# Expanded dataset of motivational quotes for better training
quotes = [
"The only way to do great work is to love what you do.",
"You are never too old to set another goal or to dream a new dream.",
"The future belongs to those who believe in the beauty of their dreams.",
"Success is not final, failure is not fatal: it is the courage to continue that counts.",
"The only impossible journey is the one you never begin.",
"In the middle of difficulty lies opportunity.",
"Believe you can and you're halfway there.",
"It does not matter how slowly you go as long as you do not stop.",
"Everything you've ever wanted is on the other side of fear.",
"Life is what happens to you while you're busy making other plans.",
"It is during our darkest moments that we must focus to see the light.",
"The only person you are destined to become is the person you decide to be.",
"Go confidently in the direction of your dreams. Live the life you have imagined.",
"The way to get started is to quit talking and begin doing.",
"Your time is limited, so don't waste it living someone else's life.",
"Innovation distinguishes between a leader and a follower.",
"Stay hungry, stay foolish.",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"If you want to live a happy life, tie it to a goal, not to people or things.",
"The only thing we have to fear is fear itself.",
] * 15 # More diverse quotes, replicated for training data
# Simple word-level tokenizer
class Tokenizer:
def __init__(self, corpus):
words = set(word.lower() for quote in corpus for word in re.findall(r'\w+|[^\w\s]', quote))
self.vocab = ['<pad>', '<sos>', '<eos>'] + sorted(words)[:497] # Limit to ~500 tokens
self.word2idx = {w: i for i, w in enumerate(self.vocab)}
self.idx2word = {i: w for i, w in enumerate(self.vocab)}
def encode(self, text):
return [self.word2idx.get(word.lower(), 0) for word in re.findall(r'\w+|[^\w\s]', text)] + [self.word2idx['<eos>']]
def decode(self, indices):
return ' '.join(self.idx2word.get(i, '<unk>') for i in indices if i not in [0, self.word2idx['<eos>']])
# Positional Encoding
def positional_encoding(max_len, d_model):
pe = np.zeros((max_len, d_model))
for pos in range(max_len):
for i in range(0, d_model, 2):
pe[pos, i] = np.sin(pos / 10000 ** (i / d_model))
if i + 1 < d_model:
pe[pos, i + 1] = np.cos(pos / 10000 ** ((i + 1) / d_model))
return pe
# Scaled Dot-Product Attention
def scaled_dot_product_attention(Q, K, V, mask=None):
d_k = Q.shape[-1]
# Use swapaxes to transpose the last two dimensions properly
K_T = np.swapaxes(K, -2, -1)
scores = np.matmul(Q, K_T) / np.sqrt(d_k)
if mask is not None:
scores = np.where(mask == 0, -1e9, scores)
weights = np.exp(scores - np.max(scores, axis=-1, keepdims=True)) / np.sum(np.exp(scores - np.max(scores, axis=-1, keepdims=True)), axis=-1, keepdims=True)
return np.matmul(weights, V), weights
# Multi-Head Attention
class MultiHeadAttention:
def __init__(self, d_model, num_heads):
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = np.random.randn(d_model, d_model) * 0.02
self.W_k = np.random.randn(d_model, d_model) * 0.02
self.W_v = np.random.randn(d_model, d_model) * 0.02
self.W_o = np.random.randn(d_model, d_model) * 0.02
def forward(self, x, mask=None):
batch_size, seq_len, _ = x.shape
Q = np.matmul(x, self.W_q).reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3)
K = np.matmul(x, self.W_k).reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3)
V = np.matmul(x, self.W_v).reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(0, 2, 1, 3)
if mask is not None:
# Expand mask to match multi-head attention dimensions: (batch_size, num_heads, seq_len, seq_len)
mask = mask[np.newaxis, np.newaxis, :, :] # Add batch and head dimensions
mask = np.broadcast_to(mask, (batch_size, self.num_heads, seq_len, seq_len))
output, weights = scaled_dot_product_attention(Q, K, V, mask)
output = output.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, self.d_model)
output = np.matmul(output, self.W_o)
return output, weights
# Feedforward Network
class FeedForward:
def __init__(self, d_model, d_ff):
self.W1 = np.random.randn(d_model, d_ff) * 0.02
self.b1 = np.zeros((1, d_ff))
self.W2 = np.random.randn(d_ff, d_model) * 0.02
self.b2 = np.zeros((1, d_model))
def forward(self, x):
h = np.maximum(0, np.matmul(x, self.W1) + self.b1) # ReLU
return np.matmul(h, self.W2) + self.b2
# Transformer Decoder Layer
class DecoderLayer:
def __init__(self, d_model, num_heads, d_ff):
self.mha = MultiHeadAttention(d_model, num_heads)
self.ffn = FeedForward(d_model, d_ff)
self.norm1 = lambda x: (x - np.mean(x, axis=-1, keepdims=True)) / (np.std(x, axis=-1, keepdims=True) + 1e-6)
self.norm2 = lambda x: (x - np.mean(x, axis=-1, keepdims=True)) / (np.std(x, axis=-1, keepdims=True) + 1e-6)
def forward(self, x, mask):
x_norm = self.norm1(x)
attn_output, weights = self.mha.forward(x_norm, mask)
x = x + attn_output # Residual connection
x_norm = self.norm2(x)
ffn_output = self.ffn.forward(x_norm)
x = x + ffn_output # Residual connection
return x, weights
# Toy Transformer
class ToyTransformer:
def __init__(self, vocab_size, d_model=64, num_heads=2, d_ff=64, max_len=50):
self.d_model = d_model
self.vocab_size = vocab_size
self.max_len = max_len
self.embedding = np.random.randn(vocab_size, d_model) * 0.02
self.pos_encoding = positional_encoding(max_len, d_model)
self.decoder = DecoderLayer(d_model, num_heads, d_ff)
self.output_layer = np.random.randn(d_model, vocab_size) * 0.02
self.output_bias = np.zeros((1, vocab_size))
def create_mask(self, seq_len):
mask = np.tril(np.ones((seq_len, seq_len)))
return mask
def forward(self, x):
batch_size, seq_len = x.shape
x_emb = self.embedding[x] + self.pos_encoding[:seq_len]
mask = self.create_mask(seq_len)
x, attn_weights = self.decoder.forward(x_emb, mask)
logits = np.matmul(x, self.output_layer) + self.output_bias
probs = np.exp(logits - np.max(logits, axis=-1, keepdims=True)) / np.sum(np.exp(logits - np.max(logits, axis=-1, keepdims=True)), axis=-1, keepdims=True)
return probs, attn_weights
def predict(self, tokenizer, prompt, max_len=20, temperature=0.8, top_k=10):
"""Generate text with improved sampling strategies"""
tokens = [tokenizer.word2idx['<sos>']] + tokenizer.encode(prompt)
for _ in range(max_len):
x = np.array([tokens[-50:]]) # Limit sequence length
probs, _ = self.forward(x)
# Apply temperature for more controlled randomness
logits = np.log(probs[0, -1] + 1e-10) / temperature
probs_temp = np.exp(logits) / np.sum(np.exp(logits))
# Top-k sampling for better quality
if top_k > 0:
top_k_indices = np.argpartition(probs_temp, -top_k)[-top_k:]
top_k_probs = probs_temp[top_k_indices]
top_k_probs = top_k_probs / np.sum(top_k_probs)
next_token = top_k_indices[np.random.choice(len(top_k_indices), p=top_k_probs)]
else:
next_token = np.random.choice(self.vocab_size, p=probs_temp)
tokens.append(next_token)
if next_token == tokenizer.word2idx['<eos>']:
break
return tokenizer.decode(tokens[1:])
# Training with improvements
def train(model, tokenizer, corpus, epochs=50, batch_size=8, lr=0.001):
optimizer = AdamOptimizer(model, lr)
best_loss = float('inf')
for epoch in range(epochs):
np.random.shuffle(corpus)
total_loss = 0
num_batches = 0
# Learning rate decay
if epoch > 0 and epoch % 15 == 0:
optimizer.lr *= 0.8 # Reduce learning rate
print(f"Learning rate reduced to: {optimizer.lr:.6f}")
for i in range(0, len(corpus), batch_size):
batch = corpus[i:i + batch_size]
inputs = [tokenizer.encode(quote) for quote in batch]
max_len = max(len(x) for x in inputs)
inputs = np.array([np.pad(x, (0, max_len - len(x)), constant_values=0) for x in inputs])
targets = inputs[:, 1:]
inputs = inputs[:, :-1]
probs, _ = model.forward(inputs)
# Handle variable batch sizes properly
actual_batch_size = len(batch)
loss = -np.mean(np.log(probs[np.arange(actual_batch_size)[:, None], np.arange(max_len-1)[None, :], targets] + 1e-10))
optimizer.zero_grad()
grads = compute_gradients(model, inputs, targets, probs)
optimizer.step(grads)
total_loss += loss
num_batches += 1
avg_loss = total_loss / num_batches
print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}")
# Save best model (conceptually)
if avg_loss < best_loss:
best_loss = avg_loss
print(f"New best loss: {best_loss:.4f}")
# Early stopping if loss plateaus
if epoch > 20 and avg_loss > best_loss * 1.1:
print("Loss plateaued, consider stopping early")
# Simple Adam Optimizer
class AdamOptimizer:
def __init__(self, model, lr, beta1=0.9, beta2=0.999, eps=1e-8):
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.eps = eps
self.model = model # Store model reference
self.m = [np.zeros_like(param) for param in [model.embedding, model.decoder.mha.W_q, model.decoder.mha.W_k, model.decoder.mha.W_v, model.decoder.mha.W_o, model.decoder.ffn.W1, model.decoder.ffn.b1, model.decoder.ffn.W2, model.decoder.ffn.b2, model.output_layer, model.output_bias]]
self.v = [np.zeros_like(param) for param in self.m]
self.t = 0
def zero_grad(self):
# Don't reinitialize momentum, just pass (gradients are computed fresh each time)
pass
def step(self, grads):
self.t += 1
for i, (g, m, v) in enumerate(zip(grads, self.m, self.v)):
m = self.beta1 * m + (1 - self.beta1) * g
v = self.beta2 * v + (1 - self.beta2) * (g ** 2)
m_hat = m / (1 - self.beta1 ** self.t)
v_hat = v / (1 - self.beta2 ** self.t)
self.m[i] = m
self.v[i] = v
if i == 0: self.model.embedding -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 1: self.model.decoder.mha.W_q -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 2: self.model.decoder.mha.W_k -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 3: self.model.decoder.mha.W_v -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 4: self.model.decoder.mha.W_o -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 5: self.model.decoder.ffn.W1 -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 6: self.model.decoder.ffn.b1 -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 7: self.model.decoder.ffn.W2 -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 8: self.model.decoder.ffn.b2 -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 9: self.model.output_layer -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
elif i == 10: self.model.output_bias -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
# Gradient Computation (Simplified)
def compute_gradients(model, inputs, targets, probs):
batch_size, seq_len = inputs.shape
grad_logits = probs.copy()
grad_logits[np.arange(batch_size)[:, None], np.arange(seq_len), targets] -= 1
grad_logits /= batch_size
# Get the decoder output without calling forward again (use cached values)
x_emb = model.embedding[inputs] + model.pos_encoding[:seq_len]
mask = model.create_mask(seq_len)
decoder_output, _ = model.decoder.forward(x_emb, mask)
grad_output_layer = np.sum(np.matmul(decoder_output.transpose(0, 2, 1), grad_logits), axis=0)
grad_output_bias = np.sum(grad_logits, axis=(0, 1), keepdims=True).reshape(1, -1)
# Backprop through decoder (simplified, focusing on output layer for brevity)
return [np.zeros_like(model.embedding), *([np.zeros_like(p) for p in [model.decoder.mha.W_q, model.decoder.mha.W_k, model.decoder.mha.W_v, model.decoder.mha.W_o, model.decoder.ffn.W1, model.decoder.ffn.b1, model.decoder.ffn.W2, model.decoder.ffn.b2]]), grad_output_layer, grad_output_bias]
# Visualize Attention
def visualize_attention(tokenizer, model, prompt):
tokens = [tokenizer.word2idx['<sos>']] + tokenizer.encode(prompt)
x = np.array([tokens[-50:]])
_, attn_weights = model.forward(x)
# Use the actual sequence length from the input
seq_len = x.shape[1]
token_labels = [tokenizer.idx2word.get(i, '<unk>') for i in x[0]]
plt.figure(figsize=(8, 6))
plt.imshow(attn_weights[0, 0][:seq_len, :seq_len], cmap='hot', interpolation='nearest')
plt.colorbar(label='Attention Weight')
plt.title(f'Attention Weights for: {prompt}')
plt.xlabel('Key Tokens')
plt.ylabel('Query Tokens')
plt.xticks(range(seq_len), token_labels, rotation=45)
plt.yticks(range(seq_len), token_labels)
plt.tight_layout()
plt.show()
# Main
def main():
print("Initializing tokenizer...")
tokenizer = Tokenizer(quotes)
print(f"Vocabulary size: {len(tokenizer.vocab)}")
print(f"Training on {len(quotes)} quotes")
print("Initializing model...")
model = ToyTransformer(vocab_size=len(tokenizer.vocab))
print("Starting training...")
train(model, tokenizer, quotes, epochs=30, batch_size=8, lr=0.002) # Better parameters
print("\n" + "="*60)
print("GENERATING TEXT WITH DIFFERENT SETTINGS")
print("="*60)
# Test different prompts and generation settings
test_prompts = [
"The only way to do great",
"Success is",
"Life is",
"Dreams are",
"The future belongs"
]
for prompt in test_prompts:
print(f"\n--- Prompt: '{prompt}' ---")
# Conservative generation (low temperature)
conservative = model.predict(tokenizer, prompt, max_len=15, temperature=0.5, top_k=5)
print(f"Conservative (T=0.5): {conservative}")
# Balanced generation
balanced = model.predict(tokenizer, prompt, max_len=15, temperature=0.8, top_k=10)
print(f"Balanced (T=0.8): {balanced}")
# Creative generation (higher temperature)
creative = model.predict(tokenizer, prompt, max_len=15, temperature=1.2, top_k=15)
print(f"Creative (T=1.2): {creative}")
# Visualize attention (comment out to avoid display issues)
# visualize_attention(tokenizer, model, test_prompts[0])
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment