Skip to content

Instantly share code, notes, and snippets.

@hgarrereyn
Created June 3, 2020 23:22
Show Gist options
  • Select an option

  • Save hgarrereyn/64ce87cbbcbe9c34ccdd13eafe49e3fb to your computer and use it in GitHub Desktop.

Select an option

Save hgarrereyn/64ce87cbbcbe9c34ccdd13eafe49e3fb to your computer and use it in GitHub Desktop.
Colab Beam Demo.ipynb
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "Colab Beam Demo.ipynb",
"provenance": [],
"authorship_tag": "ABX9TyO22+1zWktMDTJZNWajwm3K",
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/hgarrereyn/64ce87cbbcbe9c34ccdd13eafe49e3fb/colab-beam-demo.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2OfgUDz8Ytl9",
"colab_type": "text"
},
"source": [
"# Install Beam 2.22.0 from source\n",
"\n",
"**Restart runtime after this cell completes**"
]
},
{
"cell_type": "code",
"metadata": {
"id": "An2-QIb4YkJF",
"colab_type": "code",
"colab": {}
},
"source": [
"!git clone https://github.com/apache/beam\n",
"!cd beam && git checkout release-2.22.0\n",
"!cd beam/sdks/python && \\\n",
" pip install -q -r build-requirements.txt && \\\n",
" pip install -q -e .[gcp]"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "wNPqZV5XZjIa",
"colab_type": "text"
},
"source": [
"# Authenticate GCP and configure a PubSub topic"
]
},
{
"cell_type": "code",
"metadata": {
"id": "rI2NnfTTZmQ9",
"colab_type": "code",
"colab": {}
},
"source": [
"from google.colab import auth\n",
"auth.authenticate_user()"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "XBimhtr8Zs1K",
"colab_type": "code",
"colab": {}
},
"source": [
"PROJECT = '<my_gcp_project>'\n",
"TOPIC = '<my_topic>'\n",
"\n",
"import os\n",
"os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT\n",
"TOPIC_STR = f'projects/{PROJECT}/topics/{TOPIC}'"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "i_4d8iaUaNXn",
"colab_type": "code",
"colab": {}
},
"source": [
"import apache_beam as beam\n",
"from apache_beam.options import pipeline_options"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "dgxNMqJKajcj",
"colab_type": "text"
},
"source": [
"# Example pipeline\n",
"\n",
"Run this pipeline and publish a message to the pubsub topic to see it printed here. After killing and re-running the pipeline, each message is printed twice. Killing and re-running again and you see each message three times, etc...\n",
"\n",
"If you go into GCP and manually delete the beam subscriptions before re-running the pipeline, each message is just printed once."
]
},
{
"cell_type": "code",
"metadata": {
"id": "-23Lfu9NaV-t",
"colab_type": "code",
"colab": {}
},
"source": [
"def passthrough(x):\n",
" print(x)\n",
" return x\n",
"\n",
"options = pipeline_options.PipelineOptions(streaming=True)\n",
"\n",
"with beam.Pipeline(options=options) as p:\n",
" # read messages from pubsub\n",
" messages = p | beam.io.ReadFromPubSub(\n",
" topic=TOPIC_STR\n",
" )\n",
"\n",
" # log messages\n",
" messages | beam.Map(passthrough)"
],
"execution_count": 0,
"outputs": []
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment