Skip to content

Migrations API Reference

This page provides detailed API documentation for the pydapter.migrations module.

Installation

The migrations module is available as optional dependencies:

# Core migrations functionality
pip install "pydapter[migrations-core]"

# SQL migrations with Alembic support
pip install "pydapter[migrations-sql]"

# All migrations components
pip install "pydapter[migrations]"

Module Overview

The migrations module provides a framework for managing database schema changes, following the adapter pattern:

MigrationProtocol
BaseMigrationAdapter
       ├─────────────────────┐
       │                     │
       ▼                     ▼
SyncMigrationAdapter    AsyncMigrationAdapter
       │                     │
       ▼                     ▼
 AlembicAdapter        AsyncAlembicAdapter

Protocols

MigrationProtocol

pydapter.migrations.protocols.MigrationProtocol

Bases: Protocol[T]

Protocol defining synchronous migration operations.

Source code in src/pydapter/migrations/protocols.py
@runtime_checkable
class MigrationProtocol(Protocol[T]):
    """Protocol defining synchronous migration operations."""

    migration_key: ClassVar[str]

    @classmethod
    def init_migrations(cls, directory: str, **kwargs: Any) -> None:
        """
        Initialize migration environment in the specified directory.

        Args:
            directory: Path to the directory where migrations will be stored
            **kwargs: Additional adapter-specific arguments
        """
        ...

    @classmethod
    def create_migration(
        cls, message: str, autogenerate: bool = True, **kwargs: Any
    ) -> str:
        """
        Create a new migration.

        Args:
            message: Description of the migration
            autogenerate: Whether to auto-generate the migration based on model changes
            **kwargs: Additional adapter-specific arguments

        Returns:
            The revision identifier of the created migration
        """
        ...

    @classmethod
    def upgrade(cls, revision: str = "head", **kwargs: Any) -> None:
        """
        Upgrade to the specified revision.

        Args:
            revision: The target revision to upgrade to (default: "head")
            **kwargs: Additional adapter-specific arguments
        """
        ...

    @classmethod
    def downgrade(cls, revision: str, **kwargs: Any) -> None:
        """
        Downgrade to the specified revision.

        Args:
            revision: The target revision to downgrade to
            **kwargs: Additional adapter-specific arguments
        """
        ...

    @classmethod
    def get_current_revision(cls, **kwargs: Any) -> str | None:
        """
        Get the current migration revision.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            The current revision identifier, or None if no migrations have been applied
        """
        ...

    @classmethod
    def get_migration_history(cls, **kwargs: Any) -> list[dict]:
        """
        Get the migration history.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            A list of dictionaries containing migration information
        """
        ...

Functions

create_migration(message, autogenerate=True, **kwargs) classmethod

Create a new migration.

Parameters:

Name Type Description Default
message str

Description of the migration

required
autogenerate bool

Whether to auto-generate the migration based on model changes

True
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
str

The revision identifier of the created migration

Source code in src/pydapter/migrations/protocols.py
@classmethod
def create_migration(
    cls, message: str, autogenerate: bool = True, **kwargs: Any
) -> str:
    """
    Create a new migration.

    Args:
        message: Description of the migration
        autogenerate: Whether to auto-generate the migration based on model changes
        **kwargs: Additional adapter-specific arguments

    Returns:
        The revision identifier of the created migration
    """
    ...

downgrade(revision, **kwargs) classmethod

Downgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

The target revision to downgrade to

required
**kwargs Any

Additional adapter-specific arguments

{}
Source code in src/pydapter/migrations/protocols.py
@classmethod
def downgrade(cls, revision: str, **kwargs: Any) -> None:
    """
    Downgrade to the specified revision.

    Args:
        revision: The target revision to downgrade to
        **kwargs: Additional adapter-specific arguments
    """
    ...

get_current_revision(**kwargs) classmethod

Get the current migration revision.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
str | None

The current revision identifier, or None if no migrations have been applied

Source code in src/pydapter/migrations/protocols.py
@classmethod
def get_current_revision(cls, **kwargs: Any) -> str | None:
    """
    Get the current migration revision.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        The current revision identifier, or None if no migrations have been applied
    """
    ...

get_migration_history(**kwargs) classmethod

Get the migration history.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
list[dict]

A list of dictionaries containing migration information

Source code in src/pydapter/migrations/protocols.py
@classmethod
def get_migration_history(cls, **kwargs: Any) -> list[dict]:
    """
    Get the migration history.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        A list of dictionaries containing migration information
    """
    ...

init_migrations(directory, **kwargs) classmethod

Initialize migration environment in the specified directory.

Parameters:

Name Type Description Default
directory str

Path to the directory where migrations will be stored

required
**kwargs Any

Additional adapter-specific arguments

{}
Source code in src/pydapter/migrations/protocols.py
@classmethod
def init_migrations(cls, directory: str, **kwargs: Any) -> None:
    """
    Initialize migration environment in the specified directory.

    Args:
        directory: Path to the directory where migrations will be stored
        **kwargs: Additional adapter-specific arguments
    """
    ...

upgrade(revision='head', **kwargs) classmethod

Upgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

The target revision to upgrade to (default: "head")

'head'
**kwargs Any

Additional adapter-specific arguments

{}
Source code in src/pydapter/migrations/protocols.py
@classmethod
def upgrade(cls, revision: str = "head", **kwargs: Any) -> None:
    """
    Upgrade to the specified revision.

    Args:
        revision: The target revision to upgrade to (default: "head")
        **kwargs: Additional adapter-specific arguments
    """
    ...

AsyncMigrationProtocol

pydapter.migrations.protocols.AsyncMigrationProtocol

Bases: Protocol[T]

Protocol defining asynchronous migration operations.

Source code in src/pydapter/migrations/protocols.py
@runtime_checkable
class AsyncMigrationProtocol(Protocol[T]):
    """Protocol defining asynchronous migration operations."""

    migration_key: ClassVar[str]

    @classmethod
    async def init_migrations(cls, directory: str, **kwargs: Any) -> None:
        """
        Initialize migration environment in the specified directory.

        Args:
            directory: Path to the directory where migrations will be stored
            **kwargs: Additional adapter-specific arguments
        """
        ...

    @classmethod
    async def create_migration(
        cls, message: str, autogenerate: bool = True, **kwargs: Any
    ) -> str:
        """
        Create a new migration.

        Args:
            message: Description of the migration
            autogenerate: Whether to auto-generate the migration based on model changes
            **kwargs: Additional adapter-specific arguments

        Returns:
            The revision identifier of the created migration
        """
        ...

    @classmethod
    async def upgrade(cls, revision: str = "head", **kwargs: Any) -> None:
        """
        Upgrade to the specified revision.

        Args:
            revision: The target revision to upgrade to (default: "head")
            **kwargs: Additional adapter-specific arguments
        """
        ...

    @classmethod
    async def downgrade(cls, revision: str, **kwargs: Any) -> None:
        """
        Downgrade to the specified revision.

        Args:
            revision: The target revision to downgrade to
            **kwargs: Additional adapter-specific arguments
        """
        ...

    @classmethod
    async def get_current_revision(cls, **kwargs: Any) -> str | None:
        """
        Get the current migration revision.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            The current revision identifier, or None if no migrations have been applied
        """
        ...

    @classmethod
    async def get_migration_history(cls, **kwargs: Any) -> list[dict]:
        """
        Get the migration history.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            A list of dictionaries containing migration information
        """
        ...

Functions

create_migration(message, autogenerate=True, **kwargs) async classmethod

Create a new migration.

Parameters:

Name Type Description Default
message str

Description of the migration

required
autogenerate bool

Whether to auto-generate the migration based on model changes

True
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
str

The revision identifier of the created migration

Source code in src/pydapter/migrations/protocols.py
@classmethod
async def create_migration(
    cls, message: str, autogenerate: bool = True, **kwargs: Any
) -> str:
    """
    Create a new migration.

    Args:
        message: Description of the migration
        autogenerate: Whether to auto-generate the migration based on model changes
        **kwargs: Additional adapter-specific arguments

    Returns:
        The revision identifier of the created migration
    """
    ...

downgrade(revision, **kwargs) async classmethod

Downgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

The target revision to downgrade to

required
**kwargs Any

Additional adapter-specific arguments

{}
Source code in src/pydapter/migrations/protocols.py
@classmethod
async def downgrade(cls, revision: str, **kwargs: Any) -> None:
    """
    Downgrade to the specified revision.

    Args:
        revision: The target revision to downgrade to
        **kwargs: Additional adapter-specific arguments
    """
    ...

get_current_revision(**kwargs) async classmethod

Get the current migration revision.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
str | None

The current revision identifier, or None if no migrations have been applied

Source code in src/pydapter/migrations/protocols.py
@classmethod
async def get_current_revision(cls, **kwargs: Any) -> str | None:
    """
    Get the current migration revision.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        The current revision identifier, or None if no migrations have been applied
    """
    ...

get_migration_history(**kwargs) async classmethod

Get the migration history.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
list[dict]

A list of dictionaries containing migration information

Source code in src/pydapter/migrations/protocols.py
@classmethod
async def get_migration_history(cls, **kwargs: Any) -> list[dict]:
    """
    Get the migration history.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        A list of dictionaries containing migration information
    """
    ...

init_migrations(directory, **kwargs) async classmethod

Initialize migration environment in the specified directory.

Parameters:

Name Type Description Default
directory str

Path to the directory where migrations will be stored

required
**kwargs Any

Additional adapter-specific arguments

{}
Source code in src/pydapter/migrations/protocols.py
@classmethod
async def init_migrations(cls, directory: str, **kwargs: Any) -> None:
    """
    Initialize migration environment in the specified directory.

    Args:
        directory: Path to the directory where migrations will be stored
        **kwargs: Additional adapter-specific arguments
    """
    ...

upgrade(revision='head', **kwargs) async classmethod

Upgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

The target revision to upgrade to (default: "head")

'head'
**kwargs Any

Additional adapter-specific arguments

{}
Source code in src/pydapter/migrations/protocols.py
@classmethod
async def upgrade(cls, revision: str = "head", **kwargs: Any) -> None:
    """
    Upgrade to the specified revision.

    Args:
        revision: The target revision to upgrade to (default: "head")
        **kwargs: Additional adapter-specific arguments
    """
    ...

Base Classes

BaseMigrationAdapter

pydapter.migrations.base.BaseMigrationAdapter

Bases: ABC

Base class for migration adapters.

Source code in src/pydapter/migrations/base.py
class BaseMigrationAdapter(ABC):
    """Base class for migration adapters."""

    migration_key: ClassVar[str]

    def __init__(self, connection_string: str, models_module: Any = None):
        """
        Initialize the migration adapter.

        Args:
            connection_string: Database connection string
            models_module: Optional module containing model definitions
        """
        self.connection_string = connection_string
        self.models_module = models_module
        self._initialized = False
        self._migrations_dir = None

    def _ensure_directory(self, directory: str) -> None:
        """
        Ensure the directory exists.

        Args:
            directory: Directory path
        """
        if not os.path.exists(directory):
            os.makedirs(directory)

    def _check_initialized(self) -> None:
        """
        Check if migrations have been initialized.

        Raises:
            MigrationError: If migrations have not been initialized
        """
        if not self._initialized:
            raise MigrationError(
                "Migrations have not been initialized. Call init_migrations first.",
                adapter=self.__class__.migration_key,
            )

Functions

__init__(connection_string, models_module=None)

Initialize the migration adapter.

Parameters:

Name Type Description Default
connection_string str

Database connection string

required
models_module Any

Optional module containing model definitions

None
Source code in src/pydapter/migrations/base.py
def __init__(self, connection_string: str, models_module: Any = None):
    """
    Initialize the migration adapter.

    Args:
        connection_string: Database connection string
        models_module: Optional module containing model definitions
    """
    self.connection_string = connection_string
    self.models_module = models_module
    self._initialized = False
    self._migrations_dir = None

SyncMigrationAdapter

pydapter.migrations.base.SyncMigrationAdapter

Bases: BaseMigrationAdapter, MigrationProtocol

Base class for synchronous migration adapters.

Source code in src/pydapter/migrations/base.py
class SyncMigrationAdapter(BaseMigrationAdapter, MigrationProtocol):
    """Base class for synchronous migration adapters."""

    @classmethod
    def init_migrations(cls, directory: str, **kwargs: Any) -> None:
        """
        Initialize migration environment in the specified directory.

        Args:
            directory: Path to the directory where migrations will be stored
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationInitError: If initialization fails
        """
        raise NotImplementedError("Subclasses must implement init_migrations")

    @classmethod
    def create_migration(
        cls, message: str, autogenerate: bool = True, **kwargs: Any
    ) -> str:
        """
        Create a new migration.

        Args:
            message: Description of the migration
            autogenerate: Whether to auto-generate the migration based on model changes
            **kwargs: Additional adapter-specific arguments

        Returns:
            The revision identifier of the created migration

        Raises:
            MigrationCreationError: If creation fails
        """
        raise NotImplementedError("Subclasses must implement create_migration")

    @classmethod
    def upgrade(cls, revision: str = "head", **kwargs: Any) -> None:
        """
        Upgrade to the specified revision.

        Args:
            revision: The target revision to upgrade to (default: "head")
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationUpgradeError: If upgrade fails
        """
        raise NotImplementedError("Subclasses must implement upgrade")

    @classmethod
    def downgrade(cls, revision: str, **kwargs: Any) -> None:
        """
        Downgrade to the specified revision.

        Args:
            revision: The target revision to downgrade to
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationDowngradeError: If downgrade fails
        """
        raise NotImplementedError("Subclasses must implement downgrade")

    @classmethod
    def get_current_revision(cls, **kwargs: Any) -> Optional[str]:
        """
        Get the current migration revision.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            The current revision identifier, or None if no migrations have been applied

        Raises:
            MigrationError: If getting the current revision fails
        """
        raise NotImplementedError("Subclasses must implement get_current_revision")

    @classmethod
    def get_migration_history(cls, **kwargs: Any) -> list[dict[str, Any]]:
        """
        Get the migration history.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            A list of dictionaries containing migration information

        Raises:
            MigrationError: If getting the migration history fails
        """
        raise NotImplementedError("Subclasses must implement get_migration_history")

Functions

create_migration(message, autogenerate=True, **kwargs) classmethod

Create a new migration.

Parameters:

Name Type Description Default
message str

Description of the migration

required
autogenerate bool

Whether to auto-generate the migration based on model changes

True
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
str

The revision identifier of the created migration

Raises:

Type Description
MigrationCreationError

If creation fails

Source code in src/pydapter/migrations/base.py
@classmethod
def create_migration(
    cls, message: str, autogenerate: bool = True, **kwargs: Any
) -> str:
    """
    Create a new migration.

    Args:
        message: Description of the migration
        autogenerate: Whether to auto-generate the migration based on model changes
        **kwargs: Additional adapter-specific arguments

    Returns:
        The revision identifier of the created migration

    Raises:
        MigrationCreationError: If creation fails
    """
    raise NotImplementedError("Subclasses must implement create_migration")

downgrade(revision, **kwargs) classmethod

Downgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

The target revision to downgrade to

required
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationDowngradeError

If downgrade fails

Source code in src/pydapter/migrations/base.py
@classmethod
def downgrade(cls, revision: str, **kwargs: Any) -> None:
    """
    Downgrade to the specified revision.

    Args:
        revision: The target revision to downgrade to
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationDowngradeError: If downgrade fails
    """
    raise NotImplementedError("Subclasses must implement downgrade")

get_current_revision(**kwargs) classmethod

Get the current migration revision.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
Optional[str]

The current revision identifier, or None if no migrations have been applied

Raises:

Type Description
MigrationError

If getting the current revision fails

Source code in src/pydapter/migrations/base.py
@classmethod
def get_current_revision(cls, **kwargs: Any) -> Optional[str]:
    """
    Get the current migration revision.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        The current revision identifier, or None if no migrations have been applied

    Raises:
        MigrationError: If getting the current revision fails
    """
    raise NotImplementedError("Subclasses must implement get_current_revision")

get_migration_history(**kwargs) classmethod

Get the migration history.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
list[dict[str, Any]]

A list of dictionaries containing migration information

Raises:

Type Description
MigrationError

If getting the migration history fails

Source code in src/pydapter/migrations/base.py
@classmethod
def get_migration_history(cls, **kwargs: Any) -> list[dict[str, Any]]:
    """
    Get the migration history.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        A list of dictionaries containing migration information

    Raises:
        MigrationError: If getting the migration history fails
    """
    raise NotImplementedError("Subclasses must implement get_migration_history")

init_migrations(directory, **kwargs) classmethod

Initialize migration environment in the specified directory.

Parameters:

Name Type Description Default
directory str

Path to the directory where migrations will be stored

required
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationInitError

If initialization fails

Source code in src/pydapter/migrations/base.py
@classmethod
def init_migrations(cls, directory: str, **kwargs: Any) -> None:
    """
    Initialize migration environment in the specified directory.

    Args:
        directory: Path to the directory where migrations will be stored
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationInitError: If initialization fails
    """
    raise NotImplementedError("Subclasses must implement init_migrations")

upgrade(revision='head', **kwargs) classmethod

Upgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

The target revision to upgrade to (default: "head")

'head'
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationUpgradeError

If upgrade fails

Source code in src/pydapter/migrations/base.py
@classmethod
def upgrade(cls, revision: str = "head", **kwargs: Any) -> None:
    """
    Upgrade to the specified revision.

    Args:
        revision: The target revision to upgrade to (default: "head")
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationUpgradeError: If upgrade fails
    """
    raise NotImplementedError("Subclasses must implement upgrade")

AsyncMigrationAdapter

pydapter.migrations.base.AsyncMigrationAdapter

Bases: BaseMigrationAdapter, AsyncMigrationProtocol

Base class for asynchronous migration adapters.

Source code in src/pydapter/migrations/base.py
class AsyncMigrationAdapter(BaseMigrationAdapter, AsyncMigrationProtocol):
    """Base class for asynchronous migration adapters."""

    @classmethod
    async def init_migrations(cls, directory: str, **kwargs: Any) -> None:
        """
        Initialize migration environment in the specified directory.

        Args:
            directory: Path to the directory where migrations will be stored
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationInitError: If initialization fails
        """
        raise NotImplementedError("Subclasses must implement init_migrations")

    @classmethod
    async def create_migration(
        cls, message: str, autogenerate: bool = True, **kwargs: Any
    ) -> str:
        """
        Create a new migration.

        Args:
            message: Description of the migration
            autogenerate: Whether to auto-generate the migration based on model changes
            **kwargs: Additional adapter-specific arguments

        Returns:
            The revision identifier of the created migration

        Raises:
            MigrationCreationError: If creation fails
        """
        raise NotImplementedError("Subclasses must implement create_migration")

    @classmethod
    async def upgrade(cls, revision: str = "head", **kwargs: Any) -> None:
        """
        Upgrade to the specified revision.

        Args:
            revision: The target revision to upgrade to (default: "head")
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationUpgradeError: If upgrade fails
        """
        raise NotImplementedError("Subclasses must implement upgrade")

    @classmethod
    async def downgrade(cls, revision: str, **kwargs: Any) -> None:
        """
        Downgrade to the specified revision.

        Args:
            revision: The target revision to downgrade to
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationDowngradeError: If downgrade fails
        """
        raise NotImplementedError("Subclasses must implement downgrade")

    @classmethod
    async def get_current_revision(cls, **kwargs: Any) -> Optional[str]:
        """
        Get the current migration revision.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            The current revision identifier, or None if no migrations have been applied

        Raises:
            MigrationError: If getting the current revision fails
        """
        raise NotImplementedError("Subclasses must implement get_current_revision")

    @classmethod
    async def get_migration_history(cls, **kwargs: Any) -> list[dict[str, Any]]:
        """
        Get the migration history.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            A list of dictionaries containing migration information

        Raises:
            MigrationError: If getting the migration history fails
        """
        raise NotImplementedError("Subclasses must implement get_migration_history")

Functions

create_migration(message, autogenerate=True, **kwargs) async classmethod

Create a new migration.

Parameters:

Name Type Description Default
message str

Description of the migration

required
autogenerate bool

Whether to auto-generate the migration based on model changes

True
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
str

The revision identifier of the created migration

Raises:

Type Description
MigrationCreationError

If creation fails

Source code in src/pydapter/migrations/base.py
@classmethod
async def create_migration(
    cls, message: str, autogenerate: bool = True, **kwargs: Any
) -> str:
    """
    Create a new migration.

    Args:
        message: Description of the migration
        autogenerate: Whether to auto-generate the migration based on model changes
        **kwargs: Additional adapter-specific arguments

    Returns:
        The revision identifier of the created migration

    Raises:
        MigrationCreationError: If creation fails
    """
    raise NotImplementedError("Subclasses must implement create_migration")

downgrade(revision, **kwargs) async classmethod

Downgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

The target revision to downgrade to

required
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationDowngradeError

If downgrade fails

Source code in src/pydapter/migrations/base.py
@classmethod
async def downgrade(cls, revision: str, **kwargs: Any) -> None:
    """
    Downgrade to the specified revision.

    Args:
        revision: The target revision to downgrade to
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationDowngradeError: If downgrade fails
    """
    raise NotImplementedError("Subclasses must implement downgrade")

get_current_revision(**kwargs) async classmethod

Get the current migration revision.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
Optional[str]

The current revision identifier, or None if no migrations have been applied

Raises:

Type Description
MigrationError

If getting the current revision fails

Source code in src/pydapter/migrations/base.py
@classmethod
async def get_current_revision(cls, **kwargs: Any) -> Optional[str]:
    """
    Get the current migration revision.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        The current revision identifier, or None if no migrations have been applied

    Raises:
        MigrationError: If getting the current revision fails
    """
    raise NotImplementedError("Subclasses must implement get_current_revision")

get_migration_history(**kwargs) async classmethod

Get the migration history.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
list[dict[str, Any]]

A list of dictionaries containing migration information

Raises:

Type Description
MigrationError

If getting the migration history fails

Source code in src/pydapter/migrations/base.py
@classmethod
async def get_migration_history(cls, **kwargs: Any) -> list[dict[str, Any]]:
    """
    Get the migration history.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        A list of dictionaries containing migration information

    Raises:
        MigrationError: If getting the migration history fails
    """
    raise NotImplementedError("Subclasses must implement get_migration_history")

