NoSql相对于关系型数据库的优势

NoSql相对于关系型数据库的优势

使用 NoSQL 提升写入性能

数据库系统大多使用的是传统的机械磁盘,对于机械磁盘的访问方式有两种:一种是随机 IO;另一种是顺序 IO。随机 IO 就需要花费时间做昂贵的磁盘寻道,一般来说,它的读写效率要比顺序 IO 小两到三个数量级,所以我们想要提升写入的性能就要尽量减少随机 IO。

以 MySQL 的 InnoDB 存储引擎来说,更新 binlog、redolog、undolog 都是在做顺序 IO,而更新 datafile 和索引文件则是在做随机 IO,而为了减少随机 IO 的发生,关系数据库已经做了很多的优化,比如说写入时先写入内存,然后批量刷新到磁盘上,但是随机 IO 还是会发生。

索引在 InnoDB 引擎中是以 B+ 树方式来组织的,而 MySQL 主键是聚簇索引(一种索引类型,数据与索引数据放在一起),既然数据和索引数据放在一起,那么在数据插入或者更新的时候,我们需要找到要插入的位置,再把数据写到特定的位置上,这就产生了随机的 IO。而且一旦发生了页分裂,就不可避免会做数据的移动,也会极大地损耗写入性能。

NoSQL 数据库是怎么解决这个问题的呢?

它们有多种的解决方式,这里我给你讲一种最常见的方案,就是很多 NoSQL 数据库都在使用的**基于 LSM 树的存储引擎,**这种算法使用最多,所以在这里着重剖析一下。

LSM 树(Log-Structured Merge Tree)牺牲了一定的读性能来换取写入数据的高性能,Hbase、Cassandra、LevelDB 都是用这种算法作为存储的引擎。

它的思想很简单,数据首先会写入到一个叫做 MemTable 的内存结构中,在 MemTable 中数据是按照写入的 Key 来排序的。为了防止 MemTable 里面的数据因为机器掉电或者重启而丢失,一般会通过写 Write Ahead Log 的方式将数据备份在磁盘上。

MemTable 在累积到一定规模时,它会被刷新生成一个新的文件,我们把这个文件叫做 SSTable(Sorted String Table)。当 SSTable 达到一定数量时,我们会将这些 SSTable 合并,减少文件的数量,因为 SSTable 都是有序的,所以合并的速度也很快。

当从 LSM 树里面读数据时,我们首先从 MemTable 中查找数据,如果数据没有找到,再从 SSTable 中查找数据。因为存储的数据都是有序的,所以查找的效率是很高的,只是因为数据被拆分成多个 SSTable,所以读取的效率会低于 B+ 树索引。

sstable.jpg

和 LSM 树类似的算法有很多,比如说 TokuDB 使用的名为 Fractal tree 的索引结构,它们的核心思想就是将随机 IO 变成顺序的 IO,从而提升写入的性能。

提升扩展性

另外,在扩展性方面,很多 NoSQL 数据库也有着先天的优势。还是以你的垂直电商系统为例,你已经为你的电商系统增加了评论系统,开始你的评估比较乐观,觉得电商系统的评论量级不会增长很快,所以就为它分了 8 个库,每个库拆分成 16 张表。

但是评论系统上线之后,存储量级增长的异常迅猛,你不得不将数据库拆分成更多的库表,而数据也要重新迁移到新的库表中,过程非常痛苦,而且数据迁移的过程也非常容易出错。

这时,你考虑是否可以考虑使用 NoSQL 数据库来彻底解决扩展性的问题,经过调研你发现它们在设计之初就考虑到了分布式和大数据存储的场景,比如像 MongoDB 就有三个扩展性方面的特性。

  • 其一是 Replica,也叫做副本集,你可以理解为主从分离,也就是通过将数据拷贝成多份来保证当主挂掉后数据不会丢失。同时呢,Replica 还可以分担读请求。Replica 中有主节点来承担写请求,并且把对数据变动记录到 oplog 里(类似于 binlog);从节点接收到 oplog 后就会修改自身的数据以保持和主节点的一致。一旦主节点挂掉,MongoDB 会从从节点中选取一个节点成为主节点,可以继续提供写数据服务。
  • 其二是 Shard,也叫做分片,你可以理解为分库分表,即将数据按照某种规则拆分成多份,存储在不同的机器上。MongoDB 的 Sharding 特性一般需要三个角色来支持,一个是 Shard Server,它是实际存储数据的节点,是一个独立的 Mongod 进程;二是 Config Server,也是一组 Mongod 进程,主要存储一些元信息,比如说哪些分片存储了哪些数据等;最后是 Route Server,它不实际存储数据,仅仅作为路由使用,它从 Config Server 中获取元信息后,将请求路由到正确的 Shard Server 中。

