flask.globals session Example Code

session is function in the Flask flask.globals module and is an instance of LocalProxy from the Werkzeug library. session stores data about the user session for the current request and it can be used to access session data.

Note that session is usually imported directly from flask instead of from flask.globals, even though it is defined within the globals module. It's the same function that is imported, but it's less characters to type when you leave off the .globals part.

current_app, g, and request are several other callables with code examples from the same flask.globals package.

Example 1 from CTFd

CTFd (homepage) is a capture the flag (CTF) hacking web app built with Flask. The application can be used as-is to run CTF events, or modified for custom rules for related scenarios. CTFd is open sourced under the Apache License 2.0.

CTFd / CTFd / auth.py

# auth.py
import base64

import requests
from flask import Blueprint, abort
from flask import current_app as app
from flask import redirect, render_template, request, session, url_for
from itsdangerous.exc import BadSignature, BadTimeSignature, SignatureExpired

from CTFd.cache import clear_team_session, clear_user_session
from CTFd.models import Teams, UserFieldEntries, UserFields, Users, db
from CTFd.utils import config, email, get_app_config, get_config
from CTFd.utils import user as current_user
from CTFd.utils import validators
from CTFd.utils.config import is_teams_mode
from CTFd.utils.config.integrations import mlc_registration
from CTFd.utils.config.visibility import registration_visible
from CTFd.utils.crypto import verify_password
from CTFd.utils.decorators import ratelimit
from CTFd.utils.decorators.visibility import check_registration_visibility
from CTFd.utils.helpers import error_for, get_errors, markup
from CTFd.utils.logging import log
from CTFd.utils.modes import TEAMS_MODE
from CTFd.utils.security.auth import login_user, logout_user
from CTFd.utils.security.signing import unserialize
from CTFd.utils.validators import ValidationError

auth = Blueprint("auth", __name__)

@auth.route("/confirm", methods=["POST", "GET"])

## ... source file abbreviated to get to session examples ...

        return render_template("register.html", errors=errors)

