Databricks Integrations with Python

Problem

You are building an interactive Python application which needs to act as the person visiting the application when accessing a private Databricks resource. The content must authenticate to Databricks using the viewer’s Databricks credentials. For example, reading data from a Databricks Unity Catalog table that has row-level permissions and each content viewer has a different level of data access.

Solution

The examples below illustrate how to use the OAuth integrations feature of Posit Connect to delegate authorization to the data provider. These examples use the viewer’s OAuth access token to call the Databricks API on behalf of the content viewer, allowing content to access private Databricks resources as the content viewer.

When publishing the content to Connect make sure the following environment variables are set for the deployed content:

  • DATABRICKS_HOST
  • DATABRICKS_PATH
requirements.txt
databricks-sql-connector==3.3.0
databricks-sdk==0.29.0
dash==2.15.0
posit-sdk>=0.4.0
app.py
# -*- coding: utf-8 -*-
# mypy: ignore-errors
import os

import flask
import pandas as pd
from dash import Dash, Input, Output, dash_table, html
from databricks import sql
from databricks.sdk.core import ApiClient, Config, databricks_cli
from databricks.sdk.service.iam import CurrentUserAPI
from posit.connect.external.databricks import PositCredentialsStrategy

DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_HOST_URL = f"https://{DATABRICKS_HOST}"
SQL_HTTP_PATH = os.getenv("DATABRICKS_PATH")

df = None
app = Dash(__name__)

app.layout = html.Div(
    children=[
        html.Div(id="greeting", children="Loading..."),
        html.Div(id="table-container"),
        html.Div(id="dummy"),  # dummy element to trigger callback on page load
    ]
)


@app.callback(
    [Output("table-container", "children"), Output("greeting", "children")],
    Input("dummy", "children"),
)
def update_page(_):
    """
    Dash example application that shows user information and
    the first few rows from a table hosted in Databricks.
    """
    session_token = flask.request.headers.get("Posit-Connect-User-Session-Token")
    posit_strategy = PositCredentialsStrategy(
        local_strategy=databricks_cli, user_session_token=session_token
    )
    cfg = Config(
        host=DATABRICKS_HOST_URL,
        # uses Posit's custom credential_strategy if running on Connect,
        # otherwise falls back to the strategy defined by local_strategy
        credentials_strategy=posit_strategy,
    )

    def get_greeting():
        databricks_user_info = CurrentUserAPI(ApiClient(cfg)).me()
        return f"Hello, {databricks_user_info.display_name}!"

    def get_table():
        global df

        if df is None:
            query = "SELECT * FROM samples.nyctaxi.trips LIMIT 10;"

            with sql.connect(
                server_hostname=DATABRICKS_HOST,
                http_path=SQL_HTTP_PATH,
                # https://github.com/databricks/databricks-sql-python/issues/148#issuecomment-2271561365
                credentials_provider=posit_strategy.sql_credentials_provider(cfg),
            ) as connection:
                with connection.cursor() as cursor:
                    cursor.execute(query)
                    rows = cursor.fetchall()
                    df = pd.DataFrame(
                        rows, columns=[col[0] for col in cursor.description]
                    )

        table = dash_table.DataTable(
            id="table",
            columns=[{"name": i, "id": i} for i in df.columns],
            data=df.to_dict("records"),
            style_table={"overflowX": "scroll"},
        )
        return table

    return get_table(), get_greeting()


if __name__ == "__main__":
    app.run(debug=True)

Running the app locally

Terminal
export DATABRICKS_HOST="<databricks-sql-warehouse-server-hostname>"
export DATABRICKS_PATH="<databricks-sql-warehouse-http-path>"
python app.py
requirements.txt
databricks-sql-connector==3.3.0
databricks-sdk==0.29.0
fastapi==0.110.0
posit-sdk>=0.4.0
app.py
# -*- coding: utf-8 -*-
# mypy: ignore-errors
import os
from typing import Annotated

from databricks import sql
from databricks.sdk.core import Config, databricks_cli
from fastapi import FastAPI, Header
from fastapi.responses import JSONResponse
from posit.connect.external.databricks import PositCredentialsStrategy

DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_HOST_URL = f"https://{DATABRICKS_HOST}"
SQL_HTTP_PATH = os.getenv("DATABRICKS_PATH")

rows = None
app = FastAPI()


@app.get("/fares")
async def get_fares(
    posit_connect_user_session_token: Annotated[str | None, Header()] = None,
) -> JSONResponse:
    """
    FastAPI example API that returns the first few rows from
    a table hosted in Databricks.
    """
    global rows

    posit_strategy = PositCredentialsStrategy(
        local_strategy=databricks_cli,
        user_session_token=posit_connect_user_session_token,
    )
    cfg = Config(
        host=DATABRICKS_HOST_URL,
        # uses Posit's custom credential_strategy if running on Connect,
        # otherwise falls back to the strategy defined by local_strategy
        credentials_strategy=posit_strategy,
    )

    if rows is None:
        query = "SELECT * FROM samples.nyctaxi.trips LIMIT 10;"

        with sql.connect(
            server_hostname=DATABRICKS_HOST,
            http_path=SQL_HTTP_PATH,
            # https://github.com/databricks/databricks-sql-python/issues/148#issuecomment-2271561365
            credentials_provider=posit_strategy.sql_credentials_provider(cfg),
        ) as connection:
            with connection.cursor() as cursor:
                cursor.execute(query)
                rows = cursor.fetchall()

    return [row.asDict() for row in rows]

Running the app locally

Terminal
export DATABRICKS_HOST="<databricks-sql-warehouse-server-hostname>"
export DATABRICKS_PATH="<databricks-sql-warehouse-http-path>"
uvicorn app:app --reload
requirements.txt
databricks-sql-connector==3.3.0
databricks-sdk==0.29.0
flask==3.0.2
posit-sdk>=0.4.0
app.py
# -*- coding: utf-8 -*-
# mypy: ignore-errors
import os

from databricks import sql
from databricks.sdk.core import Config, databricks_cli
from flask import Flask, request
from posit.connect.external.databricks import PositCredentialsStrategy

DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_HOST_URL = f"https://{DATABRICKS_HOST}"
SQL_HTTP_PATH = os.getenv("DATABRICKS_PATH")

rows = None
app = Flask(__name__)


@app.route("/")
def usage():
    return "<p>Try: <pre>GET /fares<pre></p>"


@app.route("/fares")
def get_fares():
    """
    Flask example API that returns the first few rows from
    a table hosted in Databricks.
    """
    global rows

    session_token = request.headers.get("Posit-Connect-User-Session-Token")
    posit_strategy = PositCredentialsStrategy(
        local_strategy=databricks_cli, user_session_token=session_token
    )
    cfg = Config(
        host=DATABRICKS_HOST_URL,
        # uses Posit's custom credential_strategy if running on Connect,
        # otherwise falls back to the strategy defined by local_strategy
        credentials_strategy=posit_strategy,
    )

    if rows is None:
        query = "SELECT * FROM samples.nyctaxi.trips LIMIT 10;"

        with sql.connect(
            server_hostname=DATABRICKS_HOST,
            http_path=SQL_HTTP_PATH,
            # https://github.com/databricks/databricks-sql-python/issues/148#issuecomment-2271561365
            credentials_provider=posit_strategy.sql_credentials_provider(cfg),
        ) as connection:
            with connection.cursor() as cursor:
                cursor.execute(query)
                rows = cursor.fetchall()

    return [row.asDict() for row in rows]


if __name__ == "__main__":
    app.run(debug=True)

Running the app locally

Terminal
export DATABRICKS_HOST="<databricks-sql-warehouse-server-hostname>"
export DATABRICKS_PATH="<databricks-sql-warehouse-http-path>"
flask --app app run
requirements.txt
databricks-sql-connector==3.3.0
databricks-sdk==0.29.0
shiny==0.7.1
posit-sdk>=0.4.0
app.py
# -*- coding: utf-8 -*-
# mypy: ignore-errors
import os

import pandas as pd
from databricks import sql
from databricks.sdk.core import ApiClient, Config, databricks_cli
from databricks.sdk.service.iam import CurrentUserAPI
from posit.connect.external.databricks import PositCredentialsStrategy
from shiny import App, Inputs, Outputs, Session, render, ui

DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_HOST_URL = f"https://{DATABRICKS_HOST}"
SQL_HTTP_PATH = os.getenv("DATABRICKS_PATH")

app_ui = ui.page_fluid(ui.output_text("text"), ui.output_data_frame("result"))


def server(i: Inputs, o: Outputs, session: Session):
    """
    Shiny for Python example application that shows user information and
    the first few rows from a table hosted in Databricks.
    """
    session_token = session.http_conn.headers.get("Posit-Connect-User-Session-Token")
    posit_strategy = PositCredentialsStrategy(
        local_strategy=databricks_cli, user_session_token=session_token
    )
    cfg = Config(
        host=DATABRICKS_HOST_URL,
        # uses Posit's custom credential_strategy if running on Connect,
        # otherwise falls back to the strategy defined by local_strategy
        credentials_strategy=posit_strategy,
    )

    @render.data_frame
    def result():
        query = "SELECT * FROM samples.nyctaxi.trips LIMIT 10;"

        with sql.connect(
            server_hostname=DATABRICKS_HOST,
            http_path=SQL_HTTP_PATH,
            # https://github.com/databricks/databricks-sql-python/issues/148#issuecomment-2271561365
            credentials_provider=posit_strategy.sql_credentials_provider(cfg),
        ) as connection:
            with connection.cursor() as cursor:
                cursor.execute(query)
                rows = cursor.fetchall()
                df = pd.DataFrame(rows, columns=[col[0] for col in cursor.description])
                return df

    @render.text
    def text():
        databricks_user_info = CurrentUserAPI(ApiClient(cfg)).me()
        return f"Hello, {databricks_user_info.display_name}!"


app = App(app_ui, server)

Running the app locally

Terminal
export DATABRICKS_HOST="<databricks-sql-warehouse-server-hostname>"
export DATABRICKS_PATH="<databricks-sql-warehouse-http-path>"
shiny run app.py
requirements.txt
databricks-sql-connector==3.3.0
databricks-sdk==0.29.0
streamlit==1.37.0
posit-sdk>=0.4.0
app.py
# -*- coding: utf-8 -*-
# mypy: ignore-errors
import os

import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import ApiClient, Config, databricks_cli
from databricks.sdk.service.iam import CurrentUserAPI
from posit.connect.external.databricks import PositCredentialsStrategy

DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_HOST_URL = f"https://{DATABRICKS_HOST}"
SQL_HTTP_PATH = os.getenv("DATABRICKS_PATH")

session_token = st.context.headers.get("Posit-Connect-User-Session-Token")
posit_strategy = PositCredentialsStrategy(
    local_strategy=databricks_cli, user_session_token=session_token
)
cfg = Config(
    host=DATABRICKS_HOST_URL,
    # uses Posit's custom credential_strategy if running on Connect,
    # otherwise falls back to the strategy defined by local_strategy
    credentials_strategy=posit_strategy,
)

databricks_user = CurrentUserAPI(ApiClient(cfg)).me()
st.write(f"Hello, {databricks_user.display_name}!")

with sql.connect(
    server_hostname=DATABRICKS_HOST,
    http_path=SQL_HTTP_PATH,
    # https://github.com/databricks/databricks-sql-python/issues/148#issuecomment-2271561365
    credentials_provider=posit_strategy.sql_credentials_provider(cfg),
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 10;")
        result = cursor.fetchall()
        st.table(pd.DataFrame(result))

Running the app locally

Terminal
export DATABRICKS_HOST="<databricks-sql-warehouse-server-hostname>"
export DATABRICKS_PATH="<databricks-sql-warehouse-http-path>"
streamlit run app.py