SQLAlchemy 扩展
使用Flask-SQLAlchemy管理数据库
在Flask-SQLAlchemy中,数据库使用URL指定,而且程序使用的数据库必须保存到Flask配置对象的SQLALCHEMY_DATABASE_URI键中。
Flask的数据库设置:
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'
其他设置:
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True
名字 |
备注 |
SQLALCHEMY_DATABASE_URI |
用于连接的数据库 URI 。例如:sqlite:////tmp/test.dbmysql://username:password@server/db |
SQLALCHEMY_BINDS |
一个映射 binds 到连接 URI 的字典。更多 binds 的信息见用 Binds 操作多个数据库。 |
SQLALCHEMY_ECHO |
如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,这对调试有用。(打印sql语句) |
SQLALCHEMY_RECORD_QUERIES |
可以用于显式地禁用或启用查询记录。查询记录 在调试或测试模式自动启用。更多信息见get_debug_queries()。 |
SQLALCHEMY_NATIVE_UNICODE |
可以用于显式禁用原生 unicode 支持。当使用 不合适的指定无编码的数据库默认值时,这对于 一些数据库适配器是必须的(比如 Ubuntu 上 某些版本的 PostgreSQL )。 |
SQLALCHEMY_POOL_SIZE |
数据库连接池的大小。默认是引擎默认值(通常 是 5 ) |
SQLALCHEMY_POOL_TIMEOUT |
设定连接池的连接超时时间。默认是 10 。 |
SQLALCHEMY_POOL_RECYCLE |
多少秒后自动回收连接。这对 MySQL 是必要的, 它默认移除闲置多于 8 小时的连接。注意如果 使用了 MySQL , Flask-SQLALchemy 自动设定 这个值为 2 小时。 |
SQLALCHEMY_COMMIT_ON_TEARDOWN |
设置每次请求结束后会自动提交数据库的改动 |
常用的SQLAlchemy字段类型
类型名 |
python中类型 |
说明 |
Integer |
int |
普通整数,一般是32位 |
SmallInteger |
int |
取值范围小的整数,一般是16位 |
BigInteger |
int或long |
不限制精度的整数 |
Float |
float |
浮点数 |
Numeric |
decimal.Decimal |
普通整数,一般是32位 |
String |
str |
变长字符串 |
Text |
str |
变长字符串,对较长或不限长度的字符串做了优化 |
Unicode |
unicode |
变长Unicode字符串 |
UnicodeText |
unicode |
变长Unicode字符串,对较长或不限长度的字符串做了优化 |
Boolean |
bool |
布尔值 |
Date |
datetime.date |
时间 |
Time |
datetime.datetime |
日期和时间 |
LargeBinary |
str |
二进制文件 |
常用的SQLAlchemy列选项
选项名 |
说明 |
primary_key |
如果为True,代表表的主键 |
unique |
如果为True,代表这列不允许出现重复的值 |
index |
如果为True,为这列创建索引,提高查询效率 |
nullable |
如果为True,允许有空值,如果为False,不允许有空值 |
default |
为这列定义默认值 |
常用的SQLAlchemy关系选项
选项名 |
说明 |
backref |
在关系的另一模型中添加反向引用 |
primary join |
明确指定两个模型之间使用的联结条件 |
uselist |
如果为False,不使用列表,而使用标量值 |
order_by |
指定关系中记录的排序方式:升序降序(ASC,DESC) |
secondary |
指定多对多中记录的排序方式 |
secondary join |
在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件 |
常用的SQLAlchemy查询过滤器
过滤器 |
说明 |
filter() |
把过滤器添加到原查询上,返回一个新查询 |
filter_by() |
把等值过滤器添加到原查询上,返回一个新查询 |
limit() |
使用指定的值限定原查询返回的结果 |
offset() |
偏移原查询返回的结果,返回一个新查询 |
order_by() |
根据指定条件对原查询结果进行排序,返回一个新查询 |
group_by() |
根据指定条件对原查询结果进行分组,返回一个新查询 |
常用的SQLAlchemy查询执行器
方法 |
说明 |
all() |
以列表形式返回查询的所有结果 |
first() |
返回查询的第一个结果,如果未查到,返回None |
first_or_404() |
返回查询的第一个结果,如果未查到,返回404 |
get() |
返回指定主键对应的行,如不存在,返回None |
get_or_404() |
返回指定主键对应的行,如不存在,返回404 |
count() |
返回查询结果的数量 |
paginate() |
返回一个Paginate对象,它包含指定范围内的结果 |
SQLAlchemy查询相关
User.query.all()
User.query.count()
User.query.first()
User.query.get(4)
User.query.filter(User.id == 4).first()
User.query.filter_by(id=4).first()
User.query.filter(User.name.endswith('g')).all()
User.query.filter(User.name.startswith('g')).all()
User.query.filter(User.name.contains('g')).all()
User.query.filter(not_(Usme == 'wang')).all()
User.query.filter(User.name != 'er.nawang').all()
User.query.filter(and_(User.name.startswith("li"), User.email.startswith("li"))).all()
User.query.filter(User.name.startswith("li"), User.email.startswith("li")).all()
[User: 1 wang, User: 5 tang, User: 8 liu]
User.query.filter(User.id.in_([1,3,5,7,9])).all()
user = User.query.filter(User.name == "liu").first()
Role.query.get(user.role_id)
User.query.order_by(User.email).all()
User.query.order_by(User.email.desc()).all() 倒序
User.query.paginate(2, 3, False).items
student_17 = sess.query(Student).filter(Student.name.like("%i%")).all()
for i in student_17:
print(i.name)
student_17_2 = sess.query(Student).filter(Student.name.like("%i_i")).all()
for i in student_17_2:
print(i.name)
not like
student_70 = sess.query(Student).filter(~Student.name.like("%i_i")).all()
student_71 = sess.query(Student).filter(Student.name.notlike("%i_i")).all()
for i in student_70:
print(i.name)
print(student_70 == student_71)
student_18 = sess.query(Student).filter(Student.name.in_(['zhangsan','lisi','wangwu'])).all()
for i in student_18:
print(i.name)
not in
student_19 = sess.query(Student).filter(~Student.name.in_(['lisi'])).all()
student_19_2 = sess.query(Student).filter(Student.name.notin_(['lisi'])).all()
for i in student_19:
print(i.name)
print(student_19 == student_19_2)
student_33 = sess.query(Student).filter(text("math >= 10")).order_by(~text("math")).all()
student_33_2 = sess.query(Student).filter(text("math >= 10")).order_by(text("math")).all()
student_33_2_1 = sess.query(Student).filter(text("math >= 10")).order_by(desc(text("math"))).all()
student_33_3 = sess.query(Student).order_by(desc(Student.math)).all()
student_33_4 = sess.query(Student).order_by(Student.math.desc()).all()
student_33_5 = sess.query(Student).order_by(~Student.math).all()
student_33_6 = sess.query(Student).order_by(Student.math).all()
print(student_31)
print(student_32)
print("___"* 30)
for i in student_33:
print(i.math)
print("___"* 30)
for i in student_33_2:
print(i.math)
for i in student_33_3:
print(i.math)
for i in student_33_4:
print(i.math)
for i in student_33_5:
print(i.math)
for i in student_33_6:
print(i.math)
student_39 = sess.query(Student).group_by(Student.gender == 1).count()
print(student_39)
student_39_1 = sess.query(Student).group_by(Student.gender == 1).having(Student.math>60).all()
for i in student_39_1:
print(i)
student_38 = sess.query(Student).filter_by(gender=1).count()
student_38_2 = sess.query(func.count(Student.name),Student.name).group_by(Student.name).all()
student_38_3 = sess.query(func.count(Student.name),Student.gender).group_by(Student.gender).all()
print(student_38_2)
print(student_38_3)
student_40 = sess.query(distinct(Student.name)).all()
for i in student_40:
print(i)
student_20 = sess.query(Student).filter(Student.name != None).all()
student_21 = sess.query(Student).filter(~Student.name.is_(None)).all()
student_22 = sess.query(Student).filter(Student.name.isnot(None)).all()
student_21_3 = sess.query(Student).filter(Student.name.is_('zhangsan')).all()
student_21_2 = sess.query(Student).filter(Student.name.is_(1)).all()
for i in student_20:
print(i.name)
print(student_20 == student_21 == student_22)
student_23 = sess.query(Student).filter(Student.math.is_(90)).all()
student_24 = sess.query(Student).filter(Student.math == None).all()
and
student_25 = sess.query(Student).filter(Student.gender == 1, Student.math > 70).all()
student_26 = sess.query(Student).filter(and_(Student.gender == 1, Student.math > 70)).all()
student_27 = sess.query(Student).filter(Student.gender == 1).filter(Student.math > 70).all()
for i in student_25:
print(i.name)
print(student_25 == student_26 == student_27)
or
student_28 = sess.query(Student).filter(or_(Student.gender == 1, Student.math > 80)).all()
for i in student_28:
print(i.name)
student_31 = sess.query(Student).filter(text("name='lisi'")).first()
student_32 = sess.query(Student).filter(text("id=1")).first()
student_31 = sess.query(Student).filter(text("id>1 and math>10")).all()
for i in student_31:
print(i.name)
student_34 =sess.query(Student).filter(text("gender=:sex and math>:score")).params(sex=1, score=1).all()
for i in student_34:
print(i.name)
student_35 = sess.query(Student).from_statement(text("select * from students where id=:id")).params(id=1).one()
student_36 = sess.query(Student).from_statement("select * from students where math>:score").params(score=10).all()
student_36_1 = sess.query(Student).from_statement("select * from students where math>10").all()
print(student_35)
for i in student_36:
print(i.name)
print(student_36 == student_36_1)
student_7 = sess.query(Student.name.label('std_name')).all()
print(student_7)
for i in student_7:
print(i.std_name)
print(i[0])
student_8 = sess.query(Student.name).all()
print(student_8)
for i in student_8:
print(i.name)
print(i[0])
students_1 = sess.query(Student).get(1)
print(students_1
students_3 = sess.query(Student).value(Student.name)
print(students_3)
students_3_2 = sess.query(Student).values(Student.id,Student.name)
print(students_3_2)
for i in students_3_2:
print(i)
students_4 = sess.query(Student.name,Student.gender).all()
print(students_4)
for i in students_4:
print(i)
students_5 = sess.query(Student.name).add_columns(Student.gender).all()
print(students_5)
student_6 = sess.query(Student).filter_by(name="王大麻子").one()
print(student_6)
error_1 = sess.query(Student).filter_by(and_(Student.id == 1, Student.name == 'lisi')).first()
error_2 = sess.query(Student).filter_by(and_(id=1, name='lisi')).first()
error_3 = sess.query(Student).filter(and_(id=1, name='lisi')).first()
student_30 = sess.query(Student).filter(Student.id == 1).one()
print(student_30)
student_31 = sess.query(Student).filter(Student.id == 10).one_or_none()
print(student_31)
student_13 = sess.query(Student).filter_by(id=1).one_or_none()
student_14 = sess.query(Student).filter(Student.id == 1).one_or_none()
print(student_13)
print(student_14)
sqlalchemy——基本操作:https://www.cnblogs.com/yangmingxianshen/p/8411971.html
操作数据库
- 创建表:db.create_all()
- 删除表: db.drop_all()
- 插入行:
db.session.add()
db.session.addAll([])
db.session.commit()
- 查询全部数据: User.query.all()
- 过滤查询: User.query.filter_by(id=id).first()
- join多表查询:User.query.filter_by(env_id=env_id,id=id).join(Environments,Variable.env_id == Environments.id).first_or_404()
- count返回数量: User.query.filter_by(id=id).count()
- 修改数据
#根据条件查询一行数据
admin_role = Role.query.filter_by(role_name = 'Amdmin').first()
#修改数据-
admin_role.role_name = 'Admin'
db.session.add(admin_role)
db.session.commit()
- 删除数据:
db.session.delete(User)
db.session.commit()
- 删除多条数据
variablelists= Variable.query.filter_by(env_id=env_id).all()
for var in variablelists:
db.session.delete(var)
db.session.commit()
sql = u""" update net_internet_ip set removed = current_timestamp() where uuid not in :uuid and removed is null
"""
results = db.session.execute(sql, {"uuid": uuid})
全部代码
全部代码如下,经测试,已跑通。
#!/usr/bin/python
# -*- coding: utf-8 -*-
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 设置连接数据库的URL
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:password@127.0.0.1:3306/test'
# 设置每次请求结束后会自动提交数据库的改动
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# 查询时显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64))
user = db.relationship('User', backref='role')
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, index=True)
email = db.Column(db.String(64), unique=True)
pswd = db.Column(db.String(64))
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
def __repr__(self):
return 'User:%s' % self.name
if __name__ == '__main__':
db.drop_all()
db.create_all()
ro1 = Role(name='admin')
ro2 = Role(name='user')
db.session.add_all([ro1, ro2])
db.session.commit()
us1 = User(name='zhangsan', email='zhangsan@qq.com',pswd='12345a',role_id=ro1.id)
us2 = User(name='lisi', email='lisi@qq.com', pswd='12345a', role_id=ro2.id)
db.session.add_all([us1, us2])
db.session.commit()
app.run(debug=True)