### **列方法**
~~~
# 類屬性
@hybrid_property
def fullname(self):
return self.firstname + ' ' + self.lastname # 這樣就可以用user.fullname訪問該屬性
# 驗證列
@validates('email')
def validate_email(self, key, address):
assert '@' in address
return address
~~~
### **CRUD**
* 像`join`自身類似的需求,可以使用別名`user_model1 = aliased(UserModel)`
* 目前沒有找到合適的方法去返回影響的行數,但是在`UPDATE/DELETE`方法中可以使用`result.rowcount`來返回SQL中where語句匹配到的行數,折衷方案是可以多加一個where條件去返回實際的影響行數。
<br />
* **執行原生語句,返回的是`ResultProxy`對象:**
~~~
result = conn.execute("INSERT INTO user (name) VALUES ('haofly')")
result = conn.execute("INSERT INTO user (name) VALUES ('haofly') RETURNING id") # 插入并拿到插入的id
result.fetchall()
~~~
* **執行原生語句的時候,防止SQL注入:**
~~~
bind_sql = 'SELECT * FROM xxx WHERE field = :value'
session.execute(bind_sql, {'value': 'value1'})
// 或者用下面的方式插入一個字典或者列表
session.execute(MyModel.__table__.insert(), modelDict)
session.execute(MyModel.__table__.insert(), modelDicts)
~~~
<br />
### **查詢**
>[success]* `filter_by`只能用`=`,而`filter`可以用`==,!=`等多種取值方式,且必須帶表名
~~~
# 查詢表
query = session.query(User)
print(query) // 得到sql語句
query.statement // 同上
query.count() // COUNT操作
query.get(2) // 根據主鍵獲取的簡便寫法
query.first() // 只獲取第一條
query.all() // 獲取所有數據
session.query(User.id).distinct().all() // DISTINCT操作
query.limit(2).offset(2).all() // limit offset要注意如果page相乘的時候page-1
// 篩選
query.filter(
getattr(User, 'icon_id') == 3, // 通過字段名的字符串形式獲取屬性
User.id==2,
User.age>10, // 大于、小于、等于直接寫
User.deleted_at == None, // IS NULL用None代替
User.name.in_(['hao', 'fly']) // IN操作
).first().name
query.filter('id = 2').first() // 復雜的filter
query.filter_by(deleted_at == None) // flask-sqlalchemy的查詢方式
query.order_by('user_name').all() // 排序
query.order_by(desc('name')).all() // 倒序排序,from sqlalchemy import desc
// 使用功能函數
query(func.count('*')).all()
query(func.json_contains(User.age, '{"A":"B"}')).all() // 使用JSON_CONTAINS
// 查詢列
session.query(User.name) // 去除指定列
session.query(User.id, User.name) // 去除指定列
session.query.with_entities(User.id, User.name) // 獲取指定列
// 拼接
query2.filter(or_(User.id == 1)) // or操作,or ...
query2.filter(or_(User.id == 1, User.name.like(''))) // or操作,or (xxx AND xxx)
// 關聯查詢
query(User).join(Post, User.id == Post.user_id).all() // join查詢
query(User).join(Post, and_(User.id == Post.user_id, User.deleted_at==None)) // JOIN ... ON (xxx AND xxx),join的and操作
// 關聯查詢外鍵
query.filter(Post.user == user)
query.filter(Post.user == None)
query.filter(User.posts.contains(post))
query.filter(User.posts.any(title='hao'))
query.filter(Post.user.has(name='haofly'))
from sqlalchemy.sql import exists
stmt = exists().where(Post.user_id==User.id)
for name, in session.query(User.name).filter(stmt): // 查詢存在Post的user
print(name)
// LIKE查詢
query.filter(User.name.like('%王%'))
MyModel.query.filter(User.name.like('%hao%'))
~~~
<br />
## **其他新增方式**
>[warning] 注意在連接數據庫時`autoflush`參數默認為`True`,但是并不是`add`之后就自動將語句`flush`到數據庫,
而是指每次查詢前回自動`flush`,所以無論`autoflush`是否為`True`,`add`之后都需要手動`session.flush()`
~~~
session.add(User(name='haofly')) # 直接插入一條數據
session.flush() # 必須手動flush
// 批量插入ORM版
session.bulk_save_objects([User(name="wang") for i in xrange(1000)])
// 批量插入非ORM版
result = session.execute(
User.__table__.insert(),
[{'name': 'wang', 'age': 10}, {}]
)
session.commit()
result.lastrowid // 獲取上一次插入的主鍵id
modelobj.id // 如果是ORM,那么直接在add后獲取主鍵id值就行了
~~~
<br />
### **修改**
~~~
query.filter(...).update({User.age: 10})
session.flush()
user.name = 'new'
session.commit()
~~~
### **刪除**
~~~
session.delete(user)
session.flush()
~~~
### **自定義SQL構造**
~~~
// 在所有的Insert語句前加上指定的前綴/后綴,例如加上ON DUPLICATE KEY UPDATE。
// 例如下面這個例子,當傳入append_string參數時會將指定的字符串添加到后面
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def prefix_inserts(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if 'append_string' in insert.kwargs:
return s + " " + insert.kwargs['append_string']
return s
session.execute(MyModel.__table__.insert(append_string = 'ON DUPLICATE KEY UPDATE fieldname="abc"'), objects)
~~~