Skip to content

Protocols API Reference

This page provides API documentation for the pydapter.protocols module.

Installation

pip install pydapter

Overview

The protocols module provides composable interfaces for specialized model behavior:

┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
│   Identifiable  │  │    Temporal     │  │   Embeddable    │
│   (id: UUID)    │  │ (timestamps)    │  │ (content +      │
│                 │  │                 │  │  embedding)     │
└─────────────────┘  └─────────────────┘  └─────────────────┘

┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
│    Invokable    │  │ Cryptographical │  │   Auditable     │
│ (execution)     │  │ (hashing)       │  │ (tracking)      │
└─────────────────┘  └─────────────────┘  └─────────────────┘

┌─────────────────┐
│ SoftDeletable   │
│ (soft delete)   │
└─────────────────┘

Quick Start

from pydapter.protocols import IdentifiableMixin, TemporalMixin
from pydantic import BaseModel

class User(BaseModel, IdentifiableMixin, TemporalMixin):
    name: str
    email: str

user = User(name="John", email="john@example.com")
user.update_timestamp()  # Temporal behavior
print(user.id)           # UUID from Identifiable

Event System

from pydapter.protocols.event import as_event

@as_event(event_type="api_call")
async def process_request(data: dict) -> dict:
    return {"result": "processed", "input": data}

event = await process_request({"user_id": 123})
print(event.event_type)  # "api_call"

Protocol Factory

from pydapter.protocols.factory import create_protocol_model_class
from pydapter.protocols.constants import IDENTIFIABLE, TEMPORAL

User = create_protocol_model_class(
    "User",
    IDENTIFIABLE,
    TEMPORAL,
    name=FieldTemplate(base_type=str),
    email=FieldTemplate(base_type=str)
)

API Reference

Core Protocols

pydapter.protocols.identifiable

Classes

IdentifiableMixin

Base class for objects with a unique identifier

Source code in src/pydapter/protocols/identifiable.py
class IdentifiableMixin:
    """Base class for objects with a unique identifier"""

    if TYPE_CHECKING:
        id: UUID

    @field_serializer("id")
    def _serialize_ids(self, v: UUID) -> str:
        return str(v)

    def __hash__(self) -> int:
        """Returns the hash of the object."""
        return hash(self.id)
Functions
__hash__()

Returns the hash of the object.

Source code in src/pydapter/protocols/identifiable.py
def __hash__(self) -> int:
    """Returns the hash of the object."""
    return hash(self.id)

pydapter.protocols.temporal

Classes

TemporalMixin

Source code in src/pydapter/protocols/temporal.py
class TemporalMixin:
    if TYPE_CHECKING:
        created_at: datetime
        updated_at: datetime

    def update_timestamp(self) -> None:
        """Update the last updated timestamp to the current time."""
        self.updated_at = datetime.now(timezone.utc)

    @field_serializer("updated_at", "created_at")
    def _serialize_datetime(self, v: datetime) -> str:
        return v.isoformat()
Functions
update_timestamp()

Update the last updated timestamp to the current time.

Source code in src/pydapter/protocols/temporal.py
def update_timestamp(self) -> None:
    """Update the last updated timestamp to the current time."""
    self.updated_at = datetime.now(timezone.utc)

pydapter.protocols.embeddable

Classes

EmbeddableMixin

Mixin class for embedding functionality.

Source code in src/pydapter/protocols/embeddable.py
class EmbeddableMixin:
    """Mixin class for embedding functionality."""

    if TYPE_CHECKING:
        content: str | None
        embedding: Embedding

    @property
    def n_dim(self) -> int:
        """Get the number of dimensions of the embedding."""
        return len(self.embedding)

    @staticmethod
    def parse_embedding_response(
        x: dict | list | tuple | BaseModel,
    ) -> Embedding:
        """Parse the embedding response from OpenAI or other sources."""
        return parse_embedding_response(x)
Attributes
n_dim property

Get the number of dimensions of the embedding.

Functions
parse_embedding_response(x) staticmethod

Parse the embedding response from OpenAI or other sources.

Source code in src/pydapter/protocols/embeddable.py
@staticmethod
def parse_embedding_response(
    x: dict | list | tuple | BaseModel,
) -> Embedding:
    """Parse the embedding response from OpenAI or other sources."""
    return parse_embedding_response(x)

pydapter.protocols.invokable

Classes

Invokable

Bases: Protocol

An object that can be invoked with a request

Source code in src/pydapter/protocols/invokable.py
@runtime_checkable
class Invokable(Protocol):
    """An object that can be invoked with a request"""

    request: dict | None
    execution: Execution
    _handler: Callable | None
    _handler_args: tuple[Any, ...]
    _handler_kwargs: dict[str, Any]

InvokableMixin

An executable can be invoked with a request

Source code in src/pydapter/protocols/invokable.py
class InvokableMixin:
    """An executable can be invoked with a request"""

    _handler: Callable | None = PrivateAttr(None)
    _handler_args: tuple[Any, ...] = PrivateAttr(())
    _handler_kwargs: dict[str, Any] = PrivateAttr({})

    if TYPE_CHECKING:
        request: dict | None
        execution: Execution

    @property
    def has_invoked(self) -> bool:
        return self.execution.status in [
            ExecutionStatus.COMPLETED,
            ExecutionStatus.FAILED,
        ]

    async def _invoke(self):
        if self._handler is None:
            raise ValueError("Event invoke function is not set.")
        if asyncio.iscoroutinefunction(self._handler):
            return await self._handler(*self._handler_args, **self._handler_kwargs)
        return self._handler(*self._handler_args, **self._handler_kwargs)

    async def invoke(self) -> None:
        start = asyncio.get_event_loop().time()
        response = None
        e1 = None

        try:
            # Use the endpoint as a context manager
            response = await self._invoke()

        except asyncio.CancelledError as ce:
            e1 = ce
            logger.warning("invoke() canceled by external request.")
            raise
        except Exception as ex:
            e1 = ex  # type: ignore

        finally:
            self.execution.duration = asyncio.get_event_loop().time() - start
            if response is None and e1 is not None:
                self.execution.error = str(e1)
                self.execution.status = ExecutionStatus.FAILED
                logger.error(f"invoke() failed for event {str(self.id)[:6]}...")
            else:
                self.execution.response_obj = response
                self.execution.response = validate_model_to_dict(response)
                self.execution.status = ExecutionStatus.COMPLETED
            self.execution.updated_at = datetime.now(tz=timezone.utc)

Functions

pydapter.protocols.cryptographical

Classes

Cryptographical

Bases: Protocol

An object that can be hashed with a cryptographic hash function

Source code in src/pydapter/protocols/cryptographical.py
@runtime_checkable
class Cryptographical(Protocol):
    """An object that can be hashed with a cryptographic hash function"""

    content: JsonValue
    sha256: str | None = None

Functions

sha256_of_obj(obj)

Deterministic SHA-256 of an arbitrary mapping.

Source code in src/pydapter/protocols/cryptographical.py
def sha256_of_obj(obj: Union[dict, str, JsonValue]) -> str:
    """Deterministic SHA-256 of an arbitrary mapping."""
    import hashlib

    if isinstance(obj, str):
        return hashlib.sha256(memoryview(obj.encode())).hexdigest()

    from .utils import sha256_of_dict

    return sha256_of_dict(obj)

pydapter.protocols.auditable

Classes

AuditableMixin

Mixin for audit functionality

Source code in src/pydapter/protocols/auditable.py
class AuditableMixin:
    """Mixin for audit functionality"""

    def mark_updated_by(self, user_id: str):
        """Mark entity as updated by user"""
        self.updated_by = user_id
        self.version += 1
        if hasattr(self, "update_timestamp"):
            self.update_timestamp()
Functions
mark_updated_by(user_id)

Mark entity as updated by user

Source code in src/pydapter/protocols/auditable.py
def mark_updated_by(self, user_id: str):
    """Mark entity as updated by user"""
    self.updated_by = user_id
    self.version += 1
    if hasattr(self, "update_timestamp"):
        self.update_timestamp()

pydapter.protocols.soft_deletable

Classes

SoftDeletableMixin

Mixin for soft delete functionality

Source code in src/pydapter/protocols/soft_deletable.py
class SoftDeletableMixin:
    """Mixin for soft delete functionality"""

    def soft_delete(self):
        """Mark entity as deleted"""
        self.deleted_at = datetime.now(timezone.utc)
        self.is_deleted = True

    def restore(self):
        """Restore soft-deleted entity"""
        self.deleted_at = None
        self.is_deleted = False
