Skip to content

Instantly share code, notes, and snippets.

@gcleaves
Last active January 7, 2026 07:37
Show Gist options
  • Select an option

  • Save gcleaves/d78f0538008d0ccba112703d381baead to your computer and use it in GitHub Desktop.

Select an option

Save gcleaves/d78f0538008d0ccba112703d381baead to your computer and use it in GitHub Desktop.
Keycloak Superset

Some tweaks to make Keycloak and GizmoSQL work in my testing.

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# This is a copy/edit of the duckdb.py file.
# This file has been created solely for the purpose of fixing
# Time Grains for GizmoSQL in Superset. It seems to work
# but almost no testing has been done.
from __future__ import annotations
import re
from datetime import datetime
from re import Pattern
from typing import Any, TYPE_CHECKING, TypedDict
from apispec import APISpec
from apispec.ext.marshmallow import MarshmallowPlugin
from flask_babel import gettext as __
from marshmallow import fields, Schema
from sqlalchemy import types
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.url import URL
from superset.config import VERSION_STRING
from superset.constants import TimeGrain #, USER_AGENT
from superset.databases.utils import make_url_safe
from superset.db_engine_specs.base import BaseEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
if TYPE_CHECKING:
# prevent circular imports
from superset.models.core import Database
COLUMN_DOES_NOT_EXIST_REGEX = re.compile("no such column: (?P<column_name>.+)")
DEFAULT_ACCESS_TOKEN_URL = (
"https://app.motherduck.com/token-request?appName=Superset&close=y" # noqa: S105
)
# schema for adding a database by providing parameters instead of the
# full SQLAlchemy URI
class GizmoSQLParametersSchema(Schema):
access_token = fields.String(
allow_none=True,
metadata={"description": __("MotherDuck token")},
load_default=DEFAULT_ACCESS_TOKEN_URL,
)
database = fields.String(
required=False, metadata={"description": __("Database name")}
)
query = fields.Dict(
keys=fields.Str(),
values=fields.Raw(),
metadata={"description": __("Additional parameters")},
)
class GizmoSQLParametersType(TypedDict, total=False):
access_token: str | None
database: str
query: dict[str, Any]
class GizmoSQLPropertiesType(TypedDict):
parameters: GizmoSQLParametersType
class GizmoSQLParametersMixin:
"""
Mixin for configuring DB engine specs via a dictionary.
With this mixin the SQLAlchemy engine can be configured through
individual parameters, instead of the full SQLAlchemy URI. This
mixin is for DuckDB:
duckdb:///file_path[?key=value&key=value...]
duckdb:///md:database[?key=value&key=value...]
"""
engine = "gizmosql"
# schema describing the parameters used to configure the DB
parameters_schema = GizmoSQLParametersSchema()
# recommended driver name for the DB engine spec
default_driver = ""
# query parameter to enable encryption in the database connection
# for Postgres this would be `{"sslmode": "verify-ca"}`, eg.
encryption_parameters: dict[str, str] = {}
@staticmethod
def _is_motherduck(database: str) -> bool:
return "md:" in database
@classmethod
def build_sqlalchemy_uri( # pylint: disable=unused-argument
cls,
parameters: GizmoSQLParametersType,
encrypted_extra: dict[str, str] | None = None,
) -> str:
"""
Build SQLAlchemy URI for connecting to a DuckDB database.
If an access token is specified, return a URI to connect to a MotherDuck database.
""" # noqa: E501
if parameters is None:
parameters = {}
query = parameters.get("query", {})
database = parameters.get("database", ":memory:")
token = parameters.get("access_token")
if cls._is_motherduck(database) or (
token and token != DEFAULT_ACCESS_TOKEN_URL
):
return MotherDuckEngineSpec.build_sqlalchemy_uri(parameters)
return str(URL(drivername=cls.engine, database=database, query=query))
@classmethod
def get_parameters_from_uri( # pylint: disable=unused-argument
cls, uri: str, encrypted_extra: dict[str, Any] | None = None
) -> GizmoSQLParametersType:
url = make_url_safe(uri)
query = {
key: value
for (key, value) in url.query.items()
if (key, value) not in cls.encryption_parameters.items()
}
access_token = query.pop("motherduck_token", "")
return {
"access_token": access_token,
"database": url.database,
"query": query,
}
@classmethod
def validate_parameters(
cls, properties: GizmoSQLPropertiesType
) -> list[SupersetError]:
"""
Validates any number of parameters, for progressive validation.
"""
errors: list[SupersetError] = []
parameters = properties.get("parameters", {})
if cls._is_motherduck(parameters.get("database", "")):
required = {"access_token"}
else:
required = set()
present = {key for key in parameters if parameters.get(key, ())}
if missing := sorted(required - present):
errors.append(
SupersetError(
message=f'One or more parameters are missing: {", ".join(missing)}',
error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
level=ErrorLevel.WARNING,
extra={"missing": missing},
),
)
return errors
@classmethod
def parameters_json_schema(cls) -> Any:
"""
Return configuration parameters as OpenAPI.
"""
if not cls.parameters_schema:
return None
spec = APISpec(
title="Database Parameters",
version="1.0.0",
openapi_version="3.0.2",
plugins=[MarshmallowPlugin()],
)
spec.components.schema(cls.__name__, schema=cls.parameters_schema)
return spec.to_dict()["components"]["schemas"][cls.__name__]
class GizmoSQLEngineSpec(GizmoSQLParametersMixin, BaseEngineSpec):
engine = "gizmosql"
engine_name = "GizmoSQL"
default_driver = "gizmosql_engine"
sqlalchemy_uri_placeholder = "gizmosql://gizmosql_username:gizmosql_password@localhost:31337?disableCertificateVerification=True&useEncryption=True"
_time_grain_expressions = {
None: "{col}",
TimeGrain.SECOND: "DATE_TRUNC('second', {col})",
TimeGrain.MINUTE: "DATE_TRUNC('minute', {col})",
TimeGrain.HOUR: "DATE_TRUNC('hour', {col})",
TimeGrain.DAY: "DATE_TRUNC('day', {col})",
TimeGrain.WEEK: "DATE_TRUNC('week', {col})",
TimeGrain.MONTH: "DATE_TRUNC('month', {col})",
TimeGrain.QUARTER: "DATE_TRUNC('quarter', {col})",
TimeGrain.YEAR: "DATE_TRUNC('year', {col})",
}
custom_errors: dict[Pattern[str], tuple[str, SupersetErrorType, dict[str, Any]]] = {
COLUMN_DOES_NOT_EXIST_REGEX: (
__('We can\'t seem to resolve the column "%(column_name)s"'),
SupersetErrorType.COLUMN_DOES_NOT_EXIST_ERROR,
{},
),
}
@classmethod
def epoch_to_dttm(cls) -> str:
return "datetime({col}, 'unixepoch')"
@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: dict[str, Any] | None = None
) -> str | None:
sqla_type = cls.get_sqla_column_type(target_type)
if isinstance(sqla_type, (types.String, types.DateTime)):
return f"""'{dttm.isoformat(sep=" ", timespec="microseconds")}'"""
return None
@classmethod
def get_table_names(
cls, database: Database, inspector: Inspector, schema: str | None
) -> set[str]:
return set(inspector.get_table_names(schema))
'''
@staticmethod
def get_extra_params(database: Database) -> dict[str, Any]:
"""
Add a user agent to be used in the requests.
"""
extra: dict[str, Any] = BaseEngineSpec.get_extra_params(database)
engine_params: dict[str, Any] = extra.setdefault("engine_params", {})
connect_args: dict[str, Any] = engine_params.setdefault("connect_args", {})
config: dict[str, Any] = connect_args.setdefault("config", {})
custom_user_agent = config.pop("custom_user_agent", "")
delim = " " if custom_user_agent else ""
user_agent = "placeholder" #USER_AGENT.replace(" ", "-").lower()
user_agent = f"{user_agent}/{VERSION_STRING}{delim}{custom_user_agent}"
config.setdefault("custom_user_agent", user_agent)
return extra
'''
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# This file is included in the final Docker image and SHOULD be overridden when
# deploying the image to prod. Settings configured here are intended for use in local
# development environments. Also note that superset_config_docker.py is imported
# as a final step as a means to override "defaults" configured here
#
import logging
import os
import sys
from celery.schedules import crontab
from flask_caching.backends.filesystemcache import FileSystemCache
import urllib
import json
import jwt
from flask import flash
from flask_appbuilder.security.manager import AUTH_OAUTH
from superset.security import SupersetSecurityManager
from flask_appbuilder.security.views import AuthOAuthView
from flask_appbuilder.security.views import expose
AUTH_TYPE = AUTH_OAUTH
LOGOUT_REDIRECT_URL='https://<hidden>/realms/CleavesAI/protocol/openid-connect/logout'
OAUTH_PROVIDERS = [
{
'name':'Keycloak',
'token_key':'access_token',
'icon':'fa-key',
'remote_app': {
'client_id':'superset',
'client_secret':'<hidden>',
"client_kwargs": {"scope": "openid email profile", 'roles_key': 'client_access.roles'},
'server_metadata_url': 'https://<hidden>/realms/cleavesai/.well-known/openid-configuration',
'api_base_url': 'https://<hidden>/realms/CleavesAI/protocol',
'access_token_url': 'https://<hidden>/realms/CleavesAI/protocol/openid-connect/token',
'authorize_url': 'https://<hidden>/realms/CleavesAI/protocol/openid-connect/auth',
"request_token_url": None,
}
}
]
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Gamma"
AUTH_ROLES_SYNC_AT_LOGIN = True
AUTH_ROLES_MAPPING = {
"superset": ["Public"],
"admin": ["Admin"],
"alpha": ["Alpha"],
"gamma": ["Gamma"],
}
JWT_ALGORITHM = "RS256"
# URL to the public key endpoint
public_key_url = "https://<hidden>/realms/CleavesAI"
def fetch_keycloak_rs256_public_cert():
with urllib.request.urlopen(public_key_url) as response: # noqa: S310
public_key_url_response = json.load(response)
public_key = public_key_url_response["public_key"]
if public_key:
pem_lines = [
"-----BEGIN PUBLIC KEY-----",
public_key,
"-----END PUBLIC KEY-----",
]
cert_pem = "\n".join(pem_lines)
else:
cert_pem = "No cert found"
return cert_pem
JWT_PUBLIC_KEY = fetch_keycloak_rs256_public_cert()
# Source - https://stackoverflow.com/a/77344061
# Posted by user22786882
# Retrieved 2026-01-02, License - CC BY-SA 4.0
class CustomSsoAuthOAuthView(AuthOAuthView):
@expose("/logout/")
def logout(self):
logging.info("CustomSsoAuthOAuthView logout")
keycloak_logout_url = "https://<hidden>/realms/CleavesAI/protocol/openid-connect/logout"
logout_redirect_url = 'https://superset.cleaves.ai'
client_id = 'superset'
self.appbuilder.app.config["LOGOUT_REDIRECT_URL"] = ("{0}?client_id={1}&post_logout_redirect_uri={2}".format(
keycloak_logout_url,
client_id,
logout_redirect_url,
))
logging.info("LOGOUT_REDIRECT_URL: %s", self.appbuilder.app.config.get("LOGOUT_REDIRECT_URL"))
return super().logout()
class CustomSsoSecurityManager(SupersetSecurityManager):
authoauthview = CustomSsoAuthOAuthView
def oauth_user_info(self, provider, response=None):
logging.debug("Oauth2 provider: {0}.".format(provider))
if provider == 'Keycloak':
# As example, this line request a GET to base_url + '/' + userDetails with Bearer Authentication,
# and expects that authorization server checks the token, and response with us
me = self.appbuilder.sm.oauth_remotes[provider].get("protocol/openid-connect/userinfo")
me.raise_for_status()
data = me.json()
logging.info("User info from Keycloak: %s", data)
access_token = response.get('access_token')
decoded_token = jwt.decode(access_token, options={"verify_signature": False})
logging.info("decoded token: %s", decoded_token)
roles = decoded_token.get('resource_access', {}).get('superset',{}).get('roles', ['none'])
logging.info("roles: %s", roles)
return {
"username": data.get("preferred_username", ""),
"first_name": data.get("given_name", ""),
"last_name": data.get("family_name", ""),
"email": data.get("email", ""),
"role_keys": roles,
}
def auth_user_oauth(self, userinfo):
"""Override to restrict login to @<hidden> domain only"""
email = userinfo.get("email", "")
if not email.endswith("@<hidden>"):
logging.warning(f"Login denied for user with email: {email} - not in allowed domain")
flash(
"Access denied.",
"danger"
)
return None
return super().auth_user_oauth(userinfo)
def load_user_jwt(self, _jwt_header, jwt_data):
username = jwt_data["preferred_username"]
user = self.find_user(username=username)
logging.info("User jwt: %s", jwt_data)
if user.is_active:
# Set flask g.user to JWT user, we can't do it on before request
g.user = user
return user
return None
CUSTOM_SECURITY_MANAGER = CustomSsoSecurityManager
JINJA_CONTEXT_ADDONS = {
's3_hourly': "'s3://<hidden>/**/*H*.parquet'",
's3_daily': "'s3://<hidden>/**/data-[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]-*.parquet'",
}
logger = logging.getLogger()
DATABASE_DIALECT = os.getenv("DATABASE_DIALECT")
DATABASE_USER = os.getenv("DATABASE_USER")
DATABASE_PASSWORD = os.getenv("DATABASE_PASSWORD")
DATABASE_HOST = os.getenv("DATABASE_HOST")
DATABASE_PORT = os.getenv("DATABASE_PORT")
DATABASE_DB = os.getenv("DATABASE_DB")
EXAMPLES_USER = os.getenv("EXAMPLES_USER")
EXAMPLES_PASSWORD = os.getenv("EXAMPLES_PASSWORD")
EXAMPLES_HOST = os.getenv("EXAMPLES_HOST")
EXAMPLES_PORT = os.getenv("EXAMPLES_PORT")
EXAMPLES_DB = os.getenv("EXAMPLES_DB")
# The SQLAlchemy connection string.
SQLALCHEMY_DATABASE_URI = (
f"{DATABASE_DIALECT}://"
f"{DATABASE_USER}:{DATABASE_PASSWORD}@"
f"{DATABASE_HOST}:{DATABASE_PORT}/{DATABASE_DB}"
)
SQLALCHEMY_EXAMPLES_URI = (
f"{DATABASE_DIALECT}://"
f"{EXAMPLES_USER}:{EXAMPLES_PASSWORD}@"
f"{EXAMPLES_HOST}:{EXAMPLES_PORT}/{EXAMPLES_DB}"
)
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = os.getenv("REDIS_PORT", "6379")
REDIS_CELERY_DB = os.getenv("REDIS_CELERY_DB", "0")
REDIS_RESULTS_DB = os.getenv("REDIS_RESULTS_DB", "1")
RESULTS_BACKEND = FileSystemCache("/app/superset_home/sqllab")
CACHE_CONFIG = {
"CACHE_TYPE": "RedisCache",
"CACHE_DEFAULT_TIMEOUT": 300,
"CACHE_KEY_PREFIX": "superset_",
"CACHE_REDIS_HOST": REDIS_HOST,
"CACHE_REDIS_PORT": REDIS_PORT,
"CACHE_REDIS_DB": REDIS_RESULTS_DB,
}
DATA_CACHE_CONFIG = CACHE_CONFIG
class CeleryConfig:
broker_url = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_CELERY_DB}"
imports = (
"superset.sql_lab",
"superset.tasks.scheduler",
"superset.tasks.thumbnails",
"superset.tasks.cache",
)
result_backend = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_RESULTS_DB}"
worker_prefetch_multiplier = 1
task_acks_late = False
beat_schedule = {
"reports.scheduler": {
"task": "reports.scheduler",
"schedule": crontab(minute="*", hour="*"),
},
"reports.prune_log": {
"task": "reports.prune_log",
"schedule": crontab(minute=10, hour=0),
},
}
CELERY_CONFIG = CeleryConfig
FEATURE_FLAGS = {"ALERT_REPORTS": True, "DASHBOARD_RBAC": True, "ENABLE_TEMPLATE_PROCESSING": True}
ALERT_REPORTS_NOTIFICATION_DRY_RUN = True
WEBDRIVER_BASEURL = "http://superset:8088/" # When using docker compose baseurl should be http://superset_app:8088/ # noqa: E501
# The base URL for the email report hyperlinks.
WEBDRIVER_BASEURL_USER_FRIENDLY = WEBDRIVER_BASEURL
SQLLAB_CTAS_NO_LIMIT = True
log_level_text = os.getenv("SUPERSET_LOG_LEVEL", "INFO")
LOG_LEVEL = getattr(logging, log_level_text.upper(), logging.INFO)
if os.getenv("CYPRESS_CONFIG") == "true":
# When running the service as a cypress backend, we need to import the config
# located @ tests/integration_tests/superset_test_config.py
base_dir = os.path.dirname(__file__)
module_folder = os.path.abspath(
os.path.join(base_dir, "../../tests/integration_tests/")
)
sys.path.insert(0, module_folder)
from superset_test_config import * # noqa
sys.path.pop(0)
#
# Optionally import superset_config_docker.py (which will have been included on
# the PYTHONPATH) in order to allow for local settings to be overridden
#
try:
import superset_config_docker
from superset_config_docker import * # noqa
logger.info(
f"Loaded your Docker configuration at " f"[{superset_config_docker.__file__}]"
)
except ImportError:
logger.info("Using default Docker config...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment