ichuan.net

自信打不死的心态活到老

一次项目研发记录(mongodb, nodejs)

介绍

最近在做一个项目,过程中遇到了很多坎坷,最终经过一番斗争,大都得以解决。所以把过程分享出来,希望能帮助到别人。

这个项目的需求简单介绍就是,需要整合一大批数据,通过分析得出一个结果集可供查询。因为数据是源源不断的,所以需要增量分析。数据有主动提交过来的,也有被动的需要我们自己处理的。

最初架构

因为需要一个简单查询界面,要做个 UI;主动提交过来的数据也需要 UI 上做个 API 接口;被动的数据要写一些脚本来导入,可以使用之前实现的 API 接口;数据存储我选择了最熟悉的 mongodb,它自带 mapreduce 可做数据分析。所以最初架构如下:

  1. mongodb(单机;数据存储及查询)
  2. express(UI 展示及 API 接口)
  3. forever(nodejs 应用部署)
  4. python 导入脚本(多进程)

最初把 UI 和 mongodb 放在了一台服务器(8G 内存,8 核 cpu)上,并且在 API 入库接口被调用时,插入数据到数据表的同时,读取结果表并作更新(统计分析)。这样实际上入库时实时分析生成了结果表,mongodb 只用来存放历史数据了。

前期编码大部分时间花在了 API 的实现上:

  1. 首先定义好 API 接口(URI、参数、功能),然后用 markdown 写文档(API 接口输入输出数据格式、使用举例)。用 marked 生成文档的 html 版本供开发人员使用
  2. 使用 uuid 生成 api key 写入 keys.json,UI 在启动时载入。并且通过 fs.watchkeys.json有变化时(增删)让 UI 重新载入新 api key 列表
  3. 使用 NODE_ENV 环境变量区分开发和产品环境,然后使用不同数据库等配置
  4. 对 API 接口传过来的数据格式,使用 json schema 校验。这样可以把接口用 json 文件来定义(数据即代码)
  5. 使用 mocha 写单元测试,包括基本的启动无异常测试及各 API 接口状态测试(使用 json schema 校验输入输出格式)

入库脚本上,使用了 python 的 multiprocessing 写了一个多进程调用 API 录入接口的程序。想着可以利用多核资源。

最初遇到的问题及解决

一切搭好后,开始使用入库脚本测试入库。

首先遇到的一个问题是数据库的结果表中出现了很多重复数据。排查后发现是 UI 上实时分析操作(读取-修改-保存)在高并发时导致的,原因很简单,就是没有加锁。但加锁是不应该的,会导致入库效率降低。

为了实现不加锁,我看上了 mongodb 自带的 update operator。然后花了很大功夫把所有修改操作都写成了 update operator,这样 UI 的实时分析操作就变成了一个原子的 upsert 操作。

在接下来的测试中,发现了 mongodb 日志中经常有什么 set too large,这才发现修改后有时 upsert 操作会很大,导致无法成功执行。

最终决定 UI 只负责入库,统计分析功能用一个 mongo js 脚本来实现,离线、增量分析。

这个时候已经意识到要做性能测试了,这样才好对比优化程度。由于想测试所有 API,最初自己写了个 python 脚本来跑,后来发现一个支持自定义 lua 脚本的测试工具:wrk,改成用 wrk 跑 lua 脚本,12 线程 400 并发时可以跑到 1.3k/s 以上的入库速度,是我脚本的好几倍。

最终发现的问题是入库速度还是不理想,这时开始考虑使用 mongodb 集群。

mongodb 集群

单纯录入数据,然后离线跑脚本分析,其实还有一个选择:DSE。不用它是因为:

  1. 列数据库不好满足我 json 文档的存储需求
  2. 分析要用 hadoop,我还是对 mongodb 更熟悉些
  3. 我对 java 也不熟悉,真正调试调优时肯定要花费很大时间和精力
  4. mongodb 文档相对更全面些

mongodb 的话,要提高入库速度,需要 sharding;要提高并发查询能力及容灾能力,需要 replication。按照官方文档中的教程,sharding 节点最好是一个 replica set(RS),这样一台机器当掉了不会影响数据库正常查询。

根据现在资源上限,我计划做两个 shards,每个是个 RS,每个 RS 是两台 mongodb,这样集群需要 4 台机器。官方文档中说必须要有 3 台独立的 config server,我照做了,后来发现 config server 不占资源,于是把它们分布到了集群那 4 台机器上去。入口节点 mongos 我放在了和 UI 同台的服务器上。

后台运行了大半天,我一个失误把一台 primary 节点上 mongodb 数据文件覆盖掉了。想着有 secondary 恢复应该没事,结果半天还不恢复,一查日志发现一直在报错。最终发现,原来一个 RS 中要至少 3 台机器才能实现 failover,而我的只有 primary 和 secondary 两台。按照官方文档,其实可以再加个 Arbiter 节点,不真实存储数据,只用作故障时的仲裁节点,就可以使 primary 故障时让 secondary 成为 primary。

关于 sharding,选择时间等自增字段为 sharding key 时,由于数据分布是按范围定的,会导致大片数据写入一个节点。一般都是先把 _id 做 hashed 索引,然后用这个索引来 sharding,就可以实现数据均匀分布。

其它

关于增量统计分析,原理基本是这样:数据表有个时间字段标识改记录创建时间,我的脚本每次运行时查出当前最晚时间和上次统计分析时统计到的最晚时间,然后对这个区间的数据做分析。这就需要把时间字段做索引。但当数据量很大时,例如 5000 万,单时间字段就需要 6G 多内存占用。在集群中,_id 字段不能用作自增的字段,所以我用 redis 的 inc 指令配合 UI 实现了 _id 为自增整数,然后统计脚本根据 _id 来增量统计,不需要时间字段做索引了。

关于入库脚本,因为之前 wrk 的经验,把 python 脚本由多进程修改为多进程+多线程方式,性能有提高。

偶然遇到了 pm2,因为它可以无缝重新载入代码、内置监控、利用多核,遂把 forever 替换成了它。

经过一系列的优化,最终实际入库时,速度稳定在 900/s。

待续。。。

Comments