ichuan.net

自信打不死的心态活到老

django-mysql 中的金钱计算事务处理

末尾有更新:20140604 更新

问题

在类银行系统中,涉及金钱计算的地方,不能使用 float 类型,因为:

# python 中
>>> 0.1 + 0.2 - 0.3
5.551115123125783e-17
>>> 0.1 + 0.2 - 0.3 == 0.3
False

// js 中
> 1.13 * 10000
11299.999999999998
> 1.13 * 10000 == 11300
false

所以 python 中提供了 Decimal 类型,用以人类期望的方式处理此类计算。

django 中的 DecimalField

django 中提供了对应 Decimal 类型的 DecimalField 字段:

class DecimalField(max_digits=None, decimal_places=None[, **options])

max_digits 表示最大位数,decimal_places 表示小数点后位数。假如你的系统里最多存 9999 元人民币,人民币小数点后可以精确到 2 位,需要的 max_digits 就是 6,decimal_places 就是 2。

假如使用了 MySQl backend, MySQL 中也支持 DECIMAL 类型,django 自动处理类型转换。

示例

下面以一个 django app 为例演示。

models 定义:

from django.db import models

class MyModel(models.Model):
    price = models.DecimalField(max_digits=16, decimal_places=2, default=0)

先创建一个 object 试试:

from decimal import Decimal
obj = MyModel.objects.create(price=Decimal('123.45'))

然后更新。假设商品降价,我们需要把 price 减去 9.99:

obj.price -= Decimal('9.99')
obj.save()

来看看具体执行的 SQL 是什么:

from django.db import connection
print connection.queries[-1]

输出为:

UPDATE `hello_mymodel` SET `price` = '113.46' WHERE `id` = 1

这种 update 会有个问题。在并发很高时,会遇到类似多线程的问题,因为加减操作都在客户端,某个线程写入 price 时可能之前拿到的已经被别人更新过了,所以需要原子写入。SQL 表述为:

UPDATE `hello_mymodel` SET `price` = 'price' - '9.99' WHERE `id` = 1

这种方式在 django 中可以用 F() 表达式来实现:

obj.price = F('price') - Decimal('9.99')
obj.save(update_fields=['price'])

另外,在调用 .save() 时,可以用 update_fields 传入需要 update 的字段(如上)。不然 django 可能会把所有字段都放在 SQL 中。

貌似很完美。但假如减去的 Decimal 和字段的精确度不一致呢?

obj.price = F('price') - Decimal('9.999')
obj.save(update_fields=['price'])

price 字段精确到小数点后 2 位,给它减去了 3 位的一个数,会报异常:

Traceback (most recent call last):
  File "<console>", line 1, in <module>
  ...
  File "/home/yc/envs/hello/local/lib/python2.7/sitein _warning_check
    warn(w[-1], self.Warning, 3)
Warning: Data truncated for column 'price' at row 1

这是 MySQL 报了一个 warning,django 因此报了异常。更新操作未成功。

解决方法很简单,在入库前,我们手动转换下精确度:

def to_decimal(s, precision=8):
    '''
    to_decimal('1.2345', 2) => Decimal('1.23')
    to_decimal(1.2345, 2) => Decimal('1.23')
    to_decimal(Decimal('1.2345'), 2) => Decimal('1.23')
    '''
    r = pow(10, precision)
    v = s if type(s) is Decimal else Decimal(str(s))
    try:
        return Decimal(int(v * r)) / r
    except:
        return Decimal(s)

obj.price = F('price') - to_decimal('9.999', 2)
obj.save(update_fields=['price'])

OK. 但对于只有 MySQL 在计算时才知道精确度多少的呢?比如乘积:

obj.price = F('price') * Decimal('9.99')
obj.save(update_fields=['price'])

这种还是会报上面的异常,我们没法在 django 层面对结果做类型转换。这种类型怎么办?只有上 raw sql 了。

另外,事务处理在 django 中可以用 transaction.atomic() 来做。事务的意思是代码块中的数据库操作要么都成功执行,没有异常;要么都不执行。实际操作中,用事务+原子操作配合可实现正确的金钱操作逻辑。

raw sql 加上事务,上面的例子改为:

from django.db import transaction, connection
try:
    with transaction.atomic():
        cursor = connection.cursor()
        cursor.execute(
            'UPDATE `hello_mymodel` SET `price` = CAST((`price` * %s) AS DECIMAL(16, 2)) WHERE `id` = %s',
            [Decimal('9.999'), obj.id]
        )
except:
    print 'save failed'

我们在 MySQL 层面用 CAST(%s AS DECIMAL(16, 2)) 来把结果转为和 price 字段同样格式的 Decimal 类型。

这种做法应该很强健了。最后还有一点,可能会有多个操作同时进行,实际应用中,减法操作我们可能希望在事务中检验 price 被更新后要大于 0, 这个最好也能在 MySQL 层面做,把责任推给它。上面的例子改为:

try:
    with transaction.atomic():
        cursor = connection.cursor()
        ret = cursor.execute(
            'UPDATE `hello_mymodel` SET `price` = `price` - CAST(%s AS DECIMAL(16, 2)) WHERE `id` = %s AND `price` >= CAST(%s AS DECIMAL(16, 2))',
            [Decimal('9999.999'), obj.id, Decimal('9999.999')]
        )
        assert ret
except:
    print 'save failed'

ret 是更新的行数,假如正确更新了,ret 就是 1。

总结

金钱运算用 Decimal 类型;django 中字段间操作用 F();F() 配合 Decimal 计算时,结果类型和字段类型完全一致的没问题,不可预测的用 raw sql。

20140604 更新

Decimal 类型在 MySQL 中运算还是有问题,即便结果类型和字段类型完全一致还是可能出问题(见 http://stackoverflow.com/q/23925271/265989 )。

django 中有 select_for_update(), 所以金钱操作时最好不要用自增自减运算,而是用 select_for_update() 的行级锁来避免冲突。

这样最后一个例子可以改为:

try:
    with transaction.atomic():
        locked_obj = MyModel.objects.select_for_update().get(pk=obj.id)
        locked_obj.price -= to_decimal('9999.999', 2)
        assert locked_obj.price >= 0
        locked_obj.save(update_fields=['price'])
except:
    print 'save failed'

参考:

  1. https://code.djangoproject.com/ticket/13666

一个爬虫项目记录

上周自己做了个小项目,爬某个网站上的数据,存入 mysql。

最开始是这么计划的:从一个入口出发(比如分类页面),多线程抓取网页,然后用 lxml 定位 dom,获取想要的部分,入库。

组件

想法是最好利用成熟的组件,这样自己写的代码少,出的问题也少。

线程池:本来公司有成熟的线程池 lib,因为是个人项目,所以没使用。pypi 上找了几个都觉得对这个项目来说有些复杂。后来自己简单实现了一个(simple_threadpool),经过实践检验,够用。

dom 解析:最开始打算用 xpath 来从 html 中取数据,但是觉得语法很晦涩。后来想起以前看到过类似用 css 选择器来查询 html 的,找了下,是 pyquery。经过实践检验,很强大,绝大部分 css 都支持。

orm:决定了数据存入 mysql,就需要个 orm 来操作数据。希望有像 django 中类似的。我比较排斥 sqlalchemy,太复杂,每次查文档都得查半天。后来找到了一个轻量级的:peewee。好用。

http 请求:直接用的 urllib2,可以很简单地设置 ua 和 超时,而且调试方便。

编码,执行

程序和数据库设计好后,就开始编码了。完毕后执行,看日志,停掉,改 bug,清数据,执行,看日志。。。

中间部分网页用了 bigpipe,数据都在放在了 js 中。我只好字串定位到 js 函数入口,把 json 参数抠出来然后反序列化,之后再做 dom 解析,用 pyquery 取数据。

几次迭代下来,代码比较稳定了,数据也在线性增长。很满意。可好景不长,几小时后日志里大量报错,排查后发现对方网站把我 ip 封了。现在连浏览器也打不开那个网站了。

第二版

被封是个原因,速度慢是另一个原因,有的地方几乎抓好几个网页才能拼凑成一份数据入库。考虑到这些,我开始重新编码。

为求效率,不能再老老实实抓网页了。我首先想到了该网站的手机版。手机版一般来说 dom 相对干净简单,好分析。倒是有手机版网页,但只是一些简单数据,不全。然后想到了 app 版本。看看 app 中用的是哪里的数据,方便的话就用它了。

我下来了它 android 版的 apk,unzip 后打开 classes.dex,用 http、api 等关键字搜果然发现了一些 url。看起来像是 api 服务器地址。用 Genymotion 建了个 android 虚拟机,安装 app,并在系统设置里把网络代理设置成本机用 tinyproxy 搭建的一个小 socks5 代理。然后在虚拟机里打开 app,随机浏览,看到 tinyproxy 日志里出现了一些请求。

用浏览器试了试,这些 api 请求居然都没做客户端验证,太省心了。立马把代码中解析网页部分换成了直接抓 api 的。这样带来了几个好处:

  1. 用 api 可以获取大量数据,比以前的方式少了很多请求,入库速度加快
  2. api 返回 json 数据,不需要 pyquery 和 lxml 解析,代码量骤减
  3. api 返回的数据字段非常全,我估计就是他们数据库里的字段了
  4. 好像为了做负载均衡,app 内置好几个 api 节点。其中某个没有做请求数限制,导致我抓了整整一天都没被封

最终按这个方式,用家里小网络一天时间把全站数据都抓下来了。

经验教训

每次编码都会有收获:

  1. urllib2 默认没有超时时间。需要在 urlopen 时手动指定。没注意到这个导致程序后期卡死,查了半天还查不出来什么原因。后来加了超时,做了重试机制,就再没遇到卡死的问题了。
  2. http 响应可能不全:遇到过几次 IncompleteRead 报错,是由于网络不稳定导致的。之前用浏览器下载文件也遇到过。明明几十M的压缩包下下来看到只有1M,还可以正常打开。把这个报错当作异常,用重试机制可以解决。
  3. 要做静态代码检查:以前只道 js 要用 jshint 检查,并没用 pylint 检查 py 文件的习惯。这次真是血淋淋的教训。跑了好几小时的程序后期忽然报个 "local variable referenced before assignment" 或 "unexpected indent" 低级错误,前功尽弃。
  4. 个人私有代码可以放 bitbucket 上:这样即使是小项目、一次性项目,也可以用 git 托管,当作个备份也划算。屌丝就用免费的 bitbucket,有钱推荐用 github pro。
  5. 能快速搭建环境:用 requirements.txt 和 virtualenv,有个简单的 README,再有个 install.sh 就更好了。方便自己和别人在不同机器上快速搭建运行环境。
  6. 日志:很有必要,最好 stdout 和 stderr 写的是不同类型日志,直接输出,执行时分别重定向到日志文件。

Lock 和 Signal

在 unix 中,正在运行的程序收到 signal 后,会暂停在当前执行的代码处,转而执行注册的 signal handler,之后再返回正常执行流程。但如果程序收到信号时正在一个 lock 中呢?看下面的代码:

import signal
from threading import Condition

cond = Condition()

def func():
  print 'waiting...'
  with cond:
    cond.wait()
  print 'got notified'

def sig_handler(signum, frame):
  print 'notifing'
  with cond:
    cond.notify()

signal.signal(signal.SIGUSR1, sig_handler)
func()

首先给 SIGUSR1 信号注册了一个 handler,然后执行 func 时会输出个 waiting... 然后卡住。等我们执行 kill -SIGUSR1 <pid> 时,想象中应该会执行 sig_handler,解除 cond 的 lock,然后 func 函数中的 wait() 得以返回,输出 got notified

但实际测试发现,程序执行后,输出了 waiting...,然后无论发送什么信号都不会再有输出,只能 Ctrl+\ 结束进程。

这是为什么呢?查阅了 unix 的 wiki 后发现:

When a signal is sent, the operating system interrupts the target process's normal flow of execution to deliver the signal. Execution can be interrupted during any non-atomic instruction

原来只有程序出在非原子操作处时操作系统才能暂停程序的继续执行,转而执行 signal handler。而根据原子操作的描述,信号量、lock 等皆是原子操作。查阅了 python threading 模块代码发现,Condition.wait 内部实际调用的正是 lock:

enter image description here

这就解释了上面的例子为什么没能处理发送给它的信号了。

那如果既要用 Condition 又要用 signal 怎么办?通过查阅源码发现,Condition.wait 接受一个数字参数作为超时时间,内部是用 sleep 实现的,没有用 lock。而 unix 的 sleep 操作是在超时后或者收到信号后(参阅 man 3 sleep)返回。所以上例中代码改成如下:

import signal
from threading import Condition

cond = Condition()
got = False

def func():
  print 'waiting...'
  while not got:
    with cond:
      cond.wait(1)
  print 'got notified'

def sig_handler(signum, frame):
  global got
  print 'notifing'
  with cond:
    got = True
    cond.notify()
  print 'notified'

signal.signal(signal.SIGUSR1, sig_handler)
func()

使用了一个全局变量来记录是否成功收到了信号,然后 func 中每隔 1 秒会检测这个全局变量。这次执行后,发送 SIGUSR1 信号,程序依次输出 waiting...notifingnotifiedgot notified 后退出。和预期效果一致。

python subprocess 模块的 Popen.wait() 死锁问题

今天遇到的一个问题。简单说就是,使用 subprocess 模块的 Popen 调用外部程序,如果 stdoutstderr 参数是 pipe,并且程序输出超过操作系统的 pipe size时,如果使用 Popen.wait() 方式等待程序结束获取返回值,会导致死锁,程序卡在 wait() 调用上。

ulimit -a 看到的 pipe size 是 4KB,那只是每页的大小,查询得知 linux 默认的 pipe size 是 64KB

看例子:

#!/usr/bin/env python
# coding: utf-8
# yc@2013/04/28

import subprocess

def test(size):
    print 'start'

    cmd = 'dd if=/dev/urandom bs=1 count=%d 2>/dev/null' % size
    p = subprocess.Popen(args=cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
    #p.communicate()
    p.wait()

    print 'end'

# 64KB
test(64 * 1024)

# 64KB + 1B
test(64 * 1024 + 1)

首先测试输出为 64KB 大小的情况。使用 dd 产生了正好 64KB 的标准输出,由 subprocess.Popen 调用,然后使用 wait() 等待 dd 调用结束。可以看到正确的 startend 输出;然后测试比 64KB 多的情况,这种情况下只输出了 start,也就是说程序执行卡在了 p.wait() 上,程序死锁。具体输出如下:

start
end
start

那死锁问题如何避免呢?官方文档里推荐使用 Popen.communicate()。这个方法会把输出放在内存,而不是管道里,所以这时候上限就和内存大小有关了,一般不会有问题。而且如果要获得程序返回值,可以在调用 Popen.communicate() 之后取 Popen.returncode 的值。

结论:如果使用 subprocess.Popen,就不使用 Popen.wait(),而使用 Popen.communicate() 来等待外部程序执行结束。

windows下路径长度限制问题

问题

昨日志旭在做一个测试时发现在 windows 下无法删除一个目录,报错如下:

enter image description here

在资源管理器左侧展开查看,发现这是个嵌套很深的目录:

enter image description here

底部红色标注的目录其实还有子目录,但是展示不出来了。在 cmd.exe 下用 del /S /Q e:\test 或者在 powershell 中用 Remove-Item e:\test -recurse 同样无法删除改目录。

原因

查了半天,原来是 windows 的路径长度限制问题导致的。据说 ANSI 版本的 windows API 能操作的最大路径长度是 260 个字符,但我在本机测试时,这个值是 247:

>>> import os
>>> x = 'e:\\'
>>> x += 'e' * (247 - len(x))
>>> len(x)
247
>>> os.mkdir(x)
>>> os.mkdir(x + 'e')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
WindowsError: [Error 206] : 'e:\\eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
>>>

windows 下的 python 文件操作也是调用的系统 API,所以是受限制的。那之前的深层目录是如何创建的呢?原来是一个 java 程序创建的。java 程序是原生支持长路径名的。

解决方法

根据微软官方文档,要操作大于 260 字符的路径名,就需要使用 Unicode 版本的 windows API,那个限制是 32767,近乎无限制了。并且,需要在目录名前面加特殊字母:\\?\

我们先来重现一个无法在资源管理器和普通 python 代码里删除的深层目录。需注意两点:

  1. 使用 ctypes 里导出的 windll.kernel32 直接调用 Unicode 版本的创建目录 API:CreateDirectoryW
  2. 目录名前加特殊标识:\\?\

代码如下:

import os
import ctypes

mkdir = ctypes.windll.kernel32.CreateDirectoryW

cur = u'\\\\?\\e:\\'

for i in xrange(100):
    cur += u'test\\'
    mkdir(cur, None)

执行后会在 E 盘创建一个删不掉的 test 文件夹。如果你把上面的 mkdir 替换成 os.mkdir,会发现报错。

接下来尝试删除刚创建的嵌套目录。除了需要加特殊标识,还有个要注意的是:windows 下没有 API 提供类似 rm -rf 的功能,只有删除单个文件和删除空目录的 API,我们需要自己实现一个 rm -rf

import os, ctypes

rmdir = ctypes.windll.kernel32.RemoveDirectoryW

def rm_rf(dirname):
    for i in os.listdir(dirname):
        j = os.path.join(dirname, i)
        if os.path.isfile(j):
            os.remove(j)
        else:
            rm_rf(j)
    rmdir(dirname)

rm_rf(u'\\\\?\\e:\\test\\')

执行后会发现刚才的顽固目录被删除了。同样的,如果你把上面的 rmdir 换成 os.rmdir,也会发现报错。

你发现没?上述代码里的 os.listdiros.path.isfileos.path.join 都没换成 windows Unicode API,但也没问题,何解?原来部分 os 模块的接口是支持 unicode 文件名的:

>>> os.path.supports_unicode_filenames
True

但可惜的是 os.rmdir 不支持,不然就不用费心转用 windows API 了。

其他

linux 下对目录长度有限制吗?据说也有:linux 下目录长度限制是 4096 字节,文件名长度限制是 255 字节。