SQLModel 官方文档 SQLModel 官方中文文档
介绍 SQLModel 是一个基于 SQLAlchemy (Python SQL 工具包以及 Object-Relational Mapping 库) 和 Pydantic (基于 Python type hints 的数据验证和设置库) 的 Python 库, 其提供了一种用于处理关系型数据库的优雅和高效的方式.
环境准备 1 2 3 4 5 cd mkdir codecd codemkdir sqlmodel-tutorialcd sqlmodel-tutorial
确保有 python 环境:
安装 SQLModel:
SQLite 相关 SQLite 是一个 simple database in a single file:
其不需要额外的 database server, 只需要一个文件.
这里需要安装 sqlite browser, 用于图形化查看数据库的内容, 在 Archlinux 上安装为:
1 sudo pacman -S sqlitebrowser
语法 创建一个 table - CREATE 一个用于表示数据的 class 称一个 model (这也是为什么这个包叫 SQLModel 的原因)
步骤:
定义表 (一个类)
指定数据库名和 URL
通过 URL 创建数据库 engine
创建数据库和表1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from sqlmodel import Field, SQLModel, create_engine class Hero (SQLModel, table=True ): id : int | None = Field(default=None , primary_key=True ) name: str secret_name: str age: int | None = None sqlite_file_name = "database.db" sqlite_url = f"sqlite:///{sqlite_file_name} " engine = create_engine(sqlite_url, echo=True ) def create_db_and_tables (): SQLModel.metadata.create_all(engine) if __name__ == "__main__" : create_db_and_tables()
Hero
表示表名, table=True
表明创建 table model
其下的每一个成员为列名
用该类创建的每一个实例都为表中的行
注意这里 id
虽然为 primary key 但为可选的, 因为数据库会自动生成.
Engine 是一个处理 Python 与数据库间通信的对象, 其用 create_engine()
来创建:
echo=True
表明, 每执行一个 SQL 语句就将其打印出 (便于 debugging)
SQLModel.metadata.create_all(engine)
会创建对应的 database.db
数据库文件以及 table
一般情况下只需要一个 engine 对象.
也可以使用在内存中创建的特殊 database, 其高速, 但会在程序结束后删除, 用 sqlite://
来指定 (也就是说不指定文件名).
当一个类继承自 SQLModel
时, 也同时继承了 metadata (是 MetaData
类) 这个成员, 而添加了 table=True
之后, 就会将其注册到 metadata
成员中.
而 MetaData
类的 create_all
方法, 会根据 engine 创建数据库文件以及所有注册的 table.
创建 Rows - INSERT 步骤:
创建实例
创建 Session
将实例添加到 Session
Commit Session 的改变
Close Session
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def create_heroes (): hero_1 = Hero(name="Deadpond" , secret_name="Dive Wilson" ) hero_2 = Hero(name="Spider-Boy" , secret_name="Pedro Parqueador" ) hero_3 = Hero(name="Rusty-Man" , secret_name="Tommy Sharp" , age=48 ) session = Session(engine) session.add(hero_1) session.add(hero_2) session.add(hero_3) session.commit() session.close()
Session
其借助 engine 完成对数据库的一组操作. (因此需要 engine 才能创建)
一般在一个 application 中用一个 engine, 而在每一次 request 中用新的 session.
add 和 commit 类似 git add
和 git commit
.
在 session.commit()
之后, hero_1
, hero_2
, hero_3
这些变量会被 expired, 可以通过再次访问其成员来更新, 或者显示调用 session.refresh(object)
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 with Session(engine) as session: session.add(hero_1) session.add(hero_2) session.add(hero_3) print ("After adding to the session" ) print ("Hero 1:" , hero_1) print ("Hero 2:" , hero_2) print ("Hero 3:" , hero_3) session.commit() print ("After committing the session" ) print ("Hero 1:" , hero_1) print ("Hero 2:" , hero_2) print ("Hero 3:" , hero_3) print ("After committing the session, show IDs" ) print ("Hero 1 ID:" , hero_1.id ) print ("Hero 2 ID:" , hero_2.id ) print ("Hero 3 ID:" , hero_3.id ) print ("After committing the session, show names" ) print ("Hero 1 name:" , hero_1.name) print ("Hero 2 name:" , hero_2.name) print ("Hero 3 name:" , hero_3.name) session.refresh(hero_1) session.refresh(hero_2) session.refresh(hero_3) print ("After refreshing the heroes" ) print ("Hero 1:" , hero_1) print ("Hero 2:" , hero_2) print ("Hero 3:" , hero_3)
读取数据 - SELECT 步骤:
创建 Session (利用 engine)
创建一个 select statement (利用 table, 也就是一个类), 这一步相当于是写了一串 SQL
用 session 运行 select statement (也就是执行 SQL) 并得到结果1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def select_heroes (): with Session(engine) as session: statement = select(Hero) results = session.exec (statement) for hero in results: print (hero)def main (): create_db_and_tables() create_heroes() select_heroes()
这里返回的 results
是一个 iterable object, 可以用 results.all()
得到一个列表.
过滤数据 - WHERE 调用 select statement 的 where()
方法:
1 2 3 4 5 6 7 8 9 10 def select_heroes (): with Session(engine) as session: statement = select(Hero).where(Hero.name == "Deadpond" ) results = session.exec (statement) for hero in results: print (hero)
注意这里的 ==
(也可以用 !=
, >
等等). Hero.name == "Deadpond
返回一个 expression 对象而非 True or False, 之后 .where
利用这些 expression 对象构建 SQL statement.
必须要注意:
1 2 3 4 5 >>> Hero.name == "Deadpond" <sqlalchemy.sql.elements.BinaryExpression object at 0x7f4aec0d6c90 >>>> hero.name == "Deadpond" True
若多次调用 .where()
, 如:
1 2 3 4 5 6 7 8 9 10 def select_heroes (): with Session(engine) as session: statement = select(Hero).where(Hero.age >= 35 ).where(Hero.age < 40 ) results = session.exec (statement) for hero in results: print (hero)
其等价于 AND
:
1 2 3 SELECT id, name , secret_name, ageFROM heroWHERE age >= 35 AND age < 40
若用 OR
:
1 2 3 4 5 6 7 8 9 10 11 12 from sqlmodel import Field, Session, SQLModel, create_engine, or_, selectdef select_heroes (): with Session(engine) as session: statement = select(Hero).where(or_(Hero.age <= 35 , Hero.age > 90 )) results = session.exec (statement) for hero in results: print (hero)
由于有些 colomn 的设置是可选的, 用于 comparison 时可能会提示错误, 此时可以通过:
1 2 3 4 5 6 7 8 9 10 11 12 from sqlmodel import Field, Session, SQLModel, col, create_engine, selectdef select_heroes (): with Session(engine) as session: statement = select(Hero).where(col(Hero.age) >= 35 ) results = session.exec (statement) for hero in results: print (hero)
用 col()
包裹解决.
创建 index Index 通过排序, 能够提高表中数据的查找速度.
一般在创建 index 之后, 每当数据更新, 数据库都会自动更新 index.
Index 的 costs 体现于, 每次更新数据, 都会执行更多操作. 比如, 在没有 index 时, 插入一个数据, 需执行 1 operation; 若有 index, 则会 perform the same 1 operation 外加 5 or 10 operation for index updating.
也就是用 write 多耗点时间和空间换取 read 的高速.
创建 index 的SQL 语法为:
1 2 CREATE INDEX ix_hero_nameON hero (name)
在 SQLModel 中声明 index 设置 Field(index=True)
:
1 2 3 4 5 6 7 8 9 10 from sqlmodel import Field, Session, SQLModel, create_engine, selectclass Hero (SQLModel, table=True ): id : int | None = Field(default=None , primary_key=True ) name: str = Field(index=True ) secret_name: str age: int | None = Field(default=None , index=True )
注意数据库会为 primary key 自动生成 index, 因此不需要单独设置.
仅读取一行 用 first()
方法, 但其不会区分 None:
1 2 3 4 5 6 7 8 9 10 def select_heroes (): with Session(engine) as session: statement = select(Hero).where(Hero.age <= 35 ) results = session.exec (statement) hero = results.first() print ("Hero:" , hero)
one()
方法用于获取第一个非 None, 但前提是 select
确保返回一行数据, 不然会有 MultipleResultsFound
或 NoResultFound
报错:
1 2 3 4 5 6 7 8 9 10 def select_heroes (): with Session(engine) as session: statement = select(Hero).where(Hero.name == "Deadpond" ) results = session.exec (statement) hero = results.one() print ("Hero:" , hero)
借助 primary key 用 session 的 get()
方法:
1 2 3 4 5 6 7 8 def select_heroes (): with Session(engine) as session: hero = session.get(Hero, 1 ) print ("Hero:" , hero)
读取多行 限制读取的个数 - LIMIT 用 select statement 的 limit()
方法:
1 2 3 4 5 6 7 8 9 10 def select_heroes (): with Session(engine) as session: statement = select(Hero).limit(3 ) results = session.exec (statement) heroes = results.all () print (heroes)
设置 OFFSET 用 select statement 的 offset
方法:
1 2 3 4 5 6 7 8 9 10 def select_heroes (): with Session(engine) as session: statement = select(Hero).offset(3 ).limit(3 ) results = session.exec (statement) heroes = results.all () print (heroes)
其等价于:
1 2 3 SELECT id, name, secret_name, ageFROM hero LIMIT 3 OFFSET 3
更新数据 - UPDATE 步骤:
读取数据
直接修改成员来更新数据
Add 到 session
commit session
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def update_heroes (): with Session(engine) as session: statement = select(Hero).where(Hero.name == "Spider-Boy" ) results = session.exec (statement) hero = results.one() print ("Hero:" , hero) hero.age = 16 session.add(hero) session.commit() session.refresh(hero) print ("Updated hero:" , hero)
等价的 SQL 语句为:
1 2 3 UPDATE heroSET age= 16 WHERE name = "Spider-Boy"
注意这里都是 =
.
删除数据 - DELETE 步骤:
用 select statement 获取数据
调用 session
的 delete()
方法 (与 add()
方法对应):
commit session
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def delete_heroes (): with Session(engine) as session: statement = select(Hero).where(Hero.name == "Spider-Youngster" ) results = session.exec (statement) hero = results.one() print ("Hero: " , hero) session.delete(hero) session.commit() print ("Deleted hero:" , hero) statement = select(Hero).where(Hero.name == "Spider-Youngster" ) results = session.exec (statement) hero = results.first() if hero is None : print ("There's no hero named Spider-Youngster" )
等价的 SQL 语句为:
1 2 3 DELETE FROM heroWHERE name = "Spider-Youngster"
创建 Connected table Connected table, 指一个表中的某一列对应另一个表中的一列. 这通过 foreign_key="table.col"
来指定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from sqlmodel import Field, SQLModel, create_engineclass Team (SQLModel, table=True ): id : int | None = Field(default=None , primary_key=True ) name: str = Field(index=True ) headquarters: str class Hero (SQLModel, table=True ): id : int | None = Field(default=None , primary_key=True ) name: str = Field(index=True ) secret_name: str age: int | None = Field(default=None , index=True ) team_id: int | None = Field(default=None , foreign_key="team.id" ) sqlite_file_name = "database.db" sqlite_url = f"sqlite:///{sqlite_file_name} " engine = create_engine(sqlite_url, echo=True )def create_db_and_tables (): SQLModel.metadata.create_all(engine)def main (): create_db_and_tables()if __name__ == "__main__" : main()
其等价于:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE TABLE team ( id INTEGER , name TEXT NOT NULL , headquarters TEXT NOT NULL , PRIMARY KEY (id) )CREATE TABLE hero ( id INTEGER , name TEXT NOT NULL , secret_name TEXT NOT NULL , age INTEGER , team_id INTEGER , PRIMARY KEY (id), FOREIGN KEY(team_id) REFERENCES team (id) )
创建 Connect Rows 即利用一个表中的数据来创建另一个表的行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 def create_heroes (): with Session(engine) as session: team_preventers = Team(name="Preventers" , headquarters="Sharp Tower" ) team_z_force = Team(name="Z-Force" , headquarters="Sister Margaret's Bar" ) session.add(team_preventers) session.add(team_z_force) session.commit() hero_deadpond = Hero( name="Deadpond" , secret_name="Dive Wilson" , team_id=team_z_force.id ) hero_rusty_man = Hero( name="Rusty-Man" , secret_name="Tommy Sharp" , age=48 , team_id=team_preventers.id , ) hero_spider_boy = Hero(name="Spider-Boy" , secret_name="Pedro Parqueador" ) session.add(hero_deadpond) session.add(hero_rusty_man) session.add(hero_spider_boy) session.commit() session.refresh(hero_deadpond) session.refresh(hero_rusty_man) session.refresh(hero_spider_boy) print ("Created hero:" , hero_deadpond) print ("Created hero:" , hero_rusty_man) print ("Created hero:" , hero_spider_boy)
用 SELECT 读取 connected data 1 2 3 4 5 6 7 8 9 10 def select_heroes (): with Session(engine) as session: statement = select(Hero, Team).where(Hero.team_id == Team.id ) results = session.exec (statement) for hero, team in results: print ("Hero:" , hero, "Team:" , team)
等价于:
1 2 3 SELECT hero.id, hero.name, team.nameFROM hero, teamWHERE hero.team_id = team.id
联结 - JOIN 用 select statement 的 join
方法:
1 2 3 4 5 6 7 8 9 10 def select_heroes (): with Session(engine) as session: statement = select(Hero, Team).join(Team) results = session.exec (statement) for hero, team in results: print ("Hero:" , hero, "Team:" , team)
等价于:
1 2 3 4 SELECT hero.id, hero.name, team.nameFROM heroJOIN teamON hero.team_id = team.id
select statement 中没有指定 ON hero.team_id = team.id
部分是由于其在创建时由 foreign_key
指定了.
左联结 - LEFT OUTER JOIN 给 join()
方法传递 isouter=True
参数:
1 2 3 4 5 6 7 8 9 10 def select_heroes (): with Session(engine) as session: statement = select(Hero, Team).join(Team, isouter=True ) results = session.exec (statement) for hero, team in results: print ("Hero:" , hero, "Team:" , team)
等价于:
1 2 3 4 SELECT hero.id, hero.name, team.nameFROM heroLEFT OUTER JOIN teamON hero.team_id = team.id
使用聚合函数 需导入 func
, 然后用其内部的方法.
如:
1 2 3 4 5 from sqlmodel import select, func statement = select(func.sum (Tasks.task_score)).join(SubLog).where(SubLog.sub_user_id == 2 ) result = await session.exec (statement) total_score = result.scalar()
Relationship Attributes Relationship Attributes 是一种特殊的成员, 用于定义两个模型之间的关系. 可以由此建立一对一, 一对多或多对多的关系.
创建 Relationship 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from sqlmodel import Field, Relationship, Session, SQLModel, create_engineclass Team (SQLModel, table=True ): id : int | None = Field(default=None , primary_key=True ) name: str = Field(index=True ) headquarters: str heroes: list ["Hero" ] = Relationship(back_populates="team" )class Hero (SQLModel, table=True ): id : int | None = Field(default=None , primary_key=True ) name: str = Field(index=True ) secret_name: str age: int | None = Field(default=None , index=True ) team_id: int | None = Field(default=None , foreign_key="team.id" ) team: Team | None = Relationship(back_populates="heroes" )
back_populates
参数的作用为:
当当前 model 改变时, 也同时改变另一 model 中的指定 attribute.
SQLModel 知道两个模型由 team_id
建立联系, 此时可以用 Team.heroes
访问一个 Hero 列表, 也可以用 Hero.team
访问一个 Team.
其同样可以减少添加行的操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def create_heroes (): with Session(engine) as session: team_preventers = Team(name="Preventers" , headquarters="Sharp Tower" ) team_z_force = Team(name="Z-Force" , headquarters="Sister Margaret's Bar" ) hero_deadpond = Hero( name="Deadpond" , secret_name="Dive Wilson" , team=team_z_force ) hero_rusty_man = Hero( name="Rusty-Man" , secret_name="Tommy Sharp" , age=48 , team=team_preventers ) hero_spider_boy = Hero(name="Spider-Boy" , secret_name="Pedro Parqueador" ) session.add(hero_deadpond) session.add(hero_rusty_man) session.add(hero_spider_boy) session.commit() session.refresh(hero_deadpond) session.refresh(hero_rusty_man) session.refresh(hero_spider_boy) print ("Created hero:" , hero_deadpond) print ("Created hero:" , hero_rusty_man) print ("Created hero:" , hero_spider_boy)
这里不需要给 team_id
赋值, 也不需要 add
对应的 team. (自动完成了)
根据 Relationship 读取数据 如:
1 2 3 4 5 6 7 8 9 10 11 12 13 def select_heroes (): with Session(engine) as session: statement = select(Hero).where(Hero.name == "Spider-Boy" ) result = session.exec (statement) hero_spider_boy = result.one() print ("Spider-Boy's team again:" , hero_spider_boy.team)
移除 Relationship 通过将 relationship attributes 设置为 None 即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def update_heroes (): with Session(engine) as session: statement = select(Hero).where(Hero.name == "Spider-Boy" ) result = session.exec (statement) hero_spider_boy = result.one() hero_spider_boy.team = None session.add(hero_spider_boy) session.commit() session.refresh(hero_spider_boy) print ("Spider-Boy without team:" , hero_spider_boy)
Many to Many 这部分没看.
配合 FastAPI 在 startup 时创建数据库和表 1 2 3 4 5 6 7 8 9 10 app = FastAPI()@app.on_event("startup" ) def on_startup (): create_db_and_tables()
技巧积累 指定 table name 使用 __tablename__
魔术变量:
1 2 3 4 5 6 from sqlmodel import SQLModel, Fieldclass MyTable (SQLModel, table=True ): __tablename__ = "my_table" id : int = Field(primary_key=True ) name: str
清除一个 table 如:
1 2 3 4 5 6 7 8 def clear_user_table (): with Session(engine) as session: statement = select(User) result = session.exec (statement) results = result.all () for item in results: session.delete(item) session.commit()
也就是 select
所有数据, 然后一个一个 delete
. 这种方法需要注意外键问题, 不然会有如:
1 2 3 4 5 6 Traceback (most recent call last): File "/home/jie/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1936 , in _exec_single_context self.dialect.do_executemany( File "/home/jie/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 938 , in do_executemanycursor .executemany(statement , parameters) sqlite3.IntegrityError: NOT NULL constraint failed: post.user_id
的报错.