mongodb.jpg

  • 其三是负载均衡,就是当 MongoDB 发现 Shard 之间数据分布不均匀,会启动 Balancer 进程对数据做重新的分配,最终让不同 Shard Server 的数据可以尽量的均衡。当我们的 Shard Server 存储空间不足需要扩容时,数据会自动被移动到新的 Shard Server 上,减少了数据迁移和验证的成本。

你可以看到,NoSQL 数据库中内置的扩展性方面的特性可以让我们不再需要对数据库做分库分表和主从分离,也是对传统数据库一个良好的补充。

缓存专题(三) Write Through(读穿 写穿)策略

缓存专题(三) Write Through(读穿 写穿)策略

这个策略的核心原则是用户只与缓存打交道,由缓存和数据库通信,写入或者读取数据。这就好比你在汇报工作的时候只对你的直接上级汇报,再由你的直接上级汇报给他的上级,你是不能越级汇报的。

Write Through 的策略是这样的:先查询要写入的数据在缓存中是否已经存在,如果已经存在,则更新缓存中的数据,并且由缓存组件同步更新到数据库中,如果缓存中数据不存在,我们把这种情况叫做“Write Miss(写失效)”。

一般来说,我们可以选择两种“Write Miss”方式:一个是“Write Allocate(按写分配)”,做法是写入缓存相应位置,再由缓存组件同步更新到数据库中;另一个是“No-write allocate(不按写分配)”,做法是不写入缓存中,而是直接更新到数据库中。

在 Write Through 策略中,我们一般选择“No-write allocate”方式,原因是无论采用哪种“Write Miss”方式,我们都需要同步将数据更新到数据库中,而“No-write allocate”方式相比“Write Allocate”还减少了一次缓存的写入,能够提升写入的性能。

Read Through 策略就简单一些,它的步骤是这样的:先查询缓存中数据是否存在,如果存在则直接返回,如果不存在,则由缓存组件负责从数据库中同步加载数据。

下面是 Read Through/Write Through 策略的示意图:

cache_through1.jpg

Read Through/Write Through 策略的特点是由缓存节点而非用户来和数据库打交道,在我们开发过程中相比 Cache Aside 策略要少见一些,原因是我们经常使用的分布式缓存组件,无论是 Memcached 还是 Redis 都不提供写入数据库,或者自动加载数据库中的数据的功能。而我们在使用本地缓存的时候可以考虑使用这种策略,比如说在上一节中提到的本地缓存 Guava Cache 中的 Loading Cache 就有 Read Through 策略的影子。

我们看到 Write Through 策略中写数据库是同步的,这对于性能来说会有比较大的影响,因为相比于写缓存,同步写数据库的延迟就要高很多了。那么我们可否异步地更新数据库?这就是我们接下来要提到的“Write Back”策略。

Write Back(写回)策略

这个策略的核心思想是在写入数据时只写入缓存,并且把缓存块儿标记为“脏”的。而脏块儿只有被再次使用时才会将其中的数据写入到后端存储中。

**需要注意的是,**在“Write Miss”的情况下,我们采用的是“Write Allocate”的方式,也就是在写入后端存储的同时要写入缓存,这样我们在之后的写请求中都只需要更新缓存即可,而无需更新后端存储了,我将 Write back 策略的示意图放在了下面:

cache_through3jpg.jpg

**发现了吗?**其实这种策略不能被应用到我们常用的数据库和缓存的场景中,它是计算机体系结构中的设计,比如我们在向磁盘中写数据时采用的就是这种策略。无论是操作系统层面的 Page Cache,还是日志的异步刷盘,亦或是消息队列中消息的异步写入磁盘,大多采用了这种策略。因为这个策略在性能上的优势毋庸置疑,它避免了直接写磁盘造成的随机写问题,毕竟写内存和写磁盘的随机 I/O 的延迟相差了几个数量级呢。

