StreamlitAuth
Authentication for Streamlit applications.
StreamlitAuth
Bases: BaseAuth
Authentication for Streamlit apps.
Example
import streamlit as st
from cognito_auth.streamlit import StreamlitAuth
Auto-loads from environment variables
auth = StreamlitAuth()
user = auth.get_auth_user()
st.write(f"Welcome {user.email}!")
st.write(f"Groups: {', '.join(user.groups)}")
Source code in src/cognito_auth/streamlit.py
| class StreamlitAuth(BaseAuth):
"""
Authentication for Streamlit apps.
Example:
import streamlit as st
from cognito_auth.streamlit import StreamlitAuth
# Auto-loads from environment variables
auth = StreamlitAuth()
user = auth.get_auth_user()
st.write(f"Welcome {user.email}!")
st.write(f"Groups: {', '.join(user.groups)}")
"""
def get_auth_user(self) -> User:
"""
Get the authenticated and authorised user for this request.
Validates user from Cognito headers and checks authorisation.
Stops execution with error message if authentication or authorisation fails.
Note: Unlike other frameworks, Streamlit has no native redirect function.
This method displays an error message and stops execution using st.stop(),
which prevents any code after this call from running.
Returns:
Authenticated and authorised User
Example:
auth = StreamlitAuth()
user = auth.get_auth_user()
st.write(f"Hello {user.email}")
"""
try:
# Try headers first (ALB might have refreshed tokens)
headers = st.context.headers
logger.debug(
f"Attempting authentication from headers "
f"(available keys: {list(dict(headers).keys())})"
)
user = self._get_user_from_headers(dict(headers))
return self._authorize_and_cache_user(user)
except MissingTokenError as e:
# Headers missing - this shouldn't happen with proper ALB configuration
# Indicates misconfiguration or ALB bypass
logger.error(
f"Authentication headers missing - possible misconfiguration. "
f"Headers present: {list(dict(st.context.headers).keys())}"
)
self._handle_auth_error(e)
except ExpiredTokenError as e:
# Token in headers expired (stale headers from initial page load)
# This is normal - ALB token expires after 3 minutes
# Check cache - may have longer-lived access token (60 min)
logger.debug(
"Checked headers but ALB token expired (stale), "
"attempting cache fallback"
)
return self._get_cached_user_or_fail(e)
except Exception as e:
self._handle_auth_error(e)
def _authorize_and_cache_user(self, user: User) -> User:
"""Check authorization and cache user in session state."""
if not self._is_authorised(user):
logger.warning(
f"User not authorized: email={user.email} groups={user.groups}"
)
st.error(
"🔒 Access denied. You don't have permission to access "
"this application."
)
st.info(
"Please contact your administrator if you believe this is an error."
)
st.stop()
# Cache user in session state
st.session_state["_cognito_auth_user"] = user
logger.info(
f"User authenticated: email={user.email} groups={user.groups} "
f"exp={user.exp}"
)
return user
def _get_cached_user_or_fail(
self, error: MissingTokenError | ExpiredTokenError
) -> User:
"""Get cached user if available, otherwise fail with error."""
cached_user = st.session_state.get("_cognito_auth_user")
if not cached_user:
logger.error(
f"Authentication failed - missing headers and no cache: {error}"
)
st.error("🔒 Session initialization failed. Please refresh the page.")
st.stop()
# Check if cached token has expired
if cached_user.exp and cached_user.exp.timestamp() < time.time():
logger.warning(f"Cached token expired at {cached_user.exp}")
st.session_state.pop("_cognito_auth_user", None)
st.error(
"🔒 Your session has expired. Please refresh the page to continue."
)
st.info("Sessions expire after 60 minutes for security.")
st.stop()
logger.debug(
f"Headers missing (WebSocket reconnect?) - "
f"using cached user (expires at {cached_user.exp})"
)
return cached_user
def _handle_expired_token(self, error: ExpiredTokenError) -> None:
"""Handle expired token error."""
logger.warning(f"Token expired: {error}")
st.session_state.pop("_cognito_auth_user", None)
st.error("🔒 Your session has expired. Please refresh the page to continue.")
st.info("Sessions expire after 60 minutes for security.")
st.stop()
def _handle_auth_error(self, error: Exception) -> None:
"""Handle unexpected authentication error."""
logger.error(
f"Authentication failed with unexpected error: {error}", exc_info=True
)
st.error("🔒 Authentication failed. Please try refreshing the page.")
st.info("If the problem persists, contact support.")
st.stop()
|
Functions
get_auth_user()
Get the authenticated and authorised user for this request.
Validates user from Cognito headers and checks authorisation.
Stops execution with error message if authentication or authorisation fails.
Note: Unlike other frameworks, Streamlit has no native redirect function.
This method displays an error message and stops execution using st.stop(),
which prevents any code after this call from running.
Returns:
| Type |
Description |
User
|
Authenticated and authorised User
|
Example
auth = StreamlitAuth()
user = auth.get_auth_user()
st.write(f"Hello {user.email}")
Source code in src/cognito_auth/streamlit.py
| def get_auth_user(self) -> User:
"""
Get the authenticated and authorised user for this request.
Validates user from Cognito headers and checks authorisation.
Stops execution with error message if authentication or authorisation fails.
Note: Unlike other frameworks, Streamlit has no native redirect function.
This method displays an error message and stops execution using st.stop(),
which prevents any code after this call from running.
Returns:
Authenticated and authorised User
Example:
auth = StreamlitAuth()
user = auth.get_auth_user()
st.write(f"Hello {user.email}")
"""
try:
# Try headers first (ALB might have refreshed tokens)
headers = st.context.headers
logger.debug(
f"Attempting authentication from headers "
f"(available keys: {list(dict(headers).keys())})"
)
user = self._get_user_from_headers(dict(headers))
return self._authorize_and_cache_user(user)
except MissingTokenError as e:
# Headers missing - this shouldn't happen with proper ALB configuration
# Indicates misconfiguration or ALB bypass
logger.error(
f"Authentication headers missing - possible misconfiguration. "
f"Headers present: {list(dict(st.context.headers).keys())}"
)
self._handle_auth_error(e)
except ExpiredTokenError as e:
# Token in headers expired (stale headers from initial page load)
# This is normal - ALB token expires after 3 minutes
# Check cache - may have longer-lived access token (60 min)
logger.debug(
"Checked headers but ALB token expired (stale), "
"attempting cache fallback"
)
return self._get_cached_user_or_fail(e)
except Exception as e:
self._handle_auth_error(e)
|
Quick Start
import streamlit as st
from cognito_auth.streamlit import StreamlitAuth
# Auto-loads from environment variables
auth = StreamlitAuth()
user = auth.get_auth_user()
st.write(f"Welcome {user.name}!")
st.write(f"Groups: {', '.join(user.groups)}")
Configuration
StreamlitAuth inherits from BaseAuth and accepts these parameters:
authoriser (optional): Pre-configured Authoriser instance. If not provided, auto-loads from environment variables
redirect_url (optional): Not used in Streamlit (defaults to "https://public.gds-idea.io/401.html")
region (optional): AWS region (default: "eu-west-2")
from cognito_auth import Authoriser
from cognito_auth.streamlit import StreamlitAuth
# Custom configuration
authoriser = Authoriser.from_lists(allowed_groups=["developers"])
auth = StreamlitAuth(
authoriser=authoriser,
region="us-east-1"
)
Behavior
Unlike other frameworks, Streamlit cannot redirect users. When authentication or authorisation fails, get_auth_user():
- Displays an error message using
st.error()
- Displays an info message using
st.info()
- Stops execution using
st.stop()
This prevents any code after get_auth_user() from running for unauthorised users.
Development Mode
Enable dev mode for local development without ALB. See Development Mode for full details.
export COGNITO_AUTH_DEV_MODE=true
Complete Example
import streamlit as st
from cognito_auth.streamlit import StreamlitAuth
# Initialize auth
auth = StreamlitAuth()
# This line protects your entire app
user = auth.get_auth_user()
# Only authenticated and authorised users reach here
st.title("Protected Dashboard")
st.write(f"Logged in as: {user.name} ({user.email})")
# Use user information in your app
if user.is_admin:
st.write("You have admin access")
st.subheader("Admin Panel")
for group in user.groups:
st.write(f"Group: {group}")