Functions
restore()

Restore soft-deleted entity

Source code in src/pydapter/protocols/soft_deletable.py
def restore(self):
    """Restore soft-deleted entity"""
    self.deleted_at = None
    self.is_deleted = False
soft_delete()

Mark entity as deleted

Source code in src/pydapter/protocols/soft_deletable.py
def soft_delete(self):
    """Mark entity as deleted"""
    self.deleted_at = datetime.now(timezone.utc)
    self.is_deleted = True

Event System

pydapter.protocols.event

Classes

Event

Bases: _BaseEvent, IdentifiableMixin, InvokableMixin, TemporalMixin, EmbeddableMixin, CryptographicalMixin

Source code in src/pydapter/protocols/event.py
class Event(
    _BaseEvent,
    IdentifiableMixin,
    InvokableMixin,
    TemporalMixin,
    EmbeddableMixin,
    CryptographicalMixin,
):
    def __init__(
        self,
        handler: Callable,
        handler_arg: tuple[Any, ...],
        handler_kwargs: dict[str, Any],
        **data,
    ):
        super().__init__(**data)
        self._handler = handler
        self._handler_args = handler_arg
        self._handler_kwargs = handler_kwargs

    def update_timestamp(self):
        if self.execution.updated_at is not None:
            if self.updated_at is None:
                self.updated_at = self.execution.updated_at
            elif self.updated_at < self.execution.updated_at:
                self.updated_at = self.execution.updated_at
        else:
            self.updated_at = datetime.now(tz=timezone.utc)

    def to_log(self, event_type: str | None = None, hash_content: bool = False) -> Log:
        """Convert the event to a log entry."""
        # Use provided event_type or fall back to the event's event_type or class name
        log_event_type = (
            event_type or getattr(self, "event_type", None) or type(self).__name__
        )

        # Prepare content - use existing content or create from request/response
        if self.content is not None:
            content = (
                self.content
                if isinstance(self.content, str)
                else json.dumps(self.content)
            )
        else:
            content_dict = {
                "request": self.request,
                "response": self.execution.response,
            }
            content = json.dumps(content_dict)
            # Set the content on the event so it can be hashed
            self.content = content

        # Hash content if requested
        sha256_value = None
        if hash_content:
            self.hash_content()
            sha256_value = getattr(self, "sha256", None)

        # Create metadata
        metadata = {
            "execution_status": (
                self.execution.status.value
                if hasattr(self.execution.status, "value")
                else str(self.execution.status)
            ),
            "execution_duration": self.execution.duration,
        }

        if self.execution.error:
            metadata["error"] = self.execution.error

        return Log(
            id=str(self.id),
            event_type=log_event_type,
            content=content,
            embedding=self.embedding if self.embedding else [],  # Default to empty list
            metadata=metadata,
            created_at=self.created_at.isoformat() if self.created_at else None,
            updated_at=self.updated_at.isoformat() if self.updated_at else None,
            duration=self.execution.duration,
            status=(
                self.execution.status.value
                if hasattr(self.execution.status, "value")
                else str(self.execution.status)
            ),
            error=self.execution.error,
            sha256=sha256_value,
        )
Functions
to_log(event_type=None, hash_content=False)

Convert the event to a log entry.

Source code in src/pydapter/protocols/event.py
def to_log(self, event_type: str | None = None, hash_content: bool = False) -> Log:
    """Convert the event to a log entry."""
    # Use provided event_type or fall back to the event's event_type or class name
    log_event_type = (
        event_type or getattr(self, "event_type", None) or type(self).__name__
    )

    # Prepare content - use existing content or create from request/response
    if self.content is not None:
        content = (
            self.content
            if isinstance(self.content, str)
            else json.dumps(self.content)
        )
    else:
        content_dict = {
            "request": self.request,
            "response": self.execution.response,
        }
        content = json.dumps(content_dict)
        # Set the content on the event so it can be hashed
        self.content = content

    # Hash content if requested
    sha256_value = None
    if hash_content:
        self.hash_content()
        sha256_value = getattr(self, "sha256", None)

    # Create metadata
    metadata = {
        "execution_status": (
            self.execution.status.value
            if hasattr(self.execution.status, "value")
            else str(self.execution.status)
        ),
        "execution_duration": self.execution.duration,
    }

    if self.execution.error:
        metadata["error"] = self.execution.error

    return Log(
        id=str(self.id),
        event_type=log_event_type,
        content=content,
        embedding=self.embedding if self.embedding else [],  # Default to empty list
        metadata=metadata,
        created_at=self.created_at.isoformat() if self.created_at else None,
        updated_at=self.updated_at.isoformat() if self.updated_at else None,
        duration=self.execution.duration,
        status=(
            self.execution.status.value
            if hasattr(self.execution.status, "value")
            else str(self.execution.status)
        ),
        error=self.execution.error,
        sha256=sha256_value,
    )

Functions

as_event(*, event_type=None, request_arg=None, embed_content=False, embed_function=None, adapt=False, adapter=None, content_parser=None, strict_content=False, **kw)

  • event_type, for example, "api_call", "message", "task", etc.
  • request_arg, the name of the request argument in the function signature (will be passed to the event as part of content if content_function is not provided)
  • embed_content, if True, the content will be embedded using the embed_function
  • embed_function, a function that takes the content and returns an embedding
  • adapt, if True, the event will be adapted to the specified adapter
  • adapter, the adapter class to adapt the event to
  • content_function, a function that takes the response_obj and returns the content (if not provided, the content {"request": request, "response": response}) where response is a dict representation of the response object **kw, additional keyword arguments to pass to the adapter
Source code in src/pydapter/protocols/event.py
def as_event(
    *,
    event_type: str | None = None,
    request_arg: str | None = None,
    embed_content: bool = False,
    embed_function: Callable[..., Embedding] | None = None,
    adapt: bool = False,
    adapter: type[Adapter | AsyncAdapter] | None = None,
    content_parser: Callable | None = None,
    strict_content: bool = False,
    **kw,
):
    """
    - event_type, for example, "api_call", "message", "task", etc.
    - request_arg, the name of the request argument in the function signature
        (will be passed to the event as part of content if content_function is not provided)
    - embed_content, if True, the content will be embedded using the embed_function
    - embed_function, a function that takes the content and returns an embedding
    - adapt, if True, the event will be adapted to the specified adapter
    - adapter, the adapter class to adapt the event to
    - content_function, a function that takes the response_obj and returns the content
        (if not provided, the content {"request": request, "response": response})
        where response is a dict representation of the response object
    **kw, additional keyword arguments to pass to the adapter
    """

    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs) -> Event:
            request_obj = kwargs.get(request_arg) if request_arg else None
            if len(args) > 2 and hasattr(args[0], "__class__"):
                args = args[1:]
            request_obj = args[0] if request_obj is None else request_obj
            event = Event(
                handler=func,
                handler_arg=list(args),
                handler_kwargs=kwargs,
                event_type=event_type,
                request=request_obj,
            )
            try:
                await event.invoke()

                if content_parser is not None:
                    try:
                        event.content = content_parser(event.execution.response_obj)
                    except Exception as e:
                        if strict_content:
                            event.updated_at = datetime.now(tz=timezone.utc)
                            event.content = None
                            event.execution.error = str(e)
                            event.execution.status = event.execution.FAILED
                            event.execution.response = None
                            return event

                        event.content = {
                            "request": event.request,
                            "response": event.execution.response,
                        }

                if embed_content and embed_function is not None:
                    content = (
                        json.dumps(event.content)
                        if not isinstance(event.content, str)
                        else event.content
                    )
                    if content is None:
                        # need some logging here
                        return event

                    embed_response = None
                    try:
                        if asyncio.iscoroutinefunction(embed_function):
                            embed_response = await embed_function(content)
                        else:
                            embed_response = embed_function(content)
                    except Exception:
                        # some logging on embedding failure
                        pass

                    # Check if the response is already a valid embedding (list of floats)
                    if isinstance(embed_response, list) and all(
                        isinstance(x, (int, float)) for x in embed_response
                    ):
                        event.embedding = embed_response
                    else:
                        # Try to parse the response
                        try:
                            event.embedding = EmbeddableMixin.parse_embedding_response(
                                embed_response
                            )
                        except Exception:
                            # some logging on embedding parsing failure
                            pass

            except Exception:
                # do some logging on the mighty catch all failure
                pass

            finally:
                if adapt and adapter is not None:
                    try:
                        await adapter.to_obj(event.to_log(event_type=event_type), **kw)
                    except Exception:
                        # logging here
                        pass

            return event

        return wrapper

    return decorator

