sqlalchemy.engine Connection Example Code

Connection is a class within the sqlalchemy.engine module of the SQLAlchemy project.

Example 1 from alembic

Alembic (project documentation and PyPI page) is a data migrations tool used with SQLAlchemy to make database schema changes. The Alembic project is open sourced under the MIT license.

alembic / alembic / runtime / migration.py

# migration.py
from contextlib import contextmanager
import logging
import sys

from sqlalchemy import Column
from sqlalchemy import literal_column
from sqlalchemy import MetaData
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.engine import Connection
from sqlalchemy.engine import url as sqla_url
from sqlalchemy.engine.strategies import MockEngineStrategy

from .. import ddl
from .. import util
from ..util import sqla_compat
from ..util.compat import callable
from ..util.compat import EncodedIO

log = logging.getLogger(__name__)


class _ProxyTransaction(object):
    def __init__(self, migration_context):
        self.migration_context = migration_context

    @property
    def _proxied_transaction(self):
        return self.migration_context._transaction

    def rollback(self):
        self._proxied_transaction.rollback()

    def commit(self):


## ... source file abbreviated to get to Connection examples ...


            log.info("Generating static SQL")
        log.info(
            "Will assume %s DDL.",
            "transactional"
            if self.impl.transactional_ddl
            else "non-transactional",
        )

    @classmethod
    def configure(
        cls,
        connection=None,
        url=None,
        dialect_name=None,
        dialect=None,
        environment_context=None,
        dialect_opts=None,
        opts=None,
    ):
        if opts is None:
            opts = {}
        if dialect_opts is None:
            dialect_opts = {}

        if connection:
            if not isinstance(connection, Connection):
                util.warn(
                    "'connection' argument to configure() is expected "
                    "to be a sqlalchemy.engine.Connection instance, "
                    "got %r" % connection,
                    stacklevel=3,
                )

            dialect = connection.dialect
        elif url:
            url = sqla_url.make_url(url)
            dialect = url.get_dialect()(**dialect_opts)
        elif dialect_name:
            url = sqla_url.make_url("%s://" % dialect_name)
            dialect = url.get_dialect()(**dialect_opts)
        elif not dialect:
            raise Exception("Connection, url, or dialect_name is required.")

        return MigrationContext(dialect, connection, opts, environment_context)

    @contextmanager
    def autocommit_block(self):
        _in_connection_transaction = self._in_connection_transaction()

        if self.impl.transactional_ddl:


## ... source file continues with no further Connection examples...

Example 2 from GINO

GINO (project documentation and PyPI package information) is an object-relational mapper (ORM) built on SQLAlchemy that is non-blocking and therefore designed to work properly with asynchronously-run code, for example, an application written with asyncio.

GINO is open sourced under the BSD License.

GINO / src/gino / engine.py

# engine.py
import asyncio
import collections
import functools
import sys
import time
from contextvars import ContextVar

from sqlalchemy.cutils import _distill_params
from sqlalchemy.engine import Engine, Connection
from sqlalchemy.sql import schema

from .aiocontextvars import patch_asyncio
from .exceptions import MultipleResultsFound, NoResultFound
from .transaction import GinoTransaction

patch_asyncio()


class _BaseDBAPIConnection:
    _reset_agent = None
    gino_conn = None

    def __init__(self, cursor_cls):
        self._cursor_cls = cursor_cls
        self._closed = False

    def commit(self):
        pass

    def cursor(self):
        return self._cursor_cls(self)

    @property


## ... source file abbreviated to get to Connection examples ...



class _ReusingDBAPIConnection(_BaseDBAPIConnection):
    def __init__(self, cursor_cls, root):
        super().__init__(cursor_cls)
        self._root = root

    @property
    def raw_connection(self):
        return self._root.raw_connection

    async def _acquire(self, timeout):
        return await self._root.acquire(timeout=timeout)

    async def _release(self):
        pass


class _bypass_no_param:
    def keys(self):
        return []


_bypass_no_param = _bypass_no_param()


