本文共 7069 字,大约阅读时间需要 23 分钟。
上一篇文章,介绍了原生的pymysql的方式来操作mysql。这篇文章将要学习下SQLAchemy使用mysql的规则
ORM框架有两种形式:
第一种是DB first,就是需要手动创建数据库和表,然后ORM框架,自动生成代码类的方法;第二种是code first,就是手动创建数据库,然后通过写代码类的方法,ORM框架来自动生成表和数据的方法SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
作用:
- 提供简单的规则,2. 自动转换成SQL语句
pip3 install SQLAlchemyoreasy_install SQLAlchemy
SQLAlchemy的结构图,如下:
SQLAlchemy本身是不会连接数据库的,他是通过"DBAPI"这个模块api接口来实现对数据库连接的,而它本身会更加Dialect里面的设置,来确定你是什么数据库,如何转化sql语句的功能。使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
常见的DBAPI在官方的sqlachemy里面有介绍和使用方法,下面列出几个案例:
MySQL-Python mysql+mysqldb://: @ [: ]/ pymysql mysql+pymysql:// : @ / [? ]MySQL-Connector mysql+mysqlconnector:// : @ [: ]/ cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html
#导入模块from sqlalchemy import create_engine#创建连接,是通过create_engine来创建连接的engine = create_engine("mysql+pymysql://kk:123@localhost:3306/pysqltest",max_overflow=5)#执行SQLcur = engine.execute( "insert into t1(name) VALUES ('rr')")
结果截图
#!/usr/bin/env python# -*- coding:utf-8 -*-from sqlalchemy import create_engineengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)# 执行SQL# cur = engine.execute(# "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)"# )# 新插入行自增ID# cur.lastrowid# 执行SQL# cur = engine.execute(# "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),]# )# 执行SQL# cur = engine.execute(# "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",# host='1.1.1.99', color_id=3# )# 执行SQL# cur = engine.execute('select * from hosts')# 获取第一行数据# cur.fetchone()# 获取第n行数据# cur.fetchmany(3)# 获取所有数据# cur.fetchall()
from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column,Integer,String,ForeignKey,UniqueConstraint,Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker,relationshipfrom sqlalchemy import create_engineengine=create_engine("mysql+pymysql://kk:123@localhost:3306/pysqltest?charset=utf8",max_overflow=5)Base=declarative_base()#创建单表class Users(Base): __tablename__ = 'users' id = Column(Integer,primary_key=True) name = Column(String(32)) gender = Column(CHAR(32)) __table_args__ = ( UniqueConstraint('id','name',name='uix_id_name'), Index('ix_id_name','name','gender') # Index 要把名字写在最前面 )def init_db(): Base.metadata.create_all(engine)def drop_db(): Base.metadata.drop_all(engine)init_db()
执行结果:
要想对表中的数据进行操作,需要能拿到一个类似pymysql的游标的功能,在这里它是一个session。
Session = sessionmaker(bind=engine)session = Session()
类 -> 表
对象 -> 行
from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column,Integer,String,ForeignKey,UniqueConstraint,Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker,relationshipfrom sqlalchemy import create_engineengine=create_engine("mysql+pymysql://kk:123@localhost:3306/pysqltest?charset=utf8",max_overflow=5)session=sessionmaker(bind=engine)session=session()Base=declarative_base()#创建单表class Users(Base): __tablename__ = 'users' id = Column(Integer,primary_key=True) name = Column(String(32)) gender = Column(CHAR(32)) __table_args__ = ( UniqueConstraint('id','name',name='uix_id_name'), Index('ix_id_name','name','gender') # Index 要把名字写在最前面 )def init_db(): Base.metadata.create_all(engine)def drop_db(): Base.metadata.drop_all(engine)###增加功能obj1 =Users(name='xx',gender='女')session.add(obj1)#添加过个values的方法#obj2=[ # Users(name='yy',gender='男'), # Users(name='zz',gender='女'), # Users(name='rr',gender='男'),#]#session.add_all(obj2)session.commit()session.close()
####查找user_list=session.query(Users).all()for row in user_list: print(row.id,row.name,row.gender)
# user_type_list = session.query(UserType.id,UserType.title).filter(UserType.id > 2)# for row in user_type_list:# print(row.id,row.title)
session.query(Users).filter(Users.id ==4).delete()
session.query(Users).filter(Users.id ==1).update({'gender':'中性'})#给添加变长字符串长度的用下面这种session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)#把所有数字自动加1用下面这种session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")session.commit()
# 条件ret = session.query(Users).filter_by(name='alex').all()ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()from sqlalchemy import and_, or_ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all()# 通配符ret = session.query(Users).filter(Users.name.like('e%')).all()ret = session.query(Users).filter(~Users.name.like('e%')).all()# 限制ret = session.query(Users)[1:2]# 排序ret = session.query(Users).order_by(Users.name.desc()).all()ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# 分组from sqlalchemy.sql import funcret = session.query(Users).group_by(Users.extra).all()ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all()ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()# 连表ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()ret = session.query(Person).join(Favor).all()ret = session.query(Person).join(Favor, isouter=True).all()# 组合q1 = session.query(Users.name).filter(Users.id > 2)q2 = session.query(Favor.caption).filter(Favor.nid < 2)ret = q1.union(q2).all()q1 = session.query(Users.name).filter(Users.id > 2)q2 = session.query(Favor.caption).filter(Favor.nid < 2)ret = q1.union_all(q2).all()
# 1.# select * from b where id in (select id from tb2)# 2 select * from (select * from tb) as B# q1 = session.query(UserType).filter(UserType.id > 0).subquery()# result = session.query(q1).all()# print(result)# 3# select# id ,# (select * from users where users.user_type_id=usertype.id)# from usertype;# session.query(UserType,session.query(Users).filter(Users.id == 1).subquery())# session.query(UserType,Users)# result = session.query(UserType.id,session.query(Users).as_scalar())# print(result)# result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar())# print(result)
转载于:https://blog.51cto.com/sgk2011/2052485