@auth.route("/login", methods=["POST", "GET"])
@ratelimit(method="POST", limit=10, interval=5)
def login():
    errors = get_errors()
    if request.method == "POST":
        name = request.form["name"]

        if validators.validate_email(name) is True:
            user = Users.query.filter_by(email=name).first()
            user = Users.query.filter_by(name=name).first()

        if user:
            if user.password is None:
                    "Your account was registered with a 3rd party authentication provider. "
                    "Please try logging in with a configured authentication provider."
                return render_template("login.html", errors=errors)

            if user and verify_password(request.form["password"], user.password):

                log("logins", "[{date}] {ip} - {name} logged in", name=user.name)

                if request.args.get("next") and validators.is_safe_url(
                    return redirect(request.args.get("next"))
                return redirect(url_for("challenges.listing"))

                    "[{date}] {ip} - submitted invalid password for {name}",
                errors.append("Your username or password is incorrect")
                return render_template("login.html", errors=errors)
            log("logins", "[{date}] {ip} - submitted invalid account information")
            errors.append("Your username or password is incorrect")

## ... source file continues with no further session examples...

Example 2 from Flask AppBuilder

Flask-AppBuilder (documentation and example apps) is a web application generator that uses Flask to automatically create the code for database-driven applications based on parameters set by the user. The generated applications include default security settings, forms, and internationalization support.

Flask App Builder is provided under the BSD 3-Clause "New" or "Revised" license.

Flask AppBuilder / flask_appbuilder / security / registerviews.py

# registerviews.py
__author__ = "Daniel Gaspar"

import logging

from flask import flash, redirect, request, session, url_for
from flask_babel import lazy_gettext

from .forms import LoginForm_oid, RegisterUserDBForm, RegisterUserOIDForm
from .. import const as c
from .._compat import as_unicode
from ..validators import Unique
from ..views import expose, PublicFormView

log = logging.getLogger(__name__)

def get_first_last_name(fullname):
    names = fullname.split()
    if len(names) > 1:
        return names[0], " ".join(names[1:])
    elif names:
        return names[0], ""

class BaseRegisterUser(PublicFormView):

    route_base = "/register"
    email_template = "appbuilder/general/security/register_mail.html"
    email_subject = lazy_gettext("Account activation")

## ... source file abbreviated to get to session examples ...


class RegisterUserOIDView(BaseRegisterUser):

    route_base = "/register"

    form = RegisterUserOIDForm
    default_view = "form_oid_post"

    @expose("/formoidone", methods=["GET", "POST"])
    def form_oid_post(self, flag=True):
        if flag:
            self.oid_login_handler(self.form_oid_post, self.appbuilder.sm.oid)
        form = LoginForm_oid()
        if form.validate_on_submit():
            session["remember_me"] = form.remember_me.data
            return self.appbuilder.sm.oid.try_login(
                form.openid.data, ask_for=["email", "fullname"]
        resp = session.pop("oid_resp", None)
        if resp:
            form = self.form.refresh()
            form.username.data = resp.email
            first_name, last_name = get_first_last_name(resp.fullname)
            form.first_name.data = first_name
            form.last_name.data = last_name
            form.email.data = resp.email
            widgets = self._get_edit_widget(form=form)
            return self.render_template(
            flash(as_unicode(self.error_message), "warning")
            return redirect(self.get_redirect())

    def oid_login_handler(self, f, oid):
        from flask_openid import OpenIDResponse, SessionWrapper
        from openid.consumer.consumer import CANCEL, Consumer, SUCCESS

## ... source file continues with no further session examples...

Example 3 from FlaskBB

FlaskBB (project website) is a Flask-based forum web application. The web app allows users to chat in an open message board or send private messages in plain text or Markdown.

FlaskBB is provided as open source under this license.

FlaskBB / flaskbb / utils / helpers.py

# helpers.py
import ast
import itertools
import logging
import operator
import os
import re
import time
import warnings
from datetime import datetime, timedelta
from email import message_from_string
from functools import wraps

import pkg_resources
import requests
import unidecode
from babel.core import get_locale_identifier
from babel.dates import format_date as babel_format_date
from babel.dates import format_datetime as babel_format_datetime
from babel.dates import format_timedelta as babel_format_timedelta
from babel.dates import format_time as babel_format_time
from flask import current_app, flash, g, redirect, request, session, url_for
from flask_allows import Permission
from flask_babelplus import lazy_gettext as _
from flask_login import current_user
from flask_themes2 import get_themes_list, render_theme_template
from jinja2 import Markup
from PIL import ImageFile
from pytz import UTC
from werkzeug.local import LocalProxy
from werkzeug.utils import ImportStringError, import_string

from flaskbb.extensions import babel, redis_store
from flaskbb.utils.http import is_safe_url
from flaskbb.utils.settings import flaskbb_config

logger = logging.getLogger(__name__)

_punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>[email protected]\[\\\]^_`{|},.]+')

def to_bytes(text, encoding="utf-8"):
    if isinstance(text, str):
        text = text.encode(encoding)
    return text

## ... source file abbreviated to get to session examples ...

    return str(delim.join(result))

def redirect_url(endpoint, use_referrer=True):
    targets = [endpoint]
    allowed_hosts = current_app.config["ALLOWED_HOSTS"]
    if use_referrer:
        targets.insert(0, request.referrer)
    for target in targets:
        if target and is_safe_url(target, allowed_hosts):
            return target

def redirect_or_next(endpoint, use_referrer=True):
    return redirect(
        redirect_url(request.args.get("next"), use_referrer)
        or redirect_url(endpoint, use_referrer)

def render_template(template, **context):  # pragma: no cover
    if current_user.is_authenticated and current_user.theme:
        theme = current_user.theme
        theme = session.get("theme", flaskbb_config["DEFAULT_THEME"])
    return render_theme_template(theme, template, **context)

def do_topic_action(topics, user, action, reverse):  # noqa: C901
    if not topics:
        return False

    from flaskbb.utils.requirements import (

    if not Permission(IsAtleastModeratorInForum(forum=topics[0].forum)):
            _("You do not have the permissions to execute this action."),
        return False

    modified_topics = 0
    if action not in {"delete", "hide", "unhide"}:
        for topic in topics:
            if getattr(topic, action) and not reverse:

## ... source file continues with no further session examples...

Example 4 from flaskex

Flaskex is a working example Flask web application intended as a base to build your own applications upon. The application comes with pre-built sign up, log in and related screens, as well as a database backend. Flaskex is provided as open source under the MIT license.

flaskex / app.py

# app.py

from scripts import tabledef
from scripts import forms
from scripts import helpers
from flask import Flask, redirect, url_for, render_template, request, session
import json
import sys
import os

app = Flask(__name__)
app.secret_key = os.urandom(12)  # Generic key for dev purposes only

@app.route('/', methods=['GET', 'POST'])
def login():
    if not session.get('logged_in'):
        form = forms.LoginForm(request.form)
        if request.method == 'POST':
            username = request.form['username'].lower()
            password = request.form['password']
            if form.validate():
                if helpers.credentials_valid(username, password):
                    session['logged_in'] = True
                    session['username'] = username
                    return json.dumps({'status': 'Login successful'})
                return json.dumps({'status': 'Invalid user/pass'})
            return json.dumps({'status': 'Both fields required'})
        return render_template('login.html', form=form)
    user = helpers.get_user()
    return render_template('home.html', user=user)

def logout():
    session['logged_in'] = False
    return redirect(url_for('login'))

@app.route('/signup', methods=['GET', 'POST'])
def signup():
    if not session.get('logged_in'):
        form = forms.LoginForm(request.form)
        if request.method == 'POST':
            username = request.form['username'].lower()
            password = helpers.hash_password(request.form['password'])
            email = request.form['email']
            if form.validate():
                if not helpers.username_taken(username):
                    helpers.add_user(username, password, email)
                    session['logged_in'] = True
                    session['username'] = username
                    return json.dumps({'status': 'Signup successful'})
                return json.dumps({'status': 'Username taken'})
            return json.dumps({'status': 'User/Pass required'})
        return render_template('login.html', form=form)
    return redirect(url_for('login'))

@app.route('/settings', methods=['GET', 'POST'])
def settings():
    if session.get('logged_in'):
        if request.method == 'POST':
            password = request.form['password']
            if password != "":
                password = helpers.hash_password(password)
            email = request.form['email']
            helpers.change_user(password=password, email=email)
            return json.dumps({'status': 'Saved'})
        user = helpers.get_user()
        return render_template('settings.html', user=user)
    return redirect(url_for('login'))

if __name__ == "__main__":
    app.run(debug=True, use_reloader=True, host="")

## ... source file continues with no further session examples...

Example 5 from Flask-HTTPAuth

Flask-HTTPAuth (documentation and PyPI package information) is a Flask framework extension that creates Basic and Digest HTTP authentication for routes. This project is primarily built and maintained by Miguel Grinberg. It is provided as open source under the MIT license.

Flask-HTTPAuth / flask_httpauth.py

# flask_httpauth.py

from base64 import b64decode
from functools import wraps
from hashlib import md5
from random import Random, SystemRandom
from flask import request, make_response, session, g, Response
from werkzeug.datastructures import Authorization
from werkzeug.security import safe_str_cmp

__version__ = '4.2.1dev'

class HTTPAuth(object):
    def __init__(self, scheme=None, realm=None, header=None):
        self.scheme = scheme
        self.realm = realm or "Authentication Required"
        self.header = header
        self.get_password_callback = None
        self.get_user_roles_callback = None
        self.auth_error_callback = None

        def default_get_password(username):
            return None

        def default_auth_error(status):
            return "Unauthorized Access", status


## ... source file abbreviated to get to session examples ...

class HTTPDigestAuth(HTTPAuth):
    def __init__(self, scheme=None, realm=None, use_ha1_pw=False):
        super(HTTPDigestAuth, self).__init__(scheme or 'Digest', realm)
        self.use_ha1_pw = use_ha1_pw
        self.random = SystemRandom()
        except NotImplementedError:  # pragma: no cover
            self.random = Random()

        self.generate_nonce_callback = None
        self.verify_nonce_callback = None
        self.generate_opaque_callback = None
        self.verify_opaque_callback = None

        def _generate_random():
            return md5(str(self.random.random()).encode('utf-8')).hexdigest()

        def default_generate_nonce():
            session["auth_nonce"] = _generate_random()
            return session["auth_nonce"]

        def default_verify_nonce(nonce):
            session_nonce = session.get("auth_nonce")
            if nonce is None or session_nonce is None:
                return False
            return safe_str_cmp(nonce, session_nonce)

        def default_generate_opaque():
            session["auth_opaque"] = _generate_random()
            return session["auth_opaque"]

        def default_verify_opaque(opaque):
            session_opaque = session.get("auth_opaque")
            if opaque is None or session_opaque is None:  # pragma: no cover
                return False
            return safe_str_cmp(opaque, session_opaque)


    def generate_nonce(self, f):
        self.generate_nonce_callback = f
        return f

    def verify_nonce(self, f):
        self.verify_nonce_callback = f
        return f

    def generate_opaque(self, f):
        self.generate_opaque_callback = f
        return f

    def verify_opaque(self, f):
        self.verify_opaque_callback = f
        return f

## ... source file continues with no further session examples...

Example 6 from flask-login

Flask-Login (project documentation and PyPI package) is a Flask extension that provides user session management, which handles common tasks such as logging in and out of a web application and managing associated user session data. Flask-Login is open sourced under the MIT license.

flask-login / flask_login / utils.py

# utils.py

import hmac
from hashlib import sha512
from functools import wraps
from werkzeug.local import LocalProxy
from werkzeug.security import safe_str_cmp
from werkzeug.urls import url_decode, url_encode

from flask import (_request_ctx_stack, current_app, request, session, url_for,

from ._compat import text_type, urlparse, urlunparse
from .config import COOKIE_NAME, EXEMPT_METHODS
from .signals import user_logged_in, user_logged_out, user_login_confirmed

current_user = LocalProxy(lambda: _get_user())

def encode_cookie(payload, key=None):
    return u'{0}|{1}'.format(payload, _cookie_digest(payload, key=key))

def decode_cookie(cookie, key=None):
        payload, digest = cookie.rsplit(u'|', 1)
        if hasattr(digest, 'decode'):
            digest = digest.decode('ascii')  # pragma: no cover
    except ValueError:

    if safe_str_cmp(_cookie_digest(payload, key=key), digest):
        return payload

## ... source file abbreviated to get to session examples ...

        return login_view
        if request.view_args is None:
            return url_for(login_view)
            return url_for(login_view, **request.view_args)

def login_url(login_view, next_url=None, next_field='next'):
    base = expand_login_view(login_view)

    if next_url is None:
        return base

    parsed_result = urlparse(base)
    md = url_decode(parsed_result.query)
    md[next_field] = make_next_param(base, next_url)
    netloc = current_app.config.get('FORCE_HOST_FOR_REDIRECTS') or \
    parsed_result = parsed_result._replace(netloc=netloc,
                                           query=url_encode(md, sort=True))
    return urlunparse(parsed_result)

def login_fresh():
    return session.get('_fresh', False)

def login_user(user, remember=False, duration=None, force=False, fresh=True):
    if not force and not user.is_active:
        return False

    user_id = getattr(user, current_app.login_manager.id_attribute)()
    session['_user_id'] = user_id
    session['_fresh'] = fresh
    session['_id'] = current_app.login_manager._session_identifier_generator()

    if remember:
        session['_remember'] = 'set'
        if duration is not None:
                session['_remember_seconds'] = (duration.microseconds +
                                                (duration.seconds +
                                                 duration.days * 24 * 3600) *
                                                10**6) / 10.0**6
            except AttributeError:
                raise Exception('duration must be a datetime.timedelta, '
                                'instead got: {0}'.format(duration))

    user_logged_in.send(current_app._get_current_object(), user=_get_user())
    return True

def logout_user():

    user = _get_user()

    if '_user_id' in session:

    if '_fresh' in session:

    if '_id' in session:

    cookie_name = current_app.config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
    if cookie_name in request.cookies:
        session['_remember'] = 'clear'
        if '_remember_seconds' in session:

    user_logged_out.send(current_app._get_current_object(), user=user)

    return True

def confirm_login():
    session['_fresh'] = True
    session['_id'] = current_app.login_manager._session_identifier_generator()

def login_required(func):
    def decorated_view(*args, **kwargs):
        if request.method in EXEMPT_METHODS:
            return func(*args, **kwargs)
        elif current_app.config.get('LOGIN_DISABLED'):
            return func(*args, **kwargs)
        elif not current_user.is_authenticated:
            return current_app.login_manager.unauthorized()
        return func(*args, **kwargs)
    return decorated_view

## ... source file continues with no further session examples...

Example 7 from Flask-WTF

Flask-WTF (project documentation and PyPI page) provides a bridge between Flask and the the WTForms form-handling library. It makes it easier to use WTForms by reducing boilerplate code and shorter examples for common form operations as well as common security practices such as CSRF.

Flask-WTF / flask_wtf / csrf.py

# csrf.py
import hashlib
import logging
import os
import warnings
from urllib.parse import urlparse
from functools import wraps

from flask import Blueprint, current_app, g, request, session
from itsdangerous import BadData, SignatureExpired, URLSafeTimedSerializer
from werkzeug.exceptions import BadRequest
from werkzeug.security import safe_str_cmp
from wtforms import ValidationError
from wtforms.csrf.core import CSRF

from ._compat import FlaskWTFDeprecationWarning

__all__ = ('generate_csrf', 'validate_csrf', 'CSRFProtect')
logger = logging.getLogger(__name__)

def generate_csrf(secret_key=None, token_key=None):

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'

    if field_name not in g:
        s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

        if field_name not in session:
            session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()

            token = s.dumps(session[field_name])
        except TypeError:
            session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
            token = s.dumps(session[field_name])

        setattr(g, field_name, token)

    return g.get(field_name)

def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'
    time_limit = _get_config(
        time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False

    if not data:
        raise ValidationError('The CSRF token is missing.')

    if field_name not in session:
        raise ValidationError('The CSRF session token is missing.')

    s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

        token = s.loads(data, max_age=time_limit)
    except SignatureExpired:
        raise ValidationError('The CSRF token has expired.')
    except BadData:
        raise ValidationError('The CSRF token is invalid.')

    if not safe_str_cmp(session[field_name], token):
        raise ValidationError('The CSRF tokens do not match.')

def _get_config(
    value, config_name, default=None,
    required=True, message='CSRF is not configured.'

    if value is None:
        value = current_app.config.get(config_name, default)

    if required and value is None:

## ... source file continues with no further session examples...

Example 8 from flaskSaaS

flaskSaas is a boilerplate starter project to build a software-as-a-service (SaaS) web application in Flask, with Stripe for billing. The boilerplate relies on many common Flask extensions such as Flask-WTF, Flask-Login, Flask-Admin, and many others. The project is provided as open source under the MIT license.

flaskSaaS / app / logger_setup.py

# logger_setup.py

import datetime as dt
import logging
from logging.handlers import RotatingFileHandler
import pytz

from flask import request, session
from structlog import wrap_logger
from structlog.processors import JSONRenderer

from app import app



TZ = pytz.timezone(app.config['TIMEZONE'])

def add_fields(_, level, event_dict):
    now = dt.datetime.now()
    event_dict['timestamp'] = TZ.localize(now, True).astimezone(pytz.utc).isoformat()
    event_dict['level'] = level

    if session:
        event_dict['session_id'] = session.get('session_id')

    if request:
            event_dict['ip_address'] = request.headers['X-Forwarded-For'].split(',')[0].strip()
            event_dict['ip_address'] = 'unknown'

    return event_dict

if app.config.get('LOG_FILENAME'):
    file_handler = RotatingFileHandler(filename=app.config['LOG_FILENAME'],

logger = wrap_logger(

## ... source file continues with no further session examples...

Example 9 from Flask-Security-Too

Flask-Security-Too (PyPi page and project documentation) is a maintained fork of the original Flask-Security project that makes it easier to add common security features to Flask web applications. A few of the critical goals of the Flask-Security-Too project are ensuring JavaScript client-based single-page applications (SPAs) can work securely with Flask-based backends and that guidance by the OWASP organization is followed by default.

The Flask-Security-Too project is provided as open source under the MIT license.

Flask-Security-Too / flask_security / twofactor.py

# twofactor.py

import typing as t

from flask import current_app as app, redirect, request, session
from werkzeug.datastructures import MultiDict

from .proxies import _security, _datastore
from .utils import (
from .signals import (

if t.TYPE_CHECKING:  # pragma: no cover
    from flask import Response

def tf_clean_session():
    if config_value("TWO_FACTOR"):
        for k in [
            session.pop(k, None)

def tf_send_security_token(user, method, totp_secret, phone_number):
    token_to_be_sent = _security._totp_factory.generate_totp_password(totp_secret)
    if method == "email" or method == "mail":
    elif method == "sms":
        msg = "Use this code to log in: %s" % token_to_be_sent
        from_number = config_value("SMS_SERVICE_CONFIG")["PHONE_NUMBER"]
        to_number = phone_number
        sms_sender = SmsSenderFactory.createSender(config_value("SMS_SERVICE"))
        sms_sender.send_sms(from_number=from_number, to_number=to_number, msg=msg)

    elif method == "google_authenticator" or method == "authenticator":

## ... source file continues with no further session examples...

Example 10 from Flask-SocketIO

Flask-SocketIO (PyPI package information, official tutorial and project documentation) is a code library by Miguel Grinberg that provides Socket.IO integration for Flask applications. This extension makes it easier to add bi-directional communications on the web via the WebSockets protocol.

The Flask-SocketIO project is open source under the MIT license.

Flask-SocketIO / example / sessions.py

# sessions.py
from flask import Flask, render_template, session, request, jsonify
from flask_login import LoginManager, UserMixin, current_user, login_user, \
from flask_session import Session
from flask_socketio import SocketIO, emit

app = Flask(__name__)
app.config['SECRET_KEY'] = 'top-secret!'
app.config['SESSION_TYPE'] = 'filesystem'
login = LoginManager(app)
socketio = SocketIO(app, manage_session=False)

class User(UserMixin, object):
    def __init__(self, id=None):
        self.id = id

def load_user(id):
    return User(id)

def index():
    return render_template('sessions.html')

@app.route('/session', methods=['GET', 'POST'])
def session_access():
    if request.method == 'GET':
        return jsonify({
            'session': session.get('value', ''),
            'user': current_user.id
                if current_user.is_authenticated else 'anonymous'
    data = request.get_json()
    if 'session' in data:
        session['value'] = data['session']
    elif 'user' in data:
        if data['user']:
    return '', 204

def get_session():
    emit('refresh-session', {
        'session': session.get('value', ''),
        'user': current_user.id
            if current_user.is_authenticated else 'anonymous'

def set_session(data):
    if 'session' in data:
        session['value'] = data['session']
    elif 'user' in data:
        if data['user'] is not None:

if __name__ == '__main__':

## ... source file continues with no further session examples...

Example 11 from Flask-User

Flask-User (PyPI information and project documentation) is a Flask extension that makes it easier to add custom user account management and authentication to the projects you are building. The extension supports persistent data storage through both relational databases and MongoDB. The project is provided as open source under the MIT license.

Flask-User / flask_user / user_manager.py

# user_manager.py

import datetime

from flask import abort, Blueprint, current_app, Flask, session
from flask_login import LoginManager
from wtforms import ValidationError

from . import ConfigError
from . import forms
from .db_manager import DBManager
from .email_manager import EmailManager
from .password_manager import PasswordManager
from .token_manager import TokenManager
from .translation_utils import lazy_gettext as _  # map _() to lazy_gettext()
from .user_manager__settings import UserManager__Settings
from .user_manager__utils import UserManager__Utils
from .user_manager__views import UserManager__Views

class UserManager(UserManager__Settings, UserManager__Utils, UserManager__Views):

    def __init__(self, app, db, UserClass, **kwargs):

        self.app = app
        if app:
            self.init_app(app, db, UserClass, **kwargs)

    def init_app(

## ... source file abbreviated to get to session examples ...

            if attrib_name[0:5] == 'USER_':
                default_value = getattr(UserManager, attrib_name)
                setattr(self, attrib_name, app.config.get(attrib_name, default_value))

        if not self.USER_EMAIL_SENDER_EMAIL:
            default_sender = app.config.get('DEFAULT_MAIL_SENDER', None)
            default_sender = app.config.get('MAIL_DEFAULT_SENDER', default_sender)
            if default_sender:
                if default_sender[-1:] == '>':
                    start = default_sender.rfind('<')
                    if start >= 1:
                        self.USER_EMAIL_SENDER_EMAIL = default_sender[start + 1:-1]
                        if not self.USER_EMAIL_SENDER_NAME:
                            self.USER_EMAIL_SENDER_NAME = default_sender[0:start].strip(' "')
                    self.USER_EMAIL_SENDER_EMAIL = default_sender

        if not self.USER_EMAIL_SENDER_NAME:
            self.USER_EMAIL_SENDER_NAME = self.USER_APP_NAME

            app.permanent_session_lifetime = datetime.timedelta(seconds=self.USER_USER_SESSION_EXPIRATION)

            def advance_session_timeout():
                session.permanent = True    # Timeout after app.permanent_session_lifetime period
                session.modified = True     # Advance session timeout each time a user visits a page

        self.login_manager = LoginManager(app)
        self.login_manager.login_view = 'user.login'

        def load_user_by_user_token(user_token):
            user = self.db_manager.UserClass.get_user_by_token(user_token)
            return user

        self.babel = app.extensions.get('babel', None)
        from .translation_utils import init_translations

        if not hasattr(app.jinja_env, 'install_gettext_callables'):

        def flask_user_context_processor():
            def call_or_get(function_or_property):
                return function_or_property() if callable(function_or_property) else function_or_property

            return dict(

## ... source file continues with no further session examples...

Example 12 from indico

indico (project website, documentation and sandbox demo) is a Flask-based web app for event management. The code is open sourced under the MIT license.

indico / indico / util / i18n.py

# i18n.py

import ast
import re
from collections import Counter
from contextlib import contextmanager

from babel import negotiate_locale
from babel.core import LOCALE_ALIASES, Locale
from babel.messages.pofile import read_po
from babel.support import NullTranslations
from flask import current_app, g, has_app_context, has_request_context, request, session
from flask_babel import Babel, Domain, get_domain
from flask_pluginengine import current_plugin
from speaklater import is_lazy_string, make_lazy_string
from werkzeug.utils import cached_property

from indico.core.config import config
from indico.util.caching import memoize_request

RE_TR_FUNCTION = re.compile(r'''_\("([^"]*)"\)|_\('([^']*)'\)''', re.DOTALL | re.MULTILINE)

babel = Babel()
_use_context = object()

def get_translation_domain(plugin_name=_use_context):
    if plugin_name is None:
        return get_domain()
        plugin = None
        if has_app_context():
            from indico.core.plugins import plugin_engine
            plugin = plugin_engine.get_plugin(plugin_name) if plugin_name is not _use_context else current_plugin

## ... source file abbreviated to get to session examples ...

    def weekday(self, daynum, short=True):
        return self.days['format']['abbreviated' if short else 'wide'][daynum]

    def time_formats(self):
        formats = super().time_formats
        for k, v in formats.items():
            v.format = v.format.replace(':%(ss)s', '')
        return formats

def _remove_locale_script(locale):
    parts = locale.split('_')  # e.g. `en_GB` or `zh_Hans_CN`
    return f'{parts[0]}_{parts[-1]}'

def set_best_lang(check_session=True):
    from indico.core.config import config

    if not has_request_context():
        return 'en_GB' if current_app.config['TESTING'] else config.DEFAULT_LOCALE
    elif 'lang' in g:
        return g.lang
    elif check_session and session.lang is not None:
        return session.lang

    all_locales = {_remove_locale_script(loc).lower(): loc for loc in get_all_locales()}

    preferred = [x.replace('-', '_') for x in request.accept_languages.values()]
    resolved_lang = negotiate_locale(preferred, list(all_locales), aliases=LOCALE_ALIASES)

    if not resolved_lang:
        if current_app.config['TESTING']:
            return 'en_GB'

        resolved_lang = config.DEFAULT_LOCALE

        resolved_lang = all_locales[resolved_lang.lower()]
    except KeyError:
        return 'en_GB'

    resolved_lang = re.sub(r'^([a-zA-Z]+)_([a-zA-Z]+)$',
                           lambda m: f'{m.group(1).lower()}_{m.group(2).upper()}',

    g.lang = resolved_lang
    return resolved_lang

def get_current_locale():
    return IndicoLocale.parse(set_best_lang())

def get_all_locales():
    if babel.app is None:
        return {}
        missing = object()
        languages = {str(t): config.CUSTOM_LANGUAGES.get(str(t), (t.language_name.title(), t.territory_name))
                     for t in babel.list_translations()
                     if config.CUSTOM_LANGUAGES.get(str(t), missing) is not None}
        counts = Counter(x[0] for x in languages.values())
        return {code: (name, territory, counts[name] > 1) for code, (name, territory) in languages.items()}

def set_session_lang(lang):
    session.lang = lang

def session_language(lang):
    old_lang = session.lang


def parse_locale(locale):
    return IndicoLocale.parse(locale)

def extract_node(node, keywords, commentTags, options, parents=[None]):
    if isinstance(node, ast.Str) and isinstance(parents[-1], (ast.Assign, ast.Call)):
        matches = RE_TR_FUNCTION.findall(node.s)
        for m in matches:
            line = m[0] or m[1]
            yield (node.lineno, '', line.split('\n'), ['old style recursive strings'])
        for cnode in ast.iter_child_nodes(node):
            yield from extract_node(cnode, keywords, commentTags, options, parents=(parents + [node]))

def po_to_json(po_file, locale=None, domain=None):
    with open(po_file, 'rb') as f:
        po_data = read_po(f, locale=locale, domain=domain)

## ... source file continues with no further session examples...

Example 13 from tedivms-flask

tedivm's flask starter app is a base of Flask code and related projects such as Celery which provides a template to start your own Flask web app. The project comes baked with an admin panel, API authentication and authorization, SQLAlchemy and many other common libraries that are often used with Flask.

The project's code is provided as open source under the BSD 2-Clause "Simplified" license.

tedivms-flask / app / init.py

# __init__.py
import boto3
from celery import Celery
from datetime import datetime
import os
import requests
import yaml

from flask import Flask, render_template
from flask import session as current_session
from flask_mail import Mail
from flask_migrate import Migrate, MigrateCommand
from flask.sessions import SessionInterface
from flask_sqlalchemy import SQLAlchemy
from flask_user import user_logged_out
from flask_wtf.csrf import CSRFProtect

from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options
from beaker.middleware import SessionMiddleware

db = SQLAlchemy()
csrf_protect = CSRFProtect()
mail = Mail()
migrate = Migrate()

def get_config():
    app = Flask(__name__)

    if 'APPLICATION_SETTINGS' in os.environ:
    if 'AWS_SECRETS_MANAGER_CONFIG' in os.environ:

## ... source file abbreviated to get to session examples ...

    if 'CACHE_TYPE' not in app.config or not app.config['CACHE_TYPE']:
        app.config['CACHE_TYPE'] = 'file'

    if app.config['CACHE_TYPE'] == 'file':
        if 'CACHE_ROOT' not in app.config or not app.config['CACHE_ROOT']:
            app.config['CACHE_ROOT'] = '/tmp/%s' % __name__

    session_opts['session.type'] = app.config['CACHE_TYPE']

    if 'CACHE_ROOT' in app.config and app.config['CACHE_ROOT']:
        session_opts['session.data_dir'] = app.config['CACHE_ROOT'] + '/session'

    if 'CACHE_URL' in app.config and app.config['CACHE_URL']:
        session_opts['session.url'] = app.config['CACHE_URL']

    session_opts['session.auto'] = app.config.get('SESSION_AUTO', True)
    session_opts['session.cookie_expires'] = app.config.get('SESSION_COOKIE_EXPIRES', 86400)
    session_opts['session.secret'] = app.secret_key

    class BeakerSessionInterface(SessionInterface):
        def open_session(self, app, request):
            session = request.environ['beaker.session']
            return session

        def save_session(self, app, session, response):

    app.wsgi_app = SessionMiddleware(app.wsgi_app, session_opts)
    app.session_interface = BeakerSessionInterface()

    def clear_session(sender, user, **extra):

def init_celery_service(app):

def init_error_handlers(app):

    def show_error(status, message='An unknown error has occured.'):
        return render_template('pages/errors.html', error_code=status, message=message), status

    def error_unauthorized(e):
        return show_error(401, 'Unauthorized')

    def error_forbidden(e):

## ... source file continues with no further session examples...

Example 14 from trape

trape is a research tool for tracking people's activities that are logged digitally. The tool uses Flask to create a web front end to view aggregated data on an individual the application is set to track. The source code is provided as open source under the MIT license, according to the README.

trape / core / sockets.py

# sockets.py
from socket import gethostname, gethostbyname 
from threading import Lock
from flask import Flask, render_template, session, request, json
from flask_socketio import SocketIO, emit, join_room, rooms, disconnect
import core.stats 
import core.user
from core.user_objects import attacks_hook_message
from core.utils import utils
from core.db import Database
import sys

trape = core.stats.trape
app = core.stats.app

db = Database()

async_mode = None
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()

db.sentences_victim('clean_online', None, 2)

def background_thread():
    count = 0

@socketio.on("join", namespace="/trape")
def join(message):
        session['receive_count'] = session.get('receive_count', 0) + 1
    except Exception as error:

@socketio.on("my_room_event", namespace="/trape")
def send_room_message(message):
        session['receive_count'] = session.get('receive_count', 0) + 1
        hookAction = attacks_hook_message(message['data']['type'])
        utils.Go(utils.Color['white'] + "[" + utils.Color['blueBold'] + "@" + utils.Color['white'] + "]" + " " + hookAction + utils.Color['blue'] + message['data']['message'] + utils.Color['white'] + ' in '  + utils.Color['green'] + message['room'] + utils.Color['white'])
        emit('my_response', {'data': message['data'], 'count': session['receive_count']},room = message['room'])
    except Exception as error:

@socketio.on("disconnect_request", namespace="/trape")
def disconnect_request(d):
        session['receive_count'] = session.get('receive_count', 0) + 1
        emit('my_response', {'data': 'Disconnected!', 'count': session['receive_count']})
        utils.Go(utils.Color['white'] + "[" + utils.Color['redBold'] + "-" + utils.Color['white'] + "]" + utils.Color['red'] + " " + "A victim has closed her connection with the following id:" + " " + utils.Color['green'] + d['vId'] + utils.Color['white'])
        db.sentences_victim('disconnect_victim', d['vId'], 2)
    except Exception as error:

@socketio.on("error", namespace="/trape")
def socket_def_error(d):

def error_handler(e):

@app.route("/" + trape.home_path)
def home():
    gMaps_free_api_key = 'AIzaSyBUPHAjZl3n8Eza66ka6B78iVyPteC5MgM'
    if (trape.gmaps != ''):
        gMaps_free_api_key = trape.gmaps

    shorten_api = 'AIzaSyCPzcppCT27KTHnxAIQvYhtvB_l8sKGYBs'

    html = trape.injectCSS_Paths(render_template("home.html", async_mode=socketio.async_mode).replace('[OWN_API_KEY_HERE]', gMaps_free_api_key).replace('[LIBS_SRC]', trape.JSFiles[1]['src']).replace('[TRAPE_SRC]', trape.JSFiles[4]['src']))
    return html

## ... source file continues with no further session examples...

Full Stack Python

Full Stack Python is an open book that explains concepts in plain language and provides helpful resources for those topics.
Updates via Twitter & Facebook.

Matt Makai 2012-2022