SQLAlchemy 扩展

使用Flask-SQLAlchemy管理数据库

在Flask-SQLAlchemy中,数据库使用URL指定,而且程序使用的数据库必须保存到Flask配置对象的SQLALCHEMY_DATABASE_URI键中。

Flask的数据库设置:

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'

其他设置:

# 动态追踪修改设置,如未设置只会提示警告, 不建议开启
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
名字 备注
SQLALCHEMY_DATABASE_URI 用于连接的数据库 URI 。例如:sqlite:////tmp/test.dbmysql://username:password@server/db
SQLALCHEMY_BINDS 一个映射 binds 到连接 URI 的字典。更多 binds 的信息见用 Binds 操作多个数据库
SQLALCHEMY_ECHO 如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,这对调试有用。(打印sql语句)
SQLALCHEMY_RECORD_QUERIES 可以用于显式地禁用或启用查询记录。查询记录 在调试或测试模式自动启用。更多信息见get_debug_queries()。
SQLALCHEMY_NATIVE_UNICODE 可以用于显式禁用原生 unicode 支持。当使用 不合适的指定无编码的数据库默认值时,这对于 一些数据库适配器是必须的(比如 Ubuntu 上 某些版本的 PostgreSQL )。
SQLALCHEMY_POOL_SIZE 数据库连接池的大小。默认是引擎默认值(通常 是 5 )
SQLALCHEMY_POOL_TIMEOUT 设定连接池的连接超时时间。默认是 10 。
SQLALCHEMY_POOL_RECYCLE 多少秒后自动回收连接。这对 MySQL 是必要的, 它默认移除闲置多于 8 小时的连接。注意如果 使用了 MySQL , Flask-SQLALchemy 自动设定 这个值为 2 小时。
SQLALCHEMY_COMMIT_ON_TEARDOWN 设置每次请求结束后会自动提交数据库的改动

常用的SQLAlchemy字段类型

类型名 python中类型 说明
Integer int 普通整数,一般是32位
SmallInteger int 取值范围小的整数,一般是16位
BigInteger int或long 不限制精度的整数
Float float 浮点数
Numeric decimal.Decimal 普通整数,一般是32位
String str 变长字符串
Text str 变长字符串,对较长或不限长度的字符串做了优化
Unicode unicode 变长Unicode字符串
UnicodeText unicode 变长Unicode字符串,对较长或不限长度的字符串做了优化
Boolean bool 布尔值
Date datetime.date 时间
Time datetime.datetime 日期和时间
LargeBinary str 二进制文件

常用的SQLAlchemy列选项

选项名 说明
primary_key 如果为True,代表表的主键
unique 如果为True,代表这列不允许出现重复的值
index 如果为True,为这列创建索引,提高查询效率
nullable 如果为True,允许有空值,如果为False,不允许有空值
default 为这列定义默认值

常用的SQLAlchemy关系选项

选项名 说明
backref 在关系的另一模型中添加反向引用
primary join 明确指定两个模型之间使用的联结条件
uselist 如果为False,不使用列表,而使用标量值
order_by 指定关系中记录的排序方式:升序降序(ASC,DESC)
secondary 指定多对多中记录的排序方式
secondary join 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件

常用的SQLAlchemy查询过滤器

过滤器 说明
filter() 把过滤器添加到原查询上,返回一个新查询
filter_by() 把等值过滤器添加到原查询上,返回一个新查询
limit() 使用指定的值限定原查询返回的结果
offset() 偏移原查询返回的结果,返回一个新查询
order_by() 根据指定条件对原查询结果进行排序,返回一个新查询
group_by() 根据指定条件对原查询结果进行分组,返回一个新查询

常用的SQLAlchemy查询执行器

方法 说明
all() 以列表形式返回查询的所有结果
first() 返回查询的第一个结果,如果未查到,返回None
first_or_404() 返回查询的第一个结果,如果未查到,返回404
get() 返回指定主键对应的行,如不存在,返回None
get_or_404() 返回指定主键对应的行,如不存在,返回404
count() 返回查询结果的数量
paginate() 返回一个Paginate对象,它包含指定范围内的结果

SQLAlchemy查询相关

# 查询所有用户数据
User.query.all()
# 查询有多少个用户
User.query.count()
# 查询第1个用户
User.query.first()
# 查询id为4的用户[3种方式]
User.query.get(4)
User.query.filter(User.id == 4).first()
User.query.filter_by(id=4).first()
# 查询名字结尾字符为g的所有数据[开始/包含]
User.query.filter(User.name.endswith('g')).all()
User.query.filter(User.name.startswith('g')).all()
User.query.filter(User.name.contains('g')).all()
# 查询名字不等于wang的所有数据[2种方式]
User.query.filter(not_(Usme == 'wang')).all()
User.query.filter(User.name != 'er.nawang').all()
# 查询名字和邮箱都以 li 开头的所有数据[2种方式]
User.query.filter(and_(User.name.startswith("li"), User.email.startswith("li"))).all()
User.query.filter(User.name.startswith("li"), User.email.startswith("li")).all()
# 查询password是 `123456` 或者 `email` 以 `itheima.com` 结尾的所有数据
[User: 1 wang, User: 5 tang, User: 8 liu]
# 查询id为 [1, 3, 5, 7, 9] 的用户列表
User.query.filter(User.id.in_([1,3,5,7,9])).all()
# 查询name为liu的角色数据
user = User.query.filter(User.name == "liu").first()
Role.query.get(user.role_id)
# 查询所有用户数据,并以邮箱排序
User.query.order_by(User.email).all()
User.query.order_by(User.email.desc()).all()  倒序
# 每页3个,查询第2页的数据
User.query.paginate(2, 3, False).items

# 模糊查询
# like 操作符  % 代表任意多个字符  _ 代表一个字符
student_17 = sess.query(Student).filter(Student.name.like("%i%")).all()
for i in student_17:
    print(i.name)
student_17_2 = sess.query(Student).filter(Student.name.like("%i_i")).all()
for i in student_17_2:
    print(i.name)

not like
student_70 = sess.query(Student).filter(~Student.name.like("%i_i")).all()
student_71 = sess.query(Student).filter(Student.name.notlike("%i_i")).all()
for i in student_70:
    print(i.name)
print(student_70 == student_71)

# 区间查询
# in 操作符
student_18 = sess.query(Student).filter(Student.name.in_(['zhangsan','lisi','wangwu'])).all()
for i in student_18:
    print(i.name)
not in
student_19 = sess.query(Student).filter(~Student.name.in_(['lisi'])).all()
student_19_2 = sess.query(Student).filter(Student.name.notin_(['lisi'])).all()
for i in student_19:
    print(i.name)
print(student_19 == student_19_2)


# 排序
# order_by 排序
student_33 = sess.query(Student).filter(text("math >= 10")).order_by(~text("math")).all()  # 逆序
student_33_2 = sess.query(Student).filter(text("math >= 10")).order_by(text("math")).all()  # 顺序
student_33_2_1 = sess.query(Student).filter(text("math >= 10")).order_by(desc(text("math"))).all()  # 逆序
student_33_3 = sess.query(Student).order_by(desc(Student.math)).all()  # 逆序
student_33_4 = sess.query(Student).order_by(Student.math.desc()).all()  # 逆序
student_33_5 = sess.query(Student).order_by(~Student.math).all()  # 逆序
student_33_6 = sess.query(Student).order_by(Student.math).all()  # 顺序
print(student_31)
print(student_32)
print("___"* 30)
for i in student_33:
    print(i.math)
print("___"* 30)
for i in student_33_2:
    print(i.math)
for i in student_33_3:
    print(i.math)
for i in student_33_4:
    print(i.math)
for i in student_33_5:
    print(i.math)
for i in student_33_6:
    print(i.math)

# group_by 分组
student_39 = sess.query(Student).group_by(Student.gender == 1).count()
print(student_39)

# having # 分组之后条件过滤
student_39_1 = sess.query(Student).group_by(Student.gender == 1).having(Student.math>60).all()
for i in student_39_1:
    print(i)

# 计数
# count
student_38 = sess.query(Student).filter_by(gender=1).count()
student_38_2 = sess.query(func.count(Student.name),Student.name).group_by(Student.name).all()
student_38_3 = sess.query(func.count(Student.name),Student.gender).group_by(Student.gender).all()
print(student_38_2)
print(student_38_3)

# 去重
# distinct
student_40 = sess.query(distinct(Student.name)).all()
for i in student_40:
    print(i)

# 空值
# 是否为null
student_20 = sess.query(Student).filter(Student.name != None).all()
student_21 = sess.query(Student).filter(~Student.name.is_(None)).all()
student_22 = sess.query(Student).filter(Student.name.isnot(None)).all()
student_21_3 = sess.query(Student).filter(Student.name.is_('zhangsan')).all()  # error
student_21_2 = sess.query(Student).filter(Student.name.is_(1)).all()
for i in student_20:
    print(i.name)
print(student_20 == student_21 == student_22)

student_23 = sess.query(Student).filter(Student.math.is_(90)).all()
student_24 = sess.query(Student).filter(Student.math == None).all()

# 逻辑操作
and
student_25 = sess.query(Student).filter(Student.gender == 1, Student.math > 70).all()
student_26 = sess.query(Student).filter(and_(Student.gender == 1, Student.math > 70)).all()  # 别忘了引入该方法
student_27 = sess.query(Student).filter(Student.gender == 1).filter(Student.math > 70).all()
for i in student_25:
    print(i.name)
print(student_25 == student_26 == student_27)

or
student_28 = sess.query(Student).filter(or_(Student.gender == 1, Student.math > 80)).all()
for i in student_28:
    print(i.name)

# text  原生sql条件语句
student_31 = sess.query(Student).filter(text("name='lisi'")).first()  # 注意值为字符串时的写法
student_32 = sess.query(Student).filter(text("id=1")).first()  # 注意值为 数字 的写法
student_31 = sess.query(Student).filter(text("id>1 and math>10")).all()
for i in student_31:
    print(i.name)

