PostgreSQL Adapter Tutorial for Pydapter¶
This tutorial will show you how to use the PostgreSQL adapters in pydapter to seamlessly convert between Pydantic models and PostgreSQL databases. We'll cover both synchronous and asynchronous adapters.
Prerequisites¶
1. Install Dependencies¶
# Create a virtual environment if you haven't already
python -m venv pydapter-demo
source pydapter-demo/bin/activate # On Windows: pydapter-demo\Scripts\activate
# Install dependencies
pip install pydantic sqlalchemy psycopg # For synchronous adapter
pip install asyncpg # For asynchronous adapter
# Install pydapter (if you haven't done so already)
# Either from PyPI when available:
# pip install pydapter
# Or from the repository:
git clone https://github.com/ohdearquant/pydapter.git
cd pydapter
pip install -e .
2. Set Up PostgreSQL¶
Make sure you have PostgreSQL installed and running. You can use a local installation or a Docker container:
# Using Docker to run PostgreSQL
docker run --name pydapter-postgres -e POSTGRES_PASSWORD=password \
-e POSTGRES_USER=pydapter -e POSTGRES_DB=pydapter_demo \
-p 5432:5432 -d postgres:14
# Alternatively, install PostgreSQL locally and create a database
# createuser -s pydapter
# createdb -O pydapter pydapter_demo
3. Create a Test Table¶
Connect to your PostgreSQL instance and create a test table:
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Optional: Add some test data
INSERT INTO users (name, email) VALUES
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com'),
('Charlie', 'charlie@example.com');
Synchronous PostgreSQL Adapter¶
Let's start with the synchronous PostgreSQL adapter:
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, List
from pydapter.extras.postgres_ import PostgresAdapter
# Define a Pydantic model that maps to our database table
class User(BaseModel):
id: Optional[int] = None
name: str
email: str
active: bool = True
created_at: Optional[datetime] = None
# Connection details
db_config = {
"engine_url": "postgresql+psycopg://pydapter:password@localhost/pydapter_demo"
# You can also use:
# "engine_url": "postgresql://pydapter:password@localhost/pydapter_demo"
# Pydapter will convert it to the correct format
}
# Read data from the database
def read_users():
# Query all users
users = PostgresAdapter.from_obj(
User,
{
**db_config,
"table": "users",
"selectors": {} # Empty selectors means "select all"
},
many=True # Return a list of users
)
print(f"Found {len(users)} users:")
for user in users:
print(f" - {user.name} ({user.email}): Active={user.active}")
return users
# Query a specific user
def get_user_by_email(email):
try:
user = PostgresAdapter.from_obj(
User,
{
**db_config,
"table": "users",
"selectors": {"email": email}
},
many=False # Return a single user
)
print(f"Found user: {user.name} ({user.email})")
return user
except Exception as e:
print(f"Error finding user: {e}")
return None
# Create a new user
def create_user(name, email):
user = User(name=name, email=email)
result = PostgresAdapter.to_obj(
user,
**db_config,
table="users",
many=False
)
print(f"Created user: {result}")
return user
# Main function to demo the adapter
def main():
print("Reading all users:")
users = read_users()
print("\nFinding user by email:")
alice = get_user_by_email("alice@example.com")
print("\nCreating a new user:")
new_user = create_user("Dave", "dave@example.com")
print("\nVerifying new user was added:")
read_users()
if __name__ == "__main__":
main()
Asynchronous PostgreSQL Adapter¶
Now let's use the asynchronous version with asyncpg:
import asyncio
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, List
from pydapter.extras.async_postgres_ import AsyncPostgresAdapter
# Define a Pydantic model that maps to our database table
class User(BaseModel):
id: Optional[int] = None
name: str
email: str
active: bool = True
created_at: Optional[datetime] = None
# Connection details
db_config = {
"engine_url": "postgresql+asyncpg://pydapter:password@localhost/pydapter_demo"
# You can also use:
# "engine_url": "postgresql://pydapter:password@localhost/pydapter_demo"
# Pydapter will convert it to the correct format with asyncpg
}
# Read data from the database asynchronously
async def read_users():
# Query all users
users = await AsyncPostgresAdapter.from_obj(
User,
{
**db_config,
"table": "users",
"selectors": {} # Empty selectors means "select all"
},
many=True # Return a list of users
)
print(f"Found {len(users)} users:")
for user in users:
print(f" - {user.name} ({user.email}): Active={user.active}")
return users
# Query a specific user asynchronously
async def get_user_by_email(email):
try:
user = await AsyncPostgresAdapter.from_obj(
User,
{
**db_config,
"table": "users",
"selectors": {"email": email}
},
many=False # Return a single user
)
print(f"Found user: {user.name} ({user.email})")
return user
except Exception as e:
print(f"Error finding user: {e}")
return None
# Create a new user asynchronously
async def create_user(name, email):
user = User(name=name, email=email)
result = await AsyncPostgresAdapter.to_obj(
user,
**db_config,
table="users",
many=False
)
print(f"Created user: {result}")
return user
# Main function to demo the async adapter
async def main():
print("Reading all users:")
users = await read_users()
print("\nFinding user by email:")
alice = await get_user_by_email("alice@example.com")
print("\nCreating a new user:")
new_user = await create_user("Eve", "eve@example.com")
print("\nVerifying new user was added:")
await read_users()
# Run the async main function
if __name__ == "__main__":
asyncio.run(main())
Advanced Usage: Using Adaptable Mixin¶
For a more ergonomic API, you can use the AsyncAdaptable
mixin:
import asyncio
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, List
from pydapter.async_core import AsyncAdaptable
from pydapter.extras.async_postgres_ import AsyncPostgresAdapter
# Define a model with the AsyncAdaptable mixin
class User(BaseModel, AsyncAdaptable):
id: Optional[int] = None
name: str
email: str
active: bool = True
created_at: Optional[datetime] = None
# Register the async PostgreSQL adapter
User.register_async_adapter(AsyncPostgresAdapter)
# Main async function
async def main():
# Connection configuration
db_config = {
"engine_url": "postgresql+asyncpg://pydapter:password@localhost/pydapter_demo",
"table": "users",
"selectors": {}
}
# Read users using the mixin methods
users = await User.adapt_from_async(db_config, obj_key="async_pg", many=True)
print(f"Found {len(users)} users:")
for user in users:
print(f" - {user.name} ({user.email})")
# Create a new user
new_user = User(name="Frank", email="frank@example.com")
# Save to database
result = await new_user.adapt_to_async(
obj_key="async_pg",
engine_url="postgresql+asyncpg://pydapter:password@localhost/pydapter_demo",
table="users"
)
print(f"\nCreated new user: {result}")
# Verify the user was added
updated_users = await User.adapt_from_async(db_config, obj_key="async_pg", many=True)
print(f"\nUpdated user count: {len(updated_users)}")
# Run the async main function
if __name__ == "__main__":
asyncio.run(main())
Error Handling¶
Let's demonstrate proper error handling for common PostgreSQL errors:
from pydapter.exceptions import ConnectionError, QueryError, ResourceError
from pydapter.extras.postgres_ import PostgresAdapter
from pydantic import BaseModel
class User(BaseModel):
id: Optional[int] = None
name: str
email: str
def handle_postgres_errors():
# 1. Connection error - wrong password
try:
PostgresAdapter.from_obj(
User,
{
"engine_url": "postgresql://pydapter:wrong_password@localhost/pydapter_demo",
"table": "users"
}
)
except ConnectionError as e:
print(f"Authentication error handled: {e}")
# 2. Connection error - wrong host
try:
PostgresAdapter.from_obj(
User,
{
"engine_url": "postgresql://pydapter:password@nonexistent_host/pydapter_demo",
"table": "users"
}
)
except ConnectionError as e:
print(f"Host connection error handled: {e}")
# 3. Resource error - table doesn't exist
try:
PostgresAdapter.from_obj(
User,
{
"engine_url": "postgresql://pydapter:password@localhost/pydapter_demo",
"table": "nonexistent_table"
}
)
except ResourceError as e:
print(f"Table resource error handled: {e}")
# 4. Query error - SQL syntax error
try:
# This would normally be handled internally, but for demonstration
# you might encounter this when using raw SQL
pass
except QueryError as e:
print(f"Query error handled: {e}")
# Run the error handling examples
handle_postgres_errors()
Practical Example: A Task Management System¶
Let's create a more complete example of a task management system:
import asyncio
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel, Field
from pydapter.async_core import AsyncAdaptable
from pydapter.extras.async_postgres_ import AsyncPostgresAdapter
# Database setup - Run this SQL in your PostgreSQL database first
"""
CREATE TABLE IF NOT EXISTS projects (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS tasks (
id SERIAL PRIMARY KEY,
project_id INTEGER REFERENCES projects(id),
title VARCHAR(200) NOT NULL,
description TEXT,
status VARCHAR(20) DEFAULT 'pending',
due_date TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
# Define models with AsyncAdaptable
class Project(BaseModel, AsyncAdaptable):
id: Optional[int] = None
name: str
description: Optional[str] = None
created_at: Optional[datetime] = None
class Task(BaseModel, AsyncAdaptable):
id: Optional[int] = None
project_id: int
title: str
description: Optional[str] = None
status: str = "pending"
due_date: Optional[datetime] = None
created_at: Optional[datetime] = None
# Register adapters
Project.register_async_adapter(AsyncPostgresAdapter)
Task.register_async_adapter(AsyncPostgresAdapter)
# Database configuration
DB_CONFIG = {
"engine_url": "postgresql+asyncpg://pydapter:password@localhost/pydapter_demo"
}
# Task management system class
class TaskManager:
def __init__(self, db_config):
self.db_config = db_config
async def create_project(self, name, description=None):
project = Project(name=name, description=description)
result = await project.adapt_to_async(
obj_key="async_pg",
**self.db_config,
table="projects"
)
# Get the new project with its ID
projects = await Project.adapt_from_async(
{
**self.db_config,
"table": "projects",
"selectors": {"name": name}
},
obj_key="async_pg",
many=True
)
if projects:
return projects[0]
return None
async def get_projects(self):
return await Project.adapt_from_async(
{
**self.db_config,
"table": "projects"
},
obj_key="async_pg",
many=True
)
async def create_task(self, project_id, title, description=None, due_date=None):
task = Task(
project_id=project_id,
title=title,
description=description,
due_date=due_date
)
result = await task.adapt_to_async(
obj_key="async_pg",
**self.db_config,
table="tasks"
)
return task
async def get_tasks_for_project(self, project_id):
return await Task.adapt_from_async(
{
**self.db_config,
"table": "tasks",
"selectors": {"project_id": project_id}
},
obj_key="async_pg",
many=True
)
async def update_task_status(self, task_id, new_status):
# First, get the task
task = await Task.adapt_from_async(
{
**self.db_config,
"table": "tasks",
"selectors": {"id": task_id}
},
obj_key="async_pg",
many=False
)
# Update the status
task.status = new_status
# Save back to database
result = await task.adapt_to_async(
obj_key="async_pg",
**self.db_config,
table="tasks"
)
return task
# Main function to demo the task manager
async def main():
manager = TaskManager(DB_CONFIG)
# Create a new project
print("Creating a new project...")
project = await manager.create_project(
"Website Redesign",
"Redesign the company website with modern UI/UX"
)
print(f"Project created: {project.id} - {project.name}")
# Add tasks to the project
print("\nAdding tasks to the project...")
tasks = [
await manager.create_task(
project.id,
"Design mockups",
"Create initial design mockups for homepage",
datetime.now().replace(day=datetime.now().day + 7)
),
await manager.create_task(
project.id,
"Frontend implementation",
"Implement the design in React",
datetime.now().replace(day=datetime.now().day + 14)
),
await manager.create_task(
project.id,
"Backend API",
"Implement the required API endpoints",
datetime.now().replace(day=datetime.now().day + 10)
)
]
# Get all projects
print("\nListing all projects:")
projects = await manager.get_projects()
for proj in projects:
print(f" - {proj.id}: {proj.name}")
# Get tasks for this project
proj_tasks = await manager.get_tasks_for_project(proj.id)
for task in proj_tasks:
print(f" - {task.title} [{task.status}] " +
(f"(Due: {task.due_date.strftime('%Y-%m-%d')})" if task.due_date else ""))
# Update a task status
print("\nUpdating task status...")
updated_task = await manager.update_task_status(tasks[0].id, "in_progress")
print(f"Updated task: {updated_task.title} - Status: {updated_task.status}")
# Final task list
print("\nFinal task list:")
final_tasks = await manager.get_tasks_for_project(project.id)
for task in final_tasks:
print(f" - {task.title} [{task.status}]")
if __name__ == "__main__":
asyncio.run(main())
Handling Advanced PostgreSQL Features¶
PostgreSQL has many advanced features like JSON/JSONB fields, arrays, and full-text search. Here's how to work with some of these using pydapter:
import asyncio
import json
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
from pydapter.async_core import AsyncAdaptable
from pydapter.extras.async_postgres_ import AsyncPostgresAdapter
# Database setup - Run this SQL in your PostgreSQL database first
"""
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
price NUMERIC(10, 2) NOT NULL,
categories TEXT[] DEFAULT '{}',
metadata JSONB DEFAULT '{}'
);
"""
# Define our model with PostgreSQL-specific types
class Product(BaseModel, AsyncAdaptable):
id: Optional[int] = None
name: str
description: Optional[str] = None
price: float
categories: List[str] = []
metadata: Dict[str, Any] = {}
# Register adapter
Product.register_async_adapter(AsyncPostgresAdapter)
# Database config
DB_CONFIG = {
"engine_url": "postgresql+asyncpg://pydapter:password@localhost/pydapter_demo"
}
async def demo_advanced_postgres_features():
# Create products with array and JSON data
products = [
Product(
name="Smartphone",
description="Latest model with advanced features",
price=999.99,
categories=["electronics", "mobile", "gadgets"],
metadata={
"brand": "TechX",
"model": "TX-2000",
"specs": {
"cpu": "Octa-core",
"ram": "8GB",
"storage": "256GB"
}
}
),
Product(
name="Coffee Maker",
description="Premium coffee machine for home or office",
price=199.99,
categories=["appliances", "kitchen", "coffee"],
metadata={
"brand": "BrewMaster",
"features": ["programmable", "thermal carafe", "auto-clean"],
"dimensions": {
"width": 30,
"height": 40,
"depth": 20
}
}
)
]
# Save products to database
print("Saving products with arrays and JSON data...")
for product in products:
await product.adapt_to_async(
obj_key="async_pg",
**DB_CONFIG,
table="products"
)
# Query all products
print("\nRetrieving products from database:")
db_products = await Product.adapt_from_async(
{
**DB_CONFIG,
"table": "products"
},
obj_key="async_pg",
many=True
)
# Display products with their array and JSON data
for product in db_products:
print(f"\n{product.name} - ${product.price}")
print(f" Description: {product.description}")
print(f" Categories: {', '.join(product.categories)}")
print(f" Metadata: {json.dumps(product.metadata, indent=2)}")
if __name__ == "__main__":
asyncio.run(demo_advanced_postgres_features())
Conclusion¶
In this tutorial, you learned how to use pydapter's PostgreSQL adapters to seamlessly work with both synchronous and asynchronous database operations. These adapters provide a clean interface for converting between Pydantic models and PostgreSQL database records, with specialized error handling for PostgreSQL-specific issues.
The asynchronous adapter is particularly useful for high-performance
applications where you want to avoid blocking I/O operations. By using the
AsyncAdaptable
mixin, you can create a more ergonomic API that makes your code
cleaner and more maintainable.
The key advantages of using pydapter's PostgreSQL adapters include:
- Automatic validation through Pydantic models
- Consistent error handling
- Support for both synchronous and asynchronous operations
- Easy conversion between models and database records
- Support for PostgreSQL-specific data types like arrays and JSONB
Try experimenting with these adapters in your own projects to see how they can simplify your database interactions!