但因为缓存一般使用内存,而内存是非持久化的,所以一旦缓存机器掉电,就会造成原本缓存中的脏块儿数据丢失。所以你会发现系统在掉电之后,之前写入的文件会有部分丢失,就是因为 Page Cache 还没有来得及刷盘造成的。

**当然,你依然可以在一些场景下使用这个策略,在使用时,我想给你的落地建议是:**你在向低速设备写入数据的时候,可以在内存里先暂存一段时间的数据,甚至做一些统计汇总,然后定时地刷新到低速设备上。比如说,你在统计你的接口响应时间的时候,需要将每次请求的响应时间打印到日志中,然后监控系统收集日志后再做统计。但是如果每次请求都打印日志无疑会增加磁盘 I/O,那么不如把一段时间的响应时间暂存起来,经过简单的统计平均耗时,每个耗时区间的请求数量等等,然后定时地,批量地打印到日志中。

缓存专题(二) Cache Aside(旁路缓存)策略

缓存专题(二) Cache Aside(旁路缓存)策略

我们来考虑一种最简单的业务场景,比方说在你的电商系统中有一个用户表,表中只有 ID 和年龄两个字段,缓存中我们以 ID 为 Key 存储用户的年龄信息。那么当我们要把 ID 为 1 的用户的年龄从 19 变更为 20,要如何做呢?

**你可能会产生这样的思路:**先更新数据库中 ID 为 1 的记录,再更新缓存中 Key 为 1 的数据。

cache_aside1.jpg

**这个思路会造成缓存和数据库中的数据不一致。**比如,A 请求将数据库中 ID 为 1 的用户年龄从 19 变更为 20,与此同时,请求 B 也开始更新 ID 为 1 的用户数据,它把数据库中记录的年龄变更为 21,然后变更缓存中的用户年龄为 21。紧接着,A 请求开始更新缓存数据,它会把缓存中的年龄变更为 20。此时,数据库中用户年龄是 21,而缓存中的用户年龄却是 20。

cache_aside2.jpg

**为什么产生这个问题呢?**因为变更数据库和变更缓存是两个独立的操作,而我们并没有对操作做任何的并发控制。那么当两个线程并发更新它们的时候,就会因为写入顺序的不同造成数据的不一致。

另外,直接更新缓存还存在另外一个问题就是丢失更新。还是以我们的电商系统为例,假如电商系统中的账户表有三个字段:ID、户名和金额,这个时候缓存中存储的就不只是金额信息,而是完整的账户信息了。当更新缓存中账户金额时,你需要从缓存中查询完整的账户数据,把金额变更后再写入到缓存中。

这个过程中也会有并发的问题,比如说原有金额是 20,A 请求从缓存中读到数据,并且把金额加 1,变更成 21,在未写入缓存之前又有请求 B 也读到缓存的数据后把金额也加 1,也变更成 21,两个请求同时把金额写回缓存,这时缓存里面的金额是 21,但是我们实际上预期是金额数加 2,这也是一个比较大的问题。

**那我们要如何解决这个问题呢?**其实,我们可以在更新数据时不更新缓存,而是删除缓存中的数据,在读取数据时,发现缓存中没了数据之后,再从数据库中读取数据,更新到缓存中。

cache_aside3.jpg

这个策略就是我们使用缓存最常见的策略,Cache Aside 策略(也叫旁路缓存策略),这个策略数据以数据库中的数据为准,缓存中的数据是按需加载的。它可以分为读策略和写策略,其中读策略的步骤是:

  • 从缓存中读取数据;
  • 如果缓存命中,则直接返回数据;
  • 如果缓存不命中,则从数据库中查询数据;
  • 查询到数据后,将数据写入到缓存中,并且返回给用户。

写策略的步骤是:

  • 更新数据库中的记录;
  • 删除缓存记录。

你也许会问了,在写策略中,能否先删除缓存,后更新数据库呢?**答案是不行的,**因为这样也有可能出现缓存数据不一致的问题,我以用户表的场景为例解释一下。

假设某个用户的年龄是 20,请求 A 要更新用户年龄为 21,所以它会删除缓存中的内容。这时,另一个请求 B 要读取这个用户的年龄,它查询缓存发现未命中后,会从数据库中读取到年龄为 20,并且写入到缓存中,然后请求 A 继续更改数据库,将用户的年龄更新为 21,这就造成了缓存和数据库的不一致。

cache_aside4.jpg

那么像 Cache Aside 策略这样先更新数据库,后删除缓存就没有问题了吗?其实在理论上还是有缺陷的。假如某个用户数据在缓存中不存在,请求 A 读取数据时从数据库中查询到年龄为 20,在未写入缓存中时另一个请求 B 更新数据。它更新数据库中的年龄为 21,并且清空缓存。这时请求 A 把从数据库中读到的年龄为 20 的数据写入到缓存中,造成缓存和数据库数据不一致。

cache_aside5.jpg

不过这种问题出现的几率并不高,原因是缓存的写入通常远远快于数据库的写入,所以在实际中很难出现请求 B 已经更新了数据库并且清空了缓存,请求 A 才更新完缓存的情况。而一旦请求 A 早于请求 B 清空缓存之前更新了缓存,那么接下来的请求就会因为缓存为空而从数据库中重新加载数据,所以不会出现这种不一致的情况。

**Cache Aside 策略是我们日常开发中最经常使用的缓存策略,不过我们在使用时也要学会依情况而变。**比如说当新注册一个用户,按照这个更新策略,你要写数据库,然后清理缓存(当然缓存中没有数据给你清理)。可当我注册用户后立即读取用户信息,并且数据库主从分离时,会出现因为主从延迟所以读不到用户信息的情况。

而解决这个问题的办法恰恰是在插入新数据到数据库之后写入缓存,这样后续的读请求就会从缓存中读到数据了。并且因为是新注册的用户,所以不会出现并发更新用户信息的情况。

Cache Aside 存在的最大的问题是当写入比较频繁时,缓存中的数据会被频繁地清理,这样会对缓存的命中率有一些影响。如果你的业务对缓存命中率有严格的要求,那么可以考虑两种解决方案:

  1. 一种做法是在更新数据时也更新缓存,只是在更新缓存前先加一个分布式锁,因为这样在同一时间只允许一个线程更新缓存,就不会产生并发问题了。当然这么做对于写入的性能会有一些影响;

  2. 另一种做法同样也是在更新数据时更新缓存,只是给缓存加一个较短的过期时间,这样即使出现缓存不一致的情况,缓存的数据也会很快地过期,对业务的影响也是可以接受。

缓存专题(一) 缓存概述

缓存专题(一) 缓存概述

缓存分类

在我们日常开发中,常见的缓存主要就是静态缓存、分布式缓存和热点本地缓存这三种。

静态缓存在 Web 1.0 时期是非常著名的,它一般通过生成 Velocity 模板或者静态 HTML 文件来实现静态缓存,在 Nginx 上部署静态缓存可以减少对于后台应用服务器的压力。例如,我们在做一些内容管理系统的时候,后台会录入很多的文章,前台在网站上展示文章内容,就像新浪,网易这种门户网站一样。

当然,我们也可以把文章录入到数据库里面,然后前端展示的时候穿透查询数据库来获取数据,但是这样会对数据库造成很大的压力。即使我们使用分布式缓存来挡读请求,但是对于像日均 PV 几十亿的大型门户网站来说,基于成本考虑仍然是不划算的。

所以我们的解决思路是每篇文章在录入的时候渲染成静态页面,放置在所有的前端 Nginx 或者 Squid 等 Web 服务器上,这样用户在访问的时候会优先访问 Web 服务器上的静态页面,在对旧的文章执行一定的清理策略后,依然可以保证 99% 以上的缓存命中率。

这种缓存只能针对静态数据来缓存,对于动态请求就无能为力了。那么我们如何针对动态请求做缓存呢?这时你就需要分布式缓存了。

分布式缓存的大名可谓是如雷贯耳了,我们平时耳熟能详的 Memcached、Redis 就是分布式缓存的典型例子。它们性能强劲,通过一些分布式的方案组成集群可以突破单机的限制。所以在整体架构中,分布式缓存承担着非常重要的角色。

对于静态的资源的缓存你可以选择静态缓存,对于动态的请求你可以选择分布式缓存,那么什么时候要考虑热点本地缓存呢?

**答案是当我们遇到极端的热点数据查询的时候。**热点本地缓存主要部署在应用服务器的代码中,用于阻挡热点查询对于分布式缓存节点或者数据库的压力。

