Sharding-JDBC入门

Sharding-JDBC入门

一、Sharding-JDBC 简介

Sharding-JDBC 最早是当当网内部使用的一款分库分表框架,到2017年的时候才开始对外开源,这几年在大量社区贡献者的不断迭代下,功能也逐渐完善,现已更名为 ShardingSphere,2020年4⽉16⽇正式成为 Apache 软件基⾦会的顶级项⽬。

随着版本的不断更迭 ShardingSphere 的核心功能也变得多元化起来。从最开始 Sharding-JDBC 1.0 版本只有数据分片,到 Sharding-JDBC 2.0 版本开始支持数据库治理(注册中心、配置中心等等),再到 Sharding-JDBC 3.0版本又加分布式事务 (支持 Atomikos、Narayana、Bitronix、Seata),如今已经迭代到了 Sharding-JDBC 4.0 版本。

37cd2d870a2f68a5304997aec17b53d81f7f49.png

现在的 ShardingSphere 不单单是指某个框架而是一个生态圈,这个生态圈 Sharding-JDBC、Sharding-Proxy 和 Sharding-Sidecar 这三款开源的分布式数据库中间件解决方案所构成。

ShardingSphere 的前身就是 Sharding-JDBC,所以它是整个框架中最为经典、成熟的组件,我们先从 Sharding-JDBC 框架入手学习分库分表。

二、核心概念

在开始 Sharding-JDBC分库分表具体实战之前,我们有必要先了解分库分表的一些核心概念。

分片

一般我们在提到分库分表的时候,大多是以水平切分模式(水平分库、分表)为基础来说的,数据分片将原本一张数据量较大的表 t_order 拆分生成数个表结构完全一致的小数据量表 t_order_0、t_order_1、···、t_order_n,每张表只存储原大表中的一部分数据,当执行一条SQL时会通过 分库策略、分片策略 将数据分散到不同的数据库、表内。

357c9a6065cf897f9ac02664575f01f8fd7f8f.png

数据节点

数据节点是分库分表中一个不可再分的最小数据单元(表),它由数据源名称和数据表组成,例如上图中 order_db_1.t_order_0、order_db_2.t_order_1 就表示一个数据节点。

逻辑表

逻辑表是指一组具有相同逻辑和数据结构表的总称。比如我们将订单表t_order 拆分成 t_order_0 ··· t_order_9 等 10张表。此时我们会发现分库分表以后数据库中已不在有 t_order 这张表,取而代之的是 t_order_n,但我们在代码中写 SQL 依然按 t_order 来写。此时 t_order 就是这些拆分表的逻辑表。

真实表

真实表也就是上边提到的 t_order_n 数据库中真实存在的物理表。

分片键

用于分片的数据库字段。我们将 t_order 表分片以后,当执行一条SQL时,通过对字段 order_id 取模的方式来决定,这条数据该在哪个数据库中的哪个表中执行,此时 order_id 字段就是 t_order 表的分片健。

391fef3850fd6eaef447679258468417b4fabf.png

这样以来同一个订单的相关数据就会存在同一个数据库表中,大幅提升数据检索的性能,不仅如此 sharding-jdbc 还支持根据多个字段作为分片健进行分片。

分片算法

上边我们提到可以用分片健取模的规则分片,但这只是比较简单的一种,在实际开发中我们还希望用 >=、<=、>、<、BETWEEN 和 IN 等条件作为分片规则,自定义分片逻辑,这时就需要用到分片策略与分片算法。

从执行 SQL 的角度来看,分库分表可以看作是一种路由机制,把 SQL 语句路由到我们期望的数据库或数据表中并获取数据,分片算法可以理解成一种路由规则。

咱们先捋一下它们之间的关系,分片策略只是抽象出的概念,它是由分片算法和分片健组合而成,分片算法做具体的数据分片逻辑。

分库、分表的分片策略配置是相对独立的,可以各自使用不同的策略与算法,每种策略中可以是多个分片算法的组合,每个分片算法可以对多个分片健做逻辑判断。

17e8c31099548f64886625a65b19ab87893fd8.png

分片算法和分片策略的关系

注意:sharding-jdbc 并没有直接提供分片算法的实现,需要开发者根据业务自行实现。

sharding-jdbc 提供了4种分片算法:

1、精确分片算法

精确分片算法(PreciseShardingAlgorithm)用于单个字段作为分片键,SQL中有 = 与 IN 等条件的分片,需要在标准分片策略(StandardShardingStrategy )下使用。

2、范围分片算法

范围分片算法(RangeShardingAlgorithm)用于单个字段作为分片键,SQL中有 BETWEEN AND、>、<、>=、<= 等条件的分片,需要在标准分片策略(StandardShardingStrategy )下使用。

3、复合分片算法

复合分片算法(ComplexKeysShardingAlgorithm)用于多个字段作为分片键的分片操作,同时获取到多个分片健的值,根据多个字段处理业务逻辑。需要在复合分片策略(ComplexShardingStrategy )下使用。

4、Hint分片算法

Hint分片算法(HintShardingAlgorithm)稍有不同,上边的算法中我们都是解析SQL 语句提取分片键,并设置分片策略进行分片。但有些时候我们并没有使用任何的分片键和分片策略,可还想将 SQL 路由到目标数据库和表,就需要通过手动干预指定SQL的目标数据库和表信息,这也叫强制路由。

分片策略

上边讲分片算法的时候已经说过,分片策略是一种抽象的概念,实际分片操作的是由分片算法和分片健来完成的。

1、标准分片策略

标准分片策略适用于单分片键,此策略支持 PreciseShardingAlgorithm 和 RangeShardingAlgorithm 两个分片算法。

其中 PreciseShardingAlgorithm 是必选的,用于处理 = 和 IN 的分片。RangeShardingAlgorithm 是可选的,用于处理BETWEEN AND, >, <,>=,<= 条件分片,如果不配置RangeShardingAlgorithm,SQL中的条件等将按照全库路由处理。

2、复合分片策略

复合分片策略,同样支持对 SQL语句中的 =,>, <, >=, <=,IN和 BETWEEN AND 的分片操作。不同的是它支持多分片键,具体分配片细节完全由应用开发者实现。

3、行表达式分片策略

行表达式分片策略,支持对 SQL语句中的 = 和 IN 的分片操作,但只支持单分片键。这种策略通常用于简单的分片,不需要自定义分片算法,可以直接在配置文件中接着写规则。

t_order_$->{t_order_id % 4} 代表 t_order 对其字段 t_order_id取模,拆分成4张表,而表名分别是t_order_0 到 t_order_3。

4、Hint分片策略

Hint分片策略,对应上边的Hint分片算法,通过指定分片健而非从 SQL中提取分片健的方式进行分片的策略。

分布式主键

数据分⽚后,不同数据节点⽣成全局唯⼀主键是⾮常棘⼿的问题,同⼀个逻辑表(t_order)内的不同真实表(t_order_n)之间的⾃增键由于⽆法互相感知而产⽣重复主键。

尽管可通过设置⾃增主键 初始值 和 步⻓ 的⽅式避免ID碰撞,但这样会使维护成本加大,乏完整性和可扩展性。如果后去需要增加分片表的数量,要逐一修改分片表的步长,运维成本非常高,所以不建议这种方式。

实现分布式主键⽣成器的方式很多,可以参考我之前写的《9种分布式ID生成方式》

为了让上手更加简单,ApacheShardingSphere 内置了UUID、SNOWFLAKE 两种分布式主键⽣成器,默认使⽤雪花算法(snowflake)⽣成64bit的⻓整型数据。不仅如此它还抽离出分布式主键⽣成器的接口,⽅便我们实现⾃定义的⾃增主键⽣成算法。

广播表

广播表:存在于所有的分片数据源中的表,表结构和表中的数据在每个数据库中均完全一致。一般是为字典表或者配置表 t_config,某个表一旦被配置为广播表,只要修改某个数据库的广播表,所有数据源中广播表的数据都会跟着同步。

绑定表

绑定表:那些分片规则一致的主表和子表。比如:t_order 订单表和 t_order_item 订单服务项目表,都是按 order_id 字段分片,因此两张表互为绑定表关系。

那绑定表存在的意义是啥呢?

通常在我们的业务中都会使用 t_order 和 t_order_item 等表进行多表联合查询,但由于分库分表以后这些表被拆分成N多个子表。如果不配置绑定表关系,会出现笛卡尔积关联查询,将产生如下四条SQL。

1
2
3
4
SELECT * FROM t_order_0 o JOIN t_order_item_0 i ON o.order_id=i.order_id 
SELECT * FROM t_order_0 o JOIN t_order_item_1 i ON o.order_id=i.order_id
SELECT * FROM t_order_1 o JOIN t_order_item_0 i ON o.order_id=i.order_id
SELECT * FROM t_order_1 o JOIN t_order_item_1 i ON o.order_id=i.order_id

476aa9d21749fad3d77018b69ecac4c58f5d4c.png

笛卡尔积查询

而配置绑定表关系后再进行关联查询时,只要对应表分片规则一致产生的数据就会落到同一个库中,那么只需 t_order_0 和 t_order_item_0 表关联即可。

1
2
SELECT * FROM t_order_0 o JOIN t_order_item_0 i ON o.order_id=i.order_id 
SELECT * FROM t_order_1 o JOIN t_order_item_1 i ON o.order_id=i.order_id

420c8da9949389203424118e98818afec717d9.png

绑定表关系

注意:在关联查询时 t_order 它作为整个联合查询的主表。所有相关的路由计算都只使用主表的策略,t_order_item 表的分片相关的计算也会使用 t_order 的条件,所以要保证绑定表之间的分片键要完全相同。

三、和JDBC的猫腻

从名字上不难看出,Sharding-JDBC 和 JDBC有很大关系,我们知道 JDBC 是一种 Java 语言访问关系型数据库的规范,其设计初衷就是要提供一套用于各种数据库的统一标准,不同厂家共同遵守这套标准,并提供各自的实现方案供应用程序调用。

77c415b51b33e9ed0b8844aaaa64fcba718a7d.png

但其实对于开发人员而言,我们只关心如何调用 JDBC API 来访问数据库,只要正确使用 DataSource、Connection、Statement 、ResultSet 等 API 接口,直接操作数据库即可。所以如果想在 JDBC 层面实现数据分片就必须对现有的 API 进行功能拓展,而 Sharding-JDBC 正是基于这种思想,重写了 JDBC 规范并完全兼容了 JDBC 规范。

a5eceb153a77640d6f4504e893dc74ae00507a.png

JDBC流程

对原有的 DataSource、Connection 等接口扩展成 ShardingDataSource、ShardingConnection,而对外暴露的分片操作接口与 JDBC 规范中所提供的接口完全一致,只要你熟悉 JDBC 就可以轻松应用 Sharding-JDBC 来实现分库分表。

c86fa7c9708dfeaf0dc590c13b018b96ec07b9.png

因此它适用于任何基于 JDBC 的 ORM 框架,如:JPA, Hibernate,Mybatis,Spring JDBC Template 或直接使用的 JDBC。完美兼容任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP,Druid, HikariCP 等,几乎对主流关系型数据库都支持。

Sharding-JDBC 又是如何拓展这些接口的呢?想知道答案我们就的从源码入手了,下边我们以 JDBC API 中的 DataSource 为例看看它是如何被重写扩展的。

数据源 DataSource 接口的核心作用就是获取数据库连接对象 Connection,我们看其内部提供了两个获取数据库连接的方法 ,并且继承了 CommonDataSource 和 Wrapper 两个接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface DataSource  extends CommonDataSource, Wrapper {

/**
* <p>Attempts to establish a connection with the data source that
* this {@code DataSource} object represents.
* @return a connection to the data source
*/
Connection getConnection() throws SQLException;

/**
* <p>Attempts to establish a connection with the data source that
* this {@code DataSource} object represents.
* @param username the database user on whose behalf the connection is
* being made
* @param password the user's password
*/
Connection getConnection(String username, String password)
throws SQLException;
}

其中 CommonDataSource 是定义数据源的根接口这很好理解,而 Wrapper 接口则是拓展 JDBC 分片功能的关键。

由于数据库厂商的不同,他们可能会各自提供一些超越标准 JDBC API 的扩展功能,但这些功能非 JDBC 标准并不能直接使用,而 Wrapper 接口的作用就是把一个由第三方供应商提供的、非 JDBC 标准的接口包装成标准接口,也就是适配器模式。

既然讲到了适配器模式就多啰嗦几句,也方便后边的理解。

适配器模式个种比较常用的设计模式,它的作用是将某个类的接口转换成客户端期望的另一个接口,使原本因接口不匹配(或者不兼容)而无法在一起工作的两个类能够在一起工作。比如用耳机听音乐,我有个圆头的耳机,可手机插孔却是扁口的,如果我想要使用耳机听音乐就必须借助一个转接头才可以,这个转接头就起到了适配作用。举个栗子:假如我们 Target 接口中有 hello() 和 word() 两个方法。

1
2
3
4
5
6
public interface Target {

void hello();

void world();
}1.2.3.4.5.6.

可由于接口版本迭代Target 接口的 word() 方法可能会被废弃掉或不被支持,Adaptee 类的 greet()方法将代替hello() 方法。

1
2
3
4
5
6
7
8
9
public class Adaptee {

public void graeet(){

}
public void world(){

}
}

但此时旧版本仍然有大量 word() 方法被使用中,解决此事最好的办法就是创建一个适配器Adapter,这样就适配了 Target 类,解决了接口升级带来的兼容性问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Adapter extends Adaptee implements Target {

@Override
public void world() {

}

@Override
public void hello() {
super.greet();
}

@Override
public void greet() {

}
}

而 Sharding-JDBC 提供的正是非 JDBC 标准的接口,所以它也提供了类似的实现方案,也使用到了 Wrapper 接口做数据分片功能的适配。除了 DataSource 之外,Connection、Statement、ResultSet 等核心对象也都继承了这个接口。

下面我们通过 ShardingDataSource 类源码简单看下实现过程,下图是继承关系流程图。

49b41c48798299718fa1014a36868eea76537d.png

ShardingDataSource实现流程

ShardingDataSource 类它在原 DataSource 基础上做了功能拓展,初始化时注册了分片SQL路由包装器、SQL重写上下文和结果集处理引擎,还对数据源类型做了校验,因为它要同时支持多个不同类型的数据源。到这好像也没看出如何适配,那接着向上看 ShardingDataSource 的继承类 AbstractDataSourceAdapter 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Getter
public class ShardingDataSource extends AbstractDataSourceAdapter {

private final ShardingRuntimeContext runtimeContext;

/**
* 注册路由、SQl重写上下文、结果集处理引擎
*/
static {
NewInstanceServiceLoader.register(RouteDecorator.class);
NewInstanceServiceLoader.register(SQLRewriteContextDecorator.class);
NewInstanceServiceLoader.register(ResultProcessEngine.class);
}

/**
* 初始化时校验数据源类型 并根据数据源 map、分片规则、数据库类型得到一个分片上下文,用来获取数据库连接
*/
public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
super(dataSourceMap);
checkDataSourceType(dataSourceMap);
runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
}

private void checkDataSourceType(final Map<String, DataSource> dataSourceMap) {
for (DataSource each : dataSourceMap.values()) {
Preconditions.checkArgument(!(each instanceof MasterSlaveDataSource), "Initialized data sources can not be master-slave data sources.");
}
}

/**
* 数据库连接
*/
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
}
}

AbstractDataSourceAdapter 抽象类内部主要获取不同类型的数据源对应的数据库连接对象,实现 AutoCloseable 接口是为在使用完资源后可以自动将这些资源关闭(调用 close方法),那再看看继承类 AbstractUnsupportedOperationDataSource 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Getter
public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOperationDataSource implements AutoCloseable {

private final Map<String, DataSource> dataSourceMap;

private final DatabaseType databaseType;

public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
this.dataSourceMap = dataSourceMap;
databaseType = createDatabaseType();
}

public AbstractDataSourceAdapter(final DataSource dataSource) throws SQLException {
dataSourceMap = new HashMap<>(1, 1);
dataSourceMap.put("unique", dataSource);
databaseType = createDatabaseType();
}

private DatabaseType createDatabaseType() throws SQLException {
DatabaseType result = null;
for (DataSource each : dataSourceMap.values()) {
DatabaseType databaseType = createDatabaseType(each);
Preconditions.checkState(null == result || result == databaseType, String.format("Database type inconsistent with '%s' and '%s'", result, databaseType));
result = databaseType;
}
return result;
}

/**
* 不同数据源类型获取数据库连接
*/
private DatabaseType createDatabaseType(final DataSource dataSource) throws SQLException {
if (dataSource instanceof AbstractDataSourceAdapter) {
return ((AbstractDataSourceAdapter) dataSource).databaseType;
}
try (Connection connection = dataSource.getConnection()) {
return DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL());
}
}

@Override
public final Connection getConnection(final String username, final String password) throws SQLException {
return getConnection();
}

@Override
public final void close() throws Exception {
close(dataSourceMap.keySet());
}
}

AbstractUnsupportedOperationDataSource 实现DataSource 接口并继承了 WrapperAdapter 类,它内部并没有什么具体方法只起到桥接的作用,但看着是不是和我们前边讲适配器模式的例子方式有点相似。

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class AbstractUnsupportedOperationDataSource extends WrapperAdapter implements DataSource {

@Override
public final int getLoginTimeout() throws SQLException {
throw new SQLFeatureNotSupportedException("unsupported getLoginTimeout()");
}

@Override
public final void setLoginTimeout(final int seconds) throws SQLException {
throw new SQLFeatureNotSupportedException("unsupported setLoginTimeout(int seconds)");
}
}

WrapperAdapter 是一个包装器的适配类,实现了 JDBC 中的 Wrapper 接口,其中有两个核心方法 recordMethodInvocation 用于添加需要执行的方法和参数,而 replayMethodsInvocation 则将添加的这些方法和参数通过反射执行。仔细看不难发现两个方法中都用到了 JdbcMethodInvocation类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class WrapperAdapter implements Wrapper {

private final Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();

/**
* 添加要执行的方法
*/
@SneakyThrows
public final void recordMethodInvocation(final Class<?> targetClass, final String methodName, final Class<?>[] argumentTypes, final Object[] arguments) {
jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));
}

/**
* 通过反射执行 上边添加的方法
*/
public final void replayMethodsInvocation(final Object target) {
for (JdbcMethodInvocation each : jdbcMethodInvocations) {
each.invoke(target);
}
}
}

JdbcMethodInvocation 类主要应用反射通过传入的 method 方法和 arguments 参数执行对应的方法,这样就可以通过 JDBC API 调用非 JDBC 方法了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RequiredArgsConstructor
public class JdbcMethodInvocation {

@Getter
private final Method method;

@Getter
private final Object[] arguments;

/**
* Invoke JDBC method.
*
* @param target target object
*/
@SneakyThrows
public void invoke(final Object target) {
method.invoke(target, arguments);
}
}

那 Sharding-JDBC 拓展 JDBC API 接口后,在新增的分片功能里又做了哪些事情呢?

一张表经过分库分表后被拆分成多个子表,并分散到不同的数据库中,在不修改原业务 SQL 的前提下,Sharding-JDBC 就必须对 SQL进行一些改造才能正常执行。

大致的执行流程:SQL 解析 -> 执⾏器优化 -> SQL 路由 -> SQL 改写 -> SQL 执⾏ -> 结果归并 六步组成,一起瞅瞅每个步骤做了点什么。
83520a2514c3691be5b36638329a602e3feef1.png

SQL 解析

SQL解析过程分为词法解析和语法解析两步,比如下边这条查询用户订单的SQL,先用词法解析将SQL拆解成不可再分的原子单元。在根据不同数据库方言所提供的字典,将这些单元归类为关键字,表达式,变量或者操作符等类型。

1
SELECT order_no,price FROM t_order_ where user_id = 10086 and order_status > 0

接着语法解析会将拆分后的SQL转换为抽象语法树,通过对抽象语法树遍历,提炼出分片所需的上下文,上下文包含查询字段信息(Field)、表信息(Table)、查询条件(Condition)、排序信息(Order By)、分组信息(Group By)以及分页信息(Limit)等,并标记出 SQL中有可能需要改写的位置。

e4bc5c402444ed05610427bea57676e831fc5e.png

抽象语法树

执⾏器优化

执⾏器优化对SQL分片条件进行优化,处理像关键字 OR这种影响性能的坏味道。

SQL 路由

SQL 路由通过解析分片上下文,匹配到用户配置的分片策略,并生成路由路径。简单点理解就是可以根据我们配置的分片策略计算出 SQL该在哪个库的哪个表中执行,而SQL路由又根据有无分片健区分出 分片路由 和 广播路由。

984dca3599293c51191234fb2fae220084e8a8.png

官方路由图谱

有分⽚键的路由叫分片路由,细分为直接路由、标准路由和笛卡尔积路由这3种类型。

标准路由

标准路由是最推荐也是最为常⽤的分⽚⽅式,它的适⽤范围是不包含关联查询或仅包含绑定表之间关联查询的SQL。

当 SQL分片健的运算符为 = 时,路由结果将落⼊单库(表),当分⽚运算符是BETWEEN 或IN 等范围时,路由结果则不⼀定落⼊唯⼀的库(表),因此⼀条逻辑SQL最终可能被拆分为多条⽤于执⾏的真实SQL。

1
SELECT * FROM t_order  where t_order_id in (1,2)

SQL路由处理后

1
2
SELECT * FROM t_order_0  where t_order_id in (1,2)
SELECT * FROM t_order_1 where t_order_id in (1,2)
直接路由

直接路由是通过使用 HintAPI 直接将 SQL路由到指定⾄库表的一种分⽚方式,而且直接路由可以⽤于分⽚键不在SQL中的场景,还可以执⾏包括⼦查询、⾃定义函数等复杂情况的任意SQL。

比如根据 t_order_id 字段为条件查询订单,此时希望在不修改SQL的前提下,加上 user_id作为分片条件就可以使用直接路由。

笛卡尔积路由

笛卡尔路由是由⾮绑定表之间的关联查询产生的,查询性能较低尽量避免走此路由模式。

无分⽚键的路由又叫做广播路由,可以划分为全库表路由、全库路由、 全实例路由、单播路由和阻断路由这 5种类型。

全库表路由

全库表路由针对的是数据库 DQL和 DML,以及 DDL等操作,当我们执行一条逻辑表 t_order SQL时,在所有分片库中对应的真实表 t_order_0 ··· t_order_n 内逐一执行。

全库路由

全库路由主要是对数据库层面的操作,比如数据库 SET 类型的数据库管理命令,以及 TCL 这样的事务控制语句。

对逻辑库设置 autocommit 属性后,所有对应的真实库中都执行该命令。

1
SET autocommit=0;
全实例路由

全实例路由是针对数据库实例的 DCL 操作(设置或更改数据库用户或角色权限),比如:创建一个用户 order ,这个命令将在所有的真实库实例中执行,以此确保 order 用户可以正常访问每一个数据库实例。

1
CREATE USER order@127.0.0.1 identified BY '程序员内点事';
单播路由

单播路由用来获取某一真实表信息,比如获得表的描述信息:

1
DESCRIBE t_order; 

t_order 的真实表是 t_order_0 ···· t_order_n,他们的描述结构相完全同,我们只需在任意的真实表执行一次就可以。

阻断路由

⽤来屏蔽SQL对数据库的操作,例如:

1
USE order_db;

这个命令不会在真实数据库中执⾏,因为ShardingSphere 采⽤的是逻辑 Schema(数据库的组织和结构) ⽅式,所以无需将切换数据库的命令发送⾄真实数据库中。

SQL 改写

将基于逻辑表开发的SQL改写成可以在真实数据库中可以正确执行的语句。比如查询 t_order 订单表,我们实际开发中 SQL是按逻辑表 t_order 写的。

1
SELECT * FROM t_order

但分库分表以后真实数据库中 t_order 表就不存在了,而是被拆分成多个子表 t_order_n 分散在不同的数据库内,还按原SQL执行显然是行不通的,这时需要将分表配置中的逻辑表名称改写为路由之后所获取的真实表名称。

1
SELECT * FROM t_order_n

SQL执⾏

将路由和改写后的真实 SQL 安全且高效发送到底层数据源执行。但这个过程并不是简单的将 SQL 通过JDBC 直接发送至数据源执行,而是平衡数据源连接创建以及内存占用所产生的消耗,它会自动化的平衡资源控制与执行效率。

结果归并

将从各个数据节点获取的多数据结果集,合并成一个大的结果集并正确的返回至请求客户端,称为结果归并。而我们SQL中的排序、分组、分页和聚合等语法,均是在归并后的结果集上进行操作的。

四、快速实践

下面我们结合 Springboot + mybatisplus 快速搭建一个分库分表案例。

1、准备工作

先做准备工作,创建两个数据库 ds-0、ds-1,两个库中分别建表 t_order_0、t_order_1、t_order_2 、t_order_item_0、t_order_item_1、t_order_item_2,t_config,方便后边验证广播表、绑定表的场景。

表结构如下:

t_order_0 订单表