# text 插入变量
student_34 =sess.query(Student).filter(text("gender=:sex and math>:score")).params(sex=1, score=1).all()
for i in student_34:
    print(i.name)

# from_statement 原生sql语句
student_35 = sess.query(Student).from_statement(text("select * from students where id=:id")).params(id=1).one()
student_36 = sess.query(Student).from_statement("select * from students where math>:score").params(score=10).all()
student_36_1 = sess.query(Student).from_statement("select * from students where math>10").all()
print(student_35)
for i in student_36:
    print(i.name)
print(student_36 == student_36_1)


# 使用别名
student_7 = sess.query(Student.name.label('std_name')).all()
print(student_7)
for i in student_7:
    print(i.std_name)
    print(i[0])  # 第二种取值方式
student_8 = sess.query(Student.name).all()
print(student_8)
for i in student_8:
    print(i.name)
    print(i[0])
# 取值
# 根据主键获取对象
students_1 = sess.query(Student).get(1)
print(students_1

# value指定要获取的字段 返回的是生成器
students_3  = sess.query(Student).value(Student.name)
print(students_3)

# values 指定多个字段 返回的是生成器
students_3_2  = sess.query(Student).values(Student.id,Student.name)
print(students_3_2)
for i in students_3_2:
    print(i)


# 一次获取多个字段值
students_4 = sess.query(Student.name,Student.gender).all()
print(students_4)
for i in students_4:
    print(i)

# 后续添加的方式选择要得到的字段结果
students_5 = sess.query(Student.name).add_columns(Student.gender).all()
print(students_5)
student_6 = sess.query(Student).filter_by(name="王大麻子").one()
print(student_6)

# 错误用法
error_1 = sess.query(Student).filter_by(and_(Student.id == 1, Student.name == 'lisi')).first()
error_2 = sess.query(Student).filter_by(and_(id=1, name='lisi')).first()
error_3 = sess.query(Student).filter(and_(id=1, name='lisi')).first()

# one 有且只有一个,否则报错
student_30 = sess.query(Student).filter(Student.id == 1).one()
print(student_30)
# 可以只有一个或者没有,不能为多个结果,否则报错
student_31 = sess.query(Student).filter(Student.id == 10).one_or_none()
print(student_31)

student_13 = sess.query(Student).filter_by(id=1).one_or_none()
student_14 = sess.query(Student).filter(Student.id == 1).one_or_none()
print(student_13)
print(student_14)

sqlalchemy——基本操作:https://www.cnblogs.com/yangmingxianshen/p/8411971.html

操作数据库

  • 创建表:db.create_all()
  • 删除表: db.drop_all()
  • 插入行: db.session.add() db.session.addAll([]) db.session.commit()
  • 查询全部数据: User.query.all()
  • 过滤查询: User.query.filter_by(id=id).first()
  • join多表查询:User.query.filter_by(env_id=env_id,id=id).join(Environments,Variable.env_id == Environments.id).first_or_404()
  • count返回数量: User.query.filter_by(id=id).count()
  • 修改数据
#根据条件查询一行数据
admin_role = Role.query.filter_by(role_name = 'Amdmin').first()
#修改数据-
admin_role.role_name = 'Admin'
db.session.add(admin_role)
db.session.commit()
  • 删除数据: db.session.delete(User) db.session.commit()
  • 删除多条数据
variablelists= Variable.query.filter_by(env_id=env_id).all()

for var in variablelists:

    db.session.delete(var)

db.session.commit()
  • 直接执行sql语句
sql = u""" update net_internet_ip set removed = current_timestamp() where uuid not in :uuid and removed is null
                    """
results = db.session.execute(sql, {"uuid": uuid})

全部代码

全部代码如下,经测试,已跑通。

#!/usr/bin/python
# -*- coding: utf-8 -*-

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 设置连接数据库的URL
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:password@127.0.0.1:3306/test'

# 设置每次请求结束后会自动提交数据库的改动
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

# 查询时显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)


class Role(db.Model):
    __tablename__ = 'roles'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64))
    user = db.relationship('User', backref='role')

    def __repr__(self):
        return '<Role %r>' % self.name


class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True, index=True)
    email = db.Column(db.String(64), unique=True)
    pswd = db.Column(db.String(64))
    role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))

    def __repr__(self):
        return 'User:%s' % self.name


if __name__ == '__main__':
    db.drop_all()
    db.create_all()
    ro1 = Role(name='admin')
    ro2 = Role(name='user')
    db.session.add_all([ro1, ro2])
    db.session.commit()
    us1 = User(name='zhangsan', email='zhangsan@qq.com',pswd='12345a',role_id=ro1.id)
    us2 = User(name='lisi', email='lisi@qq.com', pswd='12345a', role_id=ro2.id)
    db.session.add_all([us1, us2])
    db.session.commit()
    app.run(debug=True)

results matching ""

    No results matching ""