比如某一位明星在微博上有了热点话题,“吃瓜群众”会到他 (她) 的微博首页围观,这就会引发这个用户信息的热点查询。这些查询通常会命中某一个缓存节点或者某一个数据库分区,短时间内会形成极高的热点查询。

那么我们会在代码中使用一些本地缓存方案,如 HashMap,Guava Cache 或者是 Ehcache 等,它们和应用程序部署在同一个进程中,优势是不需要跨网络调度,速度极快,所以可以来阻挡短时间内的热点查询。来看个例子。

比方说你的垂直电商系统的首页有一些推荐的商品,这些商品信息是由编辑在后台录入和变更。你分析编辑录入新的商品或者变更某个商品的信息后,在页面的展示是允许有一些延迟的,比如说 30 秒的延迟,并且首页请求量最大,即使使用分布式缓存也很难抗住,所以你决定使用 Guava Cache 来将所有的推荐商品的信息缓存起来,并且设置每隔 30 秒重新从数据库中加载最新的所有商品。

首先,我们初始化 Guava 的 Loading Cache:

1
2
3
4
5
6
7
8
9
CacheBuilder<String, List<Product>> cacheBuilder = CacheBuilder.newBuilder().maximumSize(maxSize).recordStats(); // 设置缓存最大值
cacheBuilder = cacheBuilder.refreshAfterWrite(30, TimeUnit.Seconds); // 设置刷新间隔

LoadingCache<String, List<Product>> cache = cacheBuilder.build(new CacheLoader<String, List<Product>>() {
@Override
public List<Product> load(String k) throws Exception {
return productService.loadAll(); // 获取所有商品
}
});

这样,你在获取所有商品信息的时候可以调用 Loading Cache 的 get 方法,就可以优先从本地缓存中获取商品信息,如果本地缓存不存在,会使用 CacheLoader 中的逻辑从数据库中加载所有的商品。

由于本地缓存是部署在应用服务器中,而我们应用服务器通常会部署多台,当数据更新时,我们不能确定哪台服务器本地中了缓存,更新或者删除所有服务器的缓存不是一个好的选择,所以我们通常会等待缓存过期。因此,这种缓存的有效期很短,通常为分钟或者秒级别,以避免返回前端脏数据。

缓存的不足

通过了解上面的内容,你不难发现,缓存的主要作用是提升访问速度,从而能够抗住更高的并发。那么,缓存是不是能够解决一切问题?显然不是。事物都是具有两面性的,缓存也不例外,我们要了解它的优势的同时也需要了解它有哪些不足,从而扬长避短,将它的作用发挥到最大。

**首先,缓存比较适合于读多写少的业务场景,并且数据最好带有一定的热点属性。**这是因为缓存毕竟会受限于存储介质不可能缓存所有数据,那么当数据有热点属性的时候才能保证一定的缓存命中率。比如说类似微博、朋友圈这种 20% 的内容会占到 80% 的流量。所以,一旦当业务场景读少写多时或者没有明显热点时,比如在搜索的场景下,每个人搜索的词都会不同,没有明显的热点,那么这时缓存的作用就不明显了。

**其次,缓存会给整体系统带来复杂度,并且会有数据不一致的风险。**当更新数据库成功,更新缓存失败的场景下,缓存中就会存在脏数据。对于这种场景,我们可以考虑使用较短的过期时间或者手动清理的方式来解决。

**再次,之前提到缓存通常使用内存作为存储介质,但是内存并不是无限的。**因此,我们在使用缓存的时候要做数据存储量级的评估,对于可预见的需要消耗极大存储成本的数据,要慎用缓存方案。同时,缓存一定要设置过期时间,这样可以保证缓存中的会是热点数据。

**最后,缓存会给运维也带来一定的成本,**运维需要对缓存组件有一定的了解,在排查问题的时候也多了一个组件需要考虑在内。

虽然有这么多的不足,但是缓存对于性能的提升是毋庸置疑的,我们在做架构设计的时候也需要把它考虑在内,只是在做具体方案的时候需要对缓存的设计有更细致的思考,才能最大化的发挥缓存的优势。

resilience4j-retry源码阅读

resilience4j 源码还是比较清晰简单的,比较适合阅读。

放一张主要类的结构图:

Retry入口

Retry接口是提供重试功能的入口,主要提供了方法模版,具体校验结构,失败后处理由Context子类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Creates a retryable supplier.
*
* @param retry the retry context
* @param supplier the original function
* @param <T> the type of results supplied by this supplier
* @return a retryable function
*/
static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier) {
return () -> {
Retry.Context<T> context = retry.context();
do try {
T result = supplier.get();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
} while (true);
};
}

这里摘抄了一段核心代码,作用是循环直到context.onResult(result)返回true为止,需要留意context.onResult/onRuntimeError/onError可能执行多次, onSuccess只会执行一次,这里每次进入重试都是一个新的context对象。

Retry.ContextImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean onResult(T result) {
if (null != resultPredicate && resultPredicate.test(result)) {
int currentNumOfAttempts = numOfAttempts.incrementAndGet();
if (currentNumOfAttempts >= maxAttempts) {
return false;
} else {
waitIntervalAfterFailure(currentNumOfAttempts, null);
return true;
}
}
return false;
}

public void onRuntimeError(RuntimeException runtimeException) {
if (exceptionPredicate.test(runtimeException)) {
lastRuntimeException.set(runtimeException);
throwOrSleepAfterRuntimeException();
} else {
failedWithoutRetryCounter.increment();
publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), runtimeException));
throw runtimeException;
}
}

先关注onResult,它负责判断是否需要继续重试,如果通过校验或者重试超过此数,会停止重试。

onRuntimeError/onError, 负责把catch的异常存储在lastRuntimeException中。

1
2
3
4
5
6
7
8
9
10
public void onSuccess() {
int currentNumOfAttempts = numOfAttempts.get();
if (currentNumOfAttempts > 0) {
succeededAfterRetryCounter.increment();
Throwable throwable = Option.of(lastException.get()).getOrElse(lastRuntimeException.get());
publishRetryEvent(() -> new RetryOnSuccessEvent(getName(), currentNumOfAttempts, throwable));
} else {
succeededWithoutRetryCounter.increment();
}
}

onSuccess负责统计和发送事件。

总结

总体来说retry比较简单,需要注意的点有一个如果设置了结果校验,如果一直校验不通过,将返回未通过的结果,而不是返回失败。

[片段] 使用redis创建简易搜索引擎(核心篇)

支持and查询、多选、多字段排序分页,缺少的功能:or 条件

核心类,有一些测试代码,将就一下。另外需要spring-data-redis 2.0版本以上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
package app.pooi.redissearch.search;

import app.pooi.redissearch.search.anno.CreateIndex;
import app.pooi.redissearch.search.anno.Field;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.Data;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.hash.Jackson2HashMapper;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static app.pooi.redissearch.search.SearchCore.Util.*;

@RestController
@Service
public class SearchCore {

private StringRedisTemplate redisTemplate;

private Jackson2HashMapper hashMapper = new Jackson2HashMapper(true);

@Data
private static class Person {
private Long id;
private String name;
private Integer age;
private Long ctime;
}

@PostMapping("/person")
@CreateIndex(
index = "person",
documentId = "#p0.id",
fields = {
@Field(propertyName = "name", value = "#p0.name"),
@Field(propertyName = "age", value = "#p0.age", sort = true),
@Field(propertyName = "ctime", value = "#p0.ctime", sort = true)
})
Person addPerson(Person person) {
return person;
}

public SearchCore(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}


public void indexMeta(String index, Map<String, FieldMeta> fieldMeta) {
this.redisTemplate.opsForHash().putAll(genIdxMetaName(index), hashMapper.toHash(fieldMeta));
}

@PostMapping("/index")
public int indexDocument(
final String index,
final String field,
final String documentId,
final String document) {
return this.indexDocument(index, field, documentId, document, doc -> Lists.newArrayList(doc.split("")));
}

public int indexDocument(
final String index,
final String field,
final String documentId,
final String document,
final Function<String, List<String>> tokenizer) {

final List<String> tokens = tokenizer != null ?
tokenizer.apply(document) :
Collections.singletonList(document);

final String docKey = genDocIdxName(index, documentId);

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;

final String[] idxs = tokens.stream()
.map(word -> genIdxName(index, field, word))
.peek(idx -> ((StringRedisTemplate) operations).opsForSet().add(idx, documentId))
.toArray(String[]::new);

template.opsForSet().add(docKey, idxs);
return null;
}
});
return results.size();
}

public int indexSortField(
final String index,
final String field,
final String documentId,
final Double document) {

final String docKey = genDocIdxName(index, documentId);

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;
final String idxName = genSortIdxName(index, field);
template.opsForZSet().add(idxName, documentId, document);
template.opsForSet().add(docKey, idxName);
return null;
}
});
return results.size();
}

@DeleteMapping("/index")
public int deleteDocumentIndex(final String index, final String documentId) {
final String docKey = genDocIdxName(index, documentId);
final Boolean hasKey = redisTemplate.hasKey(docKey);
if (!hasKey) {
return 0;
}

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final Set<String> idx = redisTemplate.opsForSet().members(docKey);
((StringRedisTemplate) operations).delete(idx);
((StringRedisTemplate) operations).delete(docKey);
return null;
}
});
return results.size();
}

@PatchMapping("/index")
public int updateDocumentIndex(final String index, final String field, final String documentId, final String document) {
this.deleteDocumentIndex(index, documentId);
return this.indexDocument(index, field, documentId, document);
}

public int updateSortField(final String index, final String field, final String documentId, final Double document) {
this.deleteDocumentIndex(index, documentId);
return this.indexSortField(index, field, documentId, document);
}

private Consumer<SetOperations<String, String>> operateAndStore(String method, String key, Collection<String> keys, String destKey) {
switch (method) {
case "intersectAndStore":
return (so) -> so.intersectAndStore(key, keys, destKey);
case "unionAndStore":
return (so) -> so.unionAndStore(key, keys, destKey);
case "differenceAndStore":
return (so) -> so.differenceAndStore(key, keys, destKey);
default:
return so -> {
};
}
}

private Consumer<ZSetOperations<String, String>> zOperateAndStore(String method, String key, Collection<String> keys, String destKey, final RedisZSetCommands.Weights weights) {
switch (method) {
case "intersectAndStore":
return (so) -> so.intersectAndStore(key, keys, destKey, RedisZSetCommands.Aggregate.SUM, weights);
case "unionAndStore":
return (so) -> so.unionAndStore(key, keys, destKey, RedisZSetCommands.Aggregate.SUM, weights);
default:
return so -> {
};
}
}

private String common(String index, String method, List<String> keys, long ttl) {
final String destKey = Util.genQueryIdxName(index);

redisTemplate.executePipelined(new SessionCallback<String>() {
@Override
public <K, V> String execute(RedisOperations<K, V> operations) throws DataAccessException {
operateAndStore(method,
keys.stream().limit(1L).findFirst().get(),
keys.stream().skip(1L).collect(Collectors.toList()),
destKey)
.accept(((StringRedisTemplate) operations).opsForSet());
((StringRedisTemplate) operations).expire(destKey, ttl, TimeUnit.SECONDS);
return null;
}
});
return destKey;
}

public String intersect(String index, List<String> keys, long ttl) {
return common(index, "intersectAndStore", keys, ttl);
}

public String union(String index, List<String> keys, long ttl) {
return common(index, "unionAndStore", keys, ttl);
}

public String diff(String index, List<String> keys, long ttl) {
return common(index, "differenceAndStore", keys, ttl);
}

private static Tuple2<Set<Tuple2<String, String>>, Set<Tuple2<String, String>>> parse(String query) {

final Pattern pattern = Pattern.compile("[+-]?([\\w\\d]+):(\\S+)");

final Matcher matcher = pattern.matcher(query);

Set<Tuple2<String, String>> unwant = Sets.newHashSet();
Set<Tuple2<String, String>> want = Sets.newHashSet();

while (matcher.find()) {
String word = matcher.group();

String prefix = null;
if (word.length() > 1) {
prefix = word.substring(0, 1);
}

final Tuple2<String, String> t = Tuples.of(matcher.group(1), matcher.group(2));
if ("-".equals(prefix)) {
unwant.add(t);
} else {
want.add(t);
}
}
return Tuples.of(want, unwant);
}