class _SAConnection(Connection):
    def _execute_context(self, dialect, constructor, statement, parameters, *args):
        if parameters == [_bypass_no_param]:
            constructor = getattr(
                self.dialect.execution_ctx_cls,
                constructor.__name__ + "_prepared",
                constructor,
            )
        return super()._execute_context(
            dialect, constructor, statement, parameters, *args
        )

    def _execute_baked_query(self, bq, multiparams, params):
        elem = bq.query
        if self._has_events or self.engine._has_events:
            for fn in self.dispatch.before_execute:
                _, multiparams, params = fn(self, elem, multiparams, params)

        distilled_params = _distill_params(multiparams, params)

        ret = self._execute_context(
            self.dialect,
            self.dialect.execution_ctx_cls._init_baked_query,
            bq.compiled_sql,
            distilled_params,


## ... source file abbreviated to get to Connection examples ...



    async def one(self, clause, *multiparams, **params):
        async with self.acquire(reuse=True) as conn:
            return await conn.one(clause, *multiparams, **params)

    async def scalar(self, clause, *multiparams, **params):
        async with self.acquire(reuse=True) as conn:
            return await conn.scalar(clause, *multiparams, **params)

    async def status(self, clause, *multiparams, **params):
        async with self.acquire(reuse=True) as conn:
            return await conn.status(clause, *multiparams, **params)

    def compile(self, clause, *multiparams, **params):
        return self._dialect.compile(clause, *multiparams, **params)

    def transaction(self, *args, timeout=None, reuse=True, reusable=True, **kwargs):
        return _TransactionContext(
            self.acquire(timeout=timeout, reuse=reuse, reusable=reusable),
            (args, kwargs),
        )

    def iterate(self, clause, *multiparams, **params):
        connection = self.current_connection
        if connection is None:
            raise ValueError("No Connection in context, please provide one")
        return connection.iterate(clause, *multiparams, **params)

    def update_execution_options(self, **opt):
        self._sa_engine.update_execution_options(**opt)

    async def _run_visitor(self, *args, **kwargs):
        async with self.acquire(reuse=True) as conn:
            await getattr(conn, "_run_visitor")(*args, **kwargs)

    def repr(self, color=False):
        return self._pool.repr(color)

    def __repr__(self):
        return self.repr()



## ... source file continues with no further Connection examples...

Sponsored By

Sentry logo

Software errors are inevitable. Chaos is not. Try Sentry for free.

1. Introduction 2. Development Environments 3. Data 4. Web Development 5. Deployment 6. DevOps Changelog What Full Stack Means About the Author Future Directions Page Statuses SQLAlchemy ExtensionsSQLAlchemy Example CodeSQLAlchemy Modelssqlalchemy.dialects mssqlsqlalchemy.dialects mysqlsqlalchemy.dialects oraclesqlalchemy.dialects postgresqlsqlalchemy.schema DDLsqlalchemy.dialects sqlitesqlalchemy.sql.expression Functionsqlalchemy.dialects.mysql pymysqlsqlalchemy.dialects.postgresql ARRAYsqlalchemy.dialects.postgresql BIGINTsqlalchemy.dialects.postgresql BITsqlalchemy.dialects.postgresql DOUBLE_PRECISIONsqlalchemy.dialects.postgresql ExcludeConstraintsqlalchemy.dialects.postgresql INTEGERsqlalchemy.dialects.postgresql JSONsqlalchemy.dialects.postgresql TSVECTORsqlalchemy.dialects.postgresql pypostgresqlsqlalchemy.dialects.postgresql.base PGTypeCompilersqlalchemy.dialects.postgresql.psycopg2 PGDialect_psycopg2sqlalchemy.dialects.sqlite pysqlitesqlalchemy.engine Connectionsqlalchemy.engine Enginesqlalchemy.engine create_enginesqlalchemy.engine defaultsqlalchemy.engine urlsqlalchemy.engine.default DefaultDialectsqlalchemy.engine.interfaces ExecutionContextsqlalchemy.engine.result ResultMetaDatasqlalchemy.engine.result RowProxysqlalchemy.engine.strategies EngineStrategysqlalchemy.engine.strategies MockEngineStrategysqlalchemy.engine.url make_urlsqlalchemy.events SchemaEventTargetsqlalchemy.exc ArgumentErrorsqlalchemy.exc DataErrorsqlalchemy.exc IntegrityErrorsqlalchemy.exc InvalidRequestErrorsqlalchemy.exc NoInspectionAvailablesqlalchemy.exc OperationalErrorsqlalchemy.exc ProgrammingErrorsqlalchemy.ext compilersqlalchemy.ext.automap automap_basesqlalchemy.ext.compiler compilessqlalchemy.ext.declarative DeclarativeMetasqlalchemy.ext.declarative declarative_basesqlalchemy.ext.hybrid hybrid_propertysqlalchemy.ext.mutable Mutablesqlalchemy.inspection inspectsqlalchemy.orm ColumnPropertysqlalchemy.orm CompositePropertysqlalchemy.orm SynonymPropertysqlalchemy.orm attributessqlalchemy.orm backrefsqlalchemy.orm class_mappersqlalchemy.orm column_propertysqlalchemy.orm compositesqlalchemy.orm interfacessqlalchemy.orm mappersqlalchemy.orm mapperlibsqlalchemy.orm object_mappersqlalchemy.orm object_sessionsqlalchemy.orm relationshipsqlalchemy.orm sessionsqlalchemy.orm sessionmakersqlalchemy.orm strategiessqlalchemy.orm.attributes InstrumentedAttributesqlalchemy.orm.collections InstrumentedListsqlalchemy.orm.exc NoResultFoundsqlalchemy.orm.exc UnmappedClassErrorsqlalchemy.orm.exc UnmappedInstanceErrorsqlalchemy.orm.interfaces MapperPropertysqlalchemy.orm.interfaces PropComparatorsqlalchemy.orm.properties ColumnPropertysqlalchemy.orm.properties RelationshipPropertysqlalchemy.orm.query Querysqlalchemy.orm.query QueryContextsqlalchemy.orm.session Sessionsqlalchemy.orm.session object_sessionsqlalchemy.orm.util AliasedInspsqlalchemy.pool NullPoolsqlalchemy.pool StaticPoolsqlalchemy.schema CheckConstraintsqlalchemy.schema Columnsqlalchemy.schema CreateIndexsqlalchemy.schema DDLElementsqlalchemy.schema ForeignKeysqlalchemy.schema ForeignKeyConstraintsqlalchemy.schema PrimaryKeyConstraintsqlalchemy.schema Tablesqlalchemy.sql ClauseElementsqlalchemy.sql columnsqlalchemy.sql expressionsqlalchemy.sql functionssqlalchemy.sql operatorssqlalchemy.sql schemasqlalchemy.sql selectsqlalchemy.sql sqltypessqlalchemy.sql tablesqlalchemy.sql.elements ColumnElementsqlalchemy.sql.elements Labelsqlalchemy.sql.expression ClauseElementsqlalchemy.sql.expression ColumnClausesqlalchemy.sql.expression ColumnElementsqlalchemy.sql.expression Executablesqlalchemy.sql.expression FunctionElementsqlalchemy.sql.expression UnaryExpressionsqlalchemy.sql.functions FunctionElementsqlalchemy.sql.functions GenericFunctionsqlalchemy.sql.naming convsqlalchemy.sql.schema Columnsqlalchemy.sql.schema SchemaItemsqlalchemy.sql.sqltypes NullTypesqlalchemy.sql.util ClauseAdaptersqlalchemy.sql.visitors traversesqlalchemy.types Booleansqlalchemy.types Enumsqlalchemy.types Integersqlalchemy.types NULLTYPEsqlalchemy.types Stringsqlalchemy.types TypeEnginesqlalchemy.types UserDefinedTypesqlalchemy.types to_instancesqlalchemy.util OrderedDictsqlalchemy.util OrderedSetsqlalchemy.util set_creation_ordersqlalchemy.util topologicalsqlalchemy.util.langhelpers symbol ...or view the full table of contents.

Full Stack Python

Full Stack Python is an open book that explains concepts in plain language and provides helpful resources for those topics.
Updates via Twitter & Facebook.

Matt Makai 2012-2020