Querying
Building a statement with select()
All clauses return the same Select object, so you can chain them:
stmt = (
select(User)
.where(User.age >= 18)
.order_by(-User.age) # descending
.limit(10)
.offset(0)
)
.where(*conditions)
Combine multiple conditions with repeated .where() calls — they are ANDed together.
| Expression | MongoDB equivalent |
|---|---|
User.age == 30 |
{"age": {"$eq": 30}} |
User.age != 30 |
{"age": {"$ne": 30}} |
User.age > 18 |
{"age": {"$gt": 18}} |
User.age >= 18 |
{"age": {"$gte": 18}} |
User.age < 65 |
{"age": {"$lt": 65}} |
User.age <= 65 |
{"age": {"$lte": 65}} |
User.role.in_(["admin", "mod"]) |
{"role": {"$in": [...]}} |
User.role.not_in(["banned"]) |
{"role": {"$nin": [...]}} |
.order_by(*fields)
Pass a ModelField for ascending, or negate it (-User.field) for descending:
# ascending by name
stmt = select(User).order_by(User.name)
# descending by age, then ascending by name
stmt = select(User).order_by(-User.age, User.name)
.limit(n) and .offset(n)
Logical combinators
Use or_(), and_(), and not_() to compose complex filter conditions.
from mongotic import select, or_, and_, not_
# OR — users who are admin or moderator
stmt = select(User).where(
or_(User.role == "admin", User.role == "moderator")
)
# NOT — exclude banned users
stmt = select(User).where(
not_(User.status == "banned")
)
# Nested composition
stmt = select(User).where(
and_(
User.age >= 18,
or_(User.role == "admin", User.verified == True),
)
)
# not_(or_(...)) maps to MongoDB $nor
stmt = select(User).where(
not_(or_(User.role == "guest", User.role == "anonymous"))
)
| Combinator | MongoDB operator |
|---|---|
or_(A, B) |
{"$or": [A, B]} |
and_(A, B) |
{"$and": [A, B]} |
not_(field_op) |
{"field": {"$not": expr}} |
not_(or_(A, B)) |
{"$nor": [A, B]} |
Null checks
Use .is_() and .is_not() to test for None (null / missing fields).
# Find users with no email set
stmt = select(User).where(User.email.is_(None))
# Find users that have an email
stmt = select(User).where(User.email.is_not(None))
Note
In MongoDB, {"field": None} matches documents where the field is null or where the field does not exist at all.
String operators
| Method | SQL equivalent | MongoDB |
|---|---|---|
.like("Al%") |
LIKE 'Al%' |
{"$regex": "^Al.*$"} |
.ilike("al%") |
ILIKE 'al%' |
{"$regex": "^al.*$", "$options": "i"} |
.contains("gmail") |
LIKE '%gmail%' |
{"$regex": "gmail"} |
.startswith("Al") |
LIKE 'Al%' |
{"$regex": "^Al"} |
.endswith("son") |
LIKE '%son' |
{"$regex": "son$"} |
stmt = select(User).where(User.name.startswith("Al"))
stmt = select(User).where(User.email.contains("@gmail.com"))
stmt = select(User).where(User.name.ilike("alice"))
Index usage
MongoDB can use an index for regex queries only when the pattern is anchored with ^. .startswith() and .like("prefix%") benefit from indexes; .contains() and .endswith() do not.
Range operator
.between(low, high) is inclusive on both ends — equivalent to field >= low AND field <= high.
stmt = select(User).where(User.age.between(18, 65))
# Works with dates
from datetime import datetime
stmt = select(Order).where(
Order.created_at.between(datetime(2024, 1, 1), datetime(2024, 12, 31))
)
Column projection
Pass individual ModelField attributes to select() to fetch only a subset of fields. session.execute() returns a SelectResult whose items are Row objects.
from mongotic.result import Row, SelectResult
result = session.execute(
select(User.name, User.email).where(User.age >= 18)
)
assert isinstance(result, SelectResult)
for row in result.all():
# attribute, index, and key access all work
print(row.name, row.email)
print(row[0], row["name"])
For a single-column projection, session.scalars() unwraps each row to a plain value:
names = session.scalars(
select(User.name).where(User.company == "Acme")
).all()
# → ["Alice", "Bob", "Carol"]
Note
Passing multiple columns to session.scalars() raises TypeError. Use session.execute() for multi-column projections.
Distinct values
.distinct(field) returns a list of unique values for a field, optionally filtered by .where().
# All unique roles
roles = session.scalars(select(User).distinct(User.role)).all()
# → ["admin", "member", "guest"]
# Distinct roles among active users
active_roles = session.scalars(
select(User).where(User.active == True).distinct(User.role)
).all()
Note
.distinct() uses MongoDB's collection.distinct() command, which returns plain values (not model instances). .order_by() and .limit() have no effect on distinct queries.
Executing with session.scalars()
ScalarResult is lazy — no MongoDB query is issued until you call a terminal method.
Terminal methods
| Method | Returns | Behaviour |
|---|---|---|
.all() |
list[Model] |
All matching documents |
.first() |
Model \| None |
First match, or None |
.one() |
Model |
Exactly one result; raises NotFound or MultipleResultsFound |
.one_or_none() |
Model \| None |
One or None; raises MultipleResultsFound if 2+ |
.count() |
int |
Number of matching documents |
.exists() |
bool |
True if at least one document matches |
Iteration
ScalarResult is iterable — you can use it directly in for loops, list comprehensions, and unpacking:
result = session.scalars(select(User).where(User.active == True))
# for loop
for user in result:
print(user.name)
# list comprehension
names = [u.name for u in session.scalars(stmt)]
# unpacking
first, second, *rest = session.scalars(stmt)
Each iteration creates a fresh cursor, so the same ScalarResult can be iterated multiple times.
Strict single-result fetch
from mongotic import NotFound, MultipleResultsFound
try:
user = session.scalars(
select(User).where(User.email == "alice@example.com")
).one()
except NotFound:
... # 0 results
except MultipleResultsFound:
... # 2+ results
# Returns None on 0 results; raises on 2+
user = session.scalars(
select(User).where(User.email == "alice@example.com")
).one_or_none()
session.scalar() shortcut
session.scalar(stmt) is a convenience method that returns the first unwrapped value, or None if there are no results. It is equivalent to session.scalars(stmt).first().
age = session.scalar(
select(User.age).where(User.email == "alice@example.com")
)
# → 30, or None if not found
Works with full-model selects too:
yield_per(n)
yield_per(n) is accepted on Select for API compatibility with SQLAlchemy v2 patterns. It is a chainable no-op.
stmt = select(User).where(User.active == True).yield_per(100)
# Executes identically to select(User).where(User.active == True)
Note
PyMongo cursors are already lazy and memory-efficient. yield_per has no effect on cursor behaviour.
Primary key lookup with session.get()
Bulk DML
session.execute() runs immediately — no staging, no commit() needed. It returns a Result with .rowcount and .inserted_ids.
Bulk insert
from mongotic import insert
from mongotic.result import Result
r = session.execute(
insert(User).values([
{"name": "Alice", "email": "alice@example.com", "age": 30},
{"name": "Bob", "email": "bob@example.com", "age": 25},
])
)
assert isinstance(r, Result)
print(r.rowcount) # 2
print(r.inserted_ids) # ["<id1>", "<id2>"]
Note
insert() bypasses the session staging area — inserted documents do not appear in session.new.
Bulk update
from mongotic import update
r = session.execute(
update(User).where(User.role == "guest").values(role="member")
)
print(r.rowcount) # number of modified documents