Last active
October 2, 2025 12:13
-
-
Save alexlib/a90a343564aedc801428e7e7bcea3c3a to your computer and use it in GitHub Desktop.
Self-tutoring demo of reservoir computing using the list sorting algorithm
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "nbformat": 4, | |
| "nbformat_minor": 0, | |
| "metadata": { | |
| "colab": { | |
| "provenance": [], | |
| "authorship_tag": "ABX9TyPBdjEo9IvNJatonsTNrAz8", | |
| "include_colab_link": true | |
| }, | |
| "kernelspec": { | |
| "name": "python3", | |
| "display_name": "Python 3" | |
| }, | |
| "language_info": { | |
| "name": "python" | |
| } | |
| }, | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "view-in-github", | |
| "colab_type": "text" | |
| }, | |
| "source": [ | |
| "<a href=\"https://colab.research.google.com/gist/alexlib/a90a343564aedc801428e7e7bcea3c3a/untitled14.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 14, | |
| "metadata": { | |
| "colab": { | |
| "base_uri": "https://localhost:8080/" | |
| }, | |
| "id": "oZWhSzZ6z4Ei", | |
| "outputId": "76f28c63-7f10-42f1-cfc7-634f7cb5b67e" | |
| }, | |
| "outputs": [ | |
| { | |
| "output_type": "stream", | |
| "name": "stdout", | |
| "text": [ | |
| "Initializing a sorter for lists of length 5.\n", | |
| "Training on 10000 samples...\n", | |
| "Training complete.\n", | |
| "\n", | |
| "--- Testing the Sorter ---\n", | |
| "Original Unseen List: [26, 99, 34, 50, 50]\n", | |
| "Predicted Sorted List (Nearest Neighbors): [np.int64(26), np.int64(34), np.int64(50), np.int64(50), np.int64(99)]\n", | |
| "Actual Sorted List: [26, 34, 50, 50, 99]\n", | |
| "\n", | |
| "Success! The prediction matches the actual sorted list.\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "import numpy as np\n", | |
| "\n", | |
| "class ReservoirSorter:\n", | |
| " \"\"\"\n", | |
| " A conceptual sorting algorithm using Reservoir Computing.\n", | |
| "\n", | |
| " This sorter is trained to sort lists of a specific, fixed length.\n", | |
| " It is not a practical sorting algorithm but a demonstration of RC principles.\n", | |
| " \"\"\"\n", | |
| " def __init__(self, list_length, reservoir_size=100, spectral_radius=0.95, sparsity=0.1):\n", | |
| " \"\"\"\n", | |
| " Initializes the Reservoir Sorter.\n", | |
| "\n", | |
| " Args:\n", | |
| " list_length (int): The fixed length of lists this sorter will handle.\n", | |
| " reservoir_size (int): The number of neurons in the reservoir.\n", | |
| " spectral_radius (float): A parameter to control the reservoir's dynamics.\n", | |
| " sparsity (float): The proportion of zero-connections in the reservoir.\n", | |
| " \"\"\"\n", | |
| " print(f\"Initializing a sorter for lists of length {list_length}.\")\n", | |
| " self.list_length = list_length\n", | |
| " self.reservoir_size = reservoir_size\n", | |
| "\n", | |
| " # Input weights (maps a single number to the reservoir)\n", | |
| " self.W_in = np.random.rand(self.reservoir_size, 1) - 0.5\n", | |
| "\n", | |
| " # Reservoir internal weights\n", | |
| " W_res_raw = np.random.rand(self.reservoir_size, self.reservoir_size) - 0.5\n", | |
| " # Make the reservoir sparse\n", | |
| " W_res_raw[np.random.rand(*W_res_raw.shape) < sparsity] = 0\n", | |
| "\n", | |
| " # Scale W_res by its spectral radius (the largest absolute eigenvalue)\n", | |
| " radius = np.max(np.abs(np.linalg.eigvals(W_res_raw)))\n", | |
| " self.W_res = W_res_raw * (spectral_radius / radius)\n", | |
| "\n", | |
| " # Output weights are learned during training\n", | |
| " self.W_out = None\n", | |
| "\n", | |
| " def _run_reservoir(self, input_list):\n", | |
| " \"\"\"Feeds a list through the reservoir and returns the final state.\"\"\"\n", | |
| " state = np.zeros((self.reservoir_size, 1))\n", | |
| " for number in input_list:\n", | |
| " # The core reservoir update equation\n", | |
| " input_signal = np.array([[number]])\n", | |
| " state = np.tanh(self.W_res @ state + self.W_in @ input_signal)\n", | |
| " return state\n", | |
| "\n", | |
| " def train(self, training_data):\n", | |
| " \"\"\"\n", | |
| " Trains the readout layer (W_out) to map final states to sorted lists.\n", | |
| "\n", | |
| " Args:\n", | |
| " training_data (list): A list of tuples, where each tuple is\n", | |
| " (unsorted_list, sorted_list).\n", | |
| " \"\"\"\n", | |
| " print(f\"Training on {len(training_data)} samples...\")\n", | |
| "\n", | |
| " # Collect final reservoir states for all training examples\n", | |
| " states = []\n", | |
| " targets = []\n", | |
| "\n", | |
| " for unsorted, sorted_list in training_data:\n", | |
| " if len(unsorted) != self.list_length:\n", | |
| " continue # Skip data that doesn't match our fixed size\n", | |
| "\n", | |
| " final_state = self._run_reservoir(unsorted)\n", | |
| " states.append(final_state.flatten()) # Flatten for matrix operations\n", | |
| " targets.append(sorted_list)\n", | |
| "\n", | |
| " # Convert to numpy matrices\n", | |
| " X = np.array(states) # Each row is a final state\n", | |
| " Y = np.array(targets) # Each row is a target sorted list\n", | |
| "\n", | |
| " # Train W_out using a simple linear regression (pseudo-inverse method)\n", | |
| " # We want to find W_out such that: W_out @ X.T = Y.T\n", | |
| " # The solution is W_out = Y.T @ pinv(X.T)\n", | |
| " X_T = X.T\n", | |
| " self.W_out = Y.T @ np.linalg.pinv(X_T)\n", | |
| " print(\"Training complete.\")\n", | |
| "\n", | |
| " def sort(self, unsorted_list):\n", | |
| " \"\"\"\n", | |
| " Uses the trained model to \"sort\" a new list by finding nearest neighbors\n", | |
| " in the original list based on the reservoir's output.\n", | |
| "\n", | |
| " Args:\n", | |
| " unsorted_list (list): The list of numbers to sort.\n", | |
| "\n", | |
| " Returns:\n", | |
| " list: The original list values ordered based on the reservoir's prediction.\n", | |
| " \"\"\"\n", | |
| " if self.W_out is None:\n", | |
| " raise RuntimeError(\"The sorter has not been trained yet. Call .train() first.\")\n", | |
| "\n", | |
| " if len(unsorted_list) != self.list_length:\n", | |
| " raise ValueError(f\"This sorter only works for lists of length {self.list_length}.\")\n", | |
| "\n", | |
| " # Get the \"fingerprint\" of the new list\n", | |
| " final_state = self._run_reservoir(unsorted_list)\n", | |
| "\n", | |
| " # Use the trained readout layer to predict the sorted list values (these are continuous)\n", | |
| " predicted_values = (self.W_out @ final_state).flatten()\n", | |
| "\n", | |
| " # Find the nearest neighbor in the original unsorted_list for each predicted value\n", | |
| " original_values = np.array(unsorted_list)\n", | |
| " predicted_sorted_original_values = []\n", | |
| "\n", | |
| " # To handle potential duplicates in the original list correctly,\n", | |
| " # we'll remove values from a temporary copy as they are assigned.\n", | |
| " temp_original_values = list(original_values)\n", | |
| "\n", | |
| " for predicted_val in predicted_values:\n", | |
| " # Find the index of the nearest value in the temporary original list\n", | |
| " nearest_index = np.argmin(np.abs(temp_original_values - predicted_val))\n", | |
| " # Get the nearest value\n", | |
| " nearest_value = temp_original_values.pop(nearest_index)\n", | |
| " predicted_sorted_original_values.append(nearest_value)\n", | |
| "\n", | |
| " return predicted_sorted_original_values\n", | |
| "\n", | |
| "\n", | |
| "# --- Main execution block ---\n", | |
| "if __name__ == \"__main__\":\n", | |
| " # 1. Define parameters\n", | |
| " LIST_LENGTH = 5 # The sorter will ONLY work for lists of this length\n", | |
| " NUM_SAMPLES = 10000 # Number of random lists to train on\n", | |
| "\n", | |
| " # 2. Generate training data\n", | |
| " training_data = []\n", | |
| " for _ in range(NUM_SAMPLES):\n", | |
| " # Create a random list of numbers between 0 and 100\n", | |
| " unsorted = np.random.randint(0, 101, size=LIST_LENGTH).tolist()\n", | |
| " sorted_list = sorted(unsorted)\n", | |
| " training_data.append((unsorted, sorted_list))\n", | |
| "\n", | |
| " # 3. Initialize and train the sorter\n", | |
| " sorter = ReservoirSorter(list_length=LIST_LENGTH, reservoir_size=200)\n", | |
| " sorter.train(training_data)\n", | |
| "\n", | |
| " # 4. Test the trained sorter on a NEW, unseen list\n", | |
| " print(\"\\n--- Testing the Sorter ---\")\n", | |
| " test_list = np.random.randint(0, 101, size=LIST_LENGTH).tolist()\n", | |
| "\n", | |
| " # Get the predicted sorted list using nearest neighbors from the original list\n", | |
| " predicted_sorted_nn = sorter.sort(test_list)\n", | |
| "\n", | |
| " # Get the actual sorted list for comparison\n", | |
| " actual_sorted = sorted(test_list)\n", | |
| "\n", | |
| " print(f\"Original Unseen List: {test_list}\")\n", | |
| " print(f\"Predicted Sorted List (Nearest Neighbors): {predicted_sorted_nn}\")\n", | |
| " print(f\"Actual Sorted List: {actual_sorted}\")\n", | |
| "\n", | |
| " if np.array_equal(predicted_sorted_nn, actual_sorted):\n", | |
| " print(\"\\nSuccess! The prediction matches the actual sorted list.\")\n", | |
| " else:\n", | |
| " print(\"\\nFailure. The prediction does not match the actual sorted list.\")" | |
| ] | |
| } | |
| ] | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment