Skip to content

Quickstart

End-to-end example: define a model, connect to MongoDB, and perform CRUD operations.

1. Install

pip install mongotic

2. Define a model

from typing import Optional, Text

from pydantic import Field

from mongotic.model import MongoBaseModel


class User(MongoBaseModel):
    __databasename__ = "myapp"   # MongoDB database name
    __tablename__    = "users"   # MongoDB collection name

    name:    Text          = Field(..., max_length=50)
    email:   Text          = Field(...)
    company: Optional[Text] = Field(None)
    age:     Optional[int]  = Field(None, ge=0, le=200)

3. Connect

from mongotic import create_engine
from mongotic.orm import sessionmaker

engine  = create_engine("mongodb://localhost:27017")
Session = sessionmaker(bind=engine)

create_engine returns a standard pymongo.MongoClient — any connection string pymongo accepts works here.

4. Write

with Session() as session:
    session.add(User(name="Alice", email="alice@example.com", age=30))
    session.add_all([
        User(name="Bob",   email="bob@example.com",   age=25),
        User(name="Carol", email="carol@example.com", age=28),
    ])
    session.commit()   # writes to MongoDB immediately

After commit(), each instance's _id field is populated with the MongoDB ObjectId string.

5. Query

from mongotic import select

with Session() as session:
    # All users
    users = session.scalars(select(User)).all()

    # Filtered
    adults = session.scalars(select(User).where(User.age >= 18)).all()

    # First match (returns None if not found)
    alice = session.scalars(
        select(User).where(User.email == "alice@example.com")
    ).first()

    # By primary key
    user = session.get(User, alice._id)

6. Update

with Session() as session:
    alice = session.scalars(
        select(User).where(User.email == "alice@example.com")
    ).first()
    alice.company = "Acme"   # change is tracked automatically
    session.commit()

7. Delete

with Session() as session:
    alice = session.scalars(
        select(User).where(User.email == "alice@example.com")
    ).first()
    session.delete(alice)
    session.commit()

Next steps

  • Querying — filters, sort, pagination, count, exists
  • Session — flush vs commit, rollback, context manager

Async quickstart

If your application runs inside an asyncio event loop, use mongotic.asyncio instead. The API mirrors the sync version exactly.

import asyncio
from mongotic.asyncio import create_async_engine, async_sessionmaker
from mongotic import insert, select

async_engine = create_async_engine("mongodb://localhost:27017")
AsyncSession  = async_sessionmaker(bind=async_engine)

async def main():
    async with AsyncSession() as session:
        # Bulk insert
        await session.execute(
            insert(User).values([
                {"name": "Alice", "email": "alice@example.com", "age": 30},
                {"name": "Bob",   "email": "bob@example.com",   "age": 25},
            ])
        )

        # Query
        adults = await session.scalars(select(User).where(User.age >= 18)).all()
        print([u.name for u in adults])

        # Async iteration
        async for user in session.scalars(select(User)):
            print(user.name)

asyncio.run(main())

See Async for the full reference — session lifecycle, projection, indexes, and a sync/async cheat sheet.