-
-
Save JonasLoos/c935738d191dcc49b3603f2688d103b3 to your computer and use it in GitHub Desktop.
microgpt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """mircrogpt by @karpathy, edited by @jonasloos: train a language model on it's own code, without dependencies""" | |
| import math | |
| import random | |
| random.seed(42) # Let there be order among chaos | |
| # Let there be data (this file) and tokenize it | |
| text = open(__file__).read().lower() | |
| lines = [line for line in text.split('\n') if line.strip() and not line.strip().startswith('#')] | |
| uchars = [*sorted(set(text)), 'BOS'] # unique characters in the dataset become token ids 0..n-1 | |
| BOS = len(uchars) - 1 # token id for the special Beginning of Sequence (BOS) token | |
| print(f"train size: {len(text)} tokens\nvocab size: {len(uchars)}") | |
| # Let there be Autograd, to recursively apply the chain rule through a computation graph | |
| class Value: | |
| __slots__ = ('data', 'grad', '_children', '_local_grads') # Python optimization for memory usage | |
| def __init__(self, data, children=(), local_grads=()): | |
| self.data = data # scalar value of this node calculated during forward pass | |
| self.grad = 0 # derivative of the loss w.r.t. this node, calculated in backward pass | |
| self._children = children # children of this node in the computation graph | |
| self._local_grads = local_grads # local derivative of this node w.r.t. its children | |
| def __add__(self, other): | |
| other = other if isinstance(other, Value) else Value(other) | |
| return Value(self.data + other.data, (self, other), (1, 1)) | |
| def __mul__(self, other): | |
| other = other if isinstance(other, Value) else Value(other) | |
| return Value(self.data * other.data, (self, other), (other.data, self.data)) | |
| def __pow__(self, other): return Value(self.data**other, (self,), (other * self.data**(other-1),)) | |
| def log(self): return Value(math.log(self.data), (self,), (1/self.data,)) | |
| def exp(self): return Value(math.exp(self.data), (self,), (math.exp(self.data),)) | |
| def relu(self): return Value(max(0, self.data), (self,), (float(self.data > 0),)) | |
| def __neg__(self): return self * -1 | |
| def __radd__(self, other): return self + other | |
| def __sub__(self, other): return self + (-other) | |
| def __rsub__(self, other): return other + (-self) | |
| def __rmul__(self, other): return self * other | |
| def __truediv__(self, other): return self * other**-1 | |
| def __rtruediv__(self, other): return other * self**-1 | |
| def backward(self): | |
| topo, visited = [], set() | |
| def build_topo(v): | |
| if v not in visited: | |
| visited.add(v) | |
| for child in v._children: build_topo(child) | |
| topo.append(v) | |
| build_topo(self) | |
| self.grad = 1 | |
| for v in reversed(topo): | |
| for child, local_grad in zip(v._children, v._local_grads): | |
| child.grad += local_grad * v.grad | |
| # Model config | |
| n_embd = 24 # embedding dimension | |
| n_head = 3 # number of attention heads | |
| n_layer = 2 # number of layers | |
| block_size = 48 # maximum sequence length | |
| mlp_ratio = 2 # MLP expansion ratio | |
| matrix = lambda nout, nin, std=0.1: [[Value(random.gauss(0, std)) for _ in range(nin)] for _ in range(nout)] | |
| dot = lambda a, b: Value(sum(ai.data * bi.data for ai, bi in zip(a, b)), tuple(a + b), tuple(x.data for x in b + a)) | |
| linear = lambda x, w: [dot(wo, x) for wo in w] | |
| def softmax(logits): | |
| max_val = max(val.data for val in logits) | |
| exps = [(val - max_val).exp() for val in logits] | |
| total = sum(exps) | |
| return [e / total for e in exps] | |
| def rmsnorm(x): | |
| scale = (sum(xi * xi for xi in x) / len(x) + 1e-5) ** -0.5 | |
| return [xi * scale for xi in x] | |
| # Let there be Attention, the core mechanism that lets tokens communicate | |
| class Attention: | |
| def __init__(self): | |
| # initialize weights: query-, key-, value-, output-projection matrices | |
| self.wq, self.wk, self.wv, self.wo = [matrix(n_embd, n_embd) for _ in range(4)] | |
| def __call__(self, x, k, v): | |
| head_dim = n_embd // n_head | |
| q = linear(x, self.wq) | |
| k.append(linear(x, self.wk)) | |
| v.append(linear(x, self.wv)) | |
| x_attn = [] | |
| for h in range(n_head): | |
| hs = h * head_dim | |
| q_h = q[hs:hs+head_dim] | |
| k_h = [ki[hs:hs+head_dim] for ki in k] | |
| v_h = [vi[hs:hs+head_dim] for vi in v] | |
| attn_logits = [dot(q_h, k_h[t]) * head_dim ** -0.5 for t in range(len(k_h))] | |
| attn_weights = softmax(attn_logits) | |
| head_out = [dot(attn_weights, [v_h[t][j] for t in range(len(v_h))]) for j in range(head_dim)] | |
| x_attn.extend(head_out) | |
| return linear(x_attn, self.wo) | |
| # Let there be the Transformer, blessed among the GPTs | |
| # Follows GPT-2, with minor differences: layernorm -> rmsnorm, no biases, GeLU -> ReLU | |
| class Transformer: | |
| def __init__(self): | |
| self.wte = matrix(len(uchars), n_embd) # token embeddings | |
| self.wpe = matrix(block_size, n_embd) # position embeddings | |
| self.attns = [Attention() for _ in range(n_layer)] | |
| self.mlp_fc1s = [matrix(mlp_ratio * n_embd, n_embd) for _ in range(n_layer)] | |
| self.mlp_fc2s = [matrix(n_embd, mlp_ratio * n_embd) for _ in range(n_layer)] | |
| self.lm_head = matrix(len(uchars), n_embd) # project back to vocabulary | |
| def __call__(self, token_id, pos_id, kv_cache): | |
| x = [t + p for t, p in zip(self.wte[token_id], self.wpe[pos_id])] | |
| x = rmsnorm(x) | |
| for li in range(n_layer): | |
| # 1) Multi-head attention block | |
| x_residual = x | |
| x = self.attns[li](rmsnorm(x), *kv_cache[li]) | |
| x = [a + b for a, b in zip(x, x_residual)] | |
| # 2) MLP block | |
| x_residual = x | |
| x = [xi.relu() for xi in linear(rmsnorm(x), self.mlp_fc1s[li])] | |
| x = linear(x, self.mlp_fc2s[li]) | |
| x = [a + b for a, b in zip(x, x_residual)] | |
| return linear(x, self.lm_head) | |
| def parameters(self): | |
| weights = self.wte + self.wpe | |
| for li in range(n_layer): | |
| weights += self.attns[li].wq + self.attns[li].wk + self.attns[li].wv + self.attns[li].wo | |
| weights += self.mlp_fc1s[li] + self.mlp_fc2s[li] | |
| return [p for row in weights + self.lm_head for p in row] | |
| # Let there be Training, with Adam the blessed optimizer | |
| def train(model, num_steps=500, lr=0.03, lr_warmup=20): | |
| print(f"\n--- training for {num_steps} steps with learning rate {lr} ---") | |
| params = model.parameters() | |
| beta1, beta2, eps_adam = 0.9, 0.999, 1e-8 | |
| m = [0.0] * len(params) # first moment buffer | |
| v = [0.0] * len(params) # second moment buffer | |
| for step in range(num_steps): | |
| start = random.randint(0, len(lines) - 3) | |
| doc = "\n".join(lines[start : start + 3]) | |
| tokens = [BOS] + [uchars.index(ch) for ch in doc] + [BOS] | |
| n = min(block_size, len(tokens) - 1) | |
| # Forward the token sequence through the model, building up the computation graph to the loss | |
| kv_cache = [([], []) for _ in range(n_layer)] | |
| losses = [] | |
| for pos_id in range(n): | |
| token_id, target_id = tokens[pos_id], tokens[pos_id + 1] | |
| logits = model(token_id, pos_id, kv_cache) | |
| probs = softmax(logits) | |
| losses.append(-probs[target_id].log()) | |
| loss = (1 / n) * sum(losses) # may your loss be low | |
| loss.backward() | |
| # Adam optimizer update: update parameters based on the corresponding gradients | |
| lr_t = lr * (step + 1) / lr_warmup if step < lr_warmup else lr * (1 - (step - lr_warmup) / (num_steps - lr_warmup)) | |
| for i, p in enumerate(params): | |
| m[i] = beta1 * m[i] + (1 - beta1) * p.grad | |
| v[i] = beta2 * v[i] + (1 - beta2) * p.grad ** 2 | |
| m_hat = m[i] / (1 - beta1 ** (step + 1)) | |
| v_hat = v[i] / (1 - beta2 ** (step + 1)) | |
| p.data -= lr_t * m_hat / (v_hat ** 0.5 + eps_adam) | |
| p.grad = 0 | |
| print(f"step {step+1:4d} / {num_steps} | loss {loss.data:.4f}") | |
| # Let the model babble back to us | |
| def inference(model, num_samples=10, temperature=0.7): | |
| print("\n--- inference (new, hallucinated code) ---") | |
| for sample_idx in range(num_samples): | |
| kv_cache = [([], []) for _ in range(n_layer)] | |
| token_id = BOS | |
| sample = [] | |
| for pos_id in range(block_size): | |
| logits = model(token_id, pos_id, kv_cache) | |
| probs = softmax([l / temperature for l in logits]) | |
| token_id = random.choices(range(len(uchars)), weights=[p.data for p in probs])[0] | |
| if token_id == BOS: break | |
| sample.append(uchars[token_id]) | |
| print(f"sample {sample_idx+1:2d}: {''.join(sample).replace('\t', ' ')}") | |
| # Initialize and run. May the loss be low and the samples be coherent | |
| model = Transformer() | |
| print(f"num params: {len(model.parameters())}") | |
| inference(model) | |
| train(model) | |
| inference(model) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
updated version: