SQLModel-基本使用

SQLModel 官方文档
SQLModel 官方中文文档

介绍

SQLModel 是一个基于 SQLAlchemy (Python SQL 工具包以及 Object-Relational Mapping 库) 和 Pydantic (基于 Python type hints 的数据验证和设置库) 的 Python 库, 其提供了一种用于处理关系型数据库的优雅和高效的方式.

环境准备

1
2
3
4
5
cd
mkdir code
cd code
mkdir sqlmodel-tutorial
cd sqlmodel-tutorial

确保有 python 环境:

1
python3 --version

安装 SQLModel:

1
pip install sqlmodel

SQLite 相关

SQLite 是一个 simple database in a single file:

其不需要额外的 database server, 只需要一个文件.

这里需要安装 sqlite browser, 用于图形化查看数据库的内容, 在 Archlinux 上安装为:

1
sudo pacman -S sqlitebrowser

语法

创建一个 table - CREATE

一个用于表示数据的 class 称一个 model (这也是为什么这个包叫 SQLModel 的原因)

步骤:

  • 定义表 (一个类)
  • 指定数据库名和 URL
  • 通过 URL 创建数据库 engine
  • 创建数据库和表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    from sqlmodel import Field, SQLModel, create_engine  


    class Hero(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    secret_name: str
    age: int | None = None


    sqlite_file_name = "database.db"
    sqlite_url = f"sqlite:///{sqlite_file_name}"

    engine = create_engine(sqlite_url, echo=True)


    def create_db_and_tables():
    SQLModel.metadata.create_all(engine)


    if __name__ == "__main__":
    create_db_and_tables()
  • Hero 表示表名, table=True 表明创建 table model
  • 其下的每一个成员为列名
  • 用该类创建的每一个实例都为表中的行

注意这里 id 虽然为 primary key 但为可选的, 因为数据库会自动生成.

Engine 是一个处理 Python 与数据库间通信的对象, 其用 create_engine() 来创建:

  • echo=True 表明, 每执行一个 SQL 语句就将其打印出 (便于 debugging)
  • SQLModel.metadata.create_all(engine) 会创建对应的 database.db 数据库文件以及 table

一般情况下只需要一个 engine 对象.

也可以使用在内存中创建的特殊 database, 其高速, 但会在程序结束后删除, 用 sqlite:// 来指定 (也就是说不指定文件名).

当一个类继承自 SQLModel 时, 也同时继承了 metadata (是 MetaData 类) 这个成员, 而添加了 table=True 之后, 就会将其注册到 metadata 成员中.

MetaData 类的 create_all 方法, 会根据 engine 创建数据库文件以及所有注册的 table.

创建 Rows - INSERT

步骤:

  • 创建实例
  • 创建 Session
  • 将实例添加到 Session
  • Commit Session 的改变
  • Close Session
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Code above omitted 👆
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)

session = Session(engine)

session.add(hero_1)
session.add(hero_2)
session.add(hero_3)

# 此时还没有将修改保存到数据库

session.commit()

session.close()

# More code here later 👇

Session

其借助 engine 完成对数据库的一组操作. (因此需要 engine 才能创建)

一般在一个 application 中用一个 engine, 而在每一次 request 中用新的 session.

add 和 commit 类似 git addgit commit.

session.commit() 之后, hero_1, hero_2, hero_3 这些变量会被 expired, 可以通过再次访问其成员来更新, 或者显示调用 session.refresh(object).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Code above omitted 👆

with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)

print("After adding to the session")
print("Hero 1:", hero_1)
print("Hero 2:", hero_2)
print("Hero 3:", hero_3)

session.commit()

print("After committing the session")
print("Hero 1:", hero_1)
print("Hero 2:", hero_2)
print("Hero 3:", hero_3)

print("After committing the session, show IDs")
print("Hero 1 ID:", hero_1.id)
print("Hero 2 ID:", hero_2.id)
print("Hero 3 ID:", hero_3.id)

print("After committing the session, show names")
print("Hero 1 name:", hero_1.name)
print("Hero 2 name:", hero_2.name)
print("Hero 3 name:", hero_3.name)

session.refresh(hero_1)
session.refresh(hero_2)
session.refresh(hero_3)

print("After refreshing the heroes")
print("Hero 1:", hero_1)
print("Hero 2:", hero_2)
print("Hero 3:", hero_3)

# Code below omitted 👇

读取数据 - SELECT

步骤:

  • 创建 Session (利用 engine)
  • 创建一个 select statement (利用 table, 也就是一个类), 这一步相当于是写了一串 SQL
  • 用 session 运行 select statement (也就是执行 SQL) 并得到结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # Code above omitted 👆

    def select_heroes():
    with Session(engine) as session:
    statement = select(Hero)
    results = session.exec(statement)
    for hero in results:
    print(hero)

    def main():
    create_db_and_tables()
    create_heroes()
    select_heroes()

    # More code here later 👇

这里返回的 results 是一个 iterable object, 可以用 results.all() 得到一个列表.

过滤数据 - WHERE

调用 select statement 的 where() 方法:

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Deadpond")
results = session.exec(statement)
for hero in results:
print(hero)

# Code below omitted 👇

注意这里的 == (也可以用 !=, > 等等). Hero.name == "Deadpond 返回一个 expression 对象而非 True or False, 之后 .where 利用这些 expression 对象构建 SQL statement.

必须要注意:

1
2
3
4
5
>>> Hero.name == "Deadpond"
<sqlalchemy.sql.elements.BinaryExpression object at 0x7f4aec0d6c90>

>>> hero.name == "Deadpond"
True

若多次调用 .where(), 如:

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.age >= 35).where(Hero.age < 40)
results = session.exec(statement)
for hero in results:
print(hero)

# Code below omitted 👇

其等价于 AND:

1
2
3
SELECT id, name, secret_name, age
FROM hero
WHERE age >= 35 AND age < 40

若用 OR:

1
2
3
4
5
6
7
8
9
10
11
12
from sqlmodel import Field, Session, SQLModel, create_engine, or_, select

# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(or_(Hero.age <= 35, Hero.age > 90))
results = session.exec(statement)
for hero in results:
print(hero)

# Code below omitted 👇

由于有些 colomn 的设置是可选的, 用于 comparison 时可能会提示错误, 此时可以通过:

1
2
3
4
5
6
7
8
9
10
11
12
from sqlmodel import Field, Session, SQLModel, col, create_engine, select

# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(col(Hero.age) >= 35)
results = session.exec(statement)
for hero in results:
print(hero)

# Code below omitted 👇

col() 包裹解决.

创建 index

Index 通过排序, 能够提高表中数据的查找速度.

一般在创建 index 之后, 每当数据更新, 数据库都会自动更新 index.

Index 的 costs 体现于, 每次更新数据, 都会执行更多操作. 比如, 在没有 index 时, 插入一个数据, 需执行 1 operation; 若有 index, 则会 perform the same 1 operation 外加 5 or 10 operation for index updating.

也就是用 write 多耗点时间和空间换取 read 的高速.

创建 index 的SQL 语法为:

1
2
CREATE INDEX ix_hero_name
ON hero (name)

在 SQLModel 中声明 index

设置 Field(index=True):

1
2
3
4
5
6
7
8
9
10
from sqlmodel import Field, Session, SQLModel, create_engine, select


class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)

# Code below omitted 👇

注意数据库会为 primary key 自动生成 index, 因此不需要单独设置.

仅读取一行

first() 方法, 但其不会区分 None:

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.age <= 35)
results = session.exec(statement)
hero = results.first()
print("Hero:", hero)

# Code below omitted 👇

one() 方法用于获取第一个非 None, 但前提是 select 确保返回一行数据, 不然会有 MultipleResultsFoundNoResultFound 报错:

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Deadpond")
results = session.exec(statement)
hero = results.one()
print("Hero:", hero)

# Code below omitted 👇

借助 primary key

用 session 的 get() 方法:

1
2
3
4
5
6
7
8
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
hero = session.get(Hero, 1)
print("Hero:", hero)

# Code below omitted 👇

读取多行

限制读取的个数 - LIMIT

用 select statement 的 limit() 方法:

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)

# Code below omitted 👇

设置 OFFSET

用 select statement 的 offset 方法:

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero).offset(3).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)

# Code below omitted 👇

其等价于:

1
2
3
SELECT id, name, secret_name, age
FROM hero
LIMIT 3 OFFSET 3

更新数据 - UPDATE

步骤:

  • 读取数据
  • 直接修改成员来更新数据
  • Add 到 session
  • commit session
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Code above omitted 👆

def update_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Spider-Boy")
results = session.exec(statement)
hero = results.one()
print("Hero:", hero)

hero.age = 16
session.add(hero)
session.commit()
session.refresh(hero)
print("Updated hero:", hero)

# Code below omitted 👇

等价的 SQL 语句为:

1
2
3
UPDATE hero
SET age=16
WHERE name = "Spider-Boy"

注意这里都是 =.

删除数据 - DELETE

步骤:

  • 用 select statement 获取数据
  • 调用 sessiondelete() 方法 (与 add() 方法对应):
  • commit session
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Code above omitted 👆

def delete_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Spider-Youngster")
results = session.exec(statement)
hero = results.one()
print("Hero: ", hero)

session.delete(hero)
session.commit()

print("Deleted hero:", hero)

statement = select(Hero).where(Hero.name == "Spider-Youngster")
results = session.exec(statement)
hero = results.first()

if hero is None:
print("There's no hero named Spider-Youngster")

# Code below omitted 👇

等价的 SQL 语句为:

1
2
3
DELETE
FROM hero
WHERE name = "Spider-Youngster"

创建 Connected table

Connected table, 指一个表中的某一列对应另一个表中的一列. 这通过 foreign_key="table.col" 来指定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from sqlmodel import Field, SQLModel, create_engine

class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str

class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)

team_id: int | None = Field(default=None, foreign_key="team.id")


sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"

engine = create_engine(sqlite_url, echo=True)

def create_db_and_tables():
SQLModel.metadata.create_all(engine)

def main():
create_db_and_tables()

if __name__ == "__main__":
main()

其等价于:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE team (
id INTEGER,
name TEXT NOT NULL,
headquarters TEXT NOT NULL,
PRIMARY KEY (id)
)

CREATE TABLE hero (
id INTEGER,
name TEXT NOT NULL,
secret_name TEXT NOT NULL,
age INTEGER,
team_id INTEGER,
PRIMARY KEY (id),
FOREIGN KEY(team_id) REFERENCES team (id)
)

创建 Connect Rows

即利用一个表中的数据来创建另一个表的行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Code above omitted 👆

def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margaret's Bar")
session.add(team_preventers)
session.add(team_z_force)
session.commit()

hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team_id=team_z_force.id
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
team_id=team_preventers.id,
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()

session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)

print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)

# Code below omitted 👇

用 SELECT 读取 connected data

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero, Team).where(Hero.team_id == Team.id)
results = session.exec(statement)
for hero, team in results:
print("Hero:", hero, "Team:", team)

# Code below omitted 👇

等价于:

1
2
3
SELECT hero.id, hero.name, team.name
FROM hero, team
WHERE hero.team_id = team.id

联结 - JOIN

用 select statement 的 join 方法:

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero, Team).join(Team)
results = session.exec(statement)
for hero, team in results:
print("Hero:", hero, "Team:", team)

# Code below omitted 👇

等价于:

1
2
3
4
SELECT hero.id, hero.name, team.name
FROM hero
JOIN team
ON hero.team_id = team.id

select statement 中没有指定 ON hero.team_id = team.id 部分是由于其在创建时由 foreign_key 指定了.