1
2
3
4
5
6
7
CREATE TABLE `t_order_0` (
`order_id` bigint(200) NOT NULL,
`order_no` varchar(100) DEFAULT NULL,
`create_name` varchar(50) DEFAULT NULL,
`price` decimal(10,2) DEFAULT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

t_order_0 与 t_order_item_0 互为关联表

1
2
3
4
5
6
7
CREATE TABLE `t_order_item_0` (
`item_id` bigint(100) NOT NULL,
`order_no` varchar(200) NOT NULL,
`item_name` varchar(50) DEFAULT NULL,
`price` decimal(10,2) DEFAULT NULL,
PRIMARY KEY (`item_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

广播表 t_config

1
2
3
4
5
6
  `id` bigint(30) NOT NULL,
`remark` varchar(50) CHARACTER SET utf8 DEFAULT NULL,
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`last_modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

ShardingSphere 提供了4种分片配置方式:

  • Java 代码配置

  • Yaml 、properties 配置

  • Spring 命名空间配置

  • Spring Boot配置

为让代码看上去更简洁和直观,后边统一使用 properties 配置的方式,引入 shardingsphere 对应的 sharding-jdbc-spring-boot-starter 和 sharding-core-common 包,版本统一用的 4.0.0-RC1。

2、分片配置

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.0.0-RC1</version>
</dependency>

<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-core-common</artifactId>
<version>4.0.0-RC1</version>
</dependency>

准备工作做完( mybatis 搭建就不赘述了),接下来我们逐一解读分片配置信息。

我们首先定义两个数据源 ds-0、ds-1,并分别加上数据源的基础信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 定义两个全局数据源
spring.shardingsphere.datasource.names=ds-0,ds-1

# 配置数据源 ds-0
spring.shardingsphere.datasource.ds-0.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds-0.driverClassName=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds-0.url=jdbc:mysql://127.0.0.1:3306/ds-0?useUnicode=true&characterEncoding=utf8&tinyInt1isBit=false&useSSL=false&serverTimezone=GMT
spring.shardingsphere.datasource.ds-0.username=root
spring.shardingsphere.datasource.ds-0.password=root

# 配置数据源 ds-1
spring.shardingsphere.datasource.ds-1.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds-1.driverClassName=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds-1.url=jdbc:mysql://127.0.0.1:3306/ds-1?useUnicode=true&characterEncoding=utf8&tinyInt1isBit=false&useSSL=false&serverTimezone=GMT
spring.shardingsphere.datasource.ds-1.username=root
spring.shardingsphere.datasource.ds-1.password=root

配置完数据源接下来为表添加分库和分表策略,使用 sharding-jdbc 做分库分表需要我们为每一个表单独设置分片规则。

1
2
3
# 配置分片表 t_order
# 指定真实数据节点
spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds-$->{0..1}.t_order_$->{0..2}

actual-data-nodes 属性指定分片的真实数据节点,$是一个占位符,{0..1}表示实际拆分的数据库表数量。

ds-$->{0..1}.t_order_$->{0..2} 表达式相当于 6个数据节点

  • ds-0.t_order_0
  • ds-0.t_order_1
  • ds-0.t_order_2
  • ds-1.t_order_0
  • ds-1.t_order_1
  • ds-1.t_order_2
1
2
3
4
5
### 分库策略
# 分库分片健
spring.shardingsphere.sharding.tables.t_order.database-strategy.inline.sharding-column=order_id
# 分库分片算法
spring.shardingsphere.sharding.tables.t_order.database-strategy.inline.algorithm-expression=ds-$->{order_id % 2}

为表设置分库策略,上边讲了 sharding-jdbc 它提供了四种分片策略,为快速搭建我们先以最简单的行内表达式分片策略来实现,在下一篇会介绍四种分片策略的详细用法和使用场景。

database-strategy.inline.sharding-column 属性中 database-strategy 为分库策略,inline 为具体的分片策略,sharding-column 代表分片健。

database-strategy.inline.algorithm-expression 是当前策略下具体的分片算法,ds-$->{order_id % 2} 表达式意思是 对 order_id字段进行取模分库,2 代表分片库的个数,不同的策略对应不同的算法,这里也可以是我们自定义的分片算法类。

1
2
3
4
5
6
7
8
9
# 分表策略
# 分表分片健
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_id
# 分表算法
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expression=t_order_$->{order_id % 3}
# 自增主键字段
spring.shardingsphere.sharding.tables.t_order.key-generator.column=order_id
# 自增主键ID 生成方案
spring.shardingsphere.sharding.tables.t_order.key-generator.type=SNOWFLAKE

分表策略 和 分库策略 的配置比较相似,不同的是分表可以通过 key-generator.column 和 key-generator.type 设置自增主键以及指定自增主键的生成方案,目前内置了SNOWFLAKE 和 UUID 两种方式,还能自定义的主键生成算法类,后续会详细的讲解。

1
2
# 绑定表关系
spring.shardingsphere.sharding.binding-tables= t_order,t_order_item

必须按相同分片健进行分片的表才能互为成绑定表,在联合查询时就能避免出现笛卡尔积查询。

1
2
# 配置广播表
spring.shardingsphere.sharding.broadcast-tables=t_config

广播表,开启 SQL解析日志,能清晰的看到 SQL分片解析的过程

1
2
# 是否开启 SQL解析日志
spring.shardingsphere.props.sql.show=true

3、验证分片

分片配置完以后我们无需在修改业务代码了,直接执行业务逻辑的增、删、改、查即可,接下来验证一下分片的效果。

我们同时向 t_order、t_order_item 表插入 5条订单记录,并不给定主键 order_id ,item_id 字段值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public String insertOrder() {

for (int i = 0; i < 4; i++) {
TOrder order = new TOrder();
order.setOrderNo("A000" + i);
order.setCreateName("订单 " + i);
order.setPrice(new BigDecimal("" + i));
orderRepository.insert(order);

TOrderItem orderItem = new TOrderItem();
orderItem.setOrderId(order.getOrderId());
orderItem.setOrderNo("A000" + i);
orderItem.setItemName("服务项目" + i);
orderItem.setPrice(new BigDecimal("" + i));
orderItemRepository.insert(orderItem);
}
return "success";
}

看到订单记录被成功分散到了不同的库表中, order_id 字段也自动生成了主键ID,基础的分片功能就完成了。

基础分片

那向广播表 t_config 中插入一条数据会是什么效果呢?

1
2
3
4
5
6
7
8
9
public String config() {

TConfig tConfig = new TConfig();
tConfig.setRemark("我是广播表");
tConfig.setCreateTime(new Date());
tConfig.setLastModifyTime(new Date());
configRepository.insert(tConfig);
return "success";
}

发现所有库中 t_config 表都执行了这条SQL,广播表和 MQ广播订阅的模式很相似,所有订阅的客户端都会收到同一条消息。

广播表

简单SQL操作验证没问通,接下来在试试复杂一点的联合查询,前边我们已经把 t_order 、t_order_item 表设为绑定表,直接联表查询执行一下。

关联查询

通过控制台日志发现,逻辑表SQL 经过解析以后,只对 t_order_0 和 t_order_item_0 表进行了关联产生一条SQL。

绑定表SQL

那如果不互为绑定表又会是什么情况呢?去掉 spring.shardingsphere.sharding.binding-tables试一下。

发现控制台解析出了 3条真实表SQL,而去掉 order_id 作为查询条件再次执行后,结果解析出了 9条SQL,进行了笛卡尔积查询。所以相比之下绑定表的优点就不言而喻了。

笛卡尔积查询

五、总结

以上对分库分表中间件 sharding-jdbc 的基础概念做了简单梳理,快速的搭建了一个分库分表案例,但这只是实践分库分表的第一步,下一篇我们会详细的介绍四种分片策略的具体用法和使用场景(必知必会),后边将陆续讲解自定义分布式主键、分布式数据库事务、分布式服务治理,数据脱敏等。

消息队列基础补充
消息队列基础概念

消息队列基础概念

消息队列提供的功能

  • 异步处理
  • 流量控制
  • 消息解耦

队列和主题的区别

最初的消息队列,就是一个严格意义上的队列。在计算机领域,“队列(Queue)”是一种数据结构,有完整而严格的定义。在维基百科中,队列的定义是这样的:

队列是先进先出(FIFO, First-In-First-Out)的线性表(Linear List)。在具体应用中通常用链表或者数组来实现。队列只允许在后端(称为 rear)进行插入操作,在前端(称为 front)进行删除操作。

早期的消息队列,就是按照“队列”的数据结构来设计的。我们一起看下这个图,生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为“队列”。

这就是最初的一种消息模型:队列模型。
queue.jpeg

如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息,例如,对于一份订单数据,风控系统、分析系统、支付系统等都需要接收消息。这个时候,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。

为了解决这个问题,演化出了另外一种消息模型:“发布 - 订阅模型(Publish-Subscribe Pattern)”。

queue2.jpeg

在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。

保障消息队列不丢失

其实,现在主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。虽然不同的消息队列提供的 API 不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。

检测消息丢失的方法
  • 分布式链路跟踪系统
  • 根据生产者标示+分区号+递增消息号判断
  • 生产者保存需要核对是否丢失的数据,消费者消费完之后需要与生产者核对数据

像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

如何确保消息不丢失

一条消息从生产到消费完成这个过程,可以划分三个阶段:

queue1.jpeg
  1. 生产阶段

    在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

    只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

    你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。

  2. 存储阶段

    在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

    如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

    对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

    如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

  3. 消费阶段

    消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

    你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

对于kafka的相关使用可以参考之前的一篇文章【消息队列(一)-如何解决消息丢失】

解决消息重复问题

消息重复的情况必然存在

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

这个服务质量标准不仅适用于 MQTT,对所有的消息队列都是适用的。我们现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

利用幂等性解决重复消息问题
  1. 利用数据库的唯一约束实现幂等

    不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

  2. 为更新的数据设置前置条件

    另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
    但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

  3. 记录并检查操作
    如果上面提到的两种实现幂等方法都不能适用于你的场景,还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。

对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题。

对于kafka的相关使用可以参考之前的一篇文章【消息队列(二)-消息幂等】

处理消息积压

优化消息收发性能,预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。

对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题。快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量。

高并发系统-数据库关键点梳理
奇怪的知识又增加了- BLNJ导致索引有序性失效

奇怪的知识又增加了- BLNJ导致索引有序性失效

先来看表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE a (
`id`bigint AUTO_INCREMENT ,
`a` int,
`b` int,
PRIMARY KEY (`id`),
KEY `idx_a_b` (`a`,`b`)
);

CREATE TABLE b (
`id`bigint AUTO_INCREMENT ,
`b` int,
`c` int,
PRIMARY KEY (`id`)
)

看一下join语句,因为b上没有索引,所以mysql用的BLNJ:

1
2
3
4
explain select * from a 
join b using(b)
where a = 1
order by a, b;
id select_type table partitions type possible_keys key key_len ref rows filtered extra
1 SIMPLE a null ref idx_a_b idx_a_b 4 const 5206 100.00 Using temporary; Using filesort
1 SIMPLE b null ALL null null null Null 1000 100.00 Using where; Using join buffer (Block Nested Loop)

如果b表有索引的话:

1
2
3
4
5
6
7
CREATE TABLE b (
`id`bigint AUTO_INCREMENT ,
`b` int,
`c` int,
PRIMARY KEY (`id`),
KEY `idx_b` (`b`)
)
id select_type table partitions type possible_keys key key_len ref rows filtered extra
1 SIMPLE a null ref idx_a_b idx_a_b 8 Const 5206 100.00 Using index condition
1 SIMPLE b null Ref idx_b Idx_b 4 b.b 50 100.00 null

可以发现a表idx_a_b有序性没有利用上,至于原因,先看一下BNLJ执行的流程图:

BNLJ.jpeg

执行过程为:

  1. 扫描表 t1,顺序读取数据行放入 join_buffer 中,直到 join_buffer 满了,继续第 2 步;
  2. 扫描表 t2,把 t2 中的每一行取出来,跟 join_buffer 中的数据做对比,满足 join 条件的,作为结果集的一部分返回;
  3. 清空 join_buffer;
  4. 继续扫描表 t1,顺序读取之后数据放入 join_buffer 中,继续执行第 2 步,直到所有数据读取完毕。

其中隐含的问题在于第二步:即使t1表的数据是有序读取到join_buffer中的,由于是先扫描t2表再关联join_buffer数据,导致join_buffer中的有序性失效。

如果表b有索引idx_b,那么使用BKA算法第二步的关联顺序与BNLJ相反,是先扫描join_buffer后通过索引关联t2,则可以利用join_buffer中的有序数据。

为什么引入间隙锁

为什么引入间隙锁

为了便于说明问题,我们先使用一个小一点儿的表,建表和初始化语句如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE `t` (
`id` int(11) NOT NULL,
`c` int(11) DEFAULT NULL,
`d` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `c` (`c`)
) ENGINE=InnoDB;

insert into t values
(0,0,0),
(5,5,5),
(10,10,10),
(15,15,15),
(20,20,20),
(25,25,25);

这个表除了主键 id 外,还有一个索引 c,初始化语句在表中插入了 6 行数据。

下面的语句序列,是怎么加锁的,加的锁又是什么时候释放的呢?

1
select * from t where d = 5 for update;

比较好理解的是,这个语句会命中 d = 5 的这一行,对应的主键 id = 5,因此在 select 语句执行完成后,会在id = 5 这一行主键上加一个写锁,而且由于两阶段锁协议,这个写锁会在执行 commit 语句的时候释放。

由于字段 d 上没有索引,因此这条查询语句会做全表扫描。那么,其他被扫描到的,但是不满足条件的 5 行记录上,会不会被加锁呢?

我们知道,InnoDB 的默认事务隔离级别是可重复读,所以本文接下来没有特殊说明的部分,都是设定在可重复读隔离级别下。

幻读是什么?

现在,我们就来分析一下,假设只在 id = 5 这一行加锁,而其他行的不加锁的话,会怎么样。

下面先来看一下这个场景(这个结果是建立在前面假设之上,实际上是错误的):

img

假设只在 id = 5 这一行加行锁,可以看到,session A 里执行了三次查询,分别是 Q1、Q2 和 Q3。它们的 SQL 语句相同,都是 select * fom t where d=5 for update。我们来看一下这三条 SQL 语句,分别会返回什么结果。

  1. Q1 只返回 id = 5 这一行;
  2. 在 T2 时刻,session B 把 id = 0 这一行的 d 值改成了 5,因此 T3 时刻 Q2 查出来的是 id = 0 id = 5 这两行;
  3. 在 T4 时刻,session C 又插入一行(1,1,5),因此 T5 时刻 Q3 查出来的是 id = 0id = 1id = 5 的这三行。

其中,Q3 读到 id = 1 这一行的现象,被称为“幻读”。也就是说,幻读指的是一个事务在前后两次查询同一个范围的时候,后一次查询看到了前一次查询没有看到的行。

这里,我需要对“幻读”做一个说明:

  1. 在可重复读隔离级别下,普通的查询是快照读,是不会看到别的事务插入的数据的。因此,幻读在当前读下才会出现。
  2. 上面 session B 的修改结果,被 session A 之后的 select 语句用当前读看到,不能称为幻读。幻读仅专指新插入的行。

如果只从我们学到的事务可见性规则来分析的话,上面这三条 SQL 语句的返回结果都没有问题。

因为这三个查询都是加了 for update,都是当前读。而当前读的规则,就是要能读到所有已经提交的记录的最新值。并且,session B 和 sessionC 的两条语句,执行后就会提交,所以 Q2 和 Q3 就是应该看到这两个事务的操作效果,而且也看到了,这跟事务的可见性规则并不矛盾。

幻读有什么问题?

首先是语义上的。session A 在 T1 时刻就声明了,“我要把所有 d=5 的行锁住,不准别的事务进行读写操作”。所以我们假设只锁了id = 5这一行的语义与select * from t where d = 5 for update 不同。

其次,是数据一致性的问题。 这个数据不一致到底是怎么引入的?肯定是前面的假设有问题。

我们把扫描过程中碰到的行,也都加上写锁,再来看看执行效果。

img

由于 session A 把所有的行都加了写锁,所以 session B 在执行第一个 update 语句的时候就被锁住了。需要等到 T6 时刻 session A 提交以后,session B 才能继续执行。

这样对于 id = 0 这一行,在数据库里的最终结果还是 (0,5,5)。在 binlog 里面,执行序列是这样的:

1
2
3
4
5
6
7
insert into t values(1,1,5); /*(1,1,5)*/
update t set c=5 where id=1; /*(1,5,5)*/

update t set d=100 where d=5;/* 所有 d=5 的行,d 改成 100*/

update t set d=5 where id=0; /*(0,0,5)*/
update t set c=5 where id=0; /*(0,5,5)*/

可以看到,按照日志顺序执行,id = 0 这一行的最终结果也是 (0,5,5)。所以,id = 0 这一行的问题解决了。

但同时你也可以看到,id = 1 这一行,在数据库里面的结果是 (1,5,5),而根据 binlog 的执行结果是 (1,5,100),也就是说幻读的问题还是没有解决。为什么我们已经这么“凶残”地,把所有的记录都上了锁,还是阻止不了 id = 1 这一行的插入和更新呢?

原因很简单。在 T3 时刻,我们给所有行加锁的时候,id = 1 这一行还不存在,不存在也就加不上锁。

也就是说,即使把所有的记录都加上锁,还是阻止不了新插入的记录,这也是为什么“幻读”会被单独拿出来解决的原因。

如何解决幻读?

现在你知道了,产生幻读的原因是,行锁只能锁住行,但是新插入记录这个动作,要更新的是记录之间的“间隙”。因此,为了解决幻读问题,InnoDB 只好引入新的锁,也就是间隙锁 (Gap Lock)。

顾名思义,间隙锁,锁的就是两个值之间的空隙。比如文章开头的表 t,初始化插入了 6 个记录,这就产生了 7 个间隙。

img

这样,当你执行 select * from t where d=5 for update 的时候,就不止是给数据库中已有的 6 个记录加上了行锁,还同时加了 7 个间隙锁。这样就确保了无法再插入新的记录。

也就是说这时候,在一行行扫描的过程中,不仅将给行加上了行锁,还给行两边的空隙,也加上了间隙锁。

现在你知道了,数据行是可以加上锁的实体,数据行之间的间隙,也是可以加上锁的实体。但是间隙锁跟我们之前碰到过的锁都不太一样。

比如行锁,分成读锁和写锁。下图就是这两种类型行锁的冲突关系。

img

也就是说,跟行锁有冲突关系的是“另外一个行锁”。

但是间隙锁不一样,跟间隙锁存在冲突关系的,是“往这个间隙中插入一个记录”这个操作。间隙锁之间都不存在冲突关系。

这句话不太好理解,我给你举个例子:

img

这里 session B 并不会被堵住。因为表 t 里并没有 c = 7 这个记录,因此 session A 加的是间隙锁 (5,10)。而 session B 也是在这个间隙加的间隙锁。它们有共同的目标,即:保护这个间隙,不允许插入值。但,它们之间是不冲突的。

间隙锁和行锁合称 next-key lock,每个 next-key lock 是前开后闭区间。也就是说,我们的表 t 初始化以后,如果用 select * from t for update 要把整个表所有记录锁起来,就形成了 7 个 next-key lock,分别是 (-∞,0]、(0,5]、(5,10]、(10,15]、(15,20]、(20, 25]、(25, +supremum]。

备注:这篇文章中,如果没有特别说明,我们把间隙锁记为开区间,把 next-key lock 记为前开后闭区间。

你可能会问说,这个 supremum 从哪儿来的呢?

这是因为 +∞是开区间。实现上,InnoDB 给每个索引加了一个不存在的最大值 supremum,这样才符合我们前面说的“都是前开后闭区间”。

间隙锁和 next-key lock 的引入,帮我们解决了幻读的问题,但同时也带来了一些“困扰”。

对应到我们这个例子的表来说,业务逻辑这样的:任意锁住一行,如果这一行不存在的话就插入,如果存在这一行就更新它的数据,代码如下:

1
2
3
4
5
6
7
8
9
begin;
select * from t where id=N for update;

/* 如果行不存在 */
insert into t values(N,N,N);
/* 如果行存在 */
update t set d=N set id=N;

commit;

这个逻辑一旦有并发,就会碰到死锁。你一定也觉得奇怪,这个逻辑每次操作前用 for update 锁起来,已经是最严格的模式了,怎么还会有死锁呢?

这里,我用两个 session 来模拟并发,并假设 N=9。

img

图 8 间隙锁导致的死锁

你看到了,其实都不需要用到后面的 update 语句,就已经形成死锁了。我们按语句执行顺序来分析一下:

  1. session A 执行 select … for update 语句,由于 id = 9 这一行并不存在,因此会加上间隙锁 (5,10);
  2. session B 执行 select … for update 语句,同样会加上间隙锁 (5,10),间隙锁之间不会冲突,因此这个语句可以执行成功;
  3. session B 试图插入一行 (9,9,9),被 session A 的间隙锁挡住了,只好进入等待;
  4. session A 试图插入一行 (9,9,9),被 session B 的间隙锁挡住了。

至此,两个 session 进入互相等待状态,形成死锁。当然,InnoDB 的死锁检测马上就发现了这对死锁关系,让 session A 的 insert 语句报错返回了。

你现在知道了,间隙锁的引入,可能会导致同样的语句锁住更大的范围,这其实是影响了并发度的

你可能会说,为了解决幻读的问题,我们引入了这么一大串内容,有没有更简单一点的处理方法呢。

我在文章一开始就说过,如果没有特别说明,今天和你分析的问题都是在可重复读隔离级别下的,间隙锁是在可重复读隔离级别下才会生效的。所以,你如果把隔离级别设置为读提交的话,就没有间隙锁了。但同时,你要解决可能出现的数据和日志不一致问题,需要把 binlog 格式设置为 row。这,也是现在不少公司使用的配置组合。

高性能MySql 4至6章读书笔记

高性能MySql 4至6章读书笔记

第四章 Schema设计

选择优化的数据类型

更小的通常更好,但是要确保没有低估需要存储的值的范围,因为在schema中的多个地方增加数据类型范围是一个十分耗时的操作。

简单就好

简单的数据类型的操作通常需要更少的cpu时间。

尽量避免NULL

可为NULL的列使得索引、索引比统计和值比较都更为复杂。

当然也有例外,InnoDB使用单独的位存储NULL值, 所以对于稀疏数据有很好的空间效率。

选择标识符

一旦选定一种类型,要确保在所有的关联表中都使用同样的类型。类型之间需要精确匹配,包括像UNSIGNED这样的属性。尽量只用整型定义标识符。

注意可变长字符串

其在临时表和排序时可能导致悲观的按最大长度分配内存

范式与反范式

范式是好的,但是反范式有时也是必须的,并且能带来好处。

第五章 创建高性能索引

B-Tree 索引的查询类型
  • 全值匹配: 指的是和索引的所有列进行匹配
  • 匹配最左前缀: 查找索引前几列进行匹配
  • 匹配列前缀: 只匹配某一列的值的开头部分
  • 匹配范围值: 查找索引某一范围的值
  • 精确匹配某一列并范围匹配另外一列
  • 只访问索引的查询:覆盖索引
B-Tree 索引的限制
  • 如果不是按照索引的最左列开始查找,则无法使用索引
  • 不能跳过索引中的列
  • 如果查询中有某个列的范围查询,则其右边的所有列都无法使用索引:例如查询 索引为key(last_name, fisrt_name, dob)
    1
    where last_name = 'a' and first_name like 'J%' and dob = '1877-12-23'
索引的优点
  1. 大大减少服务器需要扫描的数据量
  2. 帮助服务器避免排序和临时表
  3. 将随机I/O变为顺序I/O
高性能索引-独立的列

如果查询中列不是独立的,则mysql不会使用索引

1
2
select actor_id from sakila.actor where actor_id + 1 = 5;

高性能索引-前缀索引和索引选择性

有时候需要索引很长的字符列,通常可以索引开始部分的字符,同时也会降低索引的选择性。

索引的选择性指的是,不重复的索引值和数据表的记录总数的比值。索引的选择性越高表示查询效率越高,因为选择性高的索引可以过滤掉更多的行。

前缀索引是一种能使索引更小,更快的有效方法,但是也有其缺点:前缀索引无法做order by 和 group by,也无法使用前缀索引做覆盖索引。

高性能索引-多列索引

最容易遇到的困惑是多列索引的顺序,正确的顺序依赖于使用索引的查询,同时需要考虑如何更好的满足排序和分组需要。对于如何选择索引的列顺序有一个经验法则:将选择性最高的列放在索引的最前列。只有不需要考虑排序和分组时,将选择性跟高的列放在最前面通常是最好的,但是考虑问题需要更全面,避免随机I/O和排序更加重要。

高性能索引-覆盖索引

如果一个索引包含所需要查询的字段的值,我们就可以称之为“覆盖索引”

覆盖索引的好处:

  • 索引条目通常远小于数据行大小,所以如果只需要读取索引,那mysql就会极大的减少数据访问量。
  • 因为索引是按照列值顺序存储的,所以对于I/O密集型范围查询回避随机从磁盘读取每一行数据的I/O要小的多
  • 由于Innodb的聚簇索引,覆盖索引对Innodb表特别有用,可以避免对主键索引的二次查询。

覆盖索引的陷阱:

1
select * from products where actor = 'SEAN CARREY' and title like '%APOLLO%';

  • 没有索引能够覆盖这个查询,因为查询从表中选择了所有的列
  • mysql不能再索引中执行like操作,只能做最左前缀匹配

高性能索引-使用索引扫描来做排序

mysql有两种方式可以生成有序的结果:通过排序操作;或者使用索引顺序扫描。mysql可以使用同一个索引既满足排序,有用于查找行。

只有当索引的列顺序和ORDER BY子句的顺序完全一致,并且所有列的排序方向都一样时,mysql才能使用索引来对结果做排序。如果查询需要关联多张表,则只有当ORDER BY子句引用的字段全部为第一个表时,才能用索引做排序。ORDER BY子句和查询的限制是一样的:需要满足索引的最左前缀的要求。

有一种情况下ORDER BY子句可以不满足索引的最左前缀要求:

1
2
select rental_id, staff_id from sakila.rental where rental_date = '2005-05-25'
order by inventory_id, customer_id;

索引为key(rental_date, inventory_id, customer_id),前导列为常量的时候,如果where子句或者join子句中对这些列指定了常量,就可以弥补ORDER BY的不足。

1
2
where rental_date > '2005-12-25' order by inventory_id, customer_id;
where rental_date = '2005-12-25' and inventory_id in (1, 2) order by cusomter_id;

对于索引上是范围查询,mysql无法使用之后的索引列

高性能索引-使用索引扫描减少锁

索引可以让查询锁定更少的行,如果你的查询从不访问那些不需要的行,那么就会锁定更少的行。但这只有当innoDB在存储引擎层能够过滤掉所有不需要的行是才有效。如果索引无法过滤掉无效的行,那么innoDB检索到数据并返回给服务器层后,innoDB已经锁定这些行了(mysql 5.6后没有这个问题)。

高性能索引-避免多个范围条件

下面的查询:

1
where last_online > date_sub(now(), interval 7 day) and age bwtween 18 and 25

这个查询有一个问题:它有两个范围条件,last_online和age列,mysql可以使用last_online的索引或者是age列的索引,但是无法同时使用它们。

高性能索引-延迟关联优化分页

如果一个查询匹配结果有上百万行的话会怎样?

1
select * from profiles where sex = 'm' order by rating limit 10;

即使有索引,如果用户界面需要翻页,并且翻页到比较靠后的地方也会非常慢,如:

1
select * from profiles where sex = 'm' order by rating limit 1000000, 10;

无论如何创建索引,这种查询都是个严重的问题,mysql需要花费大量时间来扫描需要丢弃的数据。其中一个解决的办法是限制能够翻页的数量。

优化这类索引的另一个比较好的策略是使用延迟关联,通过使用覆盖索引返回需要的主建,再根据这些主建回主表获得需要的行,这样可以减少mysql扫描需要丢弃的行数。

1
2
3
4
5
6
select * from profiles innner join 
(
select id fomr profiles p where p.sex = 'm' order by rating limit 1000000, 10

) as t using (id);

第六章 慢查询优化

优化数据访问

查询性能低下最基本的原因是访问数据太多。某些查询可能不可避免的需要筛选大量数据,单这并不常见。大部分性能低下的查询都可以通过减少访问的数据量的方式进行优化。对于低效的查询,我们发现通过下面几个步骤来分析总是很有效:

  1. 确认应用程序时候检索大量超过需要的数据。
  2. 确认mysql服务层是否在分许大量超过需要的数据行。

第一种情况可以使用limit和选择需要的列来解决。在确定查询只返回需要的数据之后,接下来应该看看查询为了返回结果是否扫描了过多的数据,对于mysql有三个衡量查询开销的指标:

  • 响应时间
  • 扫描的行数
  • 返回的行数

没有那个指标能完美地衡量查询的开销,但它们大致反映了mysql内部查询时需要访问多少数据,并可以大概推算出查询运行的时间。

在评估查询开销的时候,需要考虑一下从表中找到某一行数据的成本。mysql有好几种访问方式可以查找并返回一行结果。有些访问方式可能需要扫描很多行才能返回一行结果,也有些访问方式可能无需扫描就能返回结果。

在explain语句中的type列反应了访问类型。访问类型有很多种,从全表扫描到索引扫描、范围扫描、唯一索引查询、常数引用等。这里列的这些,速度是从慢到快,扫描行数也是从大到小。如果查询没有办法找到合适是访问类型,那么解决的最好办法通常是增加一个合适的索引。

一般mysql能够使用如下三种方式应用where条件,从好到坏依次为:

  • 从索引中使用where条件吗来过滤不匹配的记录,这是在存储引擎层完成的。
  • 使用索引覆盖扫描来返回记录(extra出现using index),直接从索引中过滤不需要的数据并返回命中结果。这是在mysql服务层完成的,但无需再回表查询记录。
  • 从数据表中返回数据,然后过滤不满足条件的记录(extra出现using where)。这是在mysql服务器层完成,mysql需要从数据表中读出记录然后过滤。

如果发现查询需要扫描大量的数据但只返回少数的行,那么可以使用下面的技巧去优化它:

  • 使用覆盖索引,把需要用的列都放到索引中
  • 改变库表结构。例如使用单独的汇总表
  • 重写复杂的查询, 让mysql优化器能够以更加高效的方式执行这个查询

重构查询方式-切分查询

有时候对于一个大查询我们需要分而治之,将大查询分成小查询。删除旧数据是一个很好的例子,如果用一个大的语句一次性完成,则可能一次需要锁住很多数据、占满整个事务日志、耗尽系统资源、阻塞恨到重要的查询。

重构查询方式-分解关联查询

例如下面这个查询:

1
2
3
4
select * from tag
join tag_post on tag_post.tag_id = tag.id
join post on tag_post.post_id = post.id
where tag.tag = 'mysql';

可以分解成下面这个查询来代替:

1
2
3
4
5
select * from tag where tag = 'mysql';

select * from tag_post where tag_id = 1234;

select * from post where post.id in (123,456,567);

用分解关联查询的方式重构查询有如下的优势:

  • 让缓存效率更高。许多应用程序可以方便地缓存单表查询对应的结果对象。
  • 将查询分解后,执行单个查询可以减少锁的竞争。
  • 在应用层做关联,可以更容易对数据库进行拆分,更容易做到高性能和高扩展。
  • 查询本身效率可能提升,使用in代替关联查询,可能比随机关联更高效。
  • 减少冗余记录的查询
  • 相当于在应用层实现了哈希关联

重构查询方式-优化关联查询

  • 确保on或者using子句上的列上有索引。
  • 确保任何group by和 order by 的表达式中只涉及到一个表中的列,这样mysql才有可能使用索引来优化这个过程
消息队列(二)-消息幂等

消息队列(二)-消息幂等

什么是幂等

幂等是一个数学上的概念,它的含义是多次执行同一个操作和执行一次操作,最终得到的结果是相同的。

如果我们消费一条消息的时候,要给现有的库存数量减 1,那么如果消费两条相同的消息就会给库存数量减 2,这就不是幂等的。而如果消费一条消息后,处理逻辑是将库存的数量设置为 0,或者是如果当前库存数量是 10 时则减 1,这样在消费多条消息时,所得到的结果就是相同的,这就是幂等的。

说白了,你可以这么理解“幂等”:一件事儿无论做多少次都和做一次产生的结果是一样的,那么这件事儿就具有幂等性。

在生产、消费过程中增加消息幂等性的保证

消息在生产和消费的过程中都可能会产生重复,所以你要做的是,在生产过程和消费过程中增加消息幂等性的保证,这样就可以认为从最终结果上来看,消息实际上是只被消费了一次的。

在消息生产过程中,在 Kafka0.11 版本和 Pulsar 中都支持“producer idempotency”的特性,翻译过来就是生产过程的幂等性,这种特性保证消息虽然可能在生产端产生重复,但是最终在消息队列存储时只会存储一份。

它的做法是给每一个生产者一个唯一的 ID,并且为生产的每一条消息赋予一个唯一 ID,消息队列的服务端会存储 < 生产者 ID,最后一条消息 ID> 的映射。当某一个生产者产生新的消息时,消息队列服务端会比对消息 ID 是否与存储的最后一条 ID 一致,如果一致,就认为是重复的消息,服务端会自动丢弃。

loss4.jpg

而在消费端,幂等性的保证会稍微复杂一些,你可以从通用层和业务层两个层面来考虑。

在通用层面,你可以在消息被生产的时候,使用发号器给它生成一个全局唯一的消息 ID,消息被处理之后,把这个 ID 存储在数据库中,在处理下一条消息之前,先从数据库里面查询这个全局 ID 是否被消费过,如果被消费过就放弃消费。

你可以看到,无论是生产端的幂等性保证方式,还是消费端通用的幂等性保证方式,它们的共同特点都是为每一个消息生成一个唯一的 ID,然后在使用这个消息的时候,先比对这个 ID 是否已经存在,如果存在,则认为消息已经被使用过。所以这种方式是一种标准的实现幂等的方式,你在项目之中可以拿来直接使用,它在逻辑上的伪代码就像下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
boolean isIDExisted = selectByID(ID); // 判断 ID 是否存在

if(isIDExisted) {

return; // 存在则直接返回

} else {

process(message); // 不存在,则处理消息

saveID(ID); // 存储 ID

}

不过这样会有一个问题:如果消息在处理之后,还没有来得及写入数据库,消费者宕机了重启之后发现数据库中并没有这条消息,还是会重复执行两次消费逻辑,这时你就需要引入事务机制,保证消息处理和写入数据库必须同时成功或者同时失败,但是这样消息处理的成本就更高了,所以,如果对于消息重复没有特别严格的要求,可以直接使用这种通用的方案,而不考虑引入事务。

在业务层面怎么处理呢?这里有很多种处理方式,其中有一种是增加乐观锁的方式。比如,你的消息处理程序需要给一个人的账号加钱,那么你可以通过乐观锁的方式来解决。

具体的操作方式是这样的:你给每个人的账号数据中增加一个版本号的字段,在生产消息时先查询这个账户的版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额 SQL 的时候带上版本号,类似于执行:

1
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;

你看,我们在更新数据时给数据加了乐观锁,这样在消费第一条消息时,version 值为 1,SQL 可以执行成功,并且同时把 version 值改为了 2;在执行第二条相同的消息时,由于 version 值不再是 1,所以这条 SQL 不能执行成功,也就保证了消息的幂等性。

消息队列(一)-如何解决消息丢失

消息队列(一)-如何解决消息丢失

消息会丢失的环节

消息从被写入到消息队列,到被消费者消费完成,这个链路上会有哪些地方存在丢失消息的可能呢?其实,主要存在三个场景:

  • 消息从生产者写入到消息队列的过程。

  • 消息在消息队列中的存储场景。

  • 消息被消费者消费的过程。

lost1.jpg

1. 在消息生产的过程中丢失消息

消息的生产者一般是我们的业务服务器,消息队列是独立部署在单独的服务器上的。两者之间的网络虽然是内网,但是也会存在抖动的可能,而一旦发生抖动,消息就有可能因为网络的错误而丢失。

针对这种情况,我建议你采用的方案是消息重传:也就是当你发现发送超时后你就将消息重新发一次,但是你也不能无限制地重传消息。一般来说,如果不是消息队列发生故障,或者是到消息队列的网络断开了,重试 2~3 次就可以了。

不过,这种方案可能会造成消息的重复,从而导致在消费的时候会重复消费同样的消息。比方说,消息生产时由于消息队列处理慢或者网络的抖动,导致虽然最终写入消息队列成功,但在生产端却超时了,生产者重传这条消息就会形成重复的消息。

2. 在消息队列中丢失消息

拿 Kafka 举例,消息在 Kafka 中是存储在本地磁盘上的,而为了减少消息存储时对磁盘的随机 I/O,我们一般会将消息先写入到操作系统的 Page Cache 中,然后再找合适的时机刷新到磁盘上。

比如,Kafka 可以配置当达到某一时间间隔,或者累积一定的消息数量的时候再刷盘,也就是所说的异步刷盘。

不过,如果发生机器掉电或者机器异常重启,那么 Page Cache 中还没有来得及刷盘的消息就会丢失了。那么怎么解决呢?

你可能会把刷盘的间隔设置很短,或者设置累积一条消息就就刷盘,但这样频繁刷盘会对性能有比较大的影响,而且从经验来看,出现机器宕机或者掉电的几率也不高,所以我不建议你这样做。

loss2.jpg

如果你的系统对消息丢失的容忍度很低,那么你可以考虑以集群方式部署 Kafka 服务,通过部署多个副本备份数据,保证消息尽量不丢失。

那么它是怎么实现的呢?

Kafka 集群中有一个 Leader 负责消息的写入和消费,可以有多个 Follower 负责数据的备份。Follower 中有一个特殊的集合叫做 ISR(in-sync replicas),当 Leader 故障时,新选举出来的 Leader 会从 ISR 中选择,默认 Leader 的数据会异步地复制给 Follower,这样在 Leader 发生掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。

由于默认消息是异步地从 Leader 复制到 Follower 的,所以一旦 Leader 宕机,那些还没有来得及复制到 Follower 的消息还是会丢失。为了解决这个问题,Kafka 为生产者提供一个选项叫做“acks”,当这个选项被设置为“all”时,生产者发送的每一条消息除了发给 Leader 外还会发给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了,消息才会丢失。

loss3.jpg

从上面这张图来看,当设置“acks=all”时,需要同步执行 1,3,4 三个步骤,对于消息生产的性能来说也是有比较大的影响的,所以你在实际应用中需要仔细地权衡考量。我给你的建议是:

  1. 如果你需要确保消息一条都不能丢失,那么建议不要开启消息队列的同步刷盘,而是需要使用集群的方式来解决,可以配置当所有 ISR Follower 都接收到消息才返回成功。

  2. 如果对消息的丢失有一定的容忍度,那么建议不部署集群,即使以集群方式部署,也建议配置只发送给一个 Follower 就可以返回成功了。

  3. 我们的业务系统一般对于消息的丢失有一定的容忍度,比如说以上面的红包系统为例,如果红包消息丢失了,我们只要后续给没有发送红包的用户补发红包就好了。

3. 在消费的过程中存在消息丢失的可能

我还是以 Kafka 为例来说明。一个消费者消费消息的进度是记录在消息队列集群中的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。

这里面接收消息和处理消息的过程都可能会发生异常或者失败,比如说,消息接收时网络发生抖动,导致消息并没有被正确的接收到;处理消息时可能发生一些业务的异常导致处理流程未执行完成,这时如果更新消费进度,那么这条失败的消息就永远不会被处理了,也可以认为是丢失了。

所以,在这里你需要注意的是,一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后,消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后,还会重复地消费这条消息。

4. 最佳实践

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
缓存专题(五) 如何解决缓存穿透

缓存专题(五) 如何解决缓存穿透

什么是缓存穿透

缓存穿透其实是指从缓存中没有查到数据,而不得不从后端系统(比如数据库)中查询的情况。你可以把数据库比喻为手机,它是经受不了太多的划痕和磕碰的,所以你需要给它贴个膜再套个保护壳,就能对手机起到一定的保护作用了。

不过,少量的缓存穿透不可避免,对系统也是没有损害的,主要有几点原因:

  • 一方面,互联网系统通常会面临极大数据量的考验,而缓存系统在容量上是有限的,不可能存储系统所有的数据,那么在查询未缓存数据的时候就会发生缓存穿透。
  • 另一方面,互联网系统的数据访问模型一般会遵从“80/20 原则”。“80/20 原则”又称为帕累托法则,是意大利经济学家帕累托提出的一个经济学的理论。它是指在一组事物中,最重要的事物通常只占 20%,而剩余的 80% 的事物确实不重要的。把它应用到数据访问的领域,就是我们会经常访问 20% 的热点数据,而另外的 80% 的数据则不会被经常访问。比如你买了很多衣服,很多书,但是其实经常穿的,经常看的,可能也就是其中很小的一部分。

既然缓存的容量有限,并且大部分的访问只会请求 20% 的热点数据,那么理论上说,我们只需要在有限的缓存空间里存储 20% 的热点数据就可以有效地保护脆弱的后端系统了,也就可以放弃缓存另外 80% 的非热点数据了。所以,这种少量的缓存穿透是不可避免的,但是对系统是没有损害的。

那么什么样的缓存穿透对系统有害呢?答案是大量的穿透请求超过了后端系统的承受范围,造成了后端系统的崩溃。如果把少量的请求比作毛毛细雨,那么一旦变成倾盆大雨,引发洪水,冲倒房屋,肯定就不行了。

产生这种大量穿透请求的场景有很多,接下来,我就带你解析这几种场景以及相应的解决方案。

缓存穿透的解决方案

先来考虑这样一种场景:在你的电商系统的用户表中,我们需要通过用户 ID 查询用户的信息,缓存的读写策略采用 Cache Aside 策略。

那么,如果要读取一个用户表中未注册的用户,会发生什么情况呢?按照这个策略,我们会先读缓存,再穿透读数据库。由于用户并不存在,所以缓存和数据库中都没有查询到数据,因此也就不会向缓存中回种数据(也就是向缓存中设置值的意思),这样当再次请求这个用户数据的时候还是会再次穿透到数据库。在这种场景下,缓存并不能有效地阻挡请求穿透到数据库上,它的作用就微乎其微了。

那如何解决缓存穿透呢?一般来说我们会有两种解决方案:回种空值以及使用布隆过滤器。

我们先来看看第一种方案。

回种空值

回顾上面提到的场景,你会发现最大的问题在于数据库中并不存在用户的数据,这就造成无论查询多少次,数据库中永远都不会存在这个用户的数据,穿透永远都会发生。

类似的场景还有一些:比如由于代码的 bug 导致查询数据库的时候抛出了异常,这样可以认为从数据库查询出来的数据为空,同样不会回种缓存。

那么,当我们从数据库中查询到空值或者发生异常时,我们可以向缓存中回种一个空值。但是因为空值并不是准确的业务数据,并且会占用缓存的空间,所以我们会给这个空值加一个比较短的过期时间,让空值在短时间之内能够快速过期淘汰。下面是这个流程的伪代码:

1
2
3
4
5
6
7
8
9
10
11
Object nullValue = new Object();
try {
Object valueFromDB = getFromDB(uid); // 从数据库中查询数据
if (valueFromDB == null) {
cache.set(uid, nullValue, 10); // 如果从数据库中查询到空值,就把空值写入缓存,设置较短的超时时间
} else {
cache.set(uid, valueFromDB, 1000);
}
} catch(Exception e) {
cache.set(uid, nullValue, 10);
}

回种空值虽然能够阻挡大量穿透的请求,但如果有大量获取未注册用户信息的请求,缓存内就会有有大量的空值缓存,也就会浪费缓存的存储空间,如果缓存空间被占满了,还会剔除掉一些已经被缓存的用户信息反而会造成缓存命中率的下降。

所以这个方案,我建议你在使用的时候应该评估一下缓存容量是否能够支撑。如果需要大量的缓存节点来支持,那么就无法通过通过回种空值的方式来解决,这时你可以考虑使用布隆过滤器。

使用布隆过滤器

1970 年布隆提出了一种布隆过滤器的算法,用来判断一个元素是否在一个集合中。这种算法由一个二进制数组和一个 Hash 算法组成。它的基本思路如下:

我们把集合中的每一个值按照提供的 Hash 算法算出对应的 Hash 值,然后将 Hash 值对数组长度取模后得到需要计入数组的索引值,并且将数组这个位置的值从 0 改成 1。在判断一个元素是否存在于这个集合中时,你只需要将这个元素按照相同的算法计算出索引值,如果这个位置的值为 1 就认为这个元素在集合中,否则则认为不在集合中。

下图是布隆过滤器示意图,我来带你分析一下图内的信息。

nullcache1.jpg

A、B、C 等元素组成了一个集合,元素 D 计算出的 Hash 值所对应的的数组中值是 1,所以可以认为 D 也在集合中。而 F 在数组中的值是 0,所以 F 不在数组中。

那么我们如何使用布隆过滤器来解决缓存穿透的问题呢?

还是以存储用户信息的表为例进行讲解。首先,我们初始化一个很大的数组,比方说长度为 20 亿的数组,接下来我们选择一个 Hash 算法,然后我们将目前现有的所有用户的 ID 计算出 Hash 值并且映射到这个大数组中,映射位置的值设置为 1,其它值设置为 0。

新注册的用户除了需要写入到数据库中之外,它也需要依照同样的算法更新布隆过滤器的数组中,相应位置的值。那么当我们需要查询某一个用户的信息时,我们首先查询这个 ID 在布隆过滤器中是否存在,如果不存在就直接返回空值,而不需要继续查询数据库和缓存,这样就可以极大地减少异常查询带来的缓存穿透。

nullcache2.jpg

布隆过滤器拥有极高的性能,无论是写入操作还是读取操作,时间复杂度都是 O(1),是常量值。在空间上,相对于其他数据结构它也有很大的优势,比如,20 亿的数组需要 2000000000/8/1024/1024 = 238M 的空间,而如果使用数组来存储,假设每个用户 ID 占用 4 个字节的空间,那么存储 20 亿用户需要 2000000000 * 4 / 1024 / 1024 = 7600M 的空间,是布隆过滤器的 32 倍。

不过,任何事物都有两面性,布隆过滤器也不例外,它主要有两个缺陷:

  1. 它在判断元素是否在集合中时是有一定错误几率的,比如它会把不是集合中的元素判断为处在集合中;

  2. 不支持删除元素。

关于第一个缺陷,主要是 Hash 算法的问题。因为布隆过滤器是由一个二进制数组和一个 Hash 算法组成的,Hash 算法存在着一定的碰撞几率。Hash 碰撞的含义是不同的输入值经过 Hash 运算后得到了相同的 Hash 结果。

本来,Hash 的含义是不同的输入,依据不同的算法映射成独一无二的固定长度的值,也就是我输入字符串“1”,根据 CRC32 算法,值是 2212294583。但是现实中 Hash 算法的输入值是无限的,输出值的值空间却是固定的,比如 16 位的 Hash 值的值空间是 65535,那么它的碰撞几率就是 1/65535,即如果输入值的个数超过 65535 就一定会发生碰撞。

那么你可能会问为什么不映射成更长的 Hash 值呢?

因为更长的 Hash 值会带来更高的存储成本和计算成本。即使使用 32 位的 Hash 算法,它的值空间长度是 2 的 32 次幂减一,约等于 42 亿,用来映射 20 亿的用户数据,碰撞几率依然有接近 50%。

Hash 的碰撞就造成了两个用户 ID ,A 和 B 会计算出相同的 Hash 值,那么如果 A 是注册的用户,它的 Hash 值对应的数组中的值是 1,那么 B 即使不是注册用户,它在数组中的位置和 A 是相同的,对应的值也是 1,这就产生了误判。

布隆过滤器的误判有一个特点,就是它只会出现“false positive”的情况。这是什么意思呢?当布隆过滤器判断元素在集合中时,这个元素可能不在集合中。但是一旦布隆过滤器判断这个元素不在集合中时,它一定不在集合中。这一点非常适合解决缓存穿透的问题。为什么呢?

你想,如果布隆过滤器会将集合中的元素判定为不在集合中,那么我们就不确定,被布隆过滤器判定为不在集合中的元素,是不是在集合中。假设在刚才的场景中,如果有大量查询未注册的用户信息的请求存在,那么这些请求到达布隆过滤器之后,即使布隆过滤器判断为不是注册用户,那么我们也不确定它是不是真的不是注册用户,那么就还是需要去数据库和缓存中查询,这就使布隆过滤器失去了价值。

所以你看,布隆过滤器虽然存在误判的情况,但是还是会减少缓存穿透的情况发生,只是我们需要尽量减少误判的几率,这样布隆过滤器的判断正确的几率更高,对缓存的穿透也更少。一个解决方案是:

使用多个 Hash 算法为元素计算出多个 Hash 值,只有所有 Hash 值对应的数组中的值都为 1 时,才会认为这个元素在集合中。

布隆过滤器不支持删除元素的缺陷也和 Hash 碰撞有关。给你举一个例子,假如两个元素 A 和 B 都是集合中的元素,它们有相同的 Hash 值,它们就会映射到数组的同一个位置。这时我们删除了 A,数组中对应位置的值也从 1 变成 0,那么在判断 B 的时候发现值是 0,也会判断 B 是不在集合中的元素,就会得到错误的结论。

那么我是怎么解决这个问题的呢?我会让数组中不再只有 0 和 1 两个值,而是存储一个计数。比如如果 A 和 B 同时命中了一个数组的索引,那么这个位置的值就是 2,如果 A 被删除了就把这个值从 2 改为 1。这个方案中的数组不再存储 bit 位,而是存储数值,也就会增加空间的消耗。所以,你要依据业务场景来选择是否能够使用布隆过滤器,比如像是注册用户的场景下,因为用户删除的情况基本不存在,所以还是可以使用布隆过滤器来解决缓存穿透的问题的。

讲了这么多,关于布隆过滤器的使用上,我也给你几个建议:

  1. 选择多个 Hash 函数计算多个 Hash 值,这样可以减少误判的几率;

  2. 布隆过滤器会消耗一定的内存空间,所以在使用时需要评估你的业务场景下需要多大的内存,存储的成本是否可以接受。

总的来说,回种空值和布隆过滤器是解决缓存穿透问题的两种最主要的解决方案,但是它们也有各自的适用场景,并不能解决所有问题。比方说当有一个极热点的缓存项,它一旦失效会有大量请求穿透到数据库,这会对数据库造成瞬时极大的压力,我们把这个场景叫做“dog-pile effect”(狗桩效应),

这是典型的缓存并发穿透的问题,那么,我们如何来解决这个问题呢?解决狗桩效应的思路是尽量地减少缓存穿透后的并发,方案也比较简单:

  1. 在代码中,控制在某一个热点缓存项失效之后启动一个后台线程,穿透到数据库,将数据加载到缓存中,在缓存未加载之前,所有访问这个缓存的请求都不再穿透而直接返回。

  2. 通过在 Memcached 或者 Redis 中设置分布式锁,只有获取到锁的请求才能够穿透到数据库。

分布式锁的方式也比较简单,比方说 ID 为 1 的用户是一个热点用户,当他的用户信息缓存失效后,我们需要从数据库中重新加载数据时,先向 Memcached 中写入一个 Key 为”lock.1”的缓存项,然后去数据库里面加载数据,当数据加载完成后再把这个 Key 删掉。这时,如果另外一个线程也要请求这个用户的数据,它发现缓存中有 Key 为“lock.1”的缓存,就认为目前已经有线程在加载数据库中的值到缓存中了,它就可以重新去缓存中查询数据,不再穿透数据库了。

缓存专题(四) 分布式缓存高可用

缓存专题(四) 分布式缓存高可用

分布式缓存的高可用方案主要选择的方案有客户端方案、中间代理层方案和服务端方案三大类:

  • 客户端方案就是在客户端配置多个缓存的节点,通过缓存写入和读取算法策略来实现分布式,从而提高缓存的可用性。
  • 中间代理层方案是在应用代码和缓存节点之间增加代理层,客户端所有的写入和读取的请求都通过代理层,而代理层中会内置高可用策略,帮助提升缓存系统的高可用。
  • 服务端方案就是 Redis 2.4 版本后提出的 Redis Sentinel 方案。

掌握这些方案可以帮助你,抵御部分缓存节点故障导致的,缓存命中率下降的影响,增强你的系统的鲁棒性。

客户端方案

在客户端方案中,你需要关注缓存的写和读两个方面:

  • 写入数据时,需要把被写入缓存的数据分散到多个节点中,即进行数据分片;
  • 读数据时,可以利用多组的缓存来做容错,提升缓存系统的可用性。关于读数据,这里可以使用主从和多副本两种策略,两种策略是为了解决不同的问题而提出的。

下面我就带你一起详细地看一下到底要怎么做。

1. 缓存数据如何分片

单一的缓存节点受到机器内存、网卡带宽和单节点请求量的限制,不能承担比较高的并发,因此我们考虑将数据分片,依照分片算法将数据打散到多个不同的节点上,每个节点上存储部分数据。

这样在某个节点故障的情况下,其他节点也可以提供服务,保证了一定的可用性。这就好比不要把鸡蛋放在同一个篮子里,这样一旦一个篮子掉在地上,摔碎了,别的篮子里还有没摔碎的鸡蛋,不至于一个不剩。

一般来讲,分片算法常见的就是 Hash 分片算法和一致性 Hash 分片算法两种。

Hash 分片的算法就是对缓存的 Key 做哈希计算,然后对总的缓存节点个数取余。你可以这么理解:

比如说,我们部署了三个缓存节点组成一个缓存的集群,当有新的数据要写入时,我们先对这个缓存的 Key 做比如 crc32 等 Hash 算法生成 Hash 值,然后对 Hash 值模 3,得出的结果就是要存入缓存节点的序号。

cache1.jpg

这个算法最大的优点就是简单易理解,缺点是当增加或者减少缓存节点时,缓存总的节点个数变化造成计算出来的节点发生变化,从而造成缓存失效不可用。所以我建议你,如果采用这种方法,最好建立在你对于这组缓存命中率下降不敏感,比如下面还有另外一层缓存来兜底的情况下。

当然了,用一致性 Hash 算法可以很好地解决增加和删减节点时,命中率下降的问题。在这个算法中,我们将整个 Hash 值空间组织成一个虚拟的圆环,然后将缓存节点的 IP 地址或者主机名做 Hash 取值后,放置在这个圆环上。当我们需要确定某一个 Key 需要存取到哪个节点上的时候,先对这个 Key 做同样的 Hash 取值,确定在环上的位置,然后按照顺时针方向在环上“行走”,遇到的第一个缓存节点就是要访问的节点。比方说下面这张图里面,Key 1 和 Key 2 会落入到 Node 1 中,Key 3、Key 4 会落入到 Node 2 中,Key 5 落入到 Node 3 中,Key 6 落入到 Node 4 中。

cache2.jpg

这时如果在 Node 1 和 Node 2 之间增加一个 Node 5,你可以看到原本命中 Node 2 的 Key 3 现在命中到 Node 5,而其它的 Key 都没有变化;同样的道理,如果我们把 Node 3 从集群中移除,那么只会影响到 Key 5 。所以你看,在增加和删除节点时,只有少量的 Key 会“漂移”到其它节点上,而大部分的 Key 命中的节点还是会保持不变,从而可以保证命中率不会大幅下降。

cache3.jpg

不过,事物总有两面性。虽然这个算法对命中率的影响比较小,但它还是存在问题:

  • 缓存节点在圆环上分布不平均,会造成部分缓存节点的压力较大;当某个节点故障时,这个节点所要承担的所有访问都会被顺移到另一个节点上,会对后面这个节点造成压力。
  • 一致性 Hash 算法的脏数据问题。

极端情况下,比如一个有三个节点 A、B、C 承担整体的访问,每个节点的访问量平均,A 故障后,B 将承担双倍的压力(A 和 B 的全部请求),当 B 承担不了流量 Crash 后,C 也将因为要承担原先三倍的流量而 Crash,这就造成了整体缓存系统的雪崩。

说到这儿,你可能觉得很可怕,但也不要太担心,我们程序员就是要能够创造性地解决各种问题,所以你可以在一致性 Hash 算法中引入虚拟节点的概念。

它将一个缓存节点计算多个 Hash 值分散到圆环的不同位置,这样既实现了数据的平均,而且当某一个节点故障或者退出的时候,它原先承担的 Key 将以更加平均的方式分配到其他节点上,从而避免雪崩的发生。

其次,就是一致性 Hash 算法的脏数据问题。为什么会产生脏数据呢?比方说,在集群中有两个节点 A 和 B,客户端初始写入一个 Key 为 k,值为 3 的缓存数据到 Cache A 中。这时如果要更新 k 的值为 4,但是缓存 A 恰好和客户端连接出现了问题,那这次写入请求会写入到 Cache B 中。接下来缓存 A 和客户端的连接恢复,当客户端要获取 k 的值时,就会获取到存在 Cache A 中的脏数据 3,而不是 Cache B 中的 4。

所以,在使用一致性 Hash 算法时一定要设置缓存的过期时间,这样当发生漂移时,之前存储的脏数据可能已经过期,就可以减少存在脏数据的几率。

cache4.jpg很显然,数据分片最大的优势就是缓解缓存节点的存储和访问压力,但同时它也让缓存的使用更加复杂。在 MultiGet(批量获取)场景下,单个节点的访问量并没有减少,同时节点数太多会造成缓存访问的 SLA(即“服务等级协议”,SLA 代表了网站服务可用性)得不到很好的保证,因为根据木桶原则,SLA 取决于最慢、最坏的节点的情况,节点数过多也会增加出问题的概率,因此我推荐 4 到 6 个节点为佳。

中间代理层方案

虽然客户端方案已经能解决大部分的问题,但是只能在单一语言系统之间复用。例如微博使用 Java 语言实现了这么一套逻辑,我使用 PHP 就难以复用,需要重新写一套,很麻烦。而中间代理层的方案就可以解决这个问题。你可以将客户端解决方案的经验移植到代理层中,通过通用的协议(如 Redis 协议)来实现在其他语言中的复用。

如果你来自研缓存代理层,你就可以将客户端方案中的高可用逻辑封装在代理层代码里面,这样用户在使用你的代理层的时候就不需要关心缓存的高可用是如何做的,只需要依赖你的代理层就好了。

除此以外,业界也有很多中间代理层方案,比如 Facebook 的Mcrouter,Twitter 的Twemproxy,豌豆荚的Codis。它们的原理基本上可以由一张图来概括:

cache5.jpg

看这张图你有什么发现吗? 所有缓存的读写请求都是经过代理层完成的。代理层是无状态的,主要负责读写请求的路由功能,并且在其中内置了一些高可用的逻辑,不同的开源中间代理层方案中使用的高可用策略各有不同。比如在 Twemproxy 中,Proxy 保证在某一个 Redis 节点挂掉之后会把它从集群中移除,后续的请求将由其他节点来完成;而 Codis 的实现略复杂,它提供了一个叫 Codis Ha 的工具来实现自动从节点提主节点,在 3.2 版本之后换做了 Redis Sentinel 方式,从而实现 Redis 节点的高可用。

服务端方案

Redis 在 2.4 版本中提出了 Redis Sentinel 模式来解决主从 Redis 部署时的高可用问题,它可以在主节点挂了以后自动将从节点提升为主节点,保证整体集群的可用性,整体的架构如下图所示:

cache6.jpg

Redis Sentinel 也是集群部署的,这样可以避免 Sentinel 节点挂掉造成无法自动故障恢复的问题,每一个 Sentinel 节点都是无状态的。在 Sentinel 中会配置 Master 的地址,Sentinel 会时刻监控 Master 的状态,当发现 Master 在配置的时间间隔内无响应,就认为 Master 已经挂了,Sentinel 会从从节点中选取一个提升为主节点,并且把所有其他的从节点作为新主的从节点。Sentinel 集群内部在仲裁的时候,会根据配置的值来决定当有几个 Sentinel 节点认为主挂掉可以做主从切换的操作,也就是集群内部需要对缓存节点的状态达成一致才行。

Redis Sentinel 不属于代理层模式,因为对于缓存的写入和读取请求不会经过 Sentinel 节点。Sentinel 节点在架构上和主从是平级的,是作为管理者存在的,所以可以认为是在服务端提供的一种高可用方案。

NoSql相对于关系型数据库的优势

NoSql相对于关系型数据库的优势

使用 NoSQL 提升写入性能

数据库系统大多使用的是传统的机械磁盘,对于机械磁盘的访问方式有两种:一种是随机 IO;另一种是顺序 IO。随机 IO 就需要花费时间做昂贵的磁盘寻道,一般来说,它的读写效率要比顺序 IO 小两到三个数量级,所以我们想要提升写入的性能就要尽量减少随机 IO。

以 MySQL 的 InnoDB 存储引擎来说,更新 binlog、redolog、undolog 都是在做顺序 IO,而更新 datafile 和索引文件则是在做随机 IO,而为了减少随机 IO 的发生,关系数据库已经做了很多的优化,比如说写入时先写入内存,然后批量刷新到磁盘上,但是随机 IO 还是会发生。

索引在 InnoDB 引擎中是以 B+ 树方式来组织的,而 MySQL 主键是聚簇索引(一种索引类型,数据与索引数据放在一起),既然数据和索引数据放在一起,那么在数据插入或者更新的时候,我们需要找到要插入的位置,再把数据写到特定的位置上,这就产生了随机的 IO。而且一旦发生了页分裂,就不可避免会做数据的移动,也会极大地损耗写入性能。

NoSQL 数据库是怎么解决这个问题的呢?

它们有多种的解决方式,这里我给你讲一种最常见的方案,就是很多 NoSQL 数据库都在使用的基于 LSM 树的存储引擎,这种算法使用最多,所以在这里着重剖析一下。

LSM 树(Log-Structured Merge Tree)牺牲了一定的读性能来换取写入数据的高性能,Hbase、Cassandra、LevelDB 都是用这种算法作为存储的引擎。

它的思想很简单,数据首先会写入到一个叫做 MemTable 的内存结构中,在 MemTable 中数据是按照写入的 Key 来排序的。为了防止 MemTable 里面的数据因为机器掉电或者重启而丢失,一般会通过写 Write Ahead Log 的方式将数据备份在磁盘上。

MemTable 在累积到一定规模时,它会被刷新生成一个新的文件,我们把这个文件叫做 SSTable(Sorted String Table)。当 SSTable 达到一定数量时,我们会将这些 SSTable 合并,减少文件的数量,因为 SSTable 都是有序的,所以合并的速度也很快。

当从 LSM 树里面读数据时,我们首先从 MemTable 中查找数据,如果数据没有找到,再从 SSTable 中查找数据。因为存储的数据都是有序的,所以查找的效率是很高的,只是因为数据被拆分成多个 SSTable,所以读取的效率会低于 B+ 树索引。

sstable.jpg

和 LSM 树类似的算法有很多,比如说 TokuDB 使用的名为 Fractal tree 的索引结构,它们的核心思想就是将随机 IO 变成顺序的 IO,从而提升写入的性能。

提升扩展性

另外,在扩展性方面,很多 NoSQL 数据库也有着先天的优势。还是以你的垂直电商系统为例,你已经为你的电商系统增加了评论系统,开始你的评估比较乐观,觉得电商系统的评论量级不会增长很快,所以就为它分了 8 个库,每个库拆分成 16 张表。

但是评论系统上线之后,存储量级增长的异常迅猛,你不得不将数据库拆分成更多的库表,而数据也要重新迁移到新的库表中,过程非常痛苦,而且数据迁移的过程也非常容易出错。

这时,你考虑是否可以考虑使用 NoSQL 数据库来彻底解决扩展性的问题,经过调研你发现它们在设计之初就考虑到了分布式和大数据存储的场景,比如像 MongoDB 就有三个扩展性方面的特性。

  • 其一是 Replica,也叫做副本集,你可以理解为主从分离,也就是通过将数据拷贝成多份来保证当主挂掉后数据不会丢失。同时呢,Replica 还可以分担读请求。Replica 中有主节点来承担写请求,并且把对数据变动记录到 oplog 里(类似于 binlog);从节点接收到 oplog 后就会修改自身的数据以保持和主节点的一致。一旦主节点挂掉,MongoDB 会从从节点中选取一个节点成为主节点,可以继续提供写数据服务。
  • 其二是 Shard,也叫做分片,你可以理解为分库分表,即将数据按照某种规则拆分成多份,存储在不同的机器上。MongoDB 的 Sharding 特性一般需要三个角色来支持,一个是 Shard Server,它是实际存储数据的节点,是一个独立的 Mongod 进程;二是 Config Server,也是一组 Mongod 进程,主要存储一些元信息,比如说哪些分片存储了哪些数据等;最后是 Route Server,它不实际存储数据,仅仅作为路由使用,它从 Config Server 中获取元信息后,将请求路由到正确的 Shard Server 中。

mongodb.jpg

  • 其三是负载均衡,就是当 MongoDB 发现 Shard 之间数据分布不均匀,会启动 Balancer 进程对数据做重新的分配,最终让不同 Shard Server 的数据可以尽量的均衡。当我们的 Shard Server 存储空间不足需要扩容时,数据会自动被移动到新的 Shard Server 上,减少了数据迁移和验证的成本。

你可以看到,NoSQL 数据库中内置的扩展性方面的特性可以让我们不再需要对数据库做分库分表和主从分离,也是对传统数据库一个良好的补充。

缓存专题(三) Write Through(读穿 写穿)策略

缓存专题(三) Write Through(读穿 写穿)策略

这个策略的核心原则是用户只与缓存打交道,由缓存和数据库通信,写入或者读取数据。这就好比你在汇报工作的时候只对你的直接上级汇报,再由你的直接上级汇报给他的上级,你是不能越级汇报的。

Write Through 的策略是这样的:先查询要写入的数据在缓存中是否已经存在,如果已经存在,则更新缓存中的数据,并且由缓存组件同步更新到数据库中,如果缓存中数据不存在,我们把这种情况叫做“Write Miss(写失效)”。

一般来说,我们可以选择两种“Write Miss”方式:一个是“Write Allocate(按写分配)”,做法是写入缓存相应位置,再由缓存组件同步更新到数据库中;另一个是“No-write allocate(不按写分配)”,做法是不写入缓存中,而是直接更新到数据库中。

在 Write Through 策略中,我们一般选择“No-write allocate”方式,原因是无论采用哪种“Write Miss”方式,我们都需要同步将数据更新到数据库中,而“No-write allocate”方式相比“Write Allocate”还减少了一次缓存的写入,能够提升写入的性能。

Read Through 策略就简单一些,它的步骤是这样的:先查询缓存中数据是否存在,如果存在则直接返回,如果不存在,则由缓存组件负责从数据库中同步加载数据。

下面是 Read Through/Write Through 策略的示意图:

cache_through1.jpg

Read Through/Write Through 策略的特点是由缓存节点而非用户来和数据库打交道,在我们开发过程中相比 Cache Aside 策略要少见一些,原因是我们经常使用的分布式缓存组件,无论是 Memcached 还是 Redis 都不提供写入数据库,或者自动加载数据库中的数据的功能。而我们在使用本地缓存的时候可以考虑使用这种策略,比如说在上一节中提到的本地缓存 Guava Cache 中的 Loading Cache 就有 Read Through 策略的影子。

我们看到 Write Through 策略中写数据库是同步的,这对于性能来说会有比较大的影响,因为相比于写缓存,同步写数据库的延迟就要高很多了。那么我们可否异步地更新数据库?这就是我们接下来要提到的“Write Back”策略。

Write Back(写回)策略

这个策略的核心思想是在写入数据时只写入缓存,并且把缓存块儿标记为“脏”的。而脏块儿只有被再次使用时才会将其中的数据写入到后端存储中。

需要注意的是,在“Write Miss”的情况下,我们采用的是“Write Allocate”的方式,也就是在写入后端存储的同时要写入缓存,这样我们在之后的写请求中都只需要更新缓存即可,而无需更新后端存储了,我将 Write back 策略的示意图放在了下面:

cache_through3jpg.jpg

发现了吗?其实这种策略不能被应用到我们常用的数据库和缓存的场景中,它是计算机体系结构中的设计,比如我们在向磁盘中写数据时采用的就是这种策略。无论是操作系统层面的 Page Cache,还是日志的异步刷盘,亦或是消息队列中消息的异步写入磁盘,大多采用了这种策略。因为这个策略在性能上的优势毋庸置疑,它避免了直接写磁盘造成的随机写问题,毕竟写内存和写磁盘的随机 I/O 的延迟相差了几个数量级呢。

但因为缓存一般使用内存,而内存是非持久化的,所以一旦缓存机器掉电,就会造成原本缓存中的脏块儿数据丢失。所以你会发现系统在掉电之后,之前写入的文件会有部分丢失,就是因为 Page Cache 还没有来得及刷盘造成的。

当然,你依然可以在一些场景下使用这个策略,在使用时,我想给你的落地建议是:你在向低速设备写入数据的时候,可以在内存里先暂存一段时间的数据,甚至做一些统计汇总,然后定时地刷新到低速设备上。比如说,你在统计你的接口响应时间的时候,需要将每次请求的响应时间打印到日志中,然后监控系统收集日志后再做统计。但是如果每次请求都打印日志无疑会增加磁盘 I/O,那么不如把一段时间的响应时间暂存起来,经过简单的统计平均耗时,每个耗时区间的请求数量等等,然后定时地,批量地打印到日志中。

缓存专题(二) Cache Aside(旁路缓存)策略

缓存专题(二) Cache Aside(旁路缓存)策略

我们来考虑一种最简单的业务场景,比方说在你的电商系统中有一个用户表,表中只有 ID 和年龄两个字段,缓存中我们以 ID 为 Key 存储用户的年龄信息。那么当我们要把 ID 为 1 的用户的年龄从 19 变更为 20,要如何做呢?

你可能会产生这样的思路:先更新数据库中 ID 为 1 的记录,再更新缓存中 Key 为 1 的数据。

cache_aside1.jpg

这个思路会造成缓存和数据库中的数据不一致。比如,A 请求将数据库中 ID 为 1 的用户年龄从 19 变更为 20,与此同时,请求 B 也开始更新 ID 为 1 的用户数据,它把数据库中记录的年龄变更为 21,然后变更缓存中的用户年龄为 21。紧接着,A 请求开始更新缓存数据,它会把缓存中的年龄变更为 20。此时,数据库中用户年龄是 21,而缓存中的用户年龄却是 20。

cache_aside2.jpg

为什么产生这个问题呢?因为变更数据库和变更缓存是两个独立的操作,而我们并没有对操作做任何的并发控制。那么当两个线程并发更新它们的时候,就会因为写入顺序的不同造成数据的不一致。

另外,直接更新缓存还存在另外一个问题就是丢失更新。还是以我们的电商系统为例,假如电商系统中的账户表有三个字段:ID、户名和金额,这个时候缓存中存储的就不只是金额信息,而是完整的账户信息了。当更新缓存中账户金额时,你需要从缓存中查询完整的账户数据,把金额变更后再写入到缓存中。

这个过程中也会有并发的问题,比如说原有金额是 20,A 请求从缓存中读到数据,并且把金额加 1,变更成 21,在未写入缓存之前又有请求 B 也读到缓存的数据后把金额也加 1,也变更成 21,两个请求同时把金额写回缓存,这时缓存里面的金额是 21,但是我们实际上预期是金额数加 2,这也是一个比较大的问题。

那我们要如何解决这个问题呢?其实,我们可以在更新数据时不更新缓存,而是删除缓存中的数据,在读取数据时,发现缓存中没了数据之后,再从数据库中读取数据,更新到缓存中。

cache_aside3.jpg

这个策略就是我们使用缓存最常见的策略,Cache Aside 策略(也叫旁路缓存策略),这个策略数据以数据库中的数据为准,缓存中的数据是按需加载的。它可以分为读策略和写策略,其中读策略的步骤是:

  • 从缓存中读取数据;
  • 如果缓存命中,则直接返回数据;
  • 如果缓存不命中,则从数据库中查询数据;
  • 查询到数据后,将数据写入到缓存中,并且返回给用户。

写策略的步骤是:

  • 更新数据库中的记录;
  • 删除缓存记录。

你也许会问了,在写策略中,能否先删除缓存,后更新数据库呢?答案是不行的,因为这样也有可能出现缓存数据不一致的问题,我以用户表的场景为例解释一下。

假设某个用户的年龄是 20,请求 A 要更新用户年龄为 21,所以它会删除缓存中的内容。这时,另一个请求 B 要读取这个用户的年龄,它查询缓存发现未命中后,会从数据库中读取到年龄为 20,并且写入到缓存中,然后请求 A 继续更改数据库,将用户的年龄更新为 21,这就造成了缓存和数据库的不一致。

cache_aside4.jpg

那么像 Cache Aside 策略这样先更新数据库,后删除缓存就没有问题了吗?其实在理论上还是有缺陷的。假如某个用户数据在缓存中不存在,请求 A 读取数据时从数据库中查询到年龄为 20,在未写入缓存中时另一个请求 B 更新数据。它更新数据库中的年龄为 21,并且清空缓存。这时请求 A 把从数据库中读到的年龄为 20 的数据写入到缓存中,造成缓存和数据库数据不一致。

cache_aside5.jpg

不过这种问题出现的几率并不高,原因是缓存的写入通常远远快于数据库的写入,所以在实际中很难出现请求 B 已经更新了数据库并且清空了缓存,请求 A 才更新完缓存的情况。而一旦请求 A 早于请求 B 清空缓存之前更新了缓存,那么接下来的请求就会因为缓存为空而从数据库中重新加载数据,所以不会出现这种不一致的情况。

Cache Aside 策略是我们日常开发中最经常使用的缓存策略,不过我们在使用时也要学会依情况而变。比如说当新注册一个用户,按照这个更新策略,你要写数据库,然后清理缓存(当然缓存中没有数据给你清理)。可当我注册用户后立即读取用户信息,并且数据库主从分离时,会出现因为主从延迟所以读不到用户信息的情况。

而解决这个问题的办法恰恰是在插入新数据到数据库之后写入缓存,这样后续的读请求就会从缓存中读到数据了。并且因为是新注册的用户,所以不会出现并发更新用户信息的情况。

Cache Aside 存在的最大的问题是当写入比较频繁时,缓存中的数据会被频繁地清理,这样会对缓存的命中率有一些影响。如果你的业务对缓存命中率有严格的要求,那么可以考虑两种解决方案:

  1. 一种做法是在更新数据时也更新缓存,只是在更新缓存前先加一个分布式锁,因为这样在同一时间只允许一个线程更新缓存,就不会产生并发问题了。当然这么做对于写入的性能会有一些影响;

  2. 另一种做法同样也是在更新数据时更新缓存,只是给缓存加一个较短的过期时间,这样即使出现缓存不一致的情况,缓存的数据也会很快地过期,对业务的影响也是可以接受。

缓存专题(一) 缓存概述

缓存专题(一) 缓存概述

缓存分类

在我们日常开发中,常见的缓存主要就是静态缓存、分布式缓存和热点本地缓存这三种。

静态缓存在 Web 1.0 时期是非常著名的,它一般通过生成 Velocity 模板或者静态 HTML 文件来实现静态缓存,在 Nginx 上部署静态缓存可以减少对于后台应用服务器的压力。例如,我们在做一些内容管理系统的时候,后台会录入很多的文章,前台在网站上展示文章内容,就像新浪,网易这种门户网站一样。

当然,我们也可以把文章录入到数据库里面,然后前端展示的时候穿透查询数据库来获取数据,但是这样会对数据库造成很大的压力。即使我们使用分布式缓存来挡读请求,但是对于像日均 PV 几十亿的大型门户网站来说,基于成本考虑仍然是不划算的。

所以我们的解决思路是每篇文章在录入的时候渲染成静态页面,放置在所有的前端 Nginx 或者 Squid 等 Web 服务器上,这样用户在访问的时候会优先访问 Web 服务器上的静态页面,在对旧的文章执行一定的清理策略后,依然可以保证 99% 以上的缓存命中率。

这种缓存只能针对静态数据来缓存,对于动态请求就无能为力了。那么我们如何针对动态请求做缓存呢?这时你就需要分布式缓存了。

分布式缓存的大名可谓是如雷贯耳了,我们平时耳熟能详的 Memcached、Redis 就是分布式缓存的典型例子。它们性能强劲,通过一些分布式的方案组成集群可以突破单机的限制。所以在整体架构中,分布式缓存承担着非常重要的角色。

对于静态的资源的缓存你可以选择静态缓存,对于动态的请求你可以选择分布式缓存,那么什么时候要考虑热点本地缓存呢?

答案是当我们遇到极端的热点数据查询的时候。热点本地缓存主要部署在应用服务器的代码中,用于阻挡热点查询对于分布式缓存节点或者数据库的压力。

比如某一位明星在微博上有了热点话题,“吃瓜群众”会到他 (她) 的微博首页围观,这就会引发这个用户信息的热点查询。这些查询通常会命中某一个缓存节点或者某一个数据库分区,短时间内会形成极高的热点查询。

那么我们会在代码中使用一些本地缓存方案,如 HashMap,Guava Cache 或者是 Ehcache 等,它们和应用程序部署在同一个进程中,优势是不需要跨网络调度,速度极快,所以可以来阻挡短时间内的热点查询。来看个例子。

比方说你的垂直电商系统的首页有一些推荐的商品,这些商品信息是由编辑在后台录入和变更。你分析编辑录入新的商品或者变更某个商品的信息后,在页面的展示是允许有一些延迟的,比如说 30 秒的延迟,并且首页请求量最大,即使使用分布式缓存也很难抗住,所以你决定使用 Guava Cache 来将所有的推荐商品的信息缓存起来,并且设置每隔 30 秒重新从数据库中加载最新的所有商品。

首先,我们初始化 Guava 的 Loading Cache:

1
2
3
4
5
6
7
8
9
CacheBuilder<String, List<Product>> cacheBuilder = CacheBuilder.newBuilder().maximumSize(maxSize).recordStats(); // 设置缓存最大值
cacheBuilder = cacheBuilder.refreshAfterWrite(30, TimeUnit.Seconds); // 设置刷新间隔

LoadingCache<String, List<Product>> cache = cacheBuilder.build(new CacheLoader<String, List<Product>>() {
@Override
public List<Product> load(String k) throws Exception {
return productService.loadAll(); // 获取所有商品
}
});

这样,你在获取所有商品信息的时候可以调用 Loading Cache 的 get 方法,就可以优先从本地缓存中获取商品信息,如果本地缓存不存在,会使用 CacheLoader 中的逻辑从数据库中加载所有的商品。

由于本地缓存是部署在应用服务器中,而我们应用服务器通常会部署多台,当数据更新时,我们不能确定哪台服务器本地中了缓存,更新或者删除所有服务器的缓存不是一个好的选择,所以我们通常会等待缓存过期。因此,这种缓存的有效期很短,通常为分钟或者秒级别,以避免返回前端脏数据。

缓存的不足

通过了解上面的内容,你不难发现,缓存的主要作用是提升访问速度,从而能够抗住更高的并发。那么,缓存是不是能够解决一切问题?显然不是。事物都是具有两面性的,缓存也不例外,我们要了解它的优势的同时也需要了解它有哪些不足,从而扬长避短,将它的作用发挥到最大。

首先,缓存比较适合于读多写少的业务场景,并且数据最好带有一定的热点属性。这是因为缓存毕竟会受限于存储介质不可能缓存所有数据,那么当数据有热点属性的时候才能保证一定的缓存命中率。比如说类似微博、朋友圈这种 20% 的内容会占到 80% 的流量。所以,一旦当业务场景读少写多时或者没有明显热点时,比如在搜索的场景下,每个人搜索的词都会不同,没有明显的热点,那么这时缓存的作用就不明显了。

其次,缓存会给整体系统带来复杂度,并且会有数据不一致的风险。当更新数据库成功,更新缓存失败的场景下,缓存中就会存在脏数据。对于这种场景,我们可以考虑使用较短的过期时间或者手动清理的方式来解决。

再次,之前提到缓存通常使用内存作为存储介质,但是内存并不是无限的。因此,我们在使用缓存的时候要做数据存储量级的评估,对于可预见的需要消耗极大存储成本的数据,要慎用缓存方案。同时,缓存一定要设置过期时间,这样可以保证缓存中的会是热点数据。

最后,缓存会给运维也带来一定的成本,运维需要对缓存组件有一定的了解,在排查问题的时候也多了一个组件需要考虑在内。

虽然有这么多的不足,但是缓存对于性能的提升是毋庸置疑的,我们在做架构设计的时候也需要把它考虑在内,只是在做具体方案的时候需要对缓存的设计有更细致的思考,才能最大化的发挥缓存的优势。

resilience4j-retry源码阅读

resilience4j 源码还是比较清晰简单的,比较适合阅读。

放一张主要类的结构图:

Retry入口

Retry接口是提供重试功能的入口,主要提供了方法模版,具体校验结构,失败后处理由Context子类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Creates a retryable supplier.
*
* @param retry the retry context
* @param supplier the original function
* @param <T> the type of results supplied by this supplier
* @return a retryable function
*/
static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier) {
return () -> {
Retry.Context<T> context = retry.context();
do try {
T result = supplier.get();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
} while (true);
};
}

这里摘抄了一段核心代码,作用是循环直到context.onResult(result)返回true为止,需要留意context.onResult/onRuntimeError/onError可能执行多次, onSuccess只会执行一次,这里每次进入重试都是一个新的context对象。

Retry.ContextImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean onResult(T result) {
if (null != resultPredicate && resultPredicate.test(result)) {
int currentNumOfAttempts = numOfAttempts.incrementAndGet();
if (currentNumOfAttempts >= maxAttempts) {
return false;
} else {
waitIntervalAfterFailure(currentNumOfAttempts, null);
return true;
}
}
return false;
}

public void onRuntimeError(RuntimeException runtimeException) {
if (exceptionPredicate.test(runtimeException)) {
lastRuntimeException.set(runtimeException);
throwOrSleepAfterRuntimeException();
} else {
failedWithoutRetryCounter.increment();
publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), runtimeException));
throw runtimeException;
}
}

先关注onResult,它负责判断是否需要继续重试,如果通过校验或者重试超过此数,会停止重试。

onRuntimeError/onError, 负责把catch的异常存储在lastRuntimeException中。

1
2
3
4
5
6
7
8
9
10
public void onSuccess() {
int currentNumOfAttempts = numOfAttempts.get();
if (currentNumOfAttempts > 0) {
succeededAfterRetryCounter.increment();
Throwable throwable = Option.of(lastException.get()).getOrElse(lastRuntimeException.get());
publishRetryEvent(() -> new RetryOnSuccessEvent(getName(), currentNumOfAttempts, throwable));
} else {
succeededWithoutRetryCounter.increment();
}
}

onSuccess负责统计和发送事件。

总结

总体来说retry比较简单,需要注意的点有一个如果设置了结果校验,如果一直校验不通过,将返回未通过的结果,而不是返回失败。

[片段] 使用redis创建简易搜索引擎(核心篇)

支持and查询、多选、多字段排序分页,缺少的功能:or 条件

核心类,有一些测试代码,将就一下。另外需要spring-data-redis 2.0版本以上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
package app.pooi.redissearch.search;

import app.pooi.redissearch.search.anno.CreateIndex;
import app.pooi.redissearch.search.anno.Field;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.Data;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.hash.Jackson2HashMapper;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static app.pooi.redissearch.search.SearchCore.Util.*;

@RestController
@Service
public class SearchCore {

private StringRedisTemplate redisTemplate;

private Jackson2HashMapper hashMapper = new Jackson2HashMapper(true);

@Data
private static class Person {
private Long id;
private String name;
private Integer age;
private Long ctime;
}

@PostMapping("/person")
@CreateIndex(
index = "person",
documentId = "#p0.id",
fields = {
@Field(propertyName = "name", value = "#p0.name"),
@Field(propertyName = "age", value = "#p0.age", sort = true),
@Field(propertyName = "ctime", value = "#p0.ctime", sort = true)
})
Person addPerson(Person person) {
return person;
}

public SearchCore(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}


public void indexMeta(String index, Map<String, FieldMeta> fieldMeta) {
this.redisTemplate.opsForHash().putAll(genIdxMetaName(index), hashMapper.toHash(fieldMeta));
}

@PostMapping("/index")
public int indexDocument(
final String index,
final String field,
final String documentId,
final String document) {
return this.indexDocument(index, field, documentId, document, doc -> Lists.newArrayList(doc.split("")));
}

public int indexDocument(
final String index,
final String field,
final String documentId,
final String document,
final Function<String, List<String>> tokenizer) {

final List<String> tokens = tokenizer != null ?
tokenizer.apply(document) :
Collections.singletonList(document);

final String docKey = genDocIdxName(index, documentId);

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;

final String[] idxs = tokens.stream()
.map(word -> genIdxName(index, field, word))
.peek(idx -> ((StringRedisTemplate) operations).opsForSet().add(idx, documentId))
.toArray(String[]::new);

template.opsForSet().add(docKey, idxs);
return null;
}
});
return results.size();
}

public int indexSortField(
final String index,
final String field,
final String documentId,
final Double document) {

final String docKey = genDocIdxName(index, documentId);

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;
final String idxName = genSortIdxName(index, field);
template.opsForZSet().add(idxName, documentId, document);
template.opsForSet().add(docKey, idxName);
return null;
}
});
return results.size();
}

@DeleteMapping("/index")
public int deleteDocumentIndex(final String index, final String documentId) {
final String docKey = genDocIdxName(index, documentId);
final Boolean hasKey = redisTemplate.hasKey(docKey);
if (!hasKey) {
return 0;
}

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final Set<String> idx = redisTemplate.opsForSet().members(docKey);
((StringRedisTemplate) operations).delete(idx);
((StringRedisTemplate) operations).delete(docKey);
return null;
}
});
return results.size();
}

@PatchMapping("/index")
public int updateDocumentIndex(final String index, final String field, final String documentId, final String document) {
this.deleteDocumentIndex(index, documentId);
return this.indexDocument(index, field, documentId, document);
}

public int updateSortField(final String index, final String field, final String documentId, final Double document) {
this.deleteDocumentIndex(index, documentId);
return this.indexSortField(index, field, documentId, document);
}

private Consumer<SetOperations<String, String>> operateAndStore(String method, String key, Collection<String> keys, String destKey) {
switch (method) {
case "intersectAndStore":
return (so) -> so.intersectAndStore(key, keys, destKey);
case "unionAndStore":
return (so) -> so.unionAndStore(key, keys, destKey);
case "differenceAndStore":
return (so) -> so.differenceAndStore(key, keys, destKey);
default:
return so -> {
};
}
}

private Consumer<ZSetOperations<String, String>> zOperateAndStore(String method, String key, Collection<String> keys, String destKey, final RedisZSetCommands.Weights weights) {
switch (method) {
case "intersectAndStore":
return (so) -> so.intersectAndStore(key, keys, destKey, RedisZSetCommands.Aggregate.SUM, weights);
case "unionAndStore":
return (so) -> so.unionAndStore(key, keys, destKey, RedisZSetCommands.Aggregate.SUM, weights);
default:
return so -> {
};
}
}

private String common(String index, String method, List<String> keys, long ttl) {
final String destKey = Util.genQueryIdxName(index);

redisTemplate.executePipelined(new SessionCallback<String>() {
@Override
public <K, V> String execute(RedisOperations<K, V> operations) throws DataAccessException {
operateAndStore(method,
keys.stream().limit(1L).findFirst().get(),
keys.stream().skip(1L).collect(Collectors.toList()),
destKey)
.accept(((StringRedisTemplate) operations).opsForSet());
((StringRedisTemplate) operations).expire(destKey, ttl, TimeUnit.SECONDS);
return null;
}
});
return destKey;
}

public String intersect(String index, List<String> keys, long ttl) {
return common(index, "intersectAndStore", keys, ttl);
}

public String union(String index, List<String> keys, long ttl) {
return common(index, "unionAndStore", keys, ttl);
}

public String diff(String index, List<String> keys, long ttl) {
return common(index, "differenceAndStore", keys, ttl);
}

private static Tuple2<Set<Tuple2<String, String>>, Set<Tuple2<String, String>>> parse(String query) {

final Pattern pattern = Pattern.compile("[+-]?([\\w\\d]+):(\\S+)");

final Matcher matcher = pattern.matcher(query);

Set<Tuple2<String, String>> unwant = Sets.newHashSet();
Set<Tuple2<String, String>> want = Sets.newHashSet();

while (matcher.find()) {
String word = matcher.group();

String prefix = null;
if (word.length() > 1) {
prefix = word.substring(0, 1);
}

final Tuple2<String, String> t = Tuples.of(matcher.group(1), matcher.group(2));
if ("-".equals(prefix)) {
unwant.add(t);
} else {
want.add(t);
}
}
return Tuples.of(want, unwant);
}


public String query(
String index,
String query) {

final Tuple2<Set<Tuple2<String, String>>, Set<Tuple2<String, String>>> parseResult = parse(query);
final Set<Tuple2<String, String>> want = parseResult.getT1();
final Set<Tuple2<String, String>> unwant = parseResult.getT2();


if (want.isEmpty()) {
return "";
}

final Map<String, FieldMeta> entries = (Map<String, FieldMeta>) hashMapper.fromHash(redisTemplate.<String, Object>opsForHash().entries(genIdxMetaName(index)));

// union
final List<Tuple2<String, String>> unionFields = want.stream()
.filter(w -> w.getT2().contains(","))
.filter(w -> "true".equals(entries.get(w.getT1()).getSort()))
.collect(Collectors.toList());
final List<String> unionIdx = unionFields.stream()
.flatMap(w -> Arrays.stream(w.getT2().split(",")).map(value -> Tuples.of(w.getT1(), value)))
.map(w -> genIdxName(index, w.getT1(), w.getT2()))
.collect(Collectors.toList());

final String unionResultId = unionIdx.isEmpty() ? "" : this.union(index, unionIdx, 30L);

want.removeAll(unionFields);

// intersect
final List<String> intersectIdx = want.stream()
.flatMap(t -> {
if ("true".equals(entries.get(t.getT1()).getSort()))
return Stream.of(t);
return Arrays.stream(t.getT2().split("")).map(value -> Tuples.of(t.getT1(), value));
})
.map(w -> genIdxName(index, w.getT1(), w.getT2()))
.collect(Collectors.toList());

if (!unionResultId.isEmpty())
intersectIdx.add(unionResultId);

String intersectResult = this.intersect(index, intersectIdx, 30L);

// diff
return unwant.isEmpty() ?
intersectResult :
this.diff(index, Stream.concat(Stream.of(intersectResult), unwant.stream().map(w -> genIdxName(index, w.getT1(), w.getT2()))).collect(Collectors.toList()), 30L);
}

@GetMapping("/query/{index}")
public Set<String> queryAndSort(
@PathVariable("index") String index,
@RequestParam("param") String query,
@RequestParam("sort") String sort,
Integer start,
Integer stop
) {
final String[] sorts = sort.split(" ");

final Map<String, Integer> map = Arrays.stream(sorts).collect(
Collectors.toMap(f -> {
if (f.startsWith("+") || f.startsWith("-")) {
f = f.substring(1);
}
return genSortIdxName("person", f);
}, field -> field.startsWith("-") ? -1 : 1)
);

final int[] weights = map.values()
.stream()
.mapToInt(Integer::intValue)
.toArray();


// if (!sort.startsWith("+") && !sort.startsWith("-")) {
// sort = "+" + sort;
// }
// boolean desc = sort.startsWith("-");
// sort = sort.substring(1);

String queryId = this.query(index, query);
Long size;
if (queryId.length() == 0 || (size = redisTemplate.opsForSet().size(queryId)) == null || size == 0) {
return Collections.emptySet();
}

final String resultId = genQueryIdxName(index);

// String sortField = sort;

redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;

// template.opsForZSet().intersectAndStore(genSortIdxName(index, sortField), queryId, resultId);

SearchCore.this.zOperateAndStore("intersectAndStore",
map.keySet().stream().limit(1L).findFirst().get(),
Stream.concat(map.keySet().stream().skip(1L), Stream.of(queryId)).collect(Collectors.toList()),
resultId, RedisZSetCommands.Weights.of(ArrayUtils.add(weights, 0))).accept(template.opsForZSet());

// template.opsForZSet().size(resultId);
template.expire(resultId, 30L, TimeUnit.SECONDS);

return null;
}
});

// sort
return redisTemplate.opsForZSet().range(resultId, start, stop);

}

static class Util {

private Util() {
}

static String genIdxMetaName(String index) {
return String.format("meta:idx:%s", index);
}

static String genIdxName(String index, String field, String value) {
return String.format("idx:%s:%s:%s", index, field, value);
}

static String genSortIdxName(String index, String field) {
return String.format("idx:%s:%s", index, field);
}

static String genQueryIdxName(String index) {
return String.format("idx:%s:q:%s", index, UUID.randomUUID().toString());
}

static String genDocIdxName(String index, String documentId) {
return String.format("doc:%s:%s", index, documentId);
}
}
}

辅助类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import lombok.Data;


@Data
public class FieldMeta {

private String sort = "false";

private String splitFun = "";

public FieldMeta() {

}

public FieldMeta(boolean sort) {
this.sort = Boolean.toString(sort);
}
}

做一个轻量级的搜索还是可以的。

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×