init_migrations(directory, **kwargs) async classmethod

Initialize migration environment in the specified directory.

Parameters:

Name Type Description Default
directory str

Path to the directory where migrations will be stored

required
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationInitError

If initialization fails

Source code in src/pydapter/migrations/base.py
@classmethod
async def init_migrations(cls, directory: str, **kwargs: Any) -> None:
    """
    Initialize migration environment in the specified directory.

    Args:
        directory: Path to the directory where migrations will be stored
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationInitError: If initialization fails
    """
    raise NotImplementedError("Subclasses must implement init_migrations")

upgrade(revision='head', **kwargs) async classmethod

Upgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

The target revision to upgrade to (default: "head")

'head'
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationUpgradeError

If upgrade fails

Source code in src/pydapter/migrations/base.py
@classmethod
async def upgrade(cls, revision: str = "head", **kwargs: Any) -> None:
    """
    Upgrade to the specified revision.

    Args:
        revision: The target revision to upgrade to (default: "head")
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationUpgradeError: If upgrade fails
    """
    raise NotImplementedError("Subclasses must implement upgrade")

SQL Adapters

AlembicAdapter

pydapter.migrations.sql.alembic_adapter.AlembicAdapter

Bases: SyncMigrationAdapter

Alembic implementation of the MigrationAdapter interface.

Source code in src/pydapter/migrations/sql/alembic_adapter.py
class AlembicAdapter(SyncMigrationAdapter):
    """Alembic implementation of the MigrationAdapter interface."""

    migration_key: ClassVar[str] = "alembic"

    def __init__(self, connection_string: str, models_module: Any = None):
        """
        Initialize the Alembic migration adapter.

        Args:
            connection_string: Database connection string
            models_module: Optional module containing SQLAlchemy models
        """
        super().__init__(connection_string, models_module)
        self.engine = sa.create_engine(connection_string)
        self.alembic_cfg = None

    @classmethod
    def init_migrations(cls, directory: str, **kwargs: Any) -> None:
        """
        Initialize migration environment in the specified directory.

        Args:
            directory: Path to the directory where migrations will be stored
            **kwargs: Additional adapter-specific arguments
                connection_string: Database connection string
                models_module: Optional module containing SQLAlchemy models
                template: Optional template to use for migration environment
        """
        try:
            # Create a new instance with the provided connection string
            connection_string = kwargs.get("connection_string")
            if not connection_string:
                raise MigrationInitError(
                    "Connection string is required for Alembic initialization",
                    directory=directory,
                )

            adapter = cls(connection_string, kwargs.get("models_module"))

            # Check if the directory exists and is not empty
            force_clean = kwargs.get("force_clean", False)
            if os.path.exists(directory) and os.listdir(directory):
                if force_clean:
                    # If force_clean is specified, remove the directory and recreate it
                    shutil.rmtree(directory)
                    os.makedirs(directory)
            else:
                # Create the directory if it doesn't exist
                adapter._ensure_directory(directory)

            # Initialize Alembic directory structure
            template = kwargs.get("template", "generic")

            # Create a temporary config file
            ini_path = os.path.join(directory, "alembic.ini")
            with open(ini_path, "w") as f:
                f.write(
                    f"""
[alembic]
script_location = {directory}
sqlalchemy.url = {connection_string}

[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
"""
                )

            # Initialize Alembic in the specified directory
            adapter.alembic_cfg = config.Config(ini_path)
            adapter.alembic_cfg.set_main_option("script_location", directory)
            adapter.alembic_cfg.set_main_option("sqlalchemy.url", connection_string)

            # Initialize Alembic directory structure
            try:
                command.init(adapter.alembic_cfg, directory, template=template)
            except Exception as e:
                if "already exists and is not empty" in str(e) and force_clean:
                    # If the directory exists and is not empty, and force_clean is True,
                    # try to clean it up again and retry
                    shutil.rmtree(directory)
                    os.makedirs(directory)
                    command.init(adapter.alembic_cfg, directory, template=template)
                else:
                    raise

            # Update env.py to use the models_module for autogeneration if provided
            if adapter.models_module:
                env_path = os.path.join(directory, "env.py")
                adapter._update_env_py(env_path)

            adapter._migrations_dir = directory
            adapter._initialized = True

            # Return the adapter instance
            return adapter

        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationInitError(
                f"Failed to initialize Alembic migrations: {str(exc)}",
                directory=directory,
                original_error=str(exc),
            ) from exc

    def _update_env_py(self, env_path: str) -> None:
        """
        Update the env.py file to use the models_module for autogeneration.

        Args:
            env_path: Path to the env.py file
        """
        if not os.path.exists(env_path):
            return

        # Read the env.py file
        with open(env_path) as f:
            env_content = f.read()

        # Update the target_metadata
        if "target_metadata = None" in env_content:
            # Import the models module
            import_statement = f"from {self.models_module.__name__} import Base\n"
            env_content = env_content.replace(
                "from alembic import context",
                "from alembic import context\n" + import_statement,
            )

            # Update the target_metadata
            env_content = env_content.replace(
                "target_metadata = None", "target_metadata = Base.metadata"
            )

            # Write the updated env.py file
            with open(env_path, "w") as f:
                f.write(env_content)

    def create_migration(
        self, message: str, autogenerate: bool = True, **kwargs: Any
    ) -> str:
        """
        Create a new migration.

        Args:
            message: Description of the migration
            autogenerate: Whether to auto-generate the migration based on model changes
            **kwargs: Additional adapter-specific arguments

        Returns:
            The revision identifier of the created migration

        Raises:
            MigrationCreationError: If creation fails
        """
        try:
            if not self._initialized:
                raise MigrationCreationError(
                    "Migrations have not been initialized. Call init_migrations first."
                )

            # Create the migration
            command.revision(
                self.alembic_cfg,
                message=message,
                autogenerate=autogenerate,
            )

            # Get the revision ID from the latest revision
            from alembic.script import ScriptDirectory

            script = ScriptDirectory.from_config(self.alembic_cfg)
            revision = script.get_current_head()

            return revision
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationCreationError(
                f"Failed to create migration: {str(exc)}",
                autogenerate=autogenerate,
                original_error=str(exc),
            ) from exc

    def upgrade(self, revision: str = "head", **kwargs: Any) -> None:
        """
        Upgrade to the specified revision.

        Args:
            revision: Revision to upgrade to (default: "head" for latest)
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationUpgradeError: If upgrade fails
        """
        try:
            if not self._initialized:
                raise MigrationUpgradeError(
                    "Migrations have not been initialized. Call init_migrations first."
                )

            # Upgrade to the specified revision
            command.upgrade(self.alembic_cfg, revision)

            return None
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationUpgradeError(
                f"Failed to upgrade: {str(exc)}",
                revision=revision,
                original_error=str(exc),
            ) from exc

    def downgrade(self, revision: str, **kwargs: Any) -> None:
        """
        Downgrade to the specified revision.

        Args:
            revision: Revision to downgrade to
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationDowngradeError: If downgrade fails
        """
        try:
            if not self._initialized:
                raise MigrationDowngradeError(
                    "Migrations have not been initialized. Call init_migrations first."
                )

            # Downgrade to the specified revision
            command.downgrade(self.alembic_cfg, revision)

            return None
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationDowngradeError(
                f"Failed to downgrade: {str(exc)}",
                revision=revision,
                original_error=str(exc),
            ) from exc

    def get_current_revision(self, **kwargs: Any) -> Optional[str]:
        """
        Get the current revision.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            The current revision, or None if no migrations have been applied

        Raises:
            MigrationError: If getting the current revision fails
        """
        try:
            if not self._initialized:
                raise MigrationError(
                    "Migrations have not been initialized. Call init_migrations first."
                )

            # Get the current revision
            from alembic.migration import MigrationContext

            # Get the database connection
            connection = self.engine.connect()

            # Create a migration context
            migration_context = MigrationContext.configure(connection)

            # Get the current revision
            current_revision = migration_context.get_current_revision()

            # Close the connection
            connection.close()

            return current_revision
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationError(
                f"Failed to get current revision: {str(exc)}",
                original_error=exc,
            ) from exc

    def get_migration_history(self, **kwargs: Any) -> list[dict]:
        """
        Get the migration history.

        Args:
            **kwargs: Additional adapter-specific arguments

        Returns:
            A list of dictionaries containing migration information

        Raises:
            MigrationError: If getting the migration history fails
        """
        try:
            if not self._initialized:
                raise MigrationError(
                    "Migrations have not been initialized. Call init_migrations first."
                )

            # Get the migration history
            from alembic.script import ScriptDirectory

            script = ScriptDirectory.from_config(self.alembic_cfg)

            # Get all revisions
            revisions = []
            for revision in script.walk_revisions():
                revisions.append(
                    {
                        "revision": revision.revision,
                        "message": revision.doc,
                        "created": None,  # Alembic doesn't store creation dates
                    }
                )

            return revisions
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationError(
                f"Failed to get migration history: {str(exc)}",
                original_error=exc,
            ) from exc

Functions

__init__(connection_string, models_module=None)

Initialize the Alembic migration adapter.

Parameters:

Name Type Description Default
connection_string str

Database connection string

required
models_module Any

Optional module containing SQLAlchemy models

None
Source code in src/pydapter/migrations/sql/alembic_adapter.py
def __init__(self, connection_string: str, models_module: Any = None):
    """
    Initialize the Alembic migration adapter.

    Args:
        connection_string: Database connection string
        models_module: Optional module containing SQLAlchemy models
    """
    super().__init__(connection_string, models_module)
    self.engine = sa.create_engine(connection_string)
    self.alembic_cfg = None

create_migration(message, autogenerate=True, **kwargs)

Create a new migration.

Parameters:

Name Type Description Default
message str

Description of the migration

required
autogenerate bool

Whether to auto-generate the migration based on model changes

True
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
str

The revision identifier of the created migration

Raises:

Type Description
MigrationCreationError

If creation fails

Source code in src/pydapter/migrations/sql/alembic_adapter.py
def create_migration(
    self, message: str, autogenerate: bool = True, **kwargs: Any
) -> str:
    """
    Create a new migration.

    Args:
        message: Description of the migration
        autogenerate: Whether to auto-generate the migration based on model changes
        **kwargs: Additional adapter-specific arguments

    Returns:
        The revision identifier of the created migration

    Raises:
        MigrationCreationError: If creation fails
    """
    try:
        if not self._initialized:
            raise MigrationCreationError(
                "Migrations have not been initialized. Call init_migrations first."
            )

        # Create the migration
        command.revision(
            self.alembic_cfg,
            message=message,
            autogenerate=autogenerate,
        )

        # Get the revision ID from the latest revision
        from alembic.script import ScriptDirectory

        script = ScriptDirectory.from_config(self.alembic_cfg)
        revision = script.get_current_head()

        return revision
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationCreationError(
            f"Failed to create migration: {str(exc)}",
            autogenerate=autogenerate,
            original_error=str(exc),
        ) from exc

downgrade(revision, **kwargs)

Downgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

Revision to downgrade to

required
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationDowngradeError

If downgrade fails

Source code in src/pydapter/migrations/sql/alembic_adapter.py
def downgrade(self, revision: str, **kwargs: Any) -> None:
    """
    Downgrade to the specified revision.

    Args:
        revision: Revision to downgrade to
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationDowngradeError: If downgrade fails
    """
    try:
        if not self._initialized:
            raise MigrationDowngradeError(
                "Migrations have not been initialized. Call init_migrations first."
            )

        # Downgrade to the specified revision
        command.downgrade(self.alembic_cfg, revision)

        return None
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationDowngradeError(
            f"Failed to downgrade: {str(exc)}",
            revision=revision,
            original_error=str(exc),
        ) from exc

get_current_revision(**kwargs)

Get the current revision.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
Optional[str]

The current revision, or None if no migrations have been applied

Raises:

Type Description
MigrationError

If getting the current revision fails

Source code in src/pydapter/migrations/sql/alembic_adapter.py
def get_current_revision(self, **kwargs: Any) -> Optional[str]:
    """
    Get the current revision.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        The current revision, or None if no migrations have been applied

    Raises:
        MigrationError: If getting the current revision fails
    """
    try:
        if not self._initialized:
            raise MigrationError(
                "Migrations have not been initialized. Call init_migrations first."
            )

        # Get the current revision
        from alembic.migration import MigrationContext

        # Get the database connection
        connection = self.engine.connect()

        # Create a migration context
        migration_context = MigrationContext.configure(connection)

        # Get the current revision
        current_revision = migration_context.get_current_revision()

        # Close the connection
        connection.close()

        return current_revision
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationError(
            f"Failed to get current revision: {str(exc)}",
            original_error=exc,
        ) from exc

get_migration_history(**kwargs)

Get the migration history.

Parameters:

Name Type Description Default
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
list[dict]

A list of dictionaries containing migration information

Raises:

Type Description
MigrationError

If getting the migration history fails

Source code in src/pydapter/migrations/sql/alembic_adapter.py
def get_migration_history(self, **kwargs: Any) -> list[dict]:
    """
    Get the migration history.

    Args:
        **kwargs: Additional adapter-specific arguments

    Returns:
        A list of dictionaries containing migration information

    Raises:
        MigrationError: If getting the migration history fails
    """
    try:
        if not self._initialized:
            raise MigrationError(
                "Migrations have not been initialized. Call init_migrations first."
            )

        # Get the migration history
        from alembic.script import ScriptDirectory

        script = ScriptDirectory.from_config(self.alembic_cfg)

        # Get all revisions
        revisions = []
        for revision in script.walk_revisions():
            revisions.append(
                {
                    "revision": revision.revision,
                    "message": revision.doc,
                    "created": None,  # Alembic doesn't store creation dates
                }
            )

        return revisions
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationError(
            f"Failed to get migration history: {str(exc)}",
            original_error=exc,
        ) from exc

init_migrations(directory, **kwargs) classmethod

Initialize migration environment in the specified directory.

Parameters:

Name Type Description Default
directory str

Path to the directory where migrations will be stored

required
**kwargs Any

Additional adapter-specific arguments connection_string: Database connection string models_module: Optional module containing SQLAlchemy models template: Optional template to use for migration environment

{}
Source code in src/pydapter/migrations/sql/alembic_adapter.py
    @classmethod
    def init_migrations(cls, directory: str, **kwargs: Any) -> None:
        """
        Initialize migration environment in the specified directory.

        Args:
            directory: Path to the directory where migrations will be stored
            **kwargs: Additional adapter-specific arguments
                connection_string: Database connection string
                models_module: Optional module containing SQLAlchemy models
                template: Optional template to use for migration environment
        """
        try:
            # Create a new instance with the provided connection string
            connection_string = kwargs.get("connection_string")
            if not connection_string:
                raise MigrationInitError(
                    "Connection string is required for Alembic initialization",
                    directory=directory,
                )

            adapter = cls(connection_string, kwargs.get("models_module"))

            # Check if the directory exists and is not empty
            force_clean = kwargs.get("force_clean", False)
            if os.path.exists(directory) and os.listdir(directory):
                if force_clean:
                    # If force_clean is specified, remove the directory and recreate it
                    shutil.rmtree(directory)
                    os.makedirs(directory)
            else:
                # Create the directory if it doesn't exist
                adapter._ensure_directory(directory)

            # Initialize Alembic directory structure
            template = kwargs.get("template", "generic")

            # Create a temporary config file
            ini_path = os.path.join(directory, "alembic.ini")
            with open(ini_path, "w") as f:
                f.write(
                    f"""
[alembic]
script_location = {directory}
sqlalchemy.url = {connection_string}

[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
"""
                )

            # Initialize Alembic in the specified directory
            adapter.alembic_cfg = config.Config(ini_path)
            adapter.alembic_cfg.set_main_option("script_location", directory)
            adapter.alembic_cfg.set_main_option("sqlalchemy.url", connection_string)

            # Initialize Alembic directory structure
            try:
                command.init(adapter.alembic_cfg, directory, template=template)
            except Exception as e:
                if "already exists and is not empty" in str(e) and force_clean:
                    # If the directory exists and is not empty, and force_clean is True,
                    # try to clean it up again and retry
                    shutil.rmtree(directory)
                    os.makedirs(directory)
                    command.init(adapter.alembic_cfg, directory, template=template)
                else:
                    raise

            # Update env.py to use the models_module for autogeneration if provided
            if adapter.models_module:
                env_path = os.path.join(directory, "env.py")
                adapter._update_env_py(env_path)

            adapter._migrations_dir = directory
            adapter._initialized = True

            # Return the adapter instance
            return adapter

        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationInitError(
                f"Failed to initialize Alembic migrations: {str(exc)}",
                directory=directory,
                original_error=str(exc),
            ) from exc

upgrade(revision='head', **kwargs)

Upgrade to the specified revision.

Parameters:

Name Type Description Default
revision str

Revision to upgrade to (default: "head" for latest)

'head'
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationUpgradeError

If upgrade fails

Source code in src/pydapter/migrations/sql/alembic_adapter.py
def upgrade(self, revision: str = "head", **kwargs: Any) -> None:
    """
    Upgrade to the specified revision.

    Args:
        revision: Revision to upgrade to (default: "head" for latest)
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationUpgradeError: If upgrade fails
    """
    try:
        if not self._initialized:
            raise MigrationUpgradeError(
                "Migrations have not been initialized. Call init_migrations first."
            )

        # Upgrade to the specified revision
        command.upgrade(self.alembic_cfg, revision)

        return None
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationUpgradeError(
            f"Failed to upgrade: {str(exc)}",
            revision=revision,
            original_error=str(exc),
        ) from exc

AsyncAlembicAdapter

pydapter.migrations.sql.alembic_adapter.AsyncAlembicAdapter

Bases: AsyncMigrationAdapter

Async implementation of the Alembic migration adapter.

Source code in src/pydapter/migrations/sql/alembic_adapter.py
class AsyncAlembicAdapter(AsyncMigrationAdapter):
    """Async implementation of the Alembic migration adapter."""

    migration_key: ClassVar[str] = "async_alembic"

    def __init__(self, connection_string: str, models_module: Any = None):
        """
        Initialize the async Alembic migration adapter.

        Args:
            connection_string: Database connection string
            models_module: Optional module containing SQLAlchemy models
        """
        super().__init__(connection_string, models_module)
        self.engine = create_async_engine(connection_string)
        self.alembic_cfg = None

    async def _update_env_py_async(self, env_path: str) -> None:
        """
        Update the env.py file to use the models_module for autogeneration.

        Args:
            env_path: Path to the env.py file
        """
        if not os.path.exists(env_path):
            return

        # Read the env.py file
        with open(env_path) as f:
            env_content = f.read()

        # Update the target_metadata
        if "target_metadata = None" in env_content:
            # Import the models module
            import_statement = f"from {self.models_module.__name__} import Base\n"
            env_content = env_content.replace(
                "from alembic import context",
                "from alembic import context\n" + import_statement,
            )

            # Update the target_metadata
            env_content = env_content.replace(
                "target_metadata = None", "target_metadata = Base.metadata"
            )

            # Write the updated env.py file
            with open(env_path, "w") as f:
                f.write(env_content)

Functions

__init__(connection_string, models_module=None)

Initialize the async Alembic migration adapter.

Parameters:

Name Type Description Default
connection_string str

Database connection string

required
models_module Any

Optional module containing SQLAlchemy models

None
Source code in src/pydapter/migrations/sql/alembic_adapter.py
def __init__(self, connection_string: str, models_module: Any = None):
    """
    Initialize the async Alembic migration adapter.

    Args:
        connection_string: Database connection string
        models_module: Optional module containing SQLAlchemy models
    """
    super().__init__(connection_string, models_module)
    self.engine = create_async_engine(connection_string)
    self.alembic_cfg = None

Registry

pydapter.migrations.registry.MigrationRegistry

Registry for migration adapters.

Source code in src/pydapter/migrations/registry.py
class MigrationRegistry:
    """Registry for migration adapters."""

    def __init__(self) -> None:
        self._reg: dict[str, type[MigrationProtocol]] = {}

    def register(self, adapter_cls: type[MigrationProtocol]) -> None:
        """
        Register a migration adapter.

        Args:
            adapter_cls: The adapter class to register

        Raises:
            ConfigurationError: If the adapter does not define a migration_key
        """
        key = getattr(adapter_cls, "migration_key", None)
        if not key:
            raise ConfigurationError(
                "Migration adapter must define 'migration_key'",
                adapter_cls=adapter_cls.__name__,
            )
        self._reg[key] = adapter_cls

    def get(self, migration_key: str) -> type[MigrationProtocol]:
        """
        Get a migration adapter by key.

        Args:
            migration_key: The key of the adapter to retrieve

        Returns:
            The adapter class

        Raises:
            AdapterNotFoundError: If no adapter is registered for the given key
        """
        try:
            return self._reg[migration_key]
        except KeyError as exc:
            raise AdapterNotFoundError(
                f"No migration adapter registered for '{migration_key}'",
                obj_key=migration_key,
            ) from exc

    # Convenience methods for migration operations

    def init_migrations(
        self, migration_key: str, directory: str, **kwargs: Any
    ) -> None:
        """
        Initialize migrations for the specified adapter.

        Args:
            migration_key: The key of the adapter to use
            directory: The directory to initialize migrations in
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationInitError: If initialization fails
        """
        try:
            adapter_cls = self.get(migration_key)
            adapter_cls.init_migrations(directory, **kwargs)
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationInitError(
                f"Failed to initialize migrations for '{migration_key}'",
                directory=directory,
                adapter=migration_key,
                original_error=str(exc),
            ) from exc

    def create_migration(
        self, migration_key: str, message: str, autogenerate: bool = True, **kwargs: Any
    ) -> str:
        """
        Create a migration for the specified adapter.

        Args:
            migration_key: The key of the adapter to use
            message: The migration message
            autogenerate: Whether to auto-generate the migration
            **kwargs: Additional adapter-specific arguments

        Returns:
            The revision identifier of the created migration

        Raises:
            MigrationCreationError: If creation fails
        """
        try:
            adapter_cls = self.get(migration_key)
            return adapter_cls.create_migration(message, autogenerate, **kwargs)
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationCreationError(
                f"Failed to create migration for '{migration_key}'",
                message_text=message,
                autogenerate=autogenerate,
                adapter=migration_key,
                original_error=str(exc),
            ) from exc

    def upgrade(
        self, migration_key: str, revision: str = "head", **kwargs: Any
    ) -> None:
        """
        Upgrade migrations for the specified adapter.

        Args:
            migration_key: The key of the adapter to use
            revision: The target revision to upgrade to
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationUpgradeError: If upgrade fails
        """
        try:
            adapter_cls = self.get(migration_key)
            adapter_cls.upgrade(revision, **kwargs)
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationUpgradeError(
                f"Failed to upgrade migrations for '{migration_key}'",
                revision=revision,
                adapter=migration_key,
                original_error=str(exc),
            ) from exc

    def downgrade(self, migration_key: str, revision: str, **kwargs: Any) -> None:
        """
        Downgrade migrations for the specified adapter.

        Args:
            migration_key: The key of the adapter to use
            revision: The target revision to downgrade to
            **kwargs: Additional adapter-specific arguments

        Raises:
            MigrationDowngradeError: If downgrade fails
        """
        try:
            adapter_cls = self.get(migration_key)
            adapter_cls.downgrade(revision, **kwargs)
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationDowngradeError(
                f"Failed to downgrade migrations for '{migration_key}'",
                revision=revision,
                adapter=migration_key,
                original_error=str(exc),
            ) from exc

    def get_current_revision(self, migration_key: str, **kwargs: Any) -> Optional[str]:
        """
        Get the current revision for the specified adapter.

        Args:
            migration_key: The key of the adapter to use
            **kwargs: Additional adapter-specific arguments

        Returns:
            The current revision identifier, or None if no migrations have been applied

        Raises:
            MigrationError: If getting the current revision fails
        """
        try:
            adapter_cls = self.get(migration_key)
            return adapter_cls.get_current_revision(**kwargs)
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationError(
                f"Failed to get current revision for '{migration_key}'",
                adapter=migration_key,
                original_error=exc,
            ) from exc

    def get_migration_history(self, migration_key: str, **kwargs: Any) -> list[dict]:
        """
        Get the migration history for the specified adapter.

        Args:
            migration_key: The key of the adapter to use
            **kwargs: Additional adapter-specific arguments

        Returns:
            A list of dictionaries containing migration information

        Raises:
            MigrationError: If getting the migration history fails
        """
        try:
            adapter_cls = self.get(migration_key)
            return adapter_cls.get_migration_history(**kwargs)
        except Exception as exc:
            if isinstance(exc, MigrationError):
                raise
            raise MigrationError(
                f"Failed to get migration history for '{migration_key}'",
                adapter=migration_key,
                original_error=exc,
            ) from exc

Functions

create_migration(migration_key, message, autogenerate=True, **kwargs)

Create a migration for the specified adapter.

Parameters:

Name Type Description Default
migration_key str

The key of the adapter to use

required
message str

The migration message

required
autogenerate bool

Whether to auto-generate the migration

True
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
str

The revision identifier of the created migration

Raises:

Type Description
MigrationCreationError

If creation fails

Source code in src/pydapter/migrations/registry.py
def create_migration(
    self, migration_key: str, message: str, autogenerate: bool = True, **kwargs: Any
) -> str:
    """
    Create a migration for the specified adapter.

    Args:
        migration_key: The key of the adapter to use
        message: The migration message
        autogenerate: Whether to auto-generate the migration
        **kwargs: Additional adapter-specific arguments

    Returns:
        The revision identifier of the created migration

    Raises:
        MigrationCreationError: If creation fails
    """
    try:
        adapter_cls = self.get(migration_key)
        return adapter_cls.create_migration(message, autogenerate, **kwargs)
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationCreationError(
            f"Failed to create migration for '{migration_key}'",
            message_text=message,
            autogenerate=autogenerate,
            adapter=migration_key,
            original_error=str(exc),
        ) from exc

downgrade(migration_key, revision, **kwargs)

Downgrade migrations for the specified adapter.

Parameters:

Name Type Description Default
migration_key str

The key of the adapter to use

required
revision str

The target revision to downgrade to

required
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationDowngradeError

If downgrade fails

Source code in src/pydapter/migrations/registry.py
def downgrade(self, migration_key: str, revision: str, **kwargs: Any) -> None:
    """
    Downgrade migrations for the specified adapter.

    Args:
        migration_key: The key of the adapter to use
        revision: The target revision to downgrade to
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationDowngradeError: If downgrade fails
    """
    try:
        adapter_cls = self.get(migration_key)
        adapter_cls.downgrade(revision, **kwargs)
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationDowngradeError(
            f"Failed to downgrade migrations for '{migration_key}'",
            revision=revision,
            adapter=migration_key,
            original_error=str(exc),
        ) from exc

get(migration_key)

Get a migration adapter by key.

Parameters:

Name Type Description Default
migration_key str

The key of the adapter to retrieve

required

Returns:

Type Description
type[MigrationProtocol]

The adapter class

Raises:

Type Description
AdapterNotFoundError

If no adapter is registered for the given key

Source code in src/pydapter/migrations/registry.py
def get(self, migration_key: str) -> type[MigrationProtocol]:
    """
    Get a migration adapter by key.

    Args:
        migration_key: The key of the adapter to retrieve

    Returns:
        The adapter class

    Raises:
        AdapterNotFoundError: If no adapter is registered for the given key
    """
    try:
        return self._reg[migration_key]
    except KeyError as exc:
        raise AdapterNotFoundError(
            f"No migration adapter registered for '{migration_key}'",
            obj_key=migration_key,
        ) from exc

get_current_revision(migration_key, **kwargs)

Get the current revision for the specified adapter.

Parameters:

Name Type Description Default
migration_key str

The key of the adapter to use

required
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
Optional[str]

The current revision identifier, or None if no migrations have been applied

Raises:

Type Description
MigrationError

If getting the current revision fails

Source code in src/pydapter/migrations/registry.py
def get_current_revision(self, migration_key: str, **kwargs: Any) -> Optional[str]:
    """
    Get the current revision for the specified adapter.

    Args:
        migration_key: The key of the adapter to use
        **kwargs: Additional adapter-specific arguments

    Returns:
        The current revision identifier, or None if no migrations have been applied

    Raises:
        MigrationError: If getting the current revision fails
    """
    try:
        adapter_cls = self.get(migration_key)
        return adapter_cls.get_current_revision(**kwargs)
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationError(
            f"Failed to get current revision for '{migration_key}'",
            adapter=migration_key,
            original_error=exc,
        ) from exc

get_migration_history(migration_key, **kwargs)

Get the migration history for the specified adapter.

Parameters:

Name Type Description Default
migration_key str

The key of the adapter to use

required
**kwargs Any

Additional adapter-specific arguments

{}

Returns:

Type Description
list[dict]

A list of dictionaries containing migration information

Raises:

Type Description
MigrationError

If getting the migration history fails

Source code in src/pydapter/migrations/registry.py
def get_migration_history(self, migration_key: str, **kwargs: Any) -> list[dict]:
    """
    Get the migration history for the specified adapter.

    Args:
        migration_key: The key of the adapter to use
        **kwargs: Additional adapter-specific arguments

    Returns:
        A list of dictionaries containing migration information

    Raises:
        MigrationError: If getting the migration history fails
    """
    try:
        adapter_cls = self.get(migration_key)
        return adapter_cls.get_migration_history(**kwargs)
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationError(
            f"Failed to get migration history for '{migration_key}'",
            adapter=migration_key,
            original_error=exc,
        ) from exc