Factory and Utilities

pydapter.protocols.factory

Factory functions for creating protocol-compliant models.

Functions

combine_with_mixins(model_class, *protocols, name=None)

Add protocol mixins to an existing model class.

This is useful when you already have a model with the required fields (e.g., from create_protocol_model) and want to add behavioral methods.

Parameters:

Name Type Description Default
model_class type[BaseModel]

The model class to enhance with mixins

required
*protocols Union[ProtocolType, str]

Protocol names whose mixins to add

()
name str

Optional name for the new class (defaults to original name)

None

Returns:

Type Description
type[BaseModel]

A new model class with the added behavioral mixins

Example
from pydapter.fields import create_protocol_model
from pydapter.protocols import combine_with_mixins, IDENTIFIABLE, TEMPORAL

# First create structure
UserStructure = create_protocol_model(
    "UserStructure",
    IDENTIFIABLE,
    TEMPORAL,
    username=FieldTemplate(base_type=str)
)

# Then add behaviors
User = combine_with_mixins(UserStructure, IDENTIFIABLE, TEMPORAL)
Source code in src/pydapter/protocols/factory.py
def combine_with_mixins(
    model_class: type[BaseModel],
    *protocols: Union[ProtocolType, str],
    name: str = None,
) -> type[BaseModel]:
    """Add protocol mixins to an existing model class.

    This is useful when you already have a model with the required fields
    (e.g., from create_protocol_model) and want to add behavioral methods.

    Args:
        model_class: The model class to enhance with mixins
        *protocols: Protocol names whose mixins to add
        name: Optional name for the new class (defaults to original name)

    Returns:
        A new model class with the added behavioral mixins

    Example:
        ```python
        from pydapter.fields import create_protocol_model
        from pydapter.protocols import combine_with_mixins, IDENTIFIABLE, TEMPORAL

        # First create structure
        UserStructure = create_protocol_model(
            "UserStructure",
            IDENTIFIABLE,
            TEMPORAL,
            username=FieldTemplate(base_type=str)
        )

        # Then add behaviors
        User = combine_with_mixins(UserStructure, IDENTIFIABLE, TEMPORAL)
        ```
    """
    # Collect the mixin classes
    mixins = []
    for protocol in protocols:
        protocol_str = str(protocol).lower()
        if protocol_str in get_mixin_registry():
            mixins.append(get_mixin_registry()[protocol_str])

    # Determine the new class name
    class_name = name or model_class.__name__

    # Create new class with mixins
    return type(class_name, (model_class, *mixins), {})

create_protocol_model_class(name, *protocols, base_model=BaseModel, **namespace)

Create a model class with both structural fields and behavioral methods.

This is a convenience function that combines create_protocol_model (for fields) with the appropriate protocol mixins (for behavior) to create a fully functional protocol-compliant model class.

Parameters:

Name Type Description Default
name str

Name for the generated model class

required
*protocols Union[ProtocolType, str]

Protocol names to implement (e.g., IDENTIFIABLE, TEMPORAL)

()
base_model type[BaseModel]

Base model class to inherit from (default: BaseModel)

BaseModel
**namespace Any

Additional class attributes/methods to include

{}

Returns:

Type Description
type[BaseModel]

A new model class with both protocol fields and behaviors

Example
from pydapter.protocols import create_protocol_model_class, IDENTIFIABLE, TEMPORAL
from pydapter.fields import FieldTemplate

# Create a model with both fields and behaviors
User = create_protocol_model_class(
    "User",
    IDENTIFIABLE,
    TEMPORAL,
    username=FieldTemplate(base_type=str),
    email=FieldTemplate(base_type=str)
)

# Now you can use it
user = User(username="john", email="john@example.com")
user.update_timestamp()  # Method from TemporalMixin
Source code in src/pydapter/protocols/factory.py
def create_protocol_model_class(
    name: str,
    *protocols: Union[ProtocolType, str],
    base_model: type[BaseModel] = BaseModel,
    **namespace: Any,
) -> type[BaseModel]:
    """Create a model class with both structural fields and behavioral methods.

    This is a convenience function that combines create_protocol_model (for fields)
    with the appropriate protocol mixins (for behavior) to create a fully functional
    protocol-compliant model class.

    Args:
        name: Name for the generated model class
        *protocols: Protocol names to implement (e.g., IDENTIFIABLE, TEMPORAL)
        base_model: Base model class to inherit from (default: BaseModel)
        **namespace: Additional class attributes/methods to include

    Returns:
        A new model class with both protocol fields and behaviors

    Example:
        ```python
        from pydapter.protocols import create_protocol_model_class, IDENTIFIABLE, TEMPORAL
        from pydapter.fields import FieldTemplate

        # Create a model with both fields and behaviors
        User = create_protocol_model_class(
            "User",
            IDENTIFIABLE,
            TEMPORAL,
            username=FieldTemplate(base_type=str),
            email=FieldTemplate(base_type=str)
        )

        # Now you can use it
        user = User(username="john", email="john@example.com")
        user.update_timestamp()  # Method from TemporalMixin
        ```
    """
    from pydapter.fields import create_protocol_model

    # Extract field templates from namespace
    field_templates = {}
    class_attrs = {}

    for key, value in namespace.items():
        # Check if it's a FieldTemplate (avoid circular import)
        if hasattr(value, "create_field") and hasattr(value, "base_type"):
            field_templates[key] = value
        else:
            class_attrs[key] = value

    # Create the structural model with fields
    structural_model = create_protocol_model(
        f"_{name}Structure", *protocols, **field_templates
    )

    # Collect the mixin classes
    mixins = []
    for protocol in protocols:
        protocol_str = str(protocol).lower()
        if protocol_str in get_mixin_registry():
            mixins.append(get_mixin_registry()[protocol_str])

    # Create the final class with mixins
    # Order: structural_model -> mixins -> base_model
    bases = (structural_model, *mixins, base_model)

    return type(name, bases, class_attrs)

pydapter.protocols.registry

Classes

Functions

get_mixin_registry()

Get the registry of mixin classes for protocols.

Returns:

Type Description
dict[str, type]

A dictionary mapping protocol names to their corresponding mixin classes.

Source code in src/pydapter/protocols/registry.py
def get_mixin_registry() -> dict[str, type]:
    """
    Get the registry of mixin classes for protocols.

    Returns:
        A dictionary mapping protocol names to their corresponding mixin classes.
    """
    return _MIXIN_CLASSES

register_mixin(protocol_name, mixin_class)

Register a new mixin class for a protocol.

Parameters:

Name Type Description Default
protocol_name str

The name of the protocol (e.g., "identifiable").

required
mixin_class type

The mixin class to register.

required
Source code in src/pydapter/protocols/registry.py
def register_mixin(protocol_name: str, mixin_class: type) -> None:
    """
    Register a new mixin class for a protocol.

    Args:
        protocol_name: The name of the protocol (e.g., "identifiable").
        mixin_class: The mixin class to register.
    """
    _MIXIN_CLASSES[protocol_name.lower()] = mixin_class

pydapter.protocols.constants

Protocol constants for type-safe protocol selection.

pydapter.protocols.types

Basic types for protocols - maintained for backwards compatibility.

Classes

Log

Bases: BaseModel

Base Log model

Source code in src/pydapter/protocols/types.py
class Log(BaseModel):
    """Base Log model"""

    model_config = ConfigDict(
        extra="forbid",
        frozen=True,
        validate_assignment=True,
        arbitrary_types_allowed=True,
        json_schema_extra={
            "example": {
                "id": "some-uuid-string",
                "event_type": "example_event",
                "content": "This is an example log entry.",
                "embedding": [0.1, 0.2, 0.3],
                "metadata": {"key": "value"},
                "created_at": "2023-10-01T12:00:00Z",
                "updated_at": "2023-10-01T12:00:00Z",
                "duration": 1.23,
                "status": "success",
                "error": None,
                "sha256": "abc123def456...",
            },
        },
    )

    id: str
    event_type: str
    content: str | None = None
    embedding: list[float] | None = None
    metadata: dict[str, Any] | None = None
    created_at: str | None = None  # ISO format string
    updated_at: str | None = None  # ISO format string
    duration: float | None = None
    status: str | None = None
    error: str | None = None
    sha256: str | None = None

