Skip to content

Async/Sync Patterns in Pydapter

Architecture Decision

Pydapter provides separate sync and async implementations without mixing concerns:

  • Sync: Adapter, AdapterRegistry, Adaptable
  • Async: AsyncAdapter, AsyncAdapterRegistry, AsyncAdaptable

Benefits:

  • No async overhead in sync operations
  • Clear separation of concerns
  • Type safety in both contexts
  • Simple, focused interfaces

Core Async Components

AsyncAdapter Protocol

from pydapter.async_core import AsyncAdapter

class MyAsyncAdapter(AsyncAdapter[T]):
    obj_key = "my_async_format"

    @classmethod
    async def from_obj(cls, subj_cls: type[T], obj: Any, /, *, many=False, **kw) -> T | list[T]:
        # Async operations (HTTP, database, etc.)
        pass

    @classmethod
    async def to_obj(cls, subj: T | list[T], /, *, many=False, **kw) -> Any:
        # Async output operations
        pass

AsyncAdapterRegistry

from pydapter.async_core import AsyncAdapterRegistry

async_registry = AsyncAdapterRegistry()
async_registry.register(MyAsyncAdapter)

result = await async_registry.adapt_from(MyModel, data, obj_key="my_async_format")

AsyncAdaptable Mixin

from pydapter.async_core import AsyncAdaptable

class MyModel(BaseModel, AsyncAdaptable):
    name: str
    value: int

MyModel.register_async_adapter(MyAsyncAdapter)
instance = await MyModel.adapt_from_async(data, obj_key="my_async_format")

Common Async Patterns

HTTP API Adapter

class RestApiAdapter(AsyncAdapter[T]):
    obj_key = "rest_api"

    @classmethod
    async def from_obj(cls, subj_cls: type[T], obj: dict, /, *, many=False, **kw):
        url = f"{obj['base_url']}/{obj['endpoint']}"

        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()
                data = await response.json()

        if many:
            items = data.get("items", data) if isinstance(data, dict) else data
            return [subj_cls.model_validate(item) for item in items]
        return subj_cls.model_validate(data)

Database Adapter

class AsyncPostgresAdapter(AsyncAdapter[T]):
    obj_key = "async_postgres"

    @classmethod
    async def from_obj(cls, subj_cls: type[T], obj: dict, /, *, many=False, **kw):
        conn = await asyncpg.connect(obj["connection_string"])
        try:
            if many:
                rows = await conn.fetch(obj["query"], *obj.get("params", []))
                return [subj_cls.model_validate(dict(row)) for row in rows]
            else:
                row = await conn.fetchrow(obj["query"], *obj.get("params", []))
                return subj_cls.model_validate(dict(row)) if row else None
        finally:
            await conn.close()

Concurrent Operations

class ConcurrentAdapter(AsyncAdapter[T]):
    obj_key = "concurrent"

    @classmethod
    async def from_obj(cls, subj_cls: type[T], obj: dict, /, *, many=False, **kw):
        sources = obj["sources"]
        max_concurrent = kw.get("max_concurrent", 5)

        semaphore = asyncio.Semaphore(max_concurrent)

        async def fetch_one(source):
            async with semaphore:
                # Fetch from individual source
                return await SomeAdapter.from_obj(subj_cls, source)

        results = await asyncio.gather(
            *[fetch_one(source) for source in sources],
            return_exceptions=True
        )

        # Filter successful results
        successful = [r for r in results if not isinstance(r, Exception)]
        return successful if many else (successful[0] if successful else None)

Sharing Logic Between Sync/Async

Use mixins for shared transformation logic:

class DataTransformationMixin:
    @staticmethod
    def normalize_data(data: dict) -> dict:
        return {
            "id": data.get("identifier") or data.get("id"),
            "name": (data.get("name") or "").strip(),
            "active": data.get("status") == "active",
        }

# Sync adapter
class MySyncAdapter(Adapter[T], DataTransformationMixin):
    @classmethod
    def from_obj(cls, subj_cls: type[T], obj: dict, /, *, many=False, **kw):
        normalized = cls.normalize_data(obj)
        return subj_cls.model_validate(normalized)

# Async adapter using same logic
class MyAsyncAdapter(AsyncAdapter[T], DataTransformationMixin):
    @classmethod
    async def from_obj(cls, subj_cls: type[T], obj: dict, /, *, many=False, **kw):
        data = await cls._fetch_async_data(obj["url"])
        normalized = cls.normalize_data(data)
        return subj_cls.model_validate(normalized)

Error Handling Patterns

Timeout Management

class TimeoutAwareAdapter(AsyncAdapter[T]):
    @classmethod
    async def from_obj(cls, subj_cls: type[T], obj: dict, /, *, many=False, **kw):
        timeout_seconds = kw.get("timeout", 30)

        try:
            async with asyncio.timeout(timeout_seconds):
                return await cls._fetch_data(obj, subj_cls)
        except asyncio.TimeoutError:
            raise ParseError(f"Operation timed out after {timeout_seconds}s")

Retry Logic

class RetryableAdapter(AsyncAdapter[T]):
    @classmethod
    async def from_obj(cls, subj_cls: type[T], obj: dict, /, *, many=False, **kw):
        max_retries = kw.get("max_retries", 3)

        for attempt in range(max_retries + 1):
            try:
                return await cls._attempt_fetch(subj_cls, obj)
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                if attempt < max_retries:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
                    continue
                raise ParseError(f"Failed after {max_retries + 1} attempts: {e}")

Testing Async Adapters

@pytest.mark.asyncio
class TestAsyncAdapters:
    async def test_http_adapter(self, respx_mock):
        respx_mock.get("http://api.example.com/data").mock(
            return_value=httpx.Response(200, json={"name": "test"})
        )

        result = await RestApiAdapter.from_obj(
            MyModel, {"base_url": "http://api.example.com", "endpoint": "data"}
        )
        assert result.name == "test"

    async def test_timeout_handling(self):
        with pytest.raises(ParseError, match="timed out"):
            await TimeoutAwareAdapter.from_obj(MyModel, config, timeout=0.1)

Key Tips for LLM Developers

1. Resource Management

# ✓ Always use context managers
async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        data = await response.json()

# ✓ Proper cleanup
conn = await asyncpg.connect(connection_string)
try:
    result = await conn.fetch(query)
finally:
    await conn.close()

2. Concurrency Control

# ✓ Use semaphore for rate limiting
semaphore = asyncio.Semaphore(max_concurrent)

async def process_item(item):
    async with semaphore:
        # Process item without overwhelming external services
        pass

3. Common Async Caveats

  • Context managers: Always use async with for resource cleanup
  • Timeouts: Set reasonable timeouts for external operations
  • Error handling: Distinguish between retryable and non-retryable errors
  • Concurrency limits: Use semaphores to avoid overwhelming external services
  • Testing: Use pytest.mark.asyncio and mock async dependencies

4. Separation of Concerns

# ✓ Keep sync and async adapters separate
class SyncAdapter(Adapter[T]): pass
class AsyncAdapter(AsyncAdapter[T]): pass

# ✗ Avoid mixing sync/async in same class

5. Performance Patterns

  • Use asyncio.gather() for concurrent operations
  • Implement connection pooling for database adapters
  • Add circuit breakers for external service calls
  • Use exponential backoff for retry logic

This dual-API approach ensures optimal performance and clear separation between synchronous and asynchronous contexts.