Created
July 7, 2025 03:00
-
-
Save calilisantos/cd1dc6a39f0fed5a5291a5ddff6e304c to your computer and use it in GitHub Desktop.
Pyspark custom datasource for python.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## **Dependencies**" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 31, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "from pyspark.sql import Row, SparkSession, types as T # pyspark==4.0.0\n", | |
| "from pyspark.sql.datasource import DataSource, DataSourceReader # pyarrow required, used: pyarrow==19.0.1\n", | |
| "import requests # used: requests==2.32.4\n", | |
| "# optional, for jupyter notebook: ipykernel==6.26.0" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## **SparkSession**" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 32, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "spark = (\n", | |
| " SparkSession.builder\n", | |
| " .master(\"local[3]\")\n", | |
| " .appName(\"pyspark_custom_datasource\")\n", | |
| " .getOrCreate()\n", | |
| ")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## **ChuckNorrisDataSource statement**" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 33, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "25/07/06 23:57:30 WARN DataSourceManager: The data source chucknorris replaced a previously registered data source.\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "class ChuckNorrisDataSource(DataSource):\n", | |
| " \"\"\"\n", | |
| " A DataSource for reading jokes from the Chuck Norris API.\n", | |
| "\n", | |
| " Name: `chucknorris`\n", | |
| "\n", | |
| " Schema: `id string, joke string, category string`\n", | |
| "\n", | |
| " Options:\n", | |
| " count: int \n", | |
| " The number of facts to be returned. Default is 1\n", | |
| "\n", | |
| " Examples\n", | |
| " --------\n", | |
| " Register the data source:\n", | |
| "\n", | |
| " >>> from your_module import ChuckNorrisDataSource\n", | |
| " >>> spark.dataSource.register(ChuckNorrisDataSource)\n", | |
| "\n", | |
| " Load a few jokes (you can specify how many):\n", | |
| "\n", | |
| "\n", | |
| " >>> spark.read.format(\"chucknorris\").option(\"count\", 5).load().show()\n", | |
| " +--------------------+--------------------+-----------+\n", | |
| " | id | joke| category|\n", | |
| " +--------------------+--------------------+-----------+\n", | |
| " | ykC28btrRCm4Vqev...| Chuck Norris can...| animal|\n", | |
| " | ... | ... | ... |\n", | |
| " +--------------------+--------------------+-----------+\n", | |
| " \"\"\"\n", | |
| "\n", | |
| " @classmethod\n", | |
| " def name(cls):\n", | |
| " return \"chucknorris\"\n", | |
| "\n", | |
| " def schema(self):\n", | |
| " return T.StructType([\n", | |
| " T.StructField(name=\"id\", dataType=T.StringType(), nullable=False),\n", | |
| " T.StructField(name=\"joke\", dataType=T.StringType(), nullable=False),\n", | |
| " T.StructField(name=\"category\", dataType=T.StringType(), nullable=False)\n", | |
| " ])\n", | |
| "\n", | |
| " def reader(self, schema):\n", | |
| " return ChuckNorrisReader(self.options)\n", | |
| "\n", | |
| "\n", | |
| "class ChuckNorrisReader(DataSourceReader):\n", | |
| " def __init__(self, options):\n", | |
| " self.count = int(options.get(\"count\", 1)) # Default: 1 joke\n", | |
| "\n", | |
| " def read(self, partition):\n", | |
| " url = \"https://api.chucknorris.io/jokes/random\"\n", | |
| " for _ in range(self.count):\n", | |
| " response = requests.get(url)\n", | |
| " response.raise_for_status()\n", | |
| " joke_data = response.json()\n", | |
| " yield Row(\n", | |
| " id=joke_data.get(\"id\"),\n", | |
| " joke=joke_data.get(\"value\"),\n", | |
| " category=(\n", | |
| " joke_data.get(\"categories\")[0]\n", | |
| " if joke_data.get(\"categories\")\n", | |
| " else \"uncategorized\"\n", | |
| " )\n", | |
| " )\n", | |
| "\n", | |
| "spark.dataSource.register(ChuckNorrisDataSource)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## **Reading ChuckNorrisDataSource**" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 34, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "[Stage 5:> (0 + 1) / 1]\r" | |
| ] | |
| }, | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "+----------------------+-------------------------------------------------------------------------------------------------------+-------------+\n", | |
| "|id |joke |category |\n", | |
| "+----------------------+-------------------------------------------------------------------------------------------------------+-------------+\n", | |
| "|mhmp-zhqszwuimbxfbkx8q|Chuck Norris's programs can pass the Turing Test by staring at the interrogator. |dev |\n", | |
| "|xsycfh8_siooxxibfyb8nq|One time, at band camp, Chuck Norris ate a percussionist. |uncategorized|\n", | |
| "|CktaV6IqRTCdj6ReXjKxjw|Lightning never strikes the same place twice, so that it is less likely to be caught by Chuck Norris. |uncategorized|\n", | |
| "|ji2JjxdfQQ6e1YcgRaJ94Q|Chuck Norris put Lysol out of business, he kills odor too |uncategorized|\n", | |
| "|wbLFE4n3Ri2jnPXDGlUPpA|Can Chuck Norris make a rock heavy enough that he can't lift it? Of course he can....he's Chuck Norris.|uncategorized|\n", | |
| "+----------------------+-------------------------------------------------------------------------------------------------------+-------------+\n", | |
| "\n" | |
| ] | |
| }, | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| " \r" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "# Getting 5 random facts\n", | |
| "(\n", | |
| " spark.read.format(\"chucknorris\")\n", | |
| " .option(\"count\", 5)\n", | |
| " .load()\n", | |
| " .show(truncate=False)\n", | |
| ")\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 35, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "[Stage 6:> (0 + 1) / 1]\r" | |
| ] | |
| }, | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "+----------------------+----------------------------------------------+-------------+\n", | |
| "|id |joke |category |\n", | |
| "+----------------------+----------------------------------------------+-------------+\n", | |
| "|iG2l1Co7SGOi58W_ha7Olg|Chuck Norris can \"shit fire and save matches\"!|uncategorized|\n", | |
| "+----------------------+----------------------------------------------+-------------+\n", | |
| "\n" | |
| ] | |
| }, | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| " \r" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "(\n", | |
| " spark.read.format(\"chucknorris\")\n", | |
| " .load()\n", | |
| " .show(truncate=False)\n", | |
| ")" | |
| ] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "venv", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.10.12" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 2 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment