Skip to content

Extras API

This page documents the extra adapters provided by pydapter.

Excel Adapter

pydapter.extras.excel_

Excel adapter (requires pandas + xlsxwriter engine).

Classes

ExcelAdapter

Bases: Adapter[T]

Adapter for converting between Pydantic models and Excel files.

This adapter handles Excel (.xlsx) files, providing methods to: - Read Excel files into Pydantic model instances - Write Pydantic models to Excel files - Support for different sheets and pandas read_excel options

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("xlsx")

Example
from pathlib import Path
from pydantic import BaseModel
from pydapter.extras.excel_ import ExcelAdapter

class Person(BaseModel):
    name: str
    age: int

# Read from Excel file
excel_file = Path("people.xlsx")
people = ExcelAdapter.from_obj(Person, excel_file, many=True)

# Write to Excel file
output_bytes = ExcelAdapter.to_obj(people, many=True)
with open("output.xlsx", "wb") as f:
    f.write(output_bytes)
Source code in src/pydapter/extras/excel_.py
class ExcelAdapter(Adapter[T]):
    """
    Adapter for converting between Pydantic models and Excel files.

    This adapter handles Excel (.xlsx) files, providing methods to:
    - Read Excel files into Pydantic model instances
    - Write Pydantic models to Excel files
    - Support for different sheets and pandas read_excel options

    Attributes:
        obj_key: The key identifier for this adapter type ("xlsx")

    Example:
        ```python
        from pathlib import Path
        from pydantic import BaseModel
        from pydapter.extras.excel_ import ExcelAdapter

        class Person(BaseModel):
            name: str
            age: int

        # Read from Excel file
        excel_file = Path("people.xlsx")
        people = ExcelAdapter.from_obj(Person, excel_file, many=True)

        # Write to Excel file
        output_bytes = ExcelAdapter.to_obj(people, many=True)
        with open("output.xlsx", "wb") as f:
            f.write(output_bytes)
        ```
    """

    obj_key = "xlsx"

    @classmethod
    def from_obj(
        cls,
        subj_cls: type[T],
        obj: str | Path | bytes,
        /,
        *,
        many: bool = True,
        adapt_meth: str = "model_validate",
        sheet_name: str | int = 0,
        adapt_kw: dict | None = None,
        **kw: Any,
    ) -> T | list[T]:
        """
        Convert Excel data to Pydantic model instances.

        Args:
            subj_cls: The Pydantic model class to instantiate
            obj: Excel file path, file-like object, or bytes
            many: If True, convert all rows; if False, convert only first row
            adapt_meth: Method name to use for model validation (default: "model_validate")
            sheet_name: Sheet name or index to read (default: 0)
            **kw: Additional arguments passed to pandas.read_excel

        Returns:
            List of model instances if many=True, single instance if many=False

        Raises:
            ResourceError: If the Excel file cannot be read
            AdapterError: If the data cannot be converted to models
        """
        try:
            if isinstance(obj, bytes):
                df = pd.read_excel(io.BytesIO(obj), sheet_name=sheet_name, **kw)
            else:
                df = pd.read_excel(obj, sheet_name=sheet_name, **kw)
            return DataFrameAdapter.from_obj(
                subj_cls, df, many=many, adapt_meth=adapt_meth, adapt_kw=adapt_kw
            )
        except FileNotFoundError as e:
            raise ResourceError(f"File not found: {e}", resource=str(obj)) from e
        except ValueError as e:
            raise AdapterError(
                f"Error adapting from xlsx (original_error='{e}')", adapter="xlsx"
            ) from e
        except Exception as e:
            raise AdapterError(f"Unexpected error in Excel adapter: {e}", adapter="xlsx") from e

    # outgoing
    @classmethod
    def to_obj(
        cls,
        subj: T | list[T],
        /,
        *,
        many: bool = True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        sheet_name: str = "Sheet1",
        **kw: Any,
    ) -> bytes:
        """
        Convert Pydantic model instances to Excel bytes.

        Args:
            subj: Single model instance or list of instances
            many: If True, handle as multiple instances
            adapt_meth: Method name to use for model dumping (default: "model_dump")
            sheet_name: Name of the Excel sheet (default: "Sheet1")
            **kw: Additional arguments passed to DataFrame constructor

        Returns:
            Excel file content as bytes
        """
        df = DataFrameAdapter.to_obj(
            subj, many=many, adapt_meth=adapt_meth, adapt_kw=adapt_kw, **kw
        )
        buf = io.BytesIO()
        with pd.ExcelWriter(buf, engine="xlsxwriter") as wr:
            df.to_excel(wr, sheet_name=sheet_name, index=False)
        return buf.getvalue()
Functions
from_obj(subj_cls, obj, /, *, many=True, adapt_meth='model_validate', sheet_name=0, adapt_kw=None, **kw) classmethod

Convert Excel data to Pydantic model instances.

Parameters:

Name Type Description Default
subj_cls type[T]

The Pydantic model class to instantiate

required
obj str | Path | bytes

Excel file path, file-like object, or bytes

required
many bool

If True, convert all rows; if False, convert only first row

True
adapt_meth str

Method name to use for model validation (default: "model_validate")

'model_validate'
sheet_name str | int

Sheet name or index to read (default: 0)

0
**kw Any

Additional arguments passed to pandas.read_excel

{}

Returns:

Type Description
T | list[T]

List of model instances if many=True, single instance if many=False

Raises:

Type Description
ResourceError

If the Excel file cannot be read

AdapterError

If the data cannot be converted to models

Source code in src/pydapter/extras/excel_.py
@classmethod
def from_obj(
    cls,
    subj_cls: type[T],
    obj: str | Path | bytes,
    /,
    *,
    many: bool = True,
    adapt_meth: str = "model_validate",
    sheet_name: str | int = 0,
    adapt_kw: dict | None = None,
    **kw: Any,
) -> T | list[T]:
    """
    Convert Excel data to Pydantic model instances.

    Args:
        subj_cls: The Pydantic model class to instantiate
        obj: Excel file path, file-like object, or bytes
        many: If True, convert all rows; if False, convert only first row
        adapt_meth: Method name to use for model validation (default: "model_validate")
        sheet_name: Sheet name or index to read (default: 0)
        **kw: Additional arguments passed to pandas.read_excel

    Returns:
        List of model instances if many=True, single instance if many=False

    Raises:
        ResourceError: If the Excel file cannot be read
        AdapterError: If the data cannot be converted to models
    """
    try:
        if isinstance(obj, bytes):
            df = pd.read_excel(io.BytesIO(obj), sheet_name=sheet_name, **kw)
        else:
            df = pd.read_excel(obj, sheet_name=sheet_name, **kw)
        return DataFrameAdapter.from_obj(
            subj_cls, df, many=many, adapt_meth=adapt_meth, adapt_kw=adapt_kw
        )
    except FileNotFoundError as e:
        raise ResourceError(f"File not found: {e}", resource=str(obj)) from e
    except ValueError as e:
        raise AdapterError(
            f"Error adapting from xlsx (original_error='{e}')", adapter="xlsx"
        ) from e
    except Exception as e:
        raise AdapterError(f"Unexpected error in Excel adapter: {e}", adapter="xlsx") from e
to_obj(subj, /, *, many=True, adapt_meth='model_dump', adapt_kw=None, sheet_name='Sheet1', **kw) classmethod

Convert Pydantic model instances to Excel bytes.

Parameters:

Name Type Description Default
subj T | list[T]

Single model instance or list of instances

required
many bool

If True, handle as multiple instances

True
adapt_meth str

Method name to use for model dumping (default: "model_dump")

'model_dump'
sheet_name str

Name of the Excel sheet (default: "Sheet1")

'Sheet1'
**kw Any

Additional arguments passed to DataFrame constructor

{}

Returns:

Type Description
bytes

Excel file content as bytes

Source code in src/pydapter/extras/excel_.py
@classmethod
def to_obj(
    cls,
    subj: T | list[T],
    /,
    *,
    many: bool = True,
    adapt_meth: str = "model_dump",
    adapt_kw: dict | None = None,
    sheet_name: str = "Sheet1",
    **kw: Any,
) -> bytes:
    """
    Convert Pydantic model instances to Excel bytes.

    Args:
        subj: Single model instance or list of instances
        many: If True, handle as multiple instances
        adapt_meth: Method name to use for model dumping (default: "model_dump")
        sheet_name: Name of the Excel sheet (default: "Sheet1")
        **kw: Additional arguments passed to DataFrame constructor

    Returns:
        Excel file content as bytes
    """
    df = DataFrameAdapter.to_obj(
        subj, many=many, adapt_meth=adapt_meth, adapt_kw=adapt_kw, **kw
    )
    buf = io.BytesIO()
    with pd.ExcelWriter(buf, engine="xlsxwriter") as wr:
        df.to_excel(wr, sheet_name=sheet_name, index=False)
    return buf.getvalue()

Pandas Adapter

pydapter.extras.pandas_

DataFrame & Series adapters (require pandas).

Classes

DataFrameAdapter

Bases: Adapter[T]

Adapter for converting between Pydantic models and pandas DataFrames.

This adapter handles pandas DataFrame objects, providing methods to: - Convert DataFrame rows to Pydantic model instances - Convert Pydantic models to DataFrame rows - Handle both single records and multiple records

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("pd.DataFrame")

Example
import pandas as pd
from pydantic import BaseModel
from pydapter.extras.pandas_ import DataFrameAdapter

class Person(BaseModel):
    name: str
    age: int

# Create DataFrame
df = pd.DataFrame([
    {"name": "John", "age": 30},
    {"name": "Jane", "age": 25}
])

# Convert to Pydantic models
people = DataFrameAdapter.from_obj(Person, df, many=True)

# Convert back to DataFrame
df_output = DataFrameAdapter.to_obj(people, many=True)
Source code in src/pydapter/extras/pandas_.py
class DataFrameAdapter(Adapter[T]):
    """
    Adapter for converting between Pydantic models and pandas DataFrames.

    This adapter handles pandas DataFrame objects, providing methods to:
    - Convert DataFrame rows to Pydantic model instances
    - Convert Pydantic models to DataFrame rows
    - Handle both single records and multiple records

    Attributes:
        obj_key: The key identifier for this adapter type ("pd.DataFrame")

    Example:
        ```python
        import pandas as pd
        from pydantic import BaseModel
        from pydapter.extras.pandas_ import DataFrameAdapter

        class Person(BaseModel):
            name: str
            age: int

        # Create DataFrame
        df = pd.DataFrame([
            {"name": "John", "age": 30},
            {"name": "Jane", "age": 25}
        ])

        # Convert to Pydantic models
        people = DataFrameAdapter.from_obj(Person, df, many=True)

        # Convert back to DataFrame
        df_output = DataFrameAdapter.to_obj(people, many=True)
        ```
    """

    obj_key = "pd.DataFrame"

    @classmethod
    def from_obj(
        cls,
        subj_cls: type[T],
        obj: pd.DataFrame,
        /,
        *,
        many: bool = True,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw: Any,
    ) -> T | list[T]:
        """
        Convert DataFrame to Pydantic model instances.

        Args:
            subj_cls: The Pydantic model class to instantiate
            obj: The pandas DataFrame to convert
            many: If True, convert all rows; if False, convert only first row
            adapt_meth: Method name to call on subj_cls (default: "model_validate")
            **kw: Additional arguments passed to the adaptation method

        Returns:
            List of model instances if many=True, single instance if many=False
        """
        if many:
            return [
                getattr(subj_cls, adapt_meth)(r, **(adapt_kw or {}))
                for r in obj.to_dict(orient="records")
            ]
        return getattr(subj_cls, adapt_meth)(obj.iloc[0].to_dict(**kw), **(adapt_kw or {}))

    @classmethod
    def to_obj(
        cls,
        subj: T | list[T],
        /,
        *,
        many: bool = True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw: Any,
    ) -> pd.DataFrame:
        """
        Convert Pydantic model instances to pandas DataFrame.

        Args:
            subj: Single model instance or list of instances
            many: If True, handle as multiple instances
            adapt_meth: Method name to call on model instances (default: "model_dump")
            **kw: Additional arguments passed to DataFrame constructor

        Returns:
            pandas DataFrame with model data
        """
        items = subj if isinstance(subj, list) else [subj]
        return pd.DataFrame([getattr(i, adapt_meth)(**(adapt_kw or {})) for i in items], **kw)
Functions
from_obj(subj_cls, obj, /, *, many=True, adapt_meth='model_validate', adapt_kw=None, **kw) classmethod

Convert DataFrame to Pydantic model instances.

Parameters:

Name Type Description Default
subj_cls type[T]

The Pydantic model class to instantiate

required
obj DataFrame

The pandas DataFrame to convert

required
many bool

If True, convert all rows; if False, convert only first row

True
adapt_meth str

Method name to call on subj_cls (default: "model_validate")

'model_validate'
**kw Any

Additional arguments passed to the adaptation method

{}

Returns:

Type Description
T | list[T]

List of model instances if many=True, single instance if many=False

Source code in src/pydapter/extras/pandas_.py
@classmethod
def from_obj(
    cls,
    subj_cls: type[T],
    obj: pd.DataFrame,
    /,
    *,
    many: bool = True,
    adapt_meth: str = "model_validate",
    adapt_kw: dict | None = None,
    **kw: Any,
) -> T | list[T]:
    """
    Convert DataFrame to Pydantic model instances.

    Args:
        subj_cls: The Pydantic model class to instantiate
        obj: The pandas DataFrame to convert
        many: If True, convert all rows; if False, convert only first row
        adapt_meth: Method name to call on subj_cls (default: "model_validate")
        **kw: Additional arguments passed to the adaptation method

    Returns:
        List of model instances if many=True, single instance if many=False
    """
    if many:
        return [
            getattr(subj_cls, adapt_meth)(r, **(adapt_kw or {}))
            for r in obj.to_dict(orient="records")
        ]
    return getattr(subj_cls, adapt_meth)(obj.iloc[0].to_dict(**kw), **(adapt_kw or {}))
to_obj(subj, /, *, many=True, adapt_meth='model_dump', adapt_kw=None, **kw) classmethod

Convert Pydantic model instances to pandas DataFrame.

Parameters:

Name Type Description Default
subj T | list[T]

Single model instance or list of instances

required
many bool

If True, handle as multiple instances

True
adapt_meth str

Method name to call on model instances (default: "model_dump")

'model_dump'
**kw Any

Additional arguments passed to DataFrame constructor

{}

Returns:

Type Description
DataFrame

pandas DataFrame with model data

Source code in src/pydapter/extras/pandas_.py
@classmethod
def to_obj(
    cls,
    subj: T | list[T],
    /,
    *,
    many: bool = True,
    adapt_meth: str = "model_dump",
    adapt_kw: dict | None = None,
    **kw: Any,
) -> pd.DataFrame:
    """
    Convert Pydantic model instances to pandas DataFrame.

    Args:
        subj: Single model instance or list of instances
        many: If True, handle as multiple instances
        adapt_meth: Method name to call on model instances (default: "model_dump")
        **kw: Additional arguments passed to DataFrame constructor

    Returns:
        pandas DataFrame with model data
    """
    items = subj if isinstance(subj, list) else [subj]
    return pd.DataFrame([getattr(i, adapt_meth)(**(adapt_kw or {})) for i in items], **kw)

SeriesAdapter

Bases: Adapter[T]

Adapter for converting between Pydantic models and pandas Series.

This adapter handles pandas Series objects, providing methods to: - Convert Series to a single Pydantic model instance - Convert Pydantic model to Series - Only supports single records (many=False)

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("pd.Series")

Example
import pandas as pd
from pydantic import BaseModel
from pydapter.extras.pandas_ import SeriesAdapter

class Person(BaseModel):
    name: str
    age: int

# Create Series
series = pd.Series({"name": "John", "age": 30})

# Convert to Pydantic model
person = SeriesAdapter.from_obj(Person, series)

# Convert back to Series
series_output = SeriesAdapter.to_obj(person)
Source code in src/pydapter/extras/pandas_.py
class SeriesAdapter(Adapter[T]):
    """
    Adapter for converting between Pydantic models and pandas Series.

    This adapter handles pandas Series objects, providing methods to:
    - Convert Series to a single Pydantic model instance
    - Convert Pydantic model to Series
    - Only supports single records (many=False)

    Attributes:
        obj_key: The key identifier for this adapter type ("pd.Series")

    Example:
        ```python
        import pandas as pd
        from pydantic import BaseModel
        from pydapter.extras.pandas_ import SeriesAdapter

        class Person(BaseModel):
            name: str
            age: int

        # Create Series
        series = pd.Series({"name": "John", "age": 30})

        # Convert to Pydantic model
        person = SeriesAdapter.from_obj(Person, series)

        # Convert back to Series
        series_output = SeriesAdapter.to_obj(person)
        ```
    """

    obj_key = "pd.Series"

    @classmethod
    def from_obj(
        cls,
        subj_cls: type[T],
        obj: pd.Series,
        /,
        *,
        many: bool = False,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw: Any,
    ) -> T:
        """
        Convert pandas Series to Pydantic model instance.

        Args:
            subj_cls: The Pydantic model class to instantiate
            obj: The pandas Series to convert
            many: Must be False (Series only supports single records)
            adapt_meth: Method name to call on subj_cls (default: "model_validate")
            **kw: Additional arguments passed to the adaptation method

        Returns:
            Single model instance

        Raises:
            ValueError: If many=True is specified
        """
        if many:
            raise ValueError("SeriesAdapter supports single records only.")
        return getattr(subj_cls, adapt_meth)(obj.to_dict(**kw), **(adapt_kw or {}))

    @classmethod
    def to_obj(
        cls,
        subj: T | list[T],
        /,
        *,
        many: bool = False,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw: Any,
    ) -> pd.Series:
        if many or isinstance(subj, list):
            raise ValueError("SeriesAdapter supports single records only.")
        return pd.Series(getattr(subj, adapt_meth)(**(adapt_kw or {})), **kw)
Functions
from_obj(subj_cls, obj, /, *, many=False, adapt_meth='model_validate', adapt_kw=None, **kw) classmethod

Convert pandas Series to Pydantic model instance.

Parameters:

Name Type Description Default
subj_cls type[T]

The Pydantic model class to instantiate

required
obj Series

The pandas Series to convert

required
many bool

Must be False (Series only supports single records)

False
adapt_meth str

Method name to call on subj_cls (default: "model_validate")

'model_validate'
**kw Any

Additional arguments passed to the adaptation method

{}

Returns:

Type Description
T

Single model instance

Raises:

Type Description
ValueError

If many=True is specified

Source code in src/pydapter/extras/pandas_.py
@classmethod
def from_obj(
    cls,
    subj_cls: type[T],
    obj: pd.Series,
    /,
    *,
    many: bool = False,
    adapt_meth: str = "model_validate",
    adapt_kw: dict | None = None,
    **kw: Any,
) -> T:
    """
    Convert pandas Series to Pydantic model instance.

    Args:
        subj_cls: The Pydantic model class to instantiate
        obj: The pandas Series to convert
        many: Must be False (Series only supports single records)
        adapt_meth: Method name to call on subj_cls (default: "model_validate")
        **kw: Additional arguments passed to the adaptation method

    Returns:
        Single model instance

    Raises:
        ValueError: If many=True is specified
    """
    if many:
        raise ValueError("SeriesAdapter supports single records only.")
    return getattr(subj_cls, adapt_meth)(obj.to_dict(**kw), **(adapt_kw or {}))

SQL Adapter

pydapter.extras.sql_

Generic SQL adapter using SQLAlchemy Core (requires sqlalchemy>=2.0).

Classes

SQLAdapter

Bases: Adapter[T]

Generic SQL adapter using SQLAlchemy Core for database operations.

This adapter provides methods to: - Execute SQL queries and convert results to Pydantic models - Insert Pydantic models as rows into database tables - Support for various SQL databases through SQLAlchemy - Handle both raw SQL and table-based operations

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("sql")

Example
import sqlalchemy as sa
from pydantic import BaseModel
from pydapter.extras.sql_ import SQLAdapter

class User(BaseModel):
    id: int
    name: str
    email: str

# Setup database connection
engine = sa.create_engine("sqlite:///example.db")
metadata = sa.MetaData()

# Query from database
query = "SELECT id, name, email FROM users WHERE active = true"
users = SQLAdapter.from_obj(
    User,
    query,
    many=True,
    engine=engine
)

# Insert to database
new_users = [User(id=1, name="John", email="john@example.com")]
SQLAdapter.to_obj(
    new_users,
    many=True,
    table="users",
    metadata=metadata
)
Source code in src/pydapter/extras/sql_.py
class SQLAdapter(Adapter[T]):
    """
    Generic SQL adapter using SQLAlchemy Core for database operations.

    This adapter provides methods to:
    - Execute SQL queries and convert results to Pydantic models
    - Insert Pydantic models as rows into database tables
    - Support for various SQL databases through SQLAlchemy
    - Handle both raw SQL and table-based operations

    Attributes:
        obj_key: The key identifier for this adapter type ("sql")

    Example:
        ```python
        import sqlalchemy as sa
        from pydantic import BaseModel
        from pydapter.extras.sql_ import SQLAdapter

        class User(BaseModel):
            id: int
            name: str
            email: str

        # Setup database connection
        engine = sa.create_engine("sqlite:///example.db")
        metadata = sa.MetaData()

        # Query from database
        query = "SELECT id, name, email FROM users WHERE active = true"
        users = SQLAdapter.from_obj(
            User,
            query,
            many=True,
            engine=engine
        )

        # Insert to database
        new_users = [User(id=1, name="John", email="john@example.com")]
        SQLAdapter.to_obj(
            new_users,
            many=True,
            table="users",
            metadata=metadata
        )
        ```
    """

    obj_key = "sql"

    @staticmethod
    def _table(metadata: sa.MetaData, table: str, engine=None) -> sa.Table:
        """
        Helper method to get a SQLAlchemy Table object with autoloading.

        Args:
            metadata: SQLAlchemy MetaData instance
            table: Name of the table to load
            engine: Optional SQLAlchemy engine for autoloading

        Returns:
            SQLAlchemy Table object

        Raises:
            ResourceError: If table is not found or cannot be accessed
        """
        try:
            # Use engine if provided, otherwise use metadata.bind
            autoload_with = engine if engine is not None else metadata.bind  # type: ignore
            return sa.Table(table, metadata, autoload_with=autoload_with)
        except sq_exc.NoSuchTableError as e:
            raise ResourceError(f"Table '{table}' not found", resource=table) from e
        except Exception as e:
            raise ResourceError(f"Error accessing table '{table}': {e}", resource=table) from e

    # ---- incoming
    @classmethod
    def from_obj(
        cls,
        subj_cls: type[T],
        obj: dict,
        /,
        *,
        many: bool = True,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate required parameters
            if "engine_url" not in obj:
                raise AdapterValidationError("Missing required parameter 'engine_url'", data=obj)
            if "table" not in obj:
                raise AdapterValidationError("Missing required parameter 'table'", data=obj)

            # Create engine and connect to database
            try:
                eng = sa.create_engine(obj["engine_url"], future=True)
            except Exception as e:
                raise ConnectionError(
                    f"Failed to create database engine: {e}",
                    adapter="sql",
                    url=obj["engine_url"],
                ) from e

            # Create metadata and get table
            try:
                md = sa.MetaData()
                md.reflect(bind=eng)
                tbl = cls._table(md, obj["table"], engine=eng)
            except ResourceError:
                # Re-raise ResourceError from _table
                raise
            except Exception as e:
                raise ResourceError(
                    f"Error accessing table metadata: {e}",
                    resource=obj["table"],
                ) from e

            # Build query
            stmt = sa.select(tbl).filter_by(**obj.get("selectors", {}))

            # Execute query
            try:
                with eng.begin() as conn:
                    rows = conn.execute(stmt).fetchall()
            except Exception as e:
                raise QueryError(
                    f"Error executing query: {e}",
                    query=str(stmt),
                    adapter="sql",
                ) from e

            # Handle empty result set
            if not rows:
                if many:
                    return []
                raise ResourceError(
                    "No rows found matching the query",
                    resource=obj["table"],
                    selectors=obj.get("selectors", {}),
                )

            # Convert rows to model instances
            try:
                validate_func = getattr(subj_cls, adapt_meth)
                if many:
                    return [validate_func(r._mapping, **(adapt_kw or {})) for r in rows]
                return validate_func(rows[0]._mapping, **(adapt_kw or {}))
            except ValidationError as e:
                raise AdapterValidationError(
                    f"Validation error: {e}",
                    data=rows[0]._mapping if not many else [r._mapping for r in rows],
                    errors=e.errors(),
                ) from e

        except AdapterError:
            raise
        except Exception as e:
            # Wrap other exceptions
            raise QueryError(f"Unexpected error in SQL adapter: {e}", adapter="sql")

    # ---- outgoing
    @classmethod
    def to_obj(
        cls,
        subj: T | Sequence[T],
        /,
        *,
        engine_url: str,
        table: str,
        many: bool = True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw,
    ) -> dict[str, Any]:
        try:
            # Validate required parameters
            if not engine_url:
                raise AdapterValidationError("Missing required parameter 'engine_url'")
            if not table:
                raise AdapterValidationError("Missing required parameter 'table'")

            # Create engine and connect to database
            try:
                eng = sa.create_engine(engine_url, future=True)
            except Exception as e:
                raise ConnectionError(
                    f"Failed to create database engine: {e}",
                    adapter="sql",
                    url=engine_url,
                ) from e

            # Create metadata and get table
            try:
                md = sa.MetaData()
                md.reflect(bind=eng)
                tbl = cls._table(md, table, engine=eng)
            except ResourceError:
                # Re-raise ResourceError from _table
                raise
            except Exception as e:
                raise ResourceError(
                    f"Error accessing table metadata: {e}",
                    resource=table,
                ) from e

            # Prepare data
            items = subj if isinstance(subj, Sequence) else [subj]
            if not items:
                return {"success": True, "count": 0}  # Nothing to insert

            rows = [getattr(i, adapt_meth)(**(adapt_kw or {})) for i in items]

            # Execute insert or update (upsert)
            try:
                with eng.begin() as conn:
                    # Get primary key columns
                    pk_columns = [c.name for c in tbl.primary_key.columns]

                    if not pk_columns:
                        # If no primary key, just insert
                        conn.execute(sa.insert(tbl), rows)
                    else:
                        # For PostgreSQL, use ON CONFLICT DO UPDATE
                        for row in rows:
                            # Build the values to update (excluding primary key columns)
                            update_values = {k: v for k, v in row.items() if k not in pk_columns}
                            if not update_values:
                                # If only primary key columns, just try to insert
                                stmt = sa.insert(tbl).values(**row)
                            else:
                                # Otherwise, do an upsert
                                stmt = postgresql.insert(tbl).values(**row)
                                stmt = stmt.on_conflict_do_update(
                                    index_elements=pk_columns, set_=update_values
                                )
                            conn.execute(stmt)
            except Exception as e:
                raise QueryError(
                    f"Error executing insert/update: {e}",
                    query=f"UPSERT INTO {table}",
                    adapter="sql",
                ) from e

            # Return a success indicator instead of None
            return {"success": True, "count": len(rows)}

        except AdapterError:
            raise
        except Exception as e:
            # Wrap other exceptions
            raise QueryError(f"Unexpected error in SQL adapter: {e}", adapter="sql")

PostgreSQL Adapter

pydapter.extras.postgres_

PostgresAdapter - thin preset over SQLAdapter (pgvector-ready if you add vec column).

Classes

PostgresAdapter

Bases: SQLAdapter[T]

PostgreSQL-specific adapter extending SQLAdapter with PostgreSQL optimizations.

This adapter provides: - PostgreSQL-specific connection handling and error messages - Default PostgreSQL connection string - Enhanced error handling for common PostgreSQL issues - Support for pgvector when vector columns are present

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("postgres")

DEFAULT

Default PostgreSQL connection string

Example
from pydantic import BaseModel
from pydapter.extras.postgres_ import PostgresAdapter

class User(BaseModel):
    id: int
    name: str
    email: str

# Query with custom connection
query_config = {
    "query": "SELECT id, name, email FROM users WHERE active = true",
    "engine_url": "postgresql+psycopg://user:pass@localhost/mydb"
}
users = PostgresAdapter.from_obj(User, query_config, many=True)

# Insert with default connection
insert_config = {
    "table": "users",
    "engine_url": "postgresql+psycopg://user:pass@localhost/mydb"
}
new_users = [User(id=1, name="John", email="john@example.com")]
PostgresAdapter.to_obj(new_users, insert_config, many=True)
Source code in src/pydapter/extras/postgres_.py
class PostgresAdapter(SQLAdapter[T]):
    """
    PostgreSQL-specific adapter extending SQLAdapter with PostgreSQL optimizations.

    This adapter provides:
    - PostgreSQL-specific connection handling and error messages
    - Default PostgreSQL connection string
    - Enhanced error handling for common PostgreSQL issues
    - Support for pgvector when vector columns are present

    Attributes:
        obj_key: The key identifier for this adapter type ("postgres")
        DEFAULT: Default PostgreSQL connection string

    Example:
        ```python
        from pydantic import BaseModel
        from pydapter.extras.postgres_ import PostgresAdapter

        class User(BaseModel):
            id: int
            name: str
            email: str

        # Query with custom connection
        query_config = {
            "query": "SELECT id, name, email FROM users WHERE active = true",
            "engine_url": "postgresql+psycopg://user:pass@localhost/mydb"
        }
        users = PostgresAdapter.from_obj(User, query_config, many=True)

        # Insert with default connection
        insert_config = {
            "table": "users",
            "engine_url": "postgresql+psycopg://user:pass@localhost/mydb"
        }
        new_users = [User(id=1, name="John", email="john@example.com")]
        PostgresAdapter.to_obj(new_users, insert_config, many=True)
        ```
    """

    obj_key = "postgres"
    DEFAULT = "postgresql+psycopg://user:pass@localhost/db"

    @classmethod
    def from_obj(
        cls,
        subj_cls,
        obj: dict,
        /,
        *,
        many: bool = True,
        adapt_meth: str = "model_validate",
        **kw,
    ):
        try:
            # Set default connection string if not provided
            obj.setdefault("engine_url", cls.DEFAULT)

            # Add PostgreSQL-specific error handling
            try:
                return super().from_obj(subj_cls, obj, many=many, adapt_meth=adapt_meth, **kw)
            except Exception as e:
                # Check for common PostgreSQL-specific errors
                error_str = str(e).lower()
                if "authentication" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL authentication failed: {e}",
                        adapter="postgres",
                        url=obj["engine_url"],
                    ) from e
                elif "connection" in error_str and "refused" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL connection refused: {e}",
                        adapter="postgres",
                        url=obj["engine_url"],
                    ) from e
                elif "does not exist" in error_str and "database" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL database does not exist: {e}",
                        adapter="postgres",
                        url=obj["engine_url"],
                    ) from e
                # Re-raise the original exception
                raise

        except ConnectionError:
            # Re-raise ConnectionError
            raise
        except Exception as e:
            # Wrap other exceptions
            raise ConnectionError(
                f"Unexpected error in PostgreSQL adapter: {e}",
                adapter="postgres",
                url=obj.get("engine_url", cls.DEFAULT),
            ) from e

    @classmethod
    def to_obj(cls, subj, /, *, many: bool = True, adapt_meth: str = "model_dump", **kw):
        try:
            # Set default connection string if not provided
            kw.setdefault("engine_url", cls.DEFAULT)

            # Add PostgreSQL-specific error handling
            try:
                return super().to_obj(subj, many=many, adapt_meth=adapt_meth, **kw)
            except Exception as e:
                # Check for common PostgreSQL-specific errors
                error_str = str(e).lower()
                if "authentication" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL authentication failed: {e}",
                        adapter="postgres",
                        url=kw["engine_url"],
                    ) from e
                elif "connection" in error_str and "refused" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL connection refused: {e}",
                        adapter="postgres",
                        url=kw["engine_url"],
                    ) from e
                elif "does not exist" in error_str and "database" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL database does not exist: {e}",
                        adapter="postgres",
                        url=kw["engine_url"],
                    ) from e
                # Re-raise the original exception
                raise

        except ConnectionError:
            # Re-raise ConnectionError
            raise
        except Exception as e:
            # Wrap other exceptions
            raise ConnectionError(
                f"Unexpected error in PostgreSQL adapter: {e}",
                adapter="postgres",
                url=kw.get("engine_url", cls.DEFAULT),
            ) from e

MongoDB Adapter

pydapter.extras.mongo_

MongoDB adapter (requires pymongo).

Classes

MongoAdapter

Bases: Adapter[T]

MongoDB adapter for converting between Pydantic models and MongoDB documents.

This adapter provides methods to: - Query MongoDB collections and convert documents to Pydantic models - Insert Pydantic models as documents into MongoDB collections - Handle MongoDB connection management and error handling - Support for various MongoDB operations (find, insert, update, delete)

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("mongo")

Example
from pydantic import BaseModel
from pydapter.extras.mongo_ import MongoAdapter

class User(BaseModel):
    name: str
    email: str
    age: int

# Query from MongoDB
query_config = {
    "url": "mongodb://localhost:27017",
    "database": "myapp",
    "collection": "users",
    "filter": {"age": {"$gte": 18}}
}
users = MongoAdapter.from_obj(User, query_config, many=True)

# Insert to MongoDB
insert_config = {
    "url": "mongodb://localhost:27017",
    "database": "myapp",
    "collection": "users"
}
new_users = [User(name="John", email="john@example.com", age=30)]
MongoAdapter.to_obj(new_users, insert_config, many=True)
Source code in src/pydapter/extras/mongo_.py
class MongoAdapter(Adapter[T]):
    """
    MongoDB adapter for converting between Pydantic models and MongoDB documents.

    This adapter provides methods to:
    - Query MongoDB collections and convert documents to Pydantic models
    - Insert Pydantic models as documents into MongoDB collections
    - Handle MongoDB connection management and error handling
    - Support for various MongoDB operations (find, insert, update, delete)

    Attributes:
        obj_key: The key identifier for this adapter type ("mongo")

    Example:
        ```python
        from pydantic import BaseModel
        from pydapter.extras.mongo_ import MongoAdapter

        class User(BaseModel):
            name: str
            email: str
            age: int

        # Query from MongoDB
        query_config = {
            "url": "mongodb://localhost:27017",
            "database": "myapp",
            "collection": "users",
            "filter": {"age": {"$gte": 18}}
        }
        users = MongoAdapter.from_obj(User, query_config, many=True)

        # Insert to MongoDB
        insert_config = {
            "url": "mongodb://localhost:27017",
            "database": "myapp",
            "collection": "users"
        }
        new_users = [User(name="John", email="john@example.com", age=30)]
        MongoAdapter.to_obj(new_users, insert_config, many=True)
        ```
    """

    obj_key = "mongo"

    @classmethod
    def _client(cls, url: str) -> pymongo.MongoClient:
        """
        Create a MongoDB client with proper error handling.

        Args:
            url: MongoDB connection string

        Returns:
            pymongo.MongoClient instance

        Raises:
            ConnectionError: If connection cannot be established
        """
        try:
            return pymongo.MongoClient(url, serverSelectionTimeoutMS=5000)
        except pymongo.errors.ConfigurationError as e:
            raise ConnectionError(
                f"Invalid MongoDB connection string: {e}", adapter="mongo", url=url
            ) from e
        except Exception as e:
            raise ConnectionError(
                f"Failed to create MongoDB client: {e}", adapter="mongo", url=url
            ) from e

    @classmethod
    def _validate_connection(cls, client: pymongo.MongoClient) -> None:
        """Validate that the MongoDB connection is working."""
        try:
            # This will raise an exception if the connection fails
            client.admin.command("ping")
        except pymongo.errors.ServerSelectionTimeoutError as e:
            raise ConnectionError(f"MongoDB server selection timeout: {e}", adapter="mongo") from e
        except pymongo.errors.OperationFailure as e:
            if "auth failed" in str(e).lower():
                raise ConnectionError(f"MongoDB authentication failed: {e}", adapter="mongo") from e
            raise QueryError(f"MongoDB operation failure: {e}", adapter="mongo") from e
        except Exception as e:
            raise ConnectionError(f"Failed to connect to MongoDB: {e}", adapter="mongo") from e

    # incoming
    @classmethod
    def from_obj(
        cls,
        subj_cls: type[T],
        obj: dict,
        /,
        *,
        many=True,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate required parameters
            if "url" not in obj:
                raise AdapterValidationError("Missing required parameter 'url'", data=obj)
            if "db" not in obj:
                raise AdapterValidationError("Missing required parameter 'db'", data=obj)
            if "collection" not in obj:
                raise AdapterValidationError("Missing required parameter 'collection'", data=obj)

            # Create client and validate connection
            client = cls._client(obj["url"])
            cls._validate_connection(client)

            # Get collection and execute query
            try:
                coll = client[obj["db"]][obj["collection"]]
                filter_query = obj.get("filter") or {}

                # Validate filter query if provided
                if filter_query and not isinstance(filter_query, dict):
                    raise AdapterValidationError(
                        "Filter must be a dictionary",
                        data=filter_query,
                    )

                docs = list(coll.find(filter_query))
            except pymongo.errors.OperationFailure as e:
                if "not authorized" in str(e).lower():
                    raise ConnectionError(
                        f"Not authorized to access {obj['db']}.{obj['collection']}: {e}",
                        adapter="mongo",
                        url=obj["url"],
                    ) from e
                raise QueryError(
                    f"MongoDB query error: {e}",
                    query=filter_query,
                    adapter="mongo",
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Error executing MongoDB query: {e}",
                    query=filter_query,
                    adapter="mongo",
                ) from e

            # Handle empty result set
            if not docs:
                if many:
                    return []
                raise ResourceError(
                    "No documents found matching the query",
                    resource=f"{obj['db']}.{obj['collection']}",
                    filter=filter_query,
                )

            # Convert documents to model instances
            try:
                if many:
                    return [getattr(subj_cls, adapt_meth)(d, **(adapt_kw or {})) for d in docs]
                return getattr(subj_cls, adapt_meth)(docs[0], **(adapt_kw or {}))
            except ValidationError as e:
                raise AdapterValidationError(
                    f"Validation error: {e}",
                    data=docs[0] if not many else docs,
                    errors=e.errors(),
                ) from e

        except AdapterError:
            raise

        except Exception as e:
            raise QueryError(f"Unexpected error in MongoDB adapter: {e}", adapter="mongo")

    # outgoing
    @classmethod
    def to_obj(
        cls,
        subj: T | Sequence[T],
        /,
        *,
        url,
        db,
        collection,
        many=True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate required parameters
            if not url:
                raise AdapterValidationError("Missing required parameter 'url'")
            if not db:
                raise AdapterValidationError("Missing required parameter 'db'")
            if not collection:
                raise AdapterValidationError("Missing required parameter 'collection'")

            # Create client and validate connection
            client = cls._client(url)
            cls._validate_connection(client)

            # Prepare data
            items = subj if isinstance(subj, Sequence) else [subj]
            if not items:
                return None  # Nothing to insert

            payload = [getattr(i, adapt_meth)(**(adapt_kw or {})) for i in items]

            # Execute insert
            try:
                result = client[db][collection].insert_many(payload)
                return {"inserted_count": result.inserted_ids}
            except pymongo.errors.BulkWriteError as e:
                raise QueryError(
                    f"MongoDB bulk write error: {e}",
                    adapter="mongo",
                ) from e
            except pymongo.errors.OperationFailure as e:
                if "not authorized" in str(e).lower():
                    raise ConnectionError(
                        f"Not authorized to write to {db}.{collection}: {e}",
                        adapter="mongo",
                        url=url,
                    ) from e
                raise QueryError(
                    f"MongoDB operation failure: {e}",
                    adapter="mongo",
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Error inserting documents into MongoDB: {e}",
                    adapter="mongo",
                ) from e

        except AdapterError:
            raise

        except Exception as e:
            # Wrap other exceptions
            raise QueryError(f"Unexpected error in MongoDB adapter: {e}", adapter="mongo")

Neo4j Adapter

pydapter.extras.neo4j_

Neo4j adapter (requires neo4j).

Classes

Neo4jAdapter

Bases: Adapter[T]

Neo4j graph database adapter for converting between Pydantic models and Neo4j nodes/relationships.

This adapter provides methods to: - Execute Cypher queries and convert results to Pydantic models - Create nodes and relationships from Pydantic models - Handle Neo4j connection management and error handling - Support for complex graph operations and traversals

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("neo4j")

Example
from pydantic import BaseModel
from pydapter.extras.neo4j_ import Neo4jAdapter
from neo4j import basic_auth

class Person(BaseModel):
    name: str
    age: int
    city: str

# Query from Neo4j
query_config = {
    "url": "bolt://localhost:7687",
    "auth": basic_auth("neo4j", "password"),
    "query": "MATCH (p:Person) WHERE p.age >= 18 RETURN p.name, p.age, p.city"
}
people = Neo4jAdapter.from_obj(Person, query_config, many=True)

# Create nodes in Neo4j
create_config = {
    "url": "bolt://localhost:7687",
    "auth": basic_auth("neo4j", "password"),
    "query": "CREATE (p:Person {name: $name, age: $age, city: $city})"
}
new_people = [Person(name="John", age=30, city="NYC")]
Neo4jAdapter.to_obj(new_people, create_config, many=True)
Source code in src/pydapter/extras/neo4j_.py
class Neo4jAdapter(Adapter[T]):
    """
    Neo4j graph database adapter for converting between Pydantic models and Neo4j nodes/relationships.

    This adapter provides methods to:
    - Execute Cypher queries and convert results to Pydantic models
    - Create nodes and relationships from Pydantic models
    - Handle Neo4j connection management and error handling
    - Support for complex graph operations and traversals

    Attributes:
        obj_key: The key identifier for this adapter type ("neo4j")

    Example:
        ```python
        from pydantic import BaseModel
        from pydapter.extras.neo4j_ import Neo4jAdapter
        from neo4j import basic_auth

        class Person(BaseModel):
            name: str
            age: int
            city: str

        # Query from Neo4j
        query_config = {
            "url": "bolt://localhost:7687",
            "auth": basic_auth("neo4j", "password"),
            "query": "MATCH (p:Person) WHERE p.age >= 18 RETURN p.name, p.age, p.city"
        }
        people = Neo4jAdapter.from_obj(Person, query_config, many=True)

        # Create nodes in Neo4j
        create_config = {
            "url": "bolt://localhost:7687",
            "auth": basic_auth("neo4j", "password"),
            "query": "CREATE (p:Person {name: $name, age: $age, city: $city})"
        }
        new_people = [Person(name="John", age=30, city="NYC")]
        Neo4jAdapter.to_obj(new_people, create_config, many=True)
        ```
    """

    obj_key = "neo4j"

    @classmethod
    def _create_driver(cls, url: str, auth=None) -> neo4j.Driver:
        """
        Create a Neo4j driver with proper error handling.

        Args:
            url: Neo4j connection URL (e.g., "bolt://localhost:7687")
            auth: Authentication tuple or None for no auth

        Returns:
            neo4j.Driver instance

        Raises:
            ConnectionError: If connection cannot be established or auth fails
        """
        try:
            if auth:
                return GraphDatabase.driver(url, auth=auth)
            else:
                return GraphDatabase.driver(url)
        except neo4j.exceptions.ServiceUnavailable as e:
            raise ConnectionError(
                f"Neo4j service unavailable: {e}", adapter="neo4j", url=url
            ) from e
        except neo4j.exceptions.AuthError as e:
            raise ConnectionError(
                f"Neo4j authentication failed: {e}", adapter="neo4j", url=url
            ) from e
        except Exception as e:
            raise ConnectionError(
                f"Failed to create Neo4j driver: {e}", adapter="neo4j", url=url
            ) from e

    @classmethod
    def _validate_cypher(cls, cypher: str) -> None:
        """Basic validation for Cypher queries to prevent injection."""
        # Check for unescaped backticks in label names
        if re.search(r"`[^`]*`[^`]*`", cypher):
            raise QueryError(
                "Invalid Cypher query: Possible injection in label name",
                query=cypher,
                adapter="neo4j",
            )

    # incoming
    @classmethod
    def from_obj(
        cls,
        subj_cls: type[T],
        obj: dict,
        /,
        *,
        many=True,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate required parameters
            if "url" not in obj:
                raise AdapterValidationError("Missing required parameter 'url'", data=obj)

            # Create driver
            auth = obj.get("auth")
            driver = cls._create_driver(obj["url"], auth=auth)

            # Prepare Cypher query
            label = obj.get("label", subj_cls.__name__)
            where = f"WHERE {obj['where']}" if "where" in obj else ""
            cypher = f"MATCH (n:`{label}`) {where} RETURN n"

            # Validate Cypher query
            cls._validate_cypher(cypher)

            # Execute query
            try:
                with driver.session() as s:
                    result = s.run(cypher)
                    rows = [r["n"]._properties for r in result]
            except neo4j.exceptions.CypherSyntaxError as e:
                raise QueryError(
                    f"Neo4j Cypher syntax error: {e}",
                    query=cypher,
                    adapter="neo4j",
                ) from e
            except neo4j.exceptions.ClientError as e:
                if "not found" in str(e).lower():
                    raise ResourceError(
                        f"Neo4j resource not found: {e}",
                        resource=label,
                    ) from e
                raise QueryError(
                    f"Neo4j client error: {e}",
                    query=cypher,
                    adapter="neo4j",
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Error executing Neo4j query: {e}",
                    query=cypher,
                    adapter="neo4j",
                ) from e
            finally:
                driver.close()

            # Handle empty result set
            if not rows:
                if many:
                    return []
                raise ResourceError(
                    "No nodes found matching the query",
                    resource=label,
                    where=obj.get("where", ""),
                )

            # Convert rows to model instances
            try:
                if many:
                    return [getattr(subj_cls, adapt_meth)(r, **(adapt_kw or {})) for r in rows]
                return getattr(subj_cls, adapt_meth)(rows[0], **(adapt_kw or {}))
            except ValidationError as e:
                raise AdapterValidationError(
                    f"Validation error: {e}",
                    data=rows[0] if not many else rows,
                    errors=e.errors(),
                ) from e

        except (ConnectionError, QueryError, ResourceError, AdapterValidationError):
            # Re-raise our custom exceptions
            raise
        except Exception as e:
            # Wrap other exceptions
            raise QueryError(f"Unexpected error in Neo4j adapter: {e}", adapter="neo4j")

    # outgoing
    @classmethod
    def to_obj(
        cls,
        subj: T | Sequence[T],
        /,
        *,
        url,
        auth=None,
        label=None,
        merge_on="id",
        many: bool = True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate required parameters
            if not url:
                raise AdapterValidationError("Missing required parameter 'url'")
            if not merge_on:
                raise AdapterValidationError("Missing required parameter 'merge_on'")

            # Prepare data
            items = subj if isinstance(subj, Sequence) else [subj]
            if not items:
                return None  # Nothing to insert

            # Get label from first item if not provided
            label = label or items[0].__class__.__name__

            # Create driver
            driver = cls._create_driver(url, auth=auth)

            try:
                with driver.session() as s:
                    results = []
                    for it in items:
                        props = getattr(it, adapt_meth)(**(adapt_kw or {}))

                        # Check if merge_on property exists
                        if merge_on not in props:
                            raise AdapterValidationError(
                                f"Merge property '{merge_on}' not found in model",
                                data=props,
                            )

                        # Prepare and validate Cypher query
                        cypher = f"MERGE (n:`{label}` {{{merge_on}: $val}}) SET n += $props"
                        cls._validate_cypher(cypher)

                        # Execute query
                        try:
                            result = s.run(cypher, val=props[merge_on], props=props)
                            results.append(result)
                        except neo4j.exceptions.CypherSyntaxError as e:
                            raise QueryError(
                                f"Neo4j Cypher syntax error: {e}",
                                query=cypher,
                                adapter="neo4j",
                            ) from e
                        except neo4j.exceptions.ConstraintError as e:
                            raise QueryError(
                                f"Neo4j constraint violation: {e}",
                                query=cypher,
                                adapter="neo4j",
                            ) from e
                        except Exception as e:
                            raise QueryError(
                                f"Error executing Neo4j query: {e}",
                                query=cypher,
                                adapter="neo4j",
                            ) from e

                    return {"merged_count": len(results)}
            finally:
                driver.close()

        except (ConnectionError, QueryError, AdapterValidationError):
            # Re-raise our custom exceptions
            raise
        except Exception as e:
            # Wrap other exceptions
            raise QueryError(f"Unexpected error in Neo4j adapter: {e}", adapter="neo4j")

Qdrant Adapter

pydapter.extras.qdrant_

Qdrant vector-store adapter (requires qdrant-client).

Classes

QdrantAdapter

Bases: Adapter[T]

Qdrant vector database adapter for converting between Pydantic models and vector embeddings.

This adapter provides methods to: - Search for similar vectors and convert results to Pydantic models - Insert Pydantic models as vector points into Qdrant collections - Handle vector similarity operations and metadata filtering - Support for both cloud and self-hosted Qdrant instances

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("qdrant")

Example
from pydantic import BaseModel
from pydapter.extras.qdrant_ import QdrantAdapter

class Document(BaseModel):
    id: str
    text: str
    embedding: list[float]
    category: str

# Search for similar vectors
search_config = {
    "url": "http://localhost:6333",
    "collection_name": "documents",
    "query_vector": [0.1, 0.2, 0.3, ...],  # 768-dim vector
    "limit": 10,
    "score_threshold": 0.8
}
similar_docs = QdrantAdapter.from_obj(Document, search_config, many=True)

# Insert documents with vectors
insert_config = {
    "url": "http://localhost:6333",
    "collection_name": "documents"
}
new_docs = [Document(
    id="doc1",
    text="Sample text",
    embedding=[0.1, 0.2, 0.3, ...],
    category="tech"
)]
QdrantAdapter.to_obj(new_docs, insert_config, many=True)
Source code in src/pydapter/extras/qdrant_.py
class QdrantAdapter(Adapter[T]):
    """
    Qdrant vector database adapter for converting between Pydantic models and vector embeddings.

    This adapter provides methods to:
    - Search for similar vectors and convert results to Pydantic models
    - Insert Pydantic models as vector points into Qdrant collections
    - Handle vector similarity operations and metadata filtering
    - Support for both cloud and self-hosted Qdrant instances

    Attributes:
        obj_key: The key identifier for this adapter type ("qdrant")

    Example:
        ```python
        from pydantic import BaseModel
        from pydapter.extras.qdrant_ import QdrantAdapter

        class Document(BaseModel):
            id: str
            text: str
            embedding: list[float]
            category: str

        # Search for similar vectors
        search_config = {
            "url": "http://localhost:6333",
            "collection_name": "documents",
            "query_vector": [0.1, 0.2, 0.3, ...],  # 768-dim vector
            "limit": 10,
            "score_threshold": 0.8
        }
        similar_docs = QdrantAdapter.from_obj(Document, search_config, many=True)

        # Insert documents with vectors
        insert_config = {
            "url": "http://localhost:6333",
            "collection_name": "documents"
        }
        new_docs = [Document(
            id="doc1",
            text="Sample text",
            embedding=[0.1, 0.2, 0.3, ...],
            category="tech"
        )]
        QdrantAdapter.to_obj(new_docs, insert_config, many=True)
        ```
    """

    obj_key = "qdrant"

    @staticmethod
    def _client(url: str | None):
        """
        Create a Qdrant client with proper error handling.

        Args:
            url: Qdrant server URL or None for in-memory instance

        Returns:
            QdrantClient instance

        Raises:
            ConnectionError: If connection cannot be established
        """
        try:
            return QdrantClient(url=url) if url else QdrantClient(":memory:")
        except UnexpectedResponse as e:
            raise ConnectionError(
                f"Failed to connect to Qdrant: {e}", adapter="qdrant", url=url
            ) from e
        except (ConnectionRefusedError, OSError, grpc.RpcError) as e:
            # Catch specific network-related errors like DNS resolution failures
            # Include grpc.RpcError to handle gRPC-specific connection issues
            raise ConnectionError(
                f"Failed to connect to Qdrant: {e}", adapter="qdrant", url=url
            ) from e
        except Exception as e:
            # Check for DNS resolution errors in the exception message
            if (
                "nodename nor servname provided" in str(e)
                or "Name or service not known" in str(e)
                or "getaddrinfo failed" in str(e)
            ):
                raise ConnectionError(
                    f"DNS resolution failed for Qdrant: {e}", adapter="qdrant", url=url
                ) from e
            raise ConnectionError(
                f"Unexpected error connecting to Qdrant: {e}", adapter="qdrant", url=url
            ) from e

    def _validate_vector_dimensions(vector, expected_dim=None):
        """Validate that the vector has the correct dimensions."""
        if not isinstance(vector, list | tuple) or not all(
            isinstance(x, int | float) for x in vector
        ):
            raise AdapterValidationError(
                "Vector must be a list or tuple of numbers",
                data=vector,
            )

        if expected_dim is not None and len(vector) != expected_dim:
            raise AdapterValidationError(
                f"Vector dimension mismatch: expected {expected_dim}, got {len(vector)}",
                data=vector,
            )

    # outgoing
    @classmethod
    def to_obj(
        cls,
        subj: T | Sequence[T],
        /,
        *,
        collection,
        vector_field="embedding",
        id_field="id",
        url=None,
        many: bool = True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw,
    ) -> None:
        try:
            # Validate required parameters
            if not collection:
                raise AdapterValidationError("Missing required parameter 'collection'")

            # Prepare data
            items = subj if isinstance(subj, Sequence) else [subj]
            if not items:
                return None  # Nothing to insert

            # Validate vector field exists
            if not hasattr(items[0], vector_field):
                raise AdapterValidationError(
                    f"Vector field '{vector_field}' not found in model",
                    data=getattr(items[0], adapt_meth)(**(adapt_kw or {})),
                )

            # Validate ID field exists
            if not hasattr(items[0], id_field):
                raise AdapterValidationError(
                    f"ID field '{id_field}' not found in model",
                    data=getattr(items[0], adapt_meth)(**(adapt_kw or {})),
                )

            # Create client
            client = cls._client(url)

            # Get vector dimension
            vector = getattr(items[0], vector_field)
            cls._validate_vector_dimensions(vector)
            dim = len(vector)

            # Create or recreate collection
            try:
                client.recreate_collection(
                    collection,
                    vectors_config=qd.VectorParams(size=dim, distance="Cosine"),
                )
            except UnexpectedResponse as e:
                raise QueryError(
                    f"Failed to create Qdrant collection: {e}",
                    adapter="qdrant",
                ) from e
            except Exception as e:
                # Check for various DNS and connection-related error messages
                if (
                    "nodename nor servname provided" in str(e)
                    or "connection" in str(e).lower()
                    or "Name or service not known" in str(e)
                    or "getaddrinfo failed" in str(e)
                ):
                    raise ConnectionError(
                        f"Failed to connect to Qdrant: {e}",
                        adapter="qdrant",
                        url=url,
                    ) from e
                else:
                    raise QueryError(
                        f"Unexpected error creating Qdrant collection: {e}",
                        adapter="qdrant",
                    ) from e

            # Create points
            try:
                points = []
                for _i, item in enumerate(items):
                    vector = getattr(item, vector_field)
                    cls._validate_vector_dimensions(vector, dim)

                    # Create payload with all fields
                    # The test_qdrant_to_obj_with_custom_vector_field test expects
                    # the embedding field to be excluded, but other integration tests
                    # expect it to be included. We'll include it for now and handle
                    # the test case separately.
                    payload = getattr(item, adapt_meth)(**(adapt_kw or {}))

                    points.append(
                        qd.PointStruct(
                            id=getattr(item, id_field),
                            vector=vector,
                            payload=payload,
                        )
                    )
            except AdapterValidationError:
                # Re-raise validation errors
                raise
            except Exception as e:
                raise AdapterValidationError(
                    f"Error creating Qdrant points: {e}",
                    data=items,
                ) from e

            # Upsert points
            try:
                client.upsert(collection, points)
                return {"upserted_count": len(points)}
            except UnexpectedResponse as e:
                raise QueryError(
                    f"Failed to upsert points to Qdrant: {e}",
                    adapter="qdrant",
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Unexpected error upserting points to Qdrant: {e}",
                    adapter="qdrant",
                ) from e

        except (ConnectionError, QueryError, AdapterValidationError):
            # Re-raise our custom exceptions
            raise
        except Exception as e:
            # Wrap other exceptions
            raise QueryError(f"Unexpected error in Qdrant adapter: {e}", adapter="qdrant")

    # incoming
    @classmethod
    def from_obj(
        cls,
        subj_cls: type[T],
        obj: dict,
        /,
        *,
        many=True,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate required parameters
            if "collection" not in obj:
                raise AdapterValidationError("Missing required parameter 'collection'", data=obj)
            if "query_vector" not in obj:
                raise AdapterValidationError("Missing required parameter 'query_vector'", data=obj)

            # Validate query vector
            cls._validate_vector_dimensions(obj["query_vector"])

            # Create client
            client = cls._client(obj.get("url"))

            # Execute search
            try:
                # Set a high score threshold to ensure we get enough results
                res = client.search(
                    obj["collection"],
                    obj["query_vector"],
                    limit=obj.get("top_k", 5),
                    with_payload=True,
                    score_threshold=0.0,  # Return all results regardless of similarity
                )
            except UnexpectedResponse as e:
                if "not found" in str(e).lower():
                    raise ResourceError(
                        f"Qdrant collection not found: {e}",
                        resource=obj["collection"],
                    ) from e
                raise QueryError(
                    f"Failed to search Qdrant: {e}",
                    adapter="qdrant",
                ) from e
            except grpc.RpcError as e:
                raise ConnectionError(
                    f"Qdrant RPC error: {e}",
                    adapter="qdrant",
                    url=obj.get("url"),
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Unexpected error searching Qdrant: {e}",
                    adapter="qdrant",
                ) from e

            # Extract payloads
            docs = [r.payload for r in res]

            # Handle empty result set
            if not docs:
                if many:
                    return []
                raise ResourceError(
                    "No points found matching the query vector",
                    resource=obj["collection"],
                )

            # Convert documents to model instances
            try:
                if many:
                    return [getattr(subj_cls, adapt_meth)(d, **(adapt_kw or {})) for d in docs]
                return getattr(subj_cls, adapt_meth)(docs[0], **(adapt_kw or {}))
            except ValidationError as e:
                raise AdapterValidationError(
                    f"Validation error: {e}",
                    data=docs[0] if not many else docs,
                    errors=e.errors(),
                ) from e

        except (ConnectionError, QueryError, ResourceError, AdapterValidationError):
            # Re-raise our custom exceptions
            raise
        except Exception as e:
            # Wrap other exceptions
            raise QueryError(f"Unexpected error in Qdrant adapter: {e}", adapter="qdrant")

Async SQL Adapter

pydapter.extras.async_sql_

Generic async SQL adapter - SQLAlchemy 2.x asyncio + asyncpg driver.

Classes

AsyncSQLAdapter

Bases: AsyncAdapter[T]

Asynchronous SQL adapter using SQLAlchemy 2.x asyncio for database operations.

This adapter provides async methods to: - Execute SQL queries asynchronously and convert results to Pydantic models - Insert Pydantic models as rows into database tables asynchronously - Update, delete, and upsert operations through configuration - Execute raw SQL with parameterized queries - Support for various async SQL databases through SQLAlchemy - Handle connection pooling and async context management

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("async_sql")

Configuration Examples
from pydantic import BaseModel
from pydapter.extras.async_sql_ import AsyncSQLAdapter, SQLReadConfig, SQLWriteConfig

class User(BaseModel):
    id: int
    name: str
    email: str

# Using TypedDict for type hints (recommended for IDE support)
config: SQLReadConfig = {
    "dsn": "postgresql+asyncpg://user:pass@localhost/db",
    "table": "users",
    "selectors": {"active": True},
    "limit": 10
}
users = await AsyncSQLAdapter.from_obj(User, config, many=True)

# Or inline dict (same as before)
users = await AsyncSQLAdapter.from_obj(User, {
    "dsn": "postgresql+asyncpg://user:pass@localhost/db",
    "table": "users",
    "selectors": {"active": True},
    "limit": 10
}, many=True)

# DELETE via config
result = await AsyncSQLAdapter.from_obj(User, {
    "dsn": "postgresql+asyncpg://user:pass@localhost/db",
    "operation": "delete",
    "table": "users",
    "selectors": {"id": 123}
})

# Raw SQL execution (note: table parameter NOT required)
result = await AsyncSQLAdapter.from_obj(User, {
    "dsn": "postgresql+asyncpg://user:pass@localhost/db",
    "operation": "raw_sql",
    "sql": "SELECT * FROM users WHERE created_at > :since",
    "params": {"since": "2024-01-01"}
}, many=True)

# Or with dict for flexible results (no model validation)
result = await AsyncSQLAdapter.from_obj(dict, {
    "dsn": "postgresql+asyncpg://user:pass@localhost/db",
    "operation": "raw_sql",
    "sql": "SELECT * FROM users ORDER BY created_at DESC LIMIT :limit",
    "params": {"limit": 10}
}, many=True)

# INSERT (default operation)
result = await AsyncSQLAdapter.to_obj(
    new_user,
    dsn="postgresql+asyncpg://user:pass@localhost/db",
    table="users"
)

# UPDATE via config
result = await AsyncSQLAdapter.to_obj(
    updated_user,
    dsn="postgresql+asyncpg://user:pass@localhost/db",
    table="users",
    operation="update",
    where={"id": 123}
)

# UPSERT via config
result = await AsyncSQLAdapter.to_obj(
    user_data,
    dsn="postgresql+asyncpg://user:pass@localhost/db",
    table="users",
    operation="upsert",
    conflict_columns=["email"]
)
Source code in src/pydapter/extras/async_sql_.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
class AsyncSQLAdapter(AsyncAdapter[T]):
    """
    Asynchronous SQL adapter using SQLAlchemy 2.x asyncio for database operations.

    This adapter provides async methods to:
    - Execute SQL queries asynchronously and convert results to Pydantic models
    - Insert Pydantic models as rows into database tables asynchronously
    - Update, delete, and upsert operations through configuration
    - Execute raw SQL with parameterized queries
    - Support for various async SQL databases through SQLAlchemy
    - Handle connection pooling and async context management

    Attributes:
        obj_key: The key identifier for this adapter type ("async_sql")

    Configuration Examples:
        ```python
        from pydantic import BaseModel
        from pydapter.extras.async_sql_ import AsyncSQLAdapter, SQLReadConfig, SQLWriteConfig

        class User(BaseModel):
            id: int
            name: str
            email: str

        # Using TypedDict for type hints (recommended for IDE support)
        config: SQLReadConfig = {
            "dsn": "postgresql+asyncpg://user:pass@localhost/db",
            "table": "users",
            "selectors": {"active": True},
            "limit": 10
        }
        users = await AsyncSQLAdapter.from_obj(User, config, many=True)

        # Or inline dict (same as before)
        users = await AsyncSQLAdapter.from_obj(User, {
            "dsn": "postgresql+asyncpg://user:pass@localhost/db",
            "table": "users",
            "selectors": {"active": True},
            "limit": 10
        }, many=True)

        # DELETE via config
        result = await AsyncSQLAdapter.from_obj(User, {
            "dsn": "postgresql+asyncpg://user:pass@localhost/db",
            "operation": "delete",
            "table": "users",
            "selectors": {"id": 123}
        })

        # Raw SQL execution (note: table parameter NOT required)
        result = await AsyncSQLAdapter.from_obj(User, {
            "dsn": "postgresql+asyncpg://user:pass@localhost/db",
            "operation": "raw_sql",
            "sql": "SELECT * FROM users WHERE created_at > :since",
            "params": {"since": "2024-01-01"}
        }, many=True)

        # Or with dict for flexible results (no model validation)
        result = await AsyncSQLAdapter.from_obj(dict, {
            "dsn": "postgresql+asyncpg://user:pass@localhost/db",
            "operation": "raw_sql",
            "sql": "SELECT * FROM users ORDER BY created_at DESC LIMIT :limit",
            "params": {"limit": 10}
        }, many=True)

        # INSERT (default operation)
        result = await AsyncSQLAdapter.to_obj(
            new_user,
            dsn="postgresql+asyncpg://user:pass@localhost/db",
            table="users"
        )

        # UPDATE via config
        result = await AsyncSQLAdapter.to_obj(
            updated_user,
            dsn="postgresql+asyncpg://user:pass@localhost/db",
            table="users",
            operation="update",
            where={"id": 123}
        )

        # UPSERT via config
        result = await AsyncSQLAdapter.to_obj(
            user_data,
            dsn="postgresql+asyncpg://user:pass@localhost/db",
            table="users",
            operation="upsert",
            conflict_columns=["email"]
        )
        ```
    """

    obj_key = "async_sql"

    @staticmethod
    def _table(meta: sa.MetaData, name: str, conn=None) -> sa.Table:
        """
        Helper method to get a SQLAlchemy Table object for async operations.

        Args:
            meta: SQLAlchemy MetaData instance
            name: Name of the table to load
            conn: Optional connection for reflection

        Returns:
            SQLAlchemy Table object

        Raises:
            ResourceError: If table is not found or cannot be accessed
        """
        try:
            # For async, we can't autoload - just create table reference
            # The actual schema validation happens at query execution
            return sa.Table(name, meta)
        except Exception as e:
            raise ResourceError(f"Error accessing table '{name}': {e}", resource=name) from e

    # incoming
    @classmethod
    async def from_obj(
        cls,
        subj_cls: type[T],
        obj: SQLReadConfig | dict,  # TypedDict for IDE support
        /,
        *,
        many=True,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Get operation type (default: "select" for backward compatibility)
            operation = obj.get("operation", "select").lower()

            # Validate only one engine parameter is provided
            engine_params = sum(["engine" in obj, "dsn" in obj, "engine_url" in obj])

            if engine_params == 0:
                raise ValidationError(
                    "Missing required parameter: one of 'engine', 'dsn', or 'engine_url'",
                    data=obj,
                )
            elif engine_params > 1:
                raise ValidationError(
                    "Multiple engine parameters provided. Use only one of: 'engine', 'dsn', or 'engine_url'",
                    data=obj,
                )

            # Get engine - either passed directly or create from DSN
            if "engine" in obj:
                eng = obj["engine"]  # Use provided engine
            elif "dsn" in obj:
                # Create new engine from DSN
                try:
                    eng = create_async_engine(obj["dsn"], future=True)
                except Exception as e:
                    raise ConnectionError(
                        f"Failed to create async database engine: {e}",
                        adapter="async_sql",
                        url=obj["dsn"],
                    ) from e
            else:  # engine_url
                # Create new engine from URL
                try:
                    eng = create_async_engine(obj["engine_url"], future=True)
                except Exception as e:
                    raise ConnectionError(
                        f"Failed to create async database engine: {e}",
                        adapter="async_sql",
                        url=obj["engine_url"],
                    ) from e

            # Handle different operations
            if operation == "select":
                # Standard SELECT operation (existing behavior)
                if "table" not in obj:
                    raise ValidationError("Missing required parameter 'table'", data=obj)

                try:
                    async with eng.begin() as conn:
                        # Use run_sync for table reflection
                        def sync_select(sync_conn):
                            meta = sa.MetaData()
                            tbl = sa.Table(obj["table"], meta, autoload_with=sync_conn)

                            # Build query with optional selectors
                            stmt = sa.select(tbl)
                            if "selectors" in obj and obj["selectors"]:
                                for key, value in obj["selectors"].items():
                                    stmt = stmt.where(getattr(tbl.c, key) == value)

                            # Add limit/offset if specified
                            if "limit" in obj:
                                stmt = stmt.limit(obj["limit"])
                            if "offset" in obj:
                                stmt = stmt.offset(obj["offset"])
                            # Add order_by if specified
                            if "order_by" in obj:
                                stmt = stmt.order_by(text(obj["order_by"]))

                            result = sync_conn.execute(stmt)
                            # Convert Row objects to dicts
                            return [dict(row._mapping) for row in result.fetchall()]

                        rows = await conn.run_sync(sync_select)

                except sa_exc.SQLAlchemyError as e:
                    raise QueryError(
                        f"Error executing async SQL query: {e}",
                        query=str(obj.get("selectors", {})),
                        adapter="async_sql",
                    ) from e

                # Handle empty result set
                if not rows:
                    if many:
                        return []
                    raise ResourceError(
                        "No rows found matching the query",
                        resource=obj["table"],
                        selectors=obj.get("selectors", {}),
                    )

                # Convert rows to model instances (rows are already dicts from run_sync)
                try:
                    if many:
                        return [getattr(subj_cls, adapt_meth)(r, **(adapt_kw or {})) for r in rows]
                    return getattr(subj_cls, adapt_meth)(rows[0], **(adapt_kw or {}))
                except PydanticValidationError as e:
                    raise ValidationError(
                        f"Validation error: {e}", data=rows[0] if not many else rows
                    ) from e

            elif operation == "delete":
                # DELETE operation
                if "table" not in obj:
                    raise ValidationError(
                        "Missing required parameter 'table' for delete operation",
                        data=obj,
                    )

                try:
                    async with eng.begin() as conn:
                        # Use run_sync for table reflection
                        def sync_delete(sync_conn):
                            meta = sa.MetaData()
                            tbl = sa.Table(obj["table"], meta, autoload_with=sync_conn)

                            # Build DELETE statement with selectors
                            stmt = sa.delete(tbl)
                            if "selectors" in obj and obj["selectors"]:
                                for key, value in obj["selectors"].items():
                                    stmt = stmt.where(getattr(tbl.c, key) == value)
                            else:
                                raise ValidationError(
                                    "DELETE operation requires 'selectors' to prevent accidental full table deletion",
                                    data=obj,
                                )

                            result = sync_conn.execute(stmt)
                            return result.rowcount

                        deleted = await conn.run_sync(sync_delete)
                        return {"deleted_count": deleted}

                except sa_exc.SQLAlchemyError as e:
                    raise QueryError(
                        f"Error executing async SQL delete: {e}",
                        adapter="async_sql",
                    ) from e

            elif operation == "raw_sql":
                # Raw SQL execution
                if "sql" not in obj:
                    raise ValidationError(
                        "Missing required parameter 'sql' for raw_sql operation",
                        data=obj,
                    )

                try:
                    async with eng.begin() as conn:
                        # Use SQLAlchemy text() for parameterized queries
                        stmt = text(obj["sql"])
                        params = obj.get("params", {})
                        result = await conn.execute(stmt, params)

                        # Handle result based on fetch_results flag and SQL type
                        fetch_results = obj.get("fetch_results", True)
                        if fetch_results and result.returns_rows:
                            rows = result.fetchall()
                            if not rows:
                                return [] if many else None

                            # Try to convert to Pydantic models if possible
                            try:
                                # Convert Row objects to dicts
                                records = [
                                    (dict(r._mapping) if hasattr(r, "_mapping") else dict(r))
                                    for r in rows
                                ]
                                if subj_cls is not dict:  # Only convert if not using generic dict
                                    if many:
                                        return [
                                            getattr(subj_cls, adapt_meth)(r, **(adapt_kw or {}))
                                            for r in records
                                        ]
                                    return getattr(subj_cls, adapt_meth)(
                                        records[0], **(adapt_kw or {})
                                    )
                                else:
                                    return records if many else records[0]
                            except (PydanticValidationError, TypeError):
                                # If conversion fails, return raw dicts
                                records = [
                                    (dict(r._mapping) if hasattr(r, "_mapping") else dict(r))
                                    for r in rows
                                ]
                                return records if many else records[0]
                        else:
                            # For DDL, procedures, or when fetch_results=False
                            return {
                                "affected_rows": (result.rowcount if result.rowcount != -1 else 0)
                            }

                except sa_exc.SQLAlchemyError as e:
                    raise QueryError(
                        f"Error executing raw SQL: {e}",
                        adapter="async_sql",
                    ) from e

            else:
                raise ValidationError(
                    f"Unsupported operation '{operation}' for from_obj. "
                    f"Supported operations: select, delete, raw_sql",
                    data=obj,
                )

        except AdapterError:
            raise
        except Exception as e:
            raise QueryError(
                f"Unexpected error in async SQL adapter: {e}", adapter="async_sql"
            ) from e

    # outgoing
    @classmethod
    async def to_obj(
        cls,
        subj: T | Sequence[T],
        /,
        *,
        many: bool = True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Get operation type (default: "insert" for backward compatibility)
            operation = kw.get("operation", "insert").lower()

            # Validate required parameters
            if "table" not in kw:
                raise ValidationError("Missing required parameter 'table'")

            table = kw["table"]

            # Validate only one engine parameter is provided
            engine_params = sum(["engine" in kw, "dsn" in kw, "engine_url" in kw])

            if engine_params == 0:
                raise ValidationError(
                    "Missing required parameter: one of 'engine', 'dsn', or 'engine_url'"
                )
            elif engine_params > 1:
                raise ValidationError(
                    "Multiple engine parameters provided. Use only one of: 'engine', 'dsn', or 'engine_url'"
                )

            # Get engine - either passed directly or create from DSN
            if "engine" in kw:
                eng = kw["engine"]  # Use provided engine
            elif "dsn" in kw:
                # Create new engine from DSN
                try:
                    eng = create_async_engine(kw["dsn"], future=True)
                except Exception as e:
                    raise ConnectionError(
                        f"Failed to create async database engine: {e}",
                        adapter="async_sql",
                        url=kw["dsn"],
                    ) from e
            else:  # engine_url
                # Create new engine from URL
                try:
                    eng = create_async_engine(kw["engine_url"], future=True)
                except Exception as e:
                    raise ConnectionError(
                        f"Failed to create async database engine: {e}",
                        adapter="async_sql",
                        url=kw["engine_url"],
                    ) from e

            # Prepare data
            items = subj if isinstance(subj, Sequence) else [subj]
            if not items:
                return {"affected_count": 0}

            # Handle different operations
            if operation == "insert":
                # Standard INSERT operation (existing behavior)
                rows = [getattr(i, adapt_meth)(**(adapt_kw or {})) for i in items]

                try:
                    async with eng.begin() as conn:
                        # Use run_sync to handle table reflection properly
                        def sync_insert(sync_conn):
                            meta = sa.MetaData()
                            tbl = sa.Table(table, meta, autoload_with=sync_conn)
                            # Filter out None values from rows to let DB handle defaults
                            clean_rows = [
                                {k: v for k, v in row.items() if v is not None} for row in rows
                            ]
                            sync_conn.execute(sa.insert(tbl), clean_rows)
                            return len(clean_rows)

                        count = await conn.run_sync(sync_insert)
                        return {"inserted_count": count}

                except sa_exc.SQLAlchemyError as e:
                    raise QueryError(
                        f"Error executing async SQL insert: {e}",
                        query=f"INSERT INTO {table}",
                        adapter="async_sql",
                    ) from e

            elif operation == "update":
                # UPDATE operation
                if "where" not in kw:
                    raise ValidationError("UPDATE operation requires 'where' parameter")

                where_conditions = kw["where"]
                update_data = [getattr(i, adapt_meth)(**(adapt_kw or {})) for i in items]

                try:
                    async with eng.begin() as conn:
                        # Use run_sync for table reflection
                        def sync_update(sync_conn):
                            meta = sa.MetaData()
                            tbl = sa.Table(table, meta, autoload_with=sync_conn)

                            total_updated = 0
                            for data in update_data:
                                # Filter out None values and don't update primary keys
                                clean_data = {
                                    k: v for k, v in data.items() if v is not None and k != "id"
                                }
                                if not clean_data:
                                    continue

                                # Build WHERE clause from conditions
                                stmt = sa.update(tbl)
                                for key, value in where_conditions.items():
                                    stmt = stmt.where(getattr(tbl.c, key) == value)

                                # Apply updates
                                stmt = stmt.values(**clean_data)
                                result = sync_conn.execute(stmt)
                                total_updated += result.rowcount

                            return total_updated

                        count = await conn.run_sync(sync_update)
                        return {"updated_count": count}

                except sa_exc.SQLAlchemyError as e:
                    raise QueryError(
                        f"Error executing async SQL update: {e}",
                        adapter="async_sql",
                    ) from e

            elif operation == "upsert":
                # UPSERT operation (basic implementation, PostgreSQL adapter has better version)
                if "conflict_columns" not in kw:
                    raise ValidationError("UPSERT operation requires 'conflict_columns' parameter")

                # For basic SQL adapter, implement as INSERT with error handling
                # PostgreSQL adapter will override with proper ON CONFLICT
                rows = [getattr(i, adapt_meth)(**(adapt_kw or {})) for i in items]
                conflict_columns = kw["conflict_columns"]

                try:
                    async with eng.begin() as conn:
                        # Use run_sync for table reflection
                        def sync_upsert(sync_conn):
                            meta = sa.MetaData()
                            tbl = sa.Table(table, meta, autoload_with=sync_conn)

                            inserted_count = 0
                            updated_count = 0

                            for row in rows:
                                # Clean the row data - remove None values
                                clean_row = {k: v for k, v in row.items() if v is not None}

                                # Check if record exists
                                select_stmt = sa.select(tbl)
                                for col in conflict_columns:
                                    if col in clean_row:
                                        select_stmt = select_stmt.where(
                                            getattr(tbl.c, col) == clean_row[col]
                                        )

                                existing = sync_conn.execute(select_stmt).fetchone()

                                if existing:
                                    # Update existing record - don't update primary keys
                                    update_data = {k: v for k, v in clean_row.items() if k != "id"}
                                    if update_data:
                                        update_stmt = sa.update(tbl)
                                        for col in conflict_columns:
                                            if col in clean_row:
                                                update_stmt = update_stmt.where(
                                                    getattr(tbl.c, col) == clean_row[col]
                                                )
                                        update_stmt = update_stmt.values(**update_data)
                                        sync_conn.execute(update_stmt)
                                    updated_count += 1
                                else:
                                    # Insert new record
                                    insert_stmt = sa.insert(tbl).values(**clean_row)
                                    sync_conn.execute(insert_stmt)
                                    inserted_count += 1

                            return {
                                "inserted_count": inserted_count,
                                "updated_count": updated_count,
                                "total_count": inserted_count + updated_count,
                            }

                        return await conn.run_sync(sync_upsert)

                except sa_exc.SQLAlchemyError as e:
                    raise QueryError(
                        f"Error executing async SQL upsert: {e}",
                        adapter="async_sql",
                    ) from e

            else:
                raise ValidationError(
                    f"Unsupported operation '{operation}' for to_obj. "
                    f"Supported operations: insert, update, upsert"
                )

        except AdapterError:
            raise
        except Exception as e:
            raise QueryError(
                f"Unexpected error in async SQL adapter: {e}", adapter="async_sql"
            ) from e

SQLReadConfig

Bases: TypedDict

Configuration for SQL read operations (from_obj).

Source code in src/pydapter/extras/async_sql_.py
class SQLReadConfig(TypedDict):
    """Configuration for SQL read operations (from_obj)."""

    # Connection (exactly one required)
    dsn: NotRequired[str]  # Database connection string
    engine_url: NotRequired[str]  # Legacy: Database connection string
    engine: NotRequired[AsyncEngine]  # Pre-existing SQLAlchemy engine

    # Operation type
    operation: NotRequired[Literal["select", "delete", "raw_sql"]]  # Default: "select"

    # For select/delete operations (table required for these)
    table: NotRequired[str]  # Table name (NOT required for raw_sql)
    selectors: NotRequired[dict[str, Any]]  # WHERE conditions
    limit: NotRequired[int]  # LIMIT clause
    offset: NotRequired[int]  # OFFSET clause
    order_by: NotRequired[str]  # ORDER BY clause

    # For raw_sql operation (table NOT required)
    sql: NotRequired[str]  # Raw SQL statement
    params: NotRequired[dict[str, Any]]  # SQL parameters for safe binding
    fetch_results: NotRequired[bool]  # Whether to fetch results (default: True)

SQLWriteConfig

Bases: TypedDict

Configuration for SQL write operations (to_obj as **kwargs).

Source code in src/pydapter/extras/async_sql_.py
class SQLWriteConfig(TypedDict):
    """Configuration for SQL write operations (to_obj as **kwargs)."""

    # Connection (exactly one required)
    dsn: NotRequired[str]  # Database connection string
    engine_url: NotRequired[str]  # Legacy: Database connection string
    engine: NotRequired[AsyncEngine]  # Pre-existing SQLAlchemy engine

    # Required
    table: Required[str]  # Table name

    # Operation type
    operation: NotRequired[Literal["insert", "update", "upsert"]]  # Default: "insert"

    # For update operations
    where: NotRequired[dict[str, Any]]  # WHERE conditions for UPDATE

    # For upsert operations
    conflict_columns: NotRequired[list[str]]  # Columns that define conflicts
    update_columns: NotRequired[list[str]]  # Columns to update on conflict

Async PostgreSQL Adapter

pydapter.extras.async_postgres_

AsyncPostgresAdapter - presets AsyncSQLAdapter for PostgreSQL/pgvector.

Classes

AsyncPostgresAdapter

Bases: AsyncSQLAdapter[T]

Asynchronous PostgreSQL adapter extending AsyncSQLAdapter with PostgreSQL-specific optimizations.

This adapter provides: - Async PostgreSQL operations using asyncpg driver - Enhanced error handling for PostgreSQL-specific issues - Support for pgvector when vector columns are present - Default PostgreSQL connection string management

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("async_pg")

DEFAULT

Default PostgreSQL+asyncpg connection string

Example
import asyncio
from pydantic import BaseModel
from pydapter.extras.async_postgres_ import AsyncPostgresAdapter

class User(BaseModel):
    id: int
    name: str
    email: str

async def main():
    # Query with custom connection
    query_config = {
        "query": "SELECT id, name, email FROM users WHERE active = true",
        "dsn": "postgresql+asyncpg://user:pass@localhost/mydb"
    }
    users = await AsyncPostgresAdapter.from_obj(User, query_config, many=True)

    # Insert with default connection
    insert_config = {
        "table": "users"
    }
    new_users = [User(id=1, name="John", email="john@example.com")]
    await AsyncPostgresAdapter.to_obj(new_users, insert_config, many=True)

asyncio.run(main())
Source code in src/pydapter/extras/async_postgres_.py
class AsyncPostgresAdapter(AsyncSQLAdapter[T]):
    """
    Asynchronous PostgreSQL adapter extending AsyncSQLAdapter with PostgreSQL-specific optimizations.

    This adapter provides:
    - Async PostgreSQL operations using asyncpg driver
    - Enhanced error handling for PostgreSQL-specific issues
    - Support for pgvector when vector columns are present
    - Default PostgreSQL connection string management

    Attributes:
        obj_key: The key identifier for this adapter type ("async_pg")
        DEFAULT: Default PostgreSQL+asyncpg connection string

    Example:
        ```python
        import asyncio
        from pydantic import BaseModel
        from pydapter.extras.async_postgres_ import AsyncPostgresAdapter

        class User(BaseModel):
            id: int
            name: str
            email: str

        async def main():
            # Query with custom connection
            query_config = {
                "query": "SELECT id, name, email FROM users WHERE active = true",
                "dsn": "postgresql+asyncpg://user:pass@localhost/mydb"
            }
            users = await AsyncPostgresAdapter.from_obj(User, query_config, many=True)

            # Insert with default connection
            insert_config = {
                "table": "users"
            }
            new_users = [User(id=1, name="John", email="john@example.com")]
            await AsyncPostgresAdapter.to_obj(new_users, insert_config, many=True)

        asyncio.run(main())
        ```
    """

    obj_key = "async_pg"
    DEFAULT = "postgresql+asyncpg://test:test@localhost/test"

    @classmethod
    async def from_obj(
        cls,
        subj_cls,
        obj: dict,
        /,
        *,
        many: bool = True,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate only one engine parameter is provided
            engine_params = sum(["engine" in obj, "dsn" in obj, "dsn" in kw, "engine_url" in obj])

            if engine_params > 1:
                raise AdapterValidationError(
                    "Multiple engine parameters provided. Use only one of: 'engine', 'dsn', or 'engine_url'"
                )

            # Handle DSN/engine setup
            if "engine" not in obj:
                # Get DSN from obj, kw, or use default
                if "dsn" in obj:
                    dsn = obj["dsn"]
                elif "dsn" in kw:
                    dsn = kw["dsn"]
                    obj["dsn"] = dsn  # Move to obj for parent class
                elif "engine_url" in obj:  # Backward compatibility
                    dsn = obj["engine_url"]
                    obj["dsn"] = dsn  # Convert to dsn
                    del obj["engine_url"]  # Remove to avoid confusion
                else:
                    dsn = cls.DEFAULT
                    obj["dsn"] = dsn

                # Convert PostgreSQL URL to SQLAlchemy format if needed
                # BUT skip this for SQLite DSNs
                if dsn.startswith("sqlite"):
                    # Keep SQLite DSN as-is
                    pass
                elif not dsn.startswith("postgresql+asyncpg://"):
                    obj["dsn"] = dsn.replace("postgresql://", "postgresql+asyncpg://")

            # Add PostgreSQL-specific error handling
            try:
                return await super().from_obj(
                    subj_cls,
                    obj,
                    many=many,
                    adapt_meth=adapt_meth,
                    adapt_kw=adapt_kw,
                    **kw,
                )
            except Exception as e:
                # Check for common PostgreSQL-specific errors
                error_str = str(e).lower()
                conn_url = obj.get("dsn", obj.get("engine_url", cls.DEFAULT))
                if "authentication" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL authentication failed: {e}",
                        adapter="async_pg",
                        url=conn_url,
                    ) from e
                elif "connection" in error_str and "refused" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL connection refused: {e}",
                        adapter="async_pg",
                        url=conn_url,
                    ) from e
                elif "does not exist" in error_str and "database" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL database does not exist: {e}",
                        adapter="async_pg",
                        url=conn_url,
                    ) from e
                # Re-raise the original exception
                raise

        except ConnectionError:
            # Re-raise ConnectionError
            raise
        except Exception as e:
            # Wrap other exceptions
            raise ConnectionError(
                f"Unexpected error in async PostgreSQL adapter: {e}",
                adapter="async_pg",
                url=obj.get("engine_url", cls.DEFAULT),
            ) from e

    @classmethod
    async def to_obj(
        cls,
        subj,
        /,
        *,
        many: bool = True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate only one engine parameter is provided
            engine_params = sum(["engine" in kw, "dsn" in kw, "engine_url" in kw])

            if engine_params > 1:
                raise AdapterValidationError(
                    "Multiple engine parameters provided. Use only one of: 'engine', 'dsn', or 'engine_url'"
                )

            # Handle DSN/engine setup
            if "engine" not in kw:
                # Get DSN from kw or use default
                if "dsn" in kw:
                    dsn = kw["dsn"]
                elif "engine_url" in kw:  # Backward compatibility
                    dsn = kw["engine_url"]
                    kw["dsn"] = dsn  # Convert to dsn
                    del kw["engine_url"]  # Remove to avoid confusion
                else:
                    dsn = cls.DEFAULT
                    kw["dsn"] = dsn

                # Convert PostgreSQL URL to SQLAlchemy format if needed
                if not dsn.startswith("postgresql+asyncpg://"):
                    kw["dsn"] = dsn.replace("postgresql://", "postgresql+asyncpg://")

            # Add PostgreSQL-specific error handling
            try:
                return await super().to_obj(
                    subj, many=many, adapt_meth=adapt_meth, adapt_kw=adapt_kw, **kw
                )
            except Exception as e:
                # Check for common PostgreSQL-specific errors
                error_str = str(e).lower()
                conn_url = kw.get("dsn", kw.get("engine_url", cls.DEFAULT))
                if "authentication" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL authentication failed: {e}",
                        adapter="async_pg",
                        url=conn_url,
                    ) from e
                elif "connection" in error_str and "refused" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL connection refused: {e}",
                        adapter="async_pg",
                        url=conn_url,
                    ) from e
                elif "does not exist" in error_str and "database" in error_str:
                    raise ConnectionError(
                        f"PostgreSQL database does not exist: {e}",
                        adapter="async_pg",
                        url=conn_url,
                    ) from e
                # Re-raise the original exception
                raise

        except ConnectionError:
            # Re-raise ConnectionError
            raise
        except Exception as e:
            # Wrap other exceptions
            raise ConnectionError(
                f"Unexpected error in async PostgreSQL adapter: {e}",
                adapter="async_pg",
                url=kw.get("engine_url", cls.DEFAULT),
            ) from e

Async MongoDB Adapter

pydapter.extras.async_mongo_

AsyncMongoAdapter - uses motor.motor_asyncio.

Classes

AsyncMongoAdapter

Bases: AsyncAdapter[T]

Asynchronous MongoDB adapter for converting between Pydantic models and MongoDB documents.

This adapter provides async methods to: - Query MongoDB collections asynchronously and convert documents to Pydantic models - Insert Pydantic models as documents into MongoDB collections asynchronously - Handle async MongoDB operations using Motor (async MongoDB driver) - Support for various async MongoDB operations (find, insert, update, delete)

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("async_mongo")

Example
import asyncio
from pydantic import BaseModel
from pydapter.extras.async_mongo_ import AsyncMongoAdapter

class User(BaseModel):
    name: str
    email: str
    age: int

async def main():
    # Query from MongoDB
    query_config = {
        "url": "mongodb://localhost:27017",
        "database": "myapp",
        "collection": "users",
        "filter": {"age": {"$gte": 18}}
    }
    users = await AsyncMongoAdapter.from_obj(User, query_config, many=True)

    # Insert to MongoDB
    insert_config = {
        "url": "mongodb://localhost:27017",
        "database": "myapp",
        "collection": "users"
    }
    new_users = [User(name="John", email="john@example.com", age=30)]
    await AsyncMongoAdapter.to_obj(new_users, insert_config, many=True)

asyncio.run(main())
Source code in src/pydapter/extras/async_mongo_.py
class AsyncMongoAdapter(AsyncAdapter[T]):
    """
    Asynchronous MongoDB adapter for converting between Pydantic models and MongoDB documents.

    This adapter provides async methods to:
    - Query MongoDB collections asynchronously and convert documents to Pydantic models
    - Insert Pydantic models as documents into MongoDB collections asynchronously
    - Handle async MongoDB operations using Motor (async MongoDB driver)
    - Support for various async MongoDB operations (find, insert, update, delete)

    Attributes:
        obj_key: The key identifier for this adapter type ("async_mongo")

    Example:
        ```python
        import asyncio
        from pydantic import BaseModel
        from pydapter.extras.async_mongo_ import AsyncMongoAdapter

        class User(BaseModel):
            name: str
            email: str
            age: int

        async def main():
            # Query from MongoDB
            query_config = {
                "url": "mongodb://localhost:27017",
                "database": "myapp",
                "collection": "users",
                "filter": {"age": {"$gte": 18}}
            }
            users = await AsyncMongoAdapter.from_obj(User, query_config, many=True)

            # Insert to MongoDB
            insert_config = {
                "url": "mongodb://localhost:27017",
                "database": "myapp",
                "collection": "users"
            }
            new_users = [User(name="John", email="john@example.com", age=30)]
            await AsyncMongoAdapter.to_obj(new_users, insert_config, many=True)

        asyncio.run(main())
        ```
    """

    obj_key = "async_mongo"

    @classmethod
    def _client(cls, url: str) -> AsyncIOMotorClient:
        try:
            return AsyncIOMotorClient(url, serverSelectionTimeoutMS=5000)
        except pymongo.errors.ConfigurationError as e:
            raise ConnectionError(
                f"Invalid MongoDB connection string: {e}",
                adapter="async_mongo",
                url=url,
            ) from e
        except Exception as e:
            raise ConnectionError(
                f"Failed to create MongoDB client: {e}", adapter="async_mongo", url=url
            ) from e

    @classmethod
    async def _validate_connection(cls, client: AsyncIOMotorClient) -> None:
        """Validate that the MongoDB connection is working."""
        try:
            # This will raise an exception if the connection fails
            await client.admin.command("ping")
        except pymongo.errors.ServerSelectionTimeoutError as e:
            raise ConnectionError(
                f"MongoDB server selection timeout: {e}", adapter="async_mongo"
            ) from e
        except pymongo.errors.OperationFailure as e:
            if "auth failed" in str(e).lower():
                raise ConnectionError(
                    f"MongoDB authentication failed: {e}", adapter="async_mongo"
                ) from e
            raise QueryError(f"MongoDB operation failure: {e}", adapter="async_mongo") from e
        except Exception as e:
            raise ConnectionError(
                f"Failed to connect to MongoDB: {e}", adapter="async_mongo"
            ) from e

    # incoming
    @classmethod
    async def from_obj(
        cls,
        subj_cls: type[T],
        obj: dict,
        /,
        *,
        many=True,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate required parameters
            if "url" not in obj:
                raise AdapterValidationError("Missing required parameter 'url'", data=obj)
            if "db" not in obj:
                raise AdapterValidationError("Missing required parameter 'db'", data=obj)
            if "collection" not in obj:
                raise AdapterValidationError("Missing required parameter 'collection'", data=obj)

            # Create client and validate connection
            client = cls._client(obj["url"])
            await cls._validate_connection(client)

            # Get collection and execute query
            try:
                coll = client[obj["db"]][obj["collection"]]
                filter_query = obj.get("filter") or {}

                # Validate filter query if provided
                if filter_query and not isinstance(filter_query, dict):
                    raise AdapterValidationError(
                        "Filter must be a dictionary",
                        data=filter_query,
                    )

                docs = await coll.find(filter_query).to_list(length=None)
            except pymongo.errors.OperationFailure as e:
                if "not authorized" in str(e).lower():
                    raise ConnectionError(
                        f"Not authorized to access {obj['db']}.{obj['collection']}: {e}",
                        adapter="async_mongo",
                        url=obj["url"],
                    ) from e
                raise QueryError(
                    f"MongoDB query error: {e}",
                    query=filter_query,
                    adapter="async_mongo",
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Error executing MongoDB query: {e}",
                    query=filter_query,
                    adapter="async_mongo",
                ) from e

            # Handle empty result set
            if not docs:
                if many:
                    return []
                raise ResourceError(
                    "No documents found matching the query",
                    resource=f"{obj['db']}.{obj['collection']}",
                    filter=filter_query,
                )

            # Convert documents to model instances
            try:
                if many:
                    return [getattr(subj_cls, adapt_meth)(d, **(adapt_kw or {})) for d in docs]
                return getattr(subj_cls, adapt_meth)(docs[0], **(adapt_kw or {}))
            except ValidationError as e:
                raise AdapterValidationError(
                    f"Validation error: {e}",
                    data=docs[0] if not many else docs,
                    errors=e.errors(),
                ) from e

        except (ConnectionError, QueryError, ResourceError, AdapterValidationError):
            # Re-raise our custom exceptions
            raise
        except Exception as e:
            # Wrap other exceptions
            raise QueryError(
                f"Unexpected error in async MongoDB adapter: {e}", adapter="async_mongo"
            )

    # outgoing
    @classmethod
    async def to_obj(
        cls,
        subj: T | Sequence[T],
        /,
        *,
        url,
        db,
        collection,
        many=True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate required parameters
            if not url:
                raise AdapterValidationError("Missing required parameter 'url'")
            if not db:
                raise AdapterValidationError("Missing required parameter 'db'")
            if not collection:
                raise AdapterValidationError("Missing required parameter 'collection'")

            # Create client and validate connection
            client = cls._client(url)
            await cls._validate_connection(client)

            # Prepare data
            items = subj if isinstance(subj, Sequence) else [subj]
            if not items:
                return None  # Nothing to insert

            payload = [getattr(i, adapt_meth)(**(adapt_kw or {})) for i in items]

            # Execute insert
            try:
                result = await client[db][collection].insert_many(payload)
                return {"inserted_count": len(result.inserted_ids)}
            except pymongo.errors.BulkWriteError as e:
                raise QueryError(
                    f"MongoDB bulk write error: {e}",
                    adapter="async_mongo",
                ) from e
            except pymongo.errors.OperationFailure as e:
                if "not authorized" in str(e).lower():
                    raise ConnectionError(
                        f"Not authorized to write to {db}.{collection}: {e}",
                        adapter="async_mongo",
                        url=url,
                    ) from e
                raise QueryError(
                    f"MongoDB operation failure: {e}",
                    adapter="async_mongo",
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Error inserting documents into MongoDB: {e}",
                    adapter="async_mongo",
                ) from e

        except (ConnectionError, QueryError, AdapterValidationError):
            # Re-raise our custom exceptions
            raise
        except Exception as e:
            # Wrap other exceptions
            raise QueryError(
                f"Unexpected error in async MongoDB adapter: {e}", adapter="async_mongo"
            )

Async Qdrant Adapter

pydapter.extras.async_qdrant_

AsyncQdrantAdapter - vector upsert / search using AsyncQdrantClient.

Classes

AsyncQdrantAdapter

Bases: AsyncAdapter[T]

Asynchronous Qdrant vector database adapter for async vector operations.

This adapter provides async methods to: - Search for similar vectors asynchronously and convert results to Pydantic models - Insert Pydantic models as vector points into Qdrant collections asynchronously - Handle async vector similarity operations and metadata filtering - Support for both cloud and self-hosted Qdrant instances with async operations

Attributes:

Name Type Description
obj_key

The key identifier for this adapter type ("async_qdrant")

Example
import asyncio
from pydantic import BaseModel
from pydapter.extras.async_qdrant_ import AsyncQdrantAdapter

class Document(BaseModel):
    id: str
    text: str
    embedding: list[float]
    category: str

async def main():
    # Search for similar vectors
    search_config = {
        "url": "http://localhost:6333",
        "collection_name": "documents",
        "query_vector": [0.1, 0.2, 0.3, ...],  # 768-dim vector
        "limit": 10,
        "score_threshold": 0.8
    }
    similar_docs = await AsyncQdrantAdapter.from_obj(Document, search_config, many=True)

    # Insert documents with vectors
    insert_config = {
        "url": "http://localhost:6333",
        "collection_name": "documents"
    }
    new_docs = [Document(
        id="doc1",
        text="Sample text",
        embedding=[0.1, 0.2, 0.3, ...],
        category="tech"
    )]
    await AsyncQdrantAdapter.to_obj(new_docs, insert_config, many=True)

asyncio.run(main())
Source code in src/pydapter/extras/async_qdrant_.py
class AsyncQdrantAdapter(AsyncAdapter[T]):
    """
    Asynchronous Qdrant vector database adapter for async vector operations.

    This adapter provides async methods to:
    - Search for similar vectors asynchronously and convert results to Pydantic models
    - Insert Pydantic models as vector points into Qdrant collections asynchronously
    - Handle async vector similarity operations and metadata filtering
    - Support for both cloud and self-hosted Qdrant instances with async operations

    Attributes:
        obj_key: The key identifier for this adapter type ("async_qdrant")

    Example:
        ```python
        import asyncio
        from pydantic import BaseModel
        from pydapter.extras.async_qdrant_ import AsyncQdrantAdapter

        class Document(BaseModel):
            id: str
            text: str
            embedding: list[float]
            category: str

        async def main():
            # Search for similar vectors
            search_config = {
                "url": "http://localhost:6333",
                "collection_name": "documents",
                "query_vector": [0.1, 0.2, 0.3, ...],  # 768-dim vector
                "limit": 10,
                "score_threshold": 0.8
            }
            similar_docs = await AsyncQdrantAdapter.from_obj(Document, search_config, many=True)

            # Insert documents with vectors
            insert_config = {
                "url": "http://localhost:6333",
                "collection_name": "documents"
            }
            new_docs = [Document(
                id="doc1",
                text="Sample text",
                embedding=[0.1, 0.2, 0.3, ...],
                category="tech"
            )]
            await AsyncQdrantAdapter.to_obj(new_docs, insert_config, many=True)

        asyncio.run(main())
        ```
    """

    obj_key = "async_qdrant"

    @staticmethod
    def _client(url: str | None):
        """
        Create an async Qdrant client with proper error handling.

        Args:
            url: Qdrant server URL or None for in-memory instance

        Returns:
            AsyncQdrantClient instance

        Raises:
            ConnectionError: If connection cannot be established
        """
        try:
            return AsyncQdrantClient(url=url) if url else AsyncQdrantClient(":memory:")
        except UnexpectedResponse as e:
            raise ConnectionError(
                f"Failed to connect to Qdrant: {e}", adapter="async_qdrant", url=url
            ) from e
        except Exception as e:
            raise ConnectionError(
                f"Unexpected error connecting to Qdrant: {e}",
                adapter="async_qdrant",
                url=url,
            ) from e

    @staticmethod
    def _validate_vector_dimensions(vector, expected_dim=None):
        """Validate that the vector has the correct dimensions."""
        if not isinstance(vector, list | tuple) or not all(
            isinstance(x, int | float) for x in vector
        ):
            raise AdapterValidationError(
                "Vector must be a list or tuple of numbers",
                data=vector,
            )

        if expected_dim is not None and len(vector) != expected_dim:
            raise AdapterValidationError(
                f"Vector dimension mismatch: expected {expected_dim}, got {len(vector)}",
                data=vector,
            )

    # outgoing
    @classmethod
    async def to_obj(
        cls,
        subj: T | Sequence[T],
        /,
        *,
        collection,
        vector_field="embedding",
        id_field="id",
        url=None,
        many: bool = True,
        adapt_meth: str = "model_dump",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            # Validate required parameters
            if not collection:
                raise AdapterValidationError("Missing required parameter 'collection'")

            # Prepare data
            items = subj if isinstance(subj, Sequence) else [subj]
            if not items:
                return None  # Nothing to insert

            # Validate vector field exists
            if not hasattr(items[0], vector_field):
                raise AdapterValidationError(
                    f"Vector field '{vector_field}' not found in model",
                    data=getattr(items[0], adapt_meth)(**(adapt_kw or {})),
                )

            # Validate ID field exists
            if not hasattr(items[0], id_field):
                raise AdapterValidationError(
                    f"ID field '{id_field}' not found in model",
                    data=getattr(items[0], adapt_meth)(**(adapt_kw or {})),
                )

            # Get vector dimension
            vector = getattr(items[0], vector_field)
            cls._validate_vector_dimensions(vector)
            dim = len(vector)

            # Create client
            client = cls._client(url)

            # Create or recreate collection
            try:
                await client.recreate_collection(
                    collection,
                    vectors_config=qd.VectorParams(size=dim, distance="Cosine"),
                )
            except UnexpectedResponse as e:
                raise QueryError(
                    f"Failed to create Qdrant collection: {e}",
                    adapter="async_qdrant",
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Unexpected error creating Qdrant collection: {e}",
                    adapter="async_qdrant",
                ) from e

            # Create points
            try:
                points = []
                for _i, item in enumerate(items):
                    vector = getattr(item, vector_field)
                    cls._validate_vector_dimensions(vector, dim)

                    points.append(
                        qd.PointStruct(
                            id=getattr(item, id_field),
                            vector=vector,
                            payload=getattr(item, adapt_meth)(
                                exclude={vector_field}, **(adapt_kw or {})
                            ),
                        )
                    )
            except AdapterValidationError:
                # Re-raise validation errors
                raise
            except Exception as e:
                raise AdapterValidationError(
                    f"Error creating Qdrant points: {e}",
                    data=items,
                ) from e

            # Upsert points
            try:
                await client.upsert(collection, points)
                return {"upserted_count": len(points)}
            except UnexpectedResponse as e:
                raise QueryError(
                    f"Failed to upsert points to Qdrant: {e}",
                    adapter="async_qdrant",
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Unexpected error upserting points to Qdrant: {e}",
                    adapter="async_qdrant",
                ) from e

        except AdapterError:
            raise

        except Exception as e:
            raise QueryError(
                f"Unexpected error in async Qdrant adapter: {e}", adapter="async_qdrant"
            )

    # incoming
    @classmethod
    async def from_obj(
        cls,
        subj_cls: type[T],
        obj: dict,
        /,
        *,
        many=True,
        adapt_meth: str = "model_validate",
        adapt_kw: dict | None = None,
        **kw,
    ):
        try:
            if "collection" not in obj:
                raise AdapterValidationError("Missing required parameter 'collection'", data=obj)
            if "query_vector" not in obj:
                raise AdapterValidationError("Missing required parameter 'query_vector'", data=obj)

            # Validate query vector & Create client
            cls._validate_vector_dimensions(obj["query_vector"])
            client = cls._client(obj.get("url"))

            # Execute search
            try:
                res = await client.search(
                    obj["collection"],
                    obj["query_vector"],
                    limit=obj.get("top_k", 5),
                    with_payload=True,
                )
            except UnexpectedResponse as e:
                if "not found" in str(e).lower():
                    raise ResourceError(
                        f"Qdrant collection not found: {e}",
                        resource=obj["collection"],
                    ) from e
                raise QueryError(
                    f"Failed to search Qdrant: {e}",
                    adapter="async_qdrant",
                ) from e
            except grpc.RpcError as e:
                raise ConnectionError(
                    f"Qdrant RPC error: {e}",
                    adapter="async_qdrant",
                    url=obj.get("url"),
                ) from e
            except Exception as e:
                raise QueryError(
                    f"Unexpected error searching Qdrant: {e}",
                    adapter="async_qdrant",
                ) from e

            # Extract payloads
            docs = [r.payload for r in res]

            # Handle empty result set
            if not docs:
                if many:
                    return []
                raise ResourceError(
                    "No points found matching the query vector",
                    resource=obj["collection"],
                )

            # Convert documents to model instances
            try:
                if many:
                    return [getattr(subj_cls, adapt_meth)(d, **(adapt_kw or {})) for d in docs]
                return getattr(subj_cls, adapt_meth)(docs[0], **(adapt_kw or {}))
            except ValidationError as e:
                raise AdapterValidationError(
                    f"Validation error: {e}",
                    data=docs[0] if not many else docs,
                    errors=e.errors(),
                ) from e

        except AdapterError:
            raise

        except Exception as e:
            raise QueryError(
                f"Unexpected error in async Qdrant adapter: {e}", adapter="async_qdrant"
            )