act-morphia 1.7.2 带来不一样的数据聚合体验

1. 概述

Mongodb 2.2 开始就提供了数据Aggregation Pipeline (聚合管道)用于简单数据分析统计,包括计数(count),求和(sum),均值(average),标准差(stddev) 等. 这个特性相较以前的 Map Reduce 方式提升了很多. 遗憾的是在服务端代码上使用 Aggregation Pipeline 还是需要使用比较繁复的 API, 包括 Spring Data 和 Morphia 提供的 API. 这大多是因为 Aggregation Pipeline 需要兼顾各种情况, 比如嵌入数组的 rewind, 还有对第一次聚合数据进行再聚合等.

在很多常用情况下, 应用只需要简单的分组聚合, 最多对聚合结果数据进行过滤和排序. 这时候我们希望能通过更简单的方式来获得结果. Act-morphia 插件在随 act-starter-1.8.23.2 最新发布的 1.7.2 版本中提供了一组简单易用的 API 来实现常用聚合逻辑.

2. API 简介

Act-Morphia 依托与 Morphia 库 实现了 Act-DB 框架. 下面是 Act-Morphia 的简单介绍. 已经了解 Act-Morphia 的同学可以直接跳到后面的数据聚合章节.

2.1 Entity (实体类)

任何被 org.mongodb.morphia.annotations.Entity 注解的类. 我们推荐应用实体类继承 MorphiaModel 或者 MorphiaAdaptiveRecord.

示例:

@Entity("order")
public class Order extends MorphiaModel<Order> {
    public int price;

    @Property("reg")
    public String region;

    @Property("dep")
    public String department;
}

(注: 因为本文重点在于数据聚合, 所以 Order 的一些必要属性, 比如产品编号之类的信息在该模型中省却了)

2.2 Dao (数据库访问组件)

Act 定义了通用 Dao 接口, 在不同插件实现下提供对 SQL 和 MongoDB 的访问. Act-Morphia 是基于 Morphia 库的实现 MorphiaDao, 为应用提供 MongoDB 数据访问. 使用 Dao 的方式是直接在服务类中注入相关实现. 假设我们有一个访问 Order 数据的 RESTful 服务, 可以这样来是用 Dao:

@UrlContext("orders")
public class OrderService {
	@Inject // 这里注入 MorphiaDao, 其类型参数必须设置为 `Order`
	private MorphiaDao<Order> dao;
	
	@GetAction
	public Iterable<Order> listAll() {
		return dao.q(); // dao.q() 返回一个新建的 MorphiaQuery 实例, 其实现了 Iterable<Order> 接口
	}
	
	@GetAction("{order}")
	public Order fetch(@DbBind Order order) {
		// ActFramework 自动从 URL 路径中拿到 order 的 ID, 并从数据库中加载为 order 实例
		return order;
	}
	
	@PostAction
	public Order create(Order order) {
		// ActFramework 自动从 JSON 或者 Form 数据中
		return dao.save(order);
	}
	
	@PutAction("{order}/price")
	public void updatePrice(@DbBind @NotNull Order order, int newPrice) {
		order.price = newPrice;
		dao.save(order);
	}

	@DeleteAction("{order}")
	public void delete(@DbBind @NotNull Order order) {
		dao.delete(order);
	}
}

2.3 简单数据聚合 API

2.3.1 SimpleAggregation

提供了简单的 Aggregation Pipeline 封装, 用于帮助应用构建并发送 Aggregation Pipeline 到 MongoDB 以获取聚合结果. 应用通常不会直接创建 SimpleAggregation 实例. 基本上都通过一下方式获得 SimpleAggregation:

  1. MorphiaDao.aggregation() 或其简写 MorphiaDao.a(): 获得一个没有 Where 条件的 SimpleAggregation.
  2. MorphiqQuery.aggregation(): 获得一个带 Where 条件的 SimpleAggregation, 条件由 Query 定义

拿到 SimpleAggregation 实例之后可以链式调用下面的 API:

  • groupCount(String key1, String ... otherKeys) - 分组计数
  • groupSum(String key1, String ... otherKeys) - 分组求和
  • groupAverage(String key1, String ... otherKeys) - 分组求均值
  • groupStdDev(String key1, String ... otherKeys) - 分组求标准差
  • groupSampleStdDev(String key1, String ... otherKeys) - 分组求抽样标准差
  • atLeast(Number) - 过滤聚合数据, 要求必须大于或等于给定参考值
  • atMost(Number) - 过滤聚合数据, 要求必须小于或等于给定参考值
  • greaterThan(Number) - 过滤聚合数据, 要求必须大于给定参考值
  • lessThan(Number) - 过滤聚合数据, 要求必须小于给定参考值
  • between(minInclusive, maxExclusive) - 过滤聚合数据, 要求必须大于或等于第一个参数, 小于第二个参数
  • sorted() - 将聚合数据按数值大小升序排列
  • sorted(true) - 将聚合数据按数值大小降序排列
  • get() - 返回一个 AggregationResult<Double> 的对象, 数值类型为 Double
  • getAsInt() - 返回一个 AggregationResult<Integer> 的对象, 数值类型为 Integer
  • getAsLong() - 返回一个 AggregationResult<Long> 的对象, 数值类型为 Long
  • getAsMap() - 返回一个 Map<Object, Double> 类型的聚合结果, key 为分组数据, val 为数值
  • getAsIntMap() - 返回一个 Map<Object, Integer> 类型的聚合结果, key 为分组数据, val 为数值
  • getAsLongMap() - 返回一个 Map<Object, Long> 类型的聚合结果, key 为分组数据, val 为数值

2.3.2 AggregationResult

封装 MongoDB 聚合返回的结果. 因为 MongoDB 返回的聚合结果是一个 Cursor(游标), 通过 Cursor 取回的数据是一个 DBObject 的列表, 并不是非常方便应用使用. AggregationResult 类提供了一套简单的 API 供应用访问聚合结果

  • val() 返回聚合结果 - 用于 sum, average, stdDev 不分组的情况访问聚合结果数据
  • val(Object groupValue, Object... groupValues) 按照分组数据返回聚合结果数据, 分组数据的给出顺序应该和 SimpleAggregation.groupXxx 方法中给出的分组顺序一致.
  • val(Map<String, Object> groupValues) - 按照分组数据返回聚合结果, 分组数据和分组名字对应.
  • Map<Object, T> asMap() - 返回所有分组的聚合结果并依照分组数据索引.

3. 实例分析

下面我们将使用上节中的简单例子来介绍 Act-morphia 的简单聚合 API 如何满足常用的数据聚合需求.

@UrlContext("order/aggregations")
public class OrderAggregationService {

	@Inject
	private MorphiaDao<Order> dao;

	/**
	 * 统计一共有多少订单.
	 * 
	 * 这个可以直接用 Dao 上的 count() 方法, 无需使用聚合管道
	 */
	@GetAction("count")
	public long count() {
		return dao.count();
	}
	
	/**
	 * 按照地区统计订单数量
	 * 
	 * 直接构造 MorphiaQuery 对象并使用其 count() 方法
	 * 这个方法也无需聚合管道
	 */ 
	 @GetAction("/regions/{region}/count")
	public long countByRegion(String region) {
		// 使用字段名构造查询
		return dao.q("region", region).count();
		// 也可以使用 MongoDB column 名字来构造查询:
		// return dao.q("reg", region).count();
	}
	
	/**
	 * 按照部门统计订单数量
	 * 
	 * 直接构造 MorphiaQuery 对象并使用其 count() 方法
	 * 这个方法也无需聚合管道
	 */ 
	 @GetAction("/departments/{department}/count")
	public long countByDepartment(String department) {
		// 使用字段名构造查询
		return dao.q("department", department).count();
		// 也可以使用 MongoDB column 名字来构造查询:
		// return dao.q("dep", department).count();
	}
	
	/**
	 * 按照地区以及部门统计订单数量
	 *
	 * 直接构造 MorphiaQuery 对象并使用其 count() 方法
	 * 这个方法也无需聚合管道
	 */
	 @GetAction("/region={region},department={department}/count")
	public long countByRegionAndDepartment(String region, String department) {
		// 使用字段名构造查询
		return dao.q("region,department", region, department).count();
		// 也可以使用 MongoDB column 名字来构造查询:
		// return dao.q("reg,dep", region, department).count();
	}
	
	/**
	 * 一次性获得按地区分组统计订单数量的聚合结果
	 *
	 * 返回的 Map key 为地区, value 为该地区的订单数量
	 */
	 @GetAction("/regions/~group-count~")
	public Map<Object, Long> groupCountByRegion() {
		return dao.a().groupCount("region").getAsLongMap()
	}
	
	/**
	 * 一次性获得按部门分组统计订单数量的聚合结果
	 *
	 * 返回的 Map key 为部门, value 为该部门的订单数量
	 */
	 @GetAction("/departments/~group-count~")
	public Map<Object, Long> groupCountByDepartment() {
		return dao.a().groupCount("department").getAsLongMap()
	}
	
	/**
	 * 一次性获得按部门分组统计订单数量的聚合结果
	 *
	 * 返回的 Map key 为地区与部门列表, value 为该地区,部门对应的订单数量
	 */
	 @GetAction("/~region-department~/~group-count~")
	public Map<Object, Long> groupCountByRegionAndDepartment() {
		return dao.a().groupCount("region,department").getAsLongMap();
	}

	/**
	 * 一次性获得部门分组统计小订单数量的聚合结果
	 *
	 * @param priceTarget - 订单价格, 小于该价格的订单为小订单
	 * @return 返回的 Map key 为部门, value 为该部门的订单数量
	 */
	 @GetAction("/departments/~group-count~/~small-orders~/")
	public Map<Object, Long> groupCountByDepartmentForSmallOrders(int priceTarget) {
		return dao.q("price <", priceTarget).groupBy("department").count().getAsLongMap();
	}
	
	/**
	 * 返回所有订单的价格汇总
	 */
	 @GetAction("sum")
	public Long sum() {
		return dao.q().longSum("price");
		// 或者这样:
		// return dao.a().groupSum("price", null).getAsLong().val();
	}
	
	/**
	 * 返回所有小订单的价格汇总
	 *
	 * @param priceTarget - 订单价格, 小于该价格的订单为小订单
	 */
	@GetAction("sum/~small-orders~")
	public Long sumSmallOrders(@NotNull Integer priceTarget) {
		return dao.q("price <", priceTarget).longSum("price");
	}
	
	/**
	 * 返回部门订单价格汇总
	 *
	 * @param department - 指定部门
	 */
	 @GetAction("/departments/{department}/sum")
	public Long sumByDepartment(String department) {
		return dao.q("department", department).longSum("price");
	}

	
	/**
	 * 返回地区及部门订单价格汇总
	 *
	 * @param region - 指定地区
	 * @param department - 指定部门
	 */
	@GetAction("region={region},department={department}/sum")
	public Long sumByDepartment(String region, String department) {
		return dao.q("region,department", region, department).longSum("price");
	}
	
	/**
	 * 返回按地区分组价格汇总
	 *
	 * @return Map 的 key 为地区, value 为该地区价格汇总
	 */
	@GetAction("regions/~group-sum~")
	public Map<Object, Long> groupSumByRegion() {
		return dao.a().groupSum("price", "region").getAsLongMap();
		// 或者这样
		// return dao.q().groupBy("region").sum("price").getAsLongMap();
	}
	
	/**
	 * 返回按地区分组价格汇总, 过滤掉业绩达标的记录.
	 *
	 * 注意, 我们这次不能直接在查询中构造过滤条件, 而是使用 SimpleAggregation 的 lessThan 方法, 
	 * 这是因为查询条件是过滤数据库原始数据的, 而该方法需要过滤汇总后的数据. 简单的说, 查询条件
	 * 相当于 SQL 中的 where 子句, 而我们需要的 SQL 的 having 子句.
	 * 
	 * @param priceTarget - 达标业绩
	 * @return Map 的 key 为地区, value 为该地区价格汇总
	 */
	@GetAction("departments/~group-sum~/~bad-performance~")
	public Map<Object, Long> groupSumByRegionBadPerformance(int priceTarget) {
		return dao.q().groupBy("region").sum("price").lessThan(priceTarget).getAsLongMap();
	}
	
	/**
	 * 返回按地区分组平均价格
	 *
	 * @return Map 的 key 为地区, value 为该地区平均价格
	 */
	@GetAction("regions/~group-average~")
	public Map<Object, Double> groupAverageByRegion() {
		return dao.q().groupBy("region").average("price").getAsMap();
	}
	
	/**
	 * 返回按地区分组价格标准差
	 *
	 * @return Map 的 key 为地区, value 为该地区标准差
	 */
	@GetAction("regions/~group-std-dev~")
	public Map<Object, Double> groupStdDevByRegion() {
		return dao.q().groupBy("region").stdDev("price").getAsMap();
	}

	/**
	 * 返回按地区分组价格抽样标准差.
	 *
	 * 当数据量特别大的时候可以使用抽样统计标准差
	 *
	 * @param samples 样本数量
	 * @return Map 的 key 为地区, value 为该地区抽样标准差
	 */
	@GetAction("regions/~group-sample-std-dev~")
	public Map<Object, Double> groupStdDevByRegion(int samples) {
		return dao.q().groupBy("region").sampleStdDev("price", samples).getAsMap();
	}

}

4. 总结

本文简要介绍了 act-morphia 1.7.2 版本带来的新的聚合 API 以及使用方式, 希望能帮助到使用 act 操作 mongodb 数据库的同学. 如果大家对此有意见或者建议, 请在下面提出您宝贵的意见

赞(52) 打赏
未经允许不得转载:优客志 » 数据库
分享到:

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