pydapter.protocols.utils

Functions

as_async_fn(fn) cached

forcefully get the async call of a function

Source code in src/pydapter/protocols/utils.py
@cache
def as_async_fn(fn, /):
    """forcefully get the async call of a function"""
    if is_coroutine_function(fn):
        return fn
    return force_async(fn)

force_async(fn)

force a function to be async.

Source code in src/pydapter/protocols/utils.py
def force_async(fn: Callable[..., T], /) -> Callable[..., Callable[..., T]]:
    """force a function to be async."""
    pool = ThreadPoolExecutor()

    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        future = pool.submit(fn, *args, **kwargs)
        return asyncio.wrap_future(future)  # Make it awaitable

    return wrapper

get_bins(input_, /, upper)

Organizes indices of items into bins based on a cumulative upper limit length.

Parameters:

Name Type Description Default
input_ list[str]

The list of strings to be binned.

required
upper int

The cumulative length upper limit for each bin.

required

Returns:

Type Description
list[Bin]

list[list[int]]: A list of bins, each bin is a list of indices from the input list.

Source code in src/pydapter/protocols/utils.py
def get_bins(input_: list[HasLen], /, upper: int) -> list[Bin]:
    """Organizes indices of items into bins based on a cumulative upper limit length.

    Args:
        input_ (list[str]): The list of strings to be binned.
        upper (int): The cumulative length upper limit for each bin.

    Returns:
        list[list[int]]: A list of bins, each bin is a list of indices from the input list.
    """
    current = 0
    bins = []
    current_bin = []
    for idx, item in enumerate(input_):
        if current + len(item) < upper:
            current_bin.append(idx)
            current += len(item)
        else:
            bins.append(current_bin)
            current_bin = [idx]
            current = len(item)
    if current_bin:
        bins.append(current_bin)
    return bins

import_module(package_name, module_name=None, import_name=None)

Import a module by its path.

Source code in src/pydapter/protocols/utils.py
def import_module(
    package_name: str,
    module_name: str | None = None,
    import_name: str | list | None = None,
) -> Imp | list[Imp] | Any:
    """Import a module by its path."""
    try:
        full_import_path = (
            f"{package_name}.{module_name}" if module_name else package_name
        )

        if import_name:
            import_name = (
                [import_name] if not isinstance(import_name, list) else import_name
            )
            a = __import__(
                full_import_path,
                fromlist=import_name,
            )
            if len(import_name) == 1:
                return getattr(a, import_name[0])
            return [getattr(a, name) for name in import_name]
        return __import__(full_import_path)

    except ImportError as e:
        error_msg = f"Failed to import module {full_import_path}: {e}"
        raise ImportError(error_msg) from e

is_coroutine_function(fn) cached

Check if a function is a coroutine function.

Source code in src/pydapter/protocols/utils.py
@cache
def is_coroutine_function(fn, /) -> bool:
    """Check if a function is a coroutine function."""
    return asyncio.iscoroutinefunction(fn)

sha256_of_dict(obj)

Deterministic SHA-256 of an arbitrary mapping.

Source code in src/pydapter/protocols/utils.py
def sha256_of_dict(obj: dict) -> str:
    """Deterministic SHA-256 of an arbitrary mapping."""
    import hashlib

    import orjson

    payload: bytes = orjson.dumps(
        obj,
        option=(
            orjson.OPT_SORT_KEYS  # canonical ordering
            | orjson.OPT_NON_STR_KEYS  # allow int / enum keys if you need them
        ),
    )
    return hashlib.sha256(memoryview(payload)).hexdigest()

validate_model_to_dict(v)

Serialize a Pydantic model to a dictionary. kwargs are passed to model_dump.

Source code in src/pydapter/protocols/utils.py
def validate_model_to_dict(v):
    """Serialize a Pydantic model to a dictionary. kwargs are passed to model_dump."""

    if isinstance(v, BaseModel):
        return v.model_dump()
    if v is None:
        return {}
    if isinstance(v, dict):
        return v

    error_msg = "Input value for field <model> should be a `pydantic.BaseModel` object or a `dict`"
    raise ValueError(error_msg)