init_migrations(migration_key, directory, **kwargs)

Initialize migrations for the specified adapter.

Parameters:

Name Type Description Default
migration_key str

The key of the adapter to use

required
directory str

The directory to initialize migrations in

required
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationInitError

If initialization fails

Source code in src/pydapter/migrations/registry.py
def init_migrations(
    self, migration_key: str, directory: str, **kwargs: Any
) -> None:
    """
    Initialize migrations for the specified adapter.

    Args:
        migration_key: The key of the adapter to use
        directory: The directory to initialize migrations in
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationInitError: If initialization fails
    """
    try:
        adapter_cls = self.get(migration_key)
        adapter_cls.init_migrations(directory, **kwargs)
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationInitError(
            f"Failed to initialize migrations for '{migration_key}'",
            directory=directory,
            adapter=migration_key,
            original_error=str(exc),
        ) from exc

register(adapter_cls)

Register a migration adapter.

Parameters:

Name Type Description Default
adapter_cls type[MigrationProtocol]

The adapter class to register

required

Raises:

Type Description
ConfigurationError

If the adapter does not define a migration_key

Source code in src/pydapter/migrations/registry.py
def register(self, adapter_cls: type[MigrationProtocol]) -> None:
    """
    Register a migration adapter.

    Args:
        adapter_cls: The adapter class to register

    Raises:
        ConfigurationError: If the adapter does not define a migration_key
    """
    key = getattr(adapter_cls, "migration_key", None)
    if not key:
        raise ConfigurationError(
            "Migration adapter must define 'migration_key'",
            adapter_cls=adapter_cls.__name__,
        )
    self._reg[key] = adapter_cls

upgrade(migration_key, revision='head', **kwargs)

Upgrade migrations for the specified adapter.

Parameters:

Name Type Description Default
migration_key str

The key of the adapter to use

required
revision str

The target revision to upgrade to

'head'
**kwargs Any

Additional adapter-specific arguments

{}

Raises:

Type Description
MigrationUpgradeError

If upgrade fails

Source code in src/pydapter/migrations/registry.py
def upgrade(
    self, migration_key: str, revision: str = "head", **kwargs: Any
) -> None:
    """
    Upgrade migrations for the specified adapter.

    Args:
        migration_key: The key of the adapter to use
        revision: The target revision to upgrade to
        **kwargs: Additional adapter-specific arguments

    Raises:
        MigrationUpgradeError: If upgrade fails
    """
    try:
        adapter_cls = self.get(migration_key)
        adapter_cls.upgrade(revision, **kwargs)
    except Exception as exc:
        if isinstance(exc, MigrationError):
            raise
        raise MigrationUpgradeError(
            f"Failed to upgrade migrations for '{migration_key}'",
            revision=revision,
            adapter=migration_key,
            original_error=str(exc),
        ) from exc

Exceptions

MigrationError

pydapter.migrations.exceptions.MigrationError

Bases: AdapterError

Base exception for all migration-related errors.

Source code in src/pydapter/migrations/exceptions.py
class MigrationError(AdapterError):
    """Base exception for all migration-related errors."""

    def __init__(
        self,
        message: str,
        original_error: Optional[Exception] = None,
        adapter: Optional[str] = None,
        **context: Any,
    ):
        super().__init__(message, **context)
        self.original_error = original_error
        self.adapter = adapter

    def __str__(self) -> str:
        """Return a string representation of the error."""
        result = super().__str__()
        if hasattr(self, "original_error") and self.original_error is not None:
            result += f" (original_error='{self.original_error}')"
        return result

Functions

__str__()

Return a string representation of the error.

Source code in src/pydapter/migrations/exceptions.py
def __str__(self) -> str:
    """Return a string representation of the error."""
    result = super().__str__()
    if hasattr(self, "original_error") and self.original_error is not None:
        result += f" (original_error='{self.original_error}')"
    return result

MigrationInitError

pydapter.migrations.exceptions.MigrationInitError

Bases: MigrationError

Exception raised when migration initialization fails.

Source code in src/pydapter/migrations/exceptions.py
class MigrationInitError(MigrationError):
    """Exception raised when migration initialization fails."""

    def __init__(
        self,
        message: str,
        directory: Optional[str] = None,
        adapter: Optional[str] = None,
        **context: Any,
    ):
        super().__init__(message, directory=directory, adapter=adapter, **context)
        self.directory = directory
        self.adapter = adapter
        # Ensure original_error is set even if not passed through super().__init__
        if "original_error" in context:
            self.original_error = context["original_error"]

MigrationCreationError

pydapter.migrations.exceptions.MigrationCreationError

Bases: MigrationError

Exception raised when migration creation fails.

Source code in src/pydapter/migrations/exceptions.py
class MigrationCreationError(MigrationError):
    """Exception raised when migration creation fails."""

    def __init__(
        self,
        message: str,
        message_text: Optional[str] = None,
        autogenerate: Optional[bool] = None,
        adapter: Optional[str] = None,
        **context: Any,
    ):
        super().__init__(
            message,
            message_text=message_text,
            autogenerate=autogenerate,
            adapter=adapter,
            **context,
        )
        self.message_text = message_text
        self.autogenerate = autogenerate
        self.adapter = adapter
        # Ensure original_error is set even if not passed through super().__init__
        if "original_error" in context:
            self.original_error = context["original_error"]

MigrationUpgradeError

pydapter.migrations.exceptions.MigrationUpgradeError

Bases: MigrationError

Exception raised when migration upgrade fails.

Source code in src/pydapter/migrations/exceptions.py
class MigrationUpgradeError(MigrationError):
    """Exception raised when migration upgrade fails."""

    def __init__(
        self,
        message: str,
        revision: Optional[str] = None,
        adapter: Optional[str] = None,
        **context: Any,
    ):
        super().__init__(message, revision=revision, adapter=adapter, **context)
        self.revision = revision
        self.adapter = adapter
        # Ensure original_error is set even if not passed through super().__init__
        if "original_error" in context:
            self.original_error = context["original_error"]

MigrationDowngradeError

pydapter.migrations.exceptions.MigrationDowngradeError

Bases: MigrationError

Exception raised when migration downgrade fails.

Source code in src/pydapter/migrations/exceptions.py
class MigrationDowngradeError(MigrationError):
    """Exception raised when migration downgrade fails."""

    def __init__(
        self,
        message: str,
        revision: Optional[str] = None,
        adapter: Optional[str] = None,
        **context: Any,
    ):
        super().__init__(message, revision=revision, adapter=adapter, **context)
        self.revision = revision
        self.adapter = adapter
        # Ensure original_error is set even if not passed through super().__init__
        if "original_error" in context:
            self.original_error = context["original_error"]

MigrationNotFoundError

pydapter.migrations.exceptions.MigrationNotFoundError

Bases: MigrationError

Exception raised when a migration is not found.

Source code in src/pydapter/migrations/exceptions.py
class MigrationNotFoundError(MigrationError):
    """Exception raised when a migration is not found."""

    def __init__(
        self,
        message: str,
        revision: Optional[str] = None,
        adapter: Optional[str] = None,
        **context: Any,
    ):
        super().__init__(message, revision=revision, adapter=adapter, **context)
        self.revision = revision
        self.adapter = adapter
        # Ensure original_error is set even if not passed through super().__init__
        if "original_error" in context:
            self.original_error = context["original_error"]