public String query(
String index,
String query) {

final Tuple2<Set<Tuple2<String, String>>, Set<Tuple2<String, String>>> parseResult = parse(query);
final Set<Tuple2<String, String>> want = parseResult.getT1();
final Set<Tuple2<String, String>> unwant = parseResult.getT2();


if (want.isEmpty()) {
return "";
}

final Map<String, FieldMeta> entries = (Map<String, FieldMeta>) hashMapper.fromHash(redisTemplate.<String, Object>opsForHash().entries(genIdxMetaName(index)));

// union
final List<Tuple2<String, String>> unionFields = want.stream()
.filter(w -> w.getT2().contains(","))
.filter(w -> "true".equals(entries.get(w.getT1()).getSort()))
.collect(Collectors.toList());
final List<String> unionIdx = unionFields.stream()
.flatMap(w -> Arrays.stream(w.getT2().split(",")).map(value -> Tuples.of(w.getT1(), value)))
.map(w -> genIdxName(index, w.getT1(), w.getT2()))
.collect(Collectors.toList());

final String unionResultId = unionIdx.isEmpty() ? "" : this.union(index, unionIdx, 30L);

want.removeAll(unionFields);

// intersect
final List<String> intersectIdx = want.stream()
.flatMap(t -> {
if ("true".equals(entries.get(t.getT1()).getSort()))
return Stream.of(t);
return Arrays.stream(t.getT2().split("")).map(value -> Tuples.of(t.getT1(), value));
})
.map(w -> genIdxName(index, w.getT1(), w.getT2()))
.collect(Collectors.toList());

if (!unionResultId.isEmpty())
intersectIdx.add(unionResultId);

String intersectResult = this.intersect(index, intersectIdx, 30L);

// diff
return unwant.isEmpty() ?
intersectResult :
this.diff(index, Stream.concat(Stream.of(intersectResult), unwant.stream().map(w -> genIdxName(index, w.getT1(), w.getT2()))).collect(Collectors.toList()), 30L);
}

@GetMapping("/query/{index}")
public Set<String> queryAndSort(
@PathVariable("index") String index,
@RequestParam("param") String query,
@RequestParam("sort") String sort,
Integer start,
Integer stop
) {
final String[] sorts = sort.split(" ");

final Map<String, Integer> map = Arrays.stream(sorts).collect(
Collectors.toMap(f -> {
if (f.startsWith("+") || f.startsWith("-")) {
f = f.substring(1);
}
return genSortIdxName("person", f);
}, field -> field.startsWith("-") ? -1 : 1)
);

final int[] weights = map.values()
.stream()
.mapToInt(Integer::intValue)
.toArray();


// if (!sort.startsWith("+") && !sort.startsWith("-")) {
// sort = "+" + sort;
// }
// boolean desc = sort.startsWith("-");
// sort = sort.substring(1);

String queryId = this.query(index, query);
Long size;
if (queryId.length() == 0 || (size = redisTemplate.opsForSet().size(queryId)) == null || size == 0) {
return Collections.emptySet();
}

final String resultId = genQueryIdxName(index);

// String sortField = sort;

redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;

// template.opsForZSet().intersectAndStore(genSortIdxName(index, sortField), queryId, resultId);

SearchCore.this.zOperateAndStore("intersectAndStore",
map.keySet().stream().limit(1L).findFirst().get(),
Stream.concat(map.keySet().stream().skip(1L), Stream.of(queryId)).collect(Collectors.toList()),
resultId, RedisZSetCommands.Weights.of(ArrayUtils.add(weights, 0))).accept(template.opsForZSet());

// template.opsForZSet().size(resultId);
template.expire(resultId, 30L, TimeUnit.SECONDS);

return null;
}
});

// sort
return redisTemplate.opsForZSet().range(resultId, start, stop);

}

static class Util {

private Util() {
}

static String genIdxMetaName(String index) {
return String.format("meta:idx:%s", index);
}

static String genIdxName(String index, String field, String value) {
return String.format("idx:%s:%s:%s", index, field, value);
}

static String genSortIdxName(String index, String field) {
return String.format("idx:%s:%s", index, field);
}

static String genQueryIdxName(String index) {
return String.format("idx:%s:q:%s", index, UUID.randomUUID().toString());
}

static String genDocIdxName(String index, String documentId) {
return String.format("doc:%s:%s", index, documentId);
}
}
}

辅助类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import lombok.Data;


@Data
public class FieldMeta {

private String sort = "false";

private String splitFun = "";

public FieldMeta() {

}

public FieldMeta(boolean sort) {
this.sort = Boolean.toString(sort);
}
}

做一个轻量级的搜索还是可以的。