左联结 - LEFT OUTER JOIN

join() 方法传递 isouter=True 参数:

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero, Team).join(Team, isouter=True)
results = session.exec(statement)
for hero, team in results:
print("Hero:", hero, "Team:", team)

# Code below omitted 👇

等价于:

1
2
3
4
SELECT hero.id, hero.name, team.name
FROM hero
LEFT OUTER JOIN team
ON hero.team_id = team.id

使用聚合函数

需导入 func, 然后用其内部的方法.

如:

1
2
3
4
5
from sqlmodel import select, func

statement = select(func.sum(Tasks.task_score)).join(SubLog).where(SubLog.sub_user_id == 2)
result = await session.exec(statement)
total_score = result.scalar()

Relationship Attributes

Relationship Attributes 是一种特殊的成员, 用于定义两个模型之间的关系. 可以由此建立一对一, 一对多或多对多的关系.

创建 Relationship

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine


class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str

heroes: list["Hero"] = Relationship(back_populates="team")


class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)

team_id: int | None = Field(default=None, foreign_key="team.id")
team: Team | None = Relationship(back_populates="heroes")

# Code below omitted 👇

back_populates 参数的作用为:

当当前 model 改变时, 也同时改变另一 model 中的指定 attribute.

SQLModel 知道两个模型由 team_id 建立联系, 此时可以用 Team.heroes 访问一个 Hero 列表, 也可以用 Hero.team 访问一个 Team.

其同样可以减少添加行的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Code above omitted 👆

def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margaret's Bar")

hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team=team_z_force
)
hero_rusty_man = Hero(
name="Rusty-Man", secret_name="Tommy Sharp", age=48, team=team_preventers
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()

session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)

print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)

# Code below omitted 👇

这里不需要给 team_id 赋值, 也不需要 add 对应的 team. (自动完成了)

根据 Relationship 读取数据

如:

1
2
3
4
5
6
7
8
9
10
11
12
13
# Code above omitted 👆

def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Spider-Boy")
result = session.exec(statement)
hero_spider_boy = result.one()

# Code from the previous example omitted 👈

print("Spider-Boy's team again:", hero_spider_boy.team)

# Code below omitted 👇

移除 Relationship

通过将 relationship attributes 设置为 None 即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Code above omitted 👆

def update_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Spider-Boy")
result = session.exec(statement)
hero_spider_boy = result.one()

hero_spider_boy.team = None
session.add(hero_spider_boy)
session.commit()

session.refresh(hero_spider_boy)
print("Spider-Boy without team:", hero_spider_boy)

# Code below omitted 👇

Many to Many

这部分没看.

配合 FastAPI

在 startup 时创建数据库和表

1
2
3
4
5
6
7
8
9
10
# Code above omitted 👆

app = FastAPI()


@app.on_event("startup")
def on_startup():
create_db_and_tables()

# Code below omitted 👇

技巧积累

指定 table name

使用 __tablename__ 魔术变量:

1
2
3
4
5
6
from sqlmodel import SQLModel, Field

class MyTable(SQLModel, table=True):
__tablename__ = "my_table"
id: int = Field(primary_key=True)
name: str

清除一个 table

如:

1
2
3
4
5
6
7
8
def clear_user_table():
with Session(engine) as session:
statement = select(User)
result = session.exec(statement)
results = result.all()
for item in results:
session.delete(item)
session.commit()

也就是 select 所有数据, 然后一个一个 delete. 这种方法需要注意外键问题, 不然会有如:

1
2
3
4
5
6
Traceback (most recent call last):
File "/home/jie/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1936, in _exec_single_context
self.dialect.do_executemany(
File "/home/jie/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 938, in do_executemany
cursor.executemany(statement, parameters)
sqlite3.IntegrityError: NOT NULL constraint failed: post.user_id

的报错.


SQLModel-基本使用
http://example.com/2024/08/20/SQLModel-基本使用/
作者
Jie
发布于
2024年8月20日
许可协议