Skip to content

Instantly share code, notes, and snippets.

@calilisantos
Created July 7, 2025 03:00
Show Gist options
  • Select an option

  • Save calilisantos/cd1dc6a39f0fed5a5291a5ddff6e304c to your computer and use it in GitHub Desktop.

Select an option

Save calilisantos/cd1dc6a39f0fed5a5291a5ddff6e304c to your computer and use it in GitHub Desktop.
Pyspark custom datasource for python.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## **Dependencies**"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import Row, SparkSession, types as T # pyspark==4.0.0\n",
"from pyspark.sql.datasource import DataSource, DataSourceReader # pyarrow required, used: pyarrow==19.0.1\n",
"import requests # used: requests==2.32.4\n",
"# optional, for jupyter notebook: ipykernel==6.26.0"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## **SparkSession**"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"spark = (\n",
" SparkSession.builder\n",
" .master(\"local[3]\")\n",
" .appName(\"pyspark_custom_datasource\")\n",
" .getOrCreate()\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## **ChuckNorrisDataSource statement**"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"25/07/06 23:57:30 WARN DataSourceManager: The data source chucknorris replaced a previously registered data source.\n"
]
}
],
"source": [
"class ChuckNorrisDataSource(DataSource):\n",
" \"\"\"\n",
" A DataSource for reading jokes from the Chuck Norris API.\n",
"\n",
" Name: `chucknorris`\n",
"\n",
" Schema: `id string, joke string, category string`\n",
"\n",
" Options:\n",
" count: int \n",
" The number of facts to be returned. Default is 1\n",
"\n",
" Examples\n",
" --------\n",
" Register the data source:\n",
"\n",
" >>> from your_module import ChuckNorrisDataSource\n",
" >>> spark.dataSource.register(ChuckNorrisDataSource)\n",
"\n",
" Load a few jokes (you can specify how many):\n",
"\n",
"\n",
" >>> spark.read.format(\"chucknorris\").option(\"count\", 5).load().show()\n",
" +--------------------+--------------------+-----------+\n",
" | id | joke| category|\n",
" +--------------------+--------------------+-----------+\n",
" | ykC28btrRCm4Vqev...| Chuck Norris can...| animal|\n",
" | ... | ... | ... |\n",
" +--------------------+--------------------+-----------+\n",
" \"\"\"\n",
"\n",
" @classmethod\n",
" def name(cls):\n",
" return \"chucknorris\"\n",
"\n",
" def schema(self):\n",
" return T.StructType([\n",
" T.StructField(name=\"id\", dataType=T.StringType(), nullable=False),\n",
" T.StructField(name=\"joke\", dataType=T.StringType(), nullable=False),\n",
" T.StructField(name=\"category\", dataType=T.StringType(), nullable=False)\n",
" ])\n",
"\n",
" def reader(self, schema):\n",
" return ChuckNorrisReader(self.options)\n",
"\n",
"\n",
"class ChuckNorrisReader(DataSourceReader):\n",
" def __init__(self, options):\n",
" self.count = int(options.get(\"count\", 1)) # Default: 1 joke\n",
"\n",
" def read(self, partition):\n",
" url = \"https://api.chucknorris.io/jokes/random\"\n",
" for _ in range(self.count):\n",
" response = requests.get(url)\n",
" response.raise_for_status()\n",
" joke_data = response.json()\n",
" yield Row(\n",
" id=joke_data.get(\"id\"),\n",
" joke=joke_data.get(\"value\"),\n",
" category=(\n",
" joke_data.get(\"categories\")[0]\n",
" if joke_data.get(\"categories\")\n",
" else \"uncategorized\"\n",
" )\n",
" )\n",
"\n",
"spark.dataSource.register(ChuckNorrisDataSource)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## **Reading ChuckNorrisDataSource**"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 5:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------------+-------------------------------------------------------------------------------------------------------+-------------+\n",
"|id |joke |category |\n",
"+----------------------+-------------------------------------------------------------------------------------------------------+-------------+\n",
"|mhmp-zhqszwuimbxfbkx8q|Chuck Norris's programs can pass the Turing Test by staring at the interrogator. |dev |\n",
"|xsycfh8_siooxxibfyb8nq|One time, at band camp, Chuck Norris ate a percussionist. |uncategorized|\n",
"|CktaV6IqRTCdj6ReXjKxjw|Lightning never strikes the same place twice, so that it is less likely to be caught by Chuck Norris. |uncategorized|\n",
"|ji2JjxdfQQ6e1YcgRaJ94Q|Chuck Norris put Lysol out of business, he kills odor too |uncategorized|\n",
"|wbLFE4n3Ri2jnPXDGlUPpA|Can Chuck Norris make a rock heavy enough that he can't lift it? Of course he can....he's Chuck Norris.|uncategorized|\n",
"+----------------------+-------------------------------------------------------------------------------------------------------+-------------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"# Getting 5 random facts\n",
"(\n",
" spark.read.format(\"chucknorris\")\n",
" .option(\"count\", 5)\n",
" .load()\n",
" .show(truncate=False)\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 6:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------------+----------------------------------------------+-------------+\n",
"|id |joke |category |\n",
"+----------------------+----------------------------------------------+-------------+\n",
"|iG2l1Co7SGOi58W_ha7Olg|Chuck Norris can \"shit fire and save matches\"!|uncategorized|\n",
"+----------------------+----------------------------------------------+-------------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"(\n",
" spark.read.format(\"chucknorris\")\n",
" .load()\n",
" .show(truncate=False)\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment