[片段] 方法参数收集

[片段] 方法参数收集

以前的代码,用于收集当前方法的所有参数,放在map中方便调取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import com.google.common.collect.ImmutableMap;
import lombok.Data;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;

@Aspect
@Component
public class ArgumentsCollector {

private static final ThreadLocal<Map<String, Object>> ARGUMENTS = ThreadLocal.withInitial(ImmutableMap::of);

static Map<String, Object> getArgs() {
return ARGUMENTS.get();
}

private Object[] args(Object[] args, int exceptLength) {
if (exceptLength == args.length) {
return args;
}

return Arrays.copyOf(args, exceptLength);
}

@Pointcut("@annotation(CollectArguments)")
void collectArgumentsAnnotationPointCut() {
}

@Before("collectArgumentsAnnotationPointCut()")
public void doAccessCheck(JoinPoint joinPoint) {
final String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
final Object[] args = args(joinPoint.getArgs(), parameterNames.length);

ARGUMENTS.set(Collections.unmodifiableMap((IntStream.range(0, parameterNames.length)
.mapToObj(idx -> Tuple2.of(parameterNames[idx], args[idx]))
.collect(HashMap::new, (m, t) -> m.put(t.getT1(), t.getT2()), HashMap::putAll))));
}

@After("collectArgumentsAnnotationPointCut()")
public void remove() {
ARGUMENTS.remove();
}

@Data
private static class Tuple2<T1, T2> {

private T1 t1;
private T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

public static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}
}

附送一段代码,用于将方法中收集的参数转换成Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.beans.BeanUtils;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.validation.DataBinder;

public class BinderUtil {

BinderUtil() {
}

@SuppressWarnings("unchecked")
public static <T> T getTarget(Class<T> beanClazz) {
final DataBinder binder = new DataBinder(BeanUtils.instantiate(beanClazz));
binder.bind(new MutablePropertyValues(ArgumentsCollector.getArgs()));
return (T) binder.getTarget();
}
}

使用实例:

1
2
3
4
5
6
7
8
9
10
@Override
@CollectArguments
public List<PsJobSequenceVO> findJobSequence(
String jobSeqGroupId,
String jobSeqId,
Integer state,
Date endDate
) {
return jobSequenceHandler.findJobSequence(BinderUtil.getTarget(PsJobSequenceFindRO.class)).getData();
}
[片段] Mybatis ParameterHandler实践

[片段] Mybatis ParameterHandler实践

用来批量加密用@Decrypted注解的String字段,可能还有一些坑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.ke.zhaopin.manage.server.config.mybatis.interceptor.anno.Decrypted;
import com.lianjia.ctt.kinko.spi.CipherSpi;
import com.sun.istack.internal.NotNull;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.binding.MapperMethod;
import org.apache.ibatis.executor.parameter.ParameterHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.scripting.defaults.DefaultParameterHandler;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.defaults.DefaultSqlSession;
import org.joor.Reflect;
import reactor.core.publisher.Flux;

import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;


@Intercepts({
@Signature(type = ParameterHandler.class, method = "setParameters", args = {PreparedStatement.class}),
})
@Slf4j
public class EncryptInterceptor implements Interceptor {

private static final String COLLECTION_KEY = "collection";
private static final String ARRAY_KEY = "array";

private final LoadingCache<Class, List<String>> decryptFieldCaches = CacheBuilder.newBuilder()
.maximumSize(200)
.expireAfterAccess(10L, TimeUnit.MINUTES)
.build(new CacheLoader<Class, List<String>>() {
@Override
public List<String> load(Class key) {
return Arrays.stream(key.getDeclaredFields())
.filter(f -> f.getAnnotation(Decrypted.class) != null)
.filter(f -> {
boolean isString = f.getType() == String.class;
if (!isString) {
log.warn(f.getName() + "is not String, actual type is " + f.getType().getSimpleName() + " ignored");
}
return isString;
})
.map(Field::getName)
.collect(Collectors.toList());
}
}
);

private CipherSpi cipherSpi;

public EncryptInterceptor(CipherSpi cipherSpi) {
this.cipherSpi = cipherSpi;
}

@Override
public Object intercept(Invocation invocation) throws Throwable {

Flux<CryptContext> contextFlux = Flux.empty();

do {
if (!(invocation.getTarget() instanceof DefaultParameterHandler)) break;

final Reflect parameterHandler = Reflect.on(invocation.getTarget());
final Object parameterObject = parameterHandler.get("parameterObject");
final Configuration configuration = parameterHandler.get("configuration");

if (parameterObject instanceof DefaultSqlSession.StrictMap) {
// 单个Collection/Map/Array参数
DefaultSqlSession.StrictMap<?> paramMap = (DefaultSqlSession.StrictMap<?>) parameterObject;

Collection<?> collection = null;
Class<?> componentType = null;
if (paramMap.containsKey(COLLECTION_KEY)) {
collection = (Collection<?>) paramMap.get(COLLECTION_KEY);
componentType = collection.iterator().next().getClass();
} else if (paramMap.containsKey(ARRAY_KEY)) {
Object[] array = (Object[]) paramMap.get(ARRAY_KEY);
componentType = array.getClass().getComponentType();
collection = Arrays.asList(array);
}

if (!isUserDefinedClass(componentType)) break;

contextFlux = collection(configuration, collection, componentType);

} else if (parameterObject instanceof MapperMethod.ParamMap) {
// 多个参数
MapperMethod.ParamMap<?> paramMap = (MapperMethod.ParamMap<?>) parameterObject;

final List<?> params = paramMap.values().stream().filter(Objects::nonNull).distinct().collect(Collectors.toList());

for (Object parameter : params) {
if (parameter instanceof Collection) {
Collection<?> collection = (Collection<?>) parameter;
if (collection.isEmpty()) {
continue;
}

Class<?> componentType = collection.iterator().next().getClass();
if (!isUserDefinedClass(componentType)) {
continue;
}
final Flux<CryptContext> collectionFlux = collection(configuration, collection, componentType);
contextFlux = contextFlux.concatWith(collectionFlux);

} else if (parameter.getClass().isArray()) {
if (Array.getLength(parameter) == 0) continue;
final Class<?> componentType = parameter.getClass().getComponentType();
if (!isUserDefinedClass(componentType)) {
continue;
}
Collection<?> collection = Arrays.asList((Object[]) parameter);

final Flux<CryptContext> collectionFlux = collection(configuration, collection, componentType);
contextFlux = contextFlux.concatWith(collectionFlux);

} else if (isUserDefinedClass(parameter.getClass())) {
final Flux<CryptContext> singleFlux = collection(configuration, Collections.singletonList(parameter), parameter.getClass());
contextFlux = contextFlux.concatWith(singleFlux);
}
}

} else if (isUserDefinedClass(parameterObject.getClass())) {
// 单个非Collection/Map/Array参数
contextFlux = collection(configuration, Collections.singletonList(parameterObject), parameterObject.getClass());
} else {
// 不是用interface的情况
}


} while (false);

final List<CryptContext> cryptContexts = encrypt(contextFlux);

invocation.proceed();

restore(cryptContexts);

return null;
}

private void restore(List<CryptContext> cryptContexts) {
for (CryptContext cryptContext : cryptContexts) {
cryptContext.metaObject.setValue(cryptContext.fieldName, cryptContext.value);
}
}

private Flux<CryptContext> collection(Configuration configuration, Collection<?> collection, Class<?> componentType) throws ExecutionException {
final List<String> fieldNames = this.getDecryptFields(componentType);

return Flux.fromIterable(collection)
.map(configuration::newMetaObject)
.flatMapIterable(metaObject -> fieldNames.stream().map(fieldName -> new CryptContext(metaObject, fieldName)).collect(Collectors.toList()));
}

private List<CryptContext> encrypt(Flux<CryptContext> contextFlux) {
return contextFlux
.filter(context -> StringUtils.isNotBlank(context.value))
.buffer(1000)
.doOnNext(contexts -> {
Map<String, String> secretMap = Collections.emptyMap();
try {
secretMap = cipherSpi.batchEncrypt(contexts.stream().map(CryptContext::getValue).distinct().collect(Collectors.toList()));
} catch (Exception e) {

}
for (CryptContext context : contexts) {
context.secret = secretMap.get(context.value);
}
})
.flatMapIterable(Function.identity())
.doOnNext(context -> context.metaObject.setValue(context.fieldName, context.secret))
.collectList()
.block();
}

@NotNull
private List<String> getDecryptFields(Class<?> modelClazz) throws ExecutionException {
return this.decryptFieldCaches.get(modelClazz);
}

private boolean isUserDefinedClass(Class<?> clazz) {
return !clazz.isPrimitive() && !clazz.getPackage().getName().startsWith("java");
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

@Getter
class CryptContext {

CryptContext(MetaObject metaObject, String fieldName) {
this.metaObject = metaObject;
this.fieldName = fieldName;
this.value = (String) metaObject.getValue(fieldName);
if (StringUtils.isBlank(value)) {
this.secret = StringUtils.EMPTY;
}
}

final MetaObject metaObject;

final String fieldName;

final String value;

String secret;
}

[项目] 多角色权限展示数据的一种实现

[项目] 多角色权限展示数据的一种实现

多角色权限如果遇到不同角色能看到不同的列可以怎么做

  • 逐行读取

最简单的解决方法,实现简单。但是在微服务中调用接口次数太多,性能很差。

  • 批量读取

实现较复杂,但是性能好很多,下面主要介绍这种方法的思路

批量读取

以分页读取数据为例:

  1. 读取第一页数据,包含需要展示数据的id和所属权限(多个)

为什么需要所属权限这个字段呢? 因为决定能否看到这行是有你所拥有的所有权限决定的,而决定能否看到哪个列是由这行所拥有的权限决定的。

如何获取该行所拥有的权限呢,我的做法是分不同的权限查询结果通过union 组合起来

  1. 将第一页数据原始顺序保存, 然后按行拥有权限分组

记录原始顺序是因为后面分组后会打乱, 为什么要分组?分组后同样的查询才能聚合在一起,可以简化代码

  1. 根据权限分组多次查询所需要的字段,然后将查询结果合并

这里我使用的graphql来选择需要查询的字段

  1. 最后还原成原来的顺序

可以使用guava Ordering工具类方便生成Compartor

[]: https://blog.yamato.moe/2018/11/06/2018-11-06-biz/ “根据权限查询时避免角色切换的一种思路”
[]: https://blog.yamato.moe/2019/04/04/Mybatis%20ResultSetHandler_2019-04-04%20%E7%BB%AD/ “【片段】 Mybatis ResultSetHandler 实践-续”
[]: https://blog.yamato.moe/2019/01/09/Mybatis%20ResultSetHandler_2019-01-09/ “【片段】 Mybatis ResultSetHandler 实践”

IO Modle

IO Modle

操作系统IO模型与Java IO

Java IO模型和操作系统IO模型息息相关,之前阻塞/非阻塞,同步/非同步之间的关系一直分不清,所以很有必要了解下操作系统(linux)提供了哪些接口来进行IO。目前我们只需要了解即可,使用相关可以直接查看java io教程。

最基础的知识

以使用IO读取数据为例,一般操作系统分为两个独立的阶段进行操作:

  1. 等待数据准备完成,可以是从磁盘拷贝到内核空间,或者是网卡接受到数据后拷贝到内核空间。
  2. 从内核空间将数据拷贝至请求数据的进程。如果是java可能还需从进程拷贝至jvm堆内存。

Blocking I/O Model

这个是最常用的模型,望文生义就是阻塞IO,进行IO的两个阶段会都阻塞程序,直到读取到数据或者返回错误才会返回。

blocking io

具体来说,通过调用系统recvfrom函数,而recvfrom函数会等到出错或者把数据拷贝到进程完成时才会返回。之后我们程序只需要处理错误或者处理数据就可以了。

阻塞模型对应java中绝大部分IO操作,比如网络请求api,io stream api,该模型优点在于简单直观,缺点在长时间阻塞很难支持大量并发IO请求。

Nonblocking I/O Model

该模型在java中没有对应,所以这里只做简单介绍。

nonblocking io

使用轮询方式调用系统recvfrom函数,recvfrom函数在第一阶段完成前一直返回错误,直到第一阶段完成后,阻塞至第二阶段完成。

这个模型稍显鸡肋,特点是在第一阶段是非阻塞的(进程不会被切换),代码相比阻塞模型来说也更复杂。

I/O Multiplexing Model

非常著名的IO模型,可以支持大量并发IO。通过调用select或者pull并阻塞,而不是在实际调用系统IO时阻塞。使用select阻塞在第一阶段和Blocking I/O的阻塞不太一样,Blocking I/O阻塞在当前IO操作第一阶段,而I/O复用则可以注册多个I/O在select函数,当有一个I/O就绪时select函数就会返回,如果所有I/O处于第一阶段阻塞状态则select函数阻塞。

multiplexing io

相比较Blocking I/O Model和Nonblocking I/O Model,I/O Multiplexing Model明显能在短时间内处理更多的I/O。如果使用多线程+Blocking I/O Model也能达到类似的效果,但是有可能消耗过多线程资源。

I/O Multiplexing Model对应java NIO的Selector等api

Signal-Driven I/O Model

该模型在java中没有对应,所以这里只做简单介绍。

Signal-Driven I/O

该模型特点是第一阶段调用sigaction函数非阻塞返回,在第一阶段完成后发送信号SIGIO至进程,之后在signal handler中进行第二阶段处理。相当于对Nonblocking I/O Model的一种改进。

Asynchronous I/O Model

Asynchronous I/O Model相比较Signal-Driven I/O Model的区别在于通知的时机不同:Asynchronous I/O Model在第一和第二阶段都完成时通过信号通知进程操作完成。

Asynchronous I/O Model

Asynchronous I/O Model对应java中AsynchronousSocketChannelAsynchronousServerSocketChannelAsynchronousFileChannel等api。

各个模型比较

[项目] 根据权限查询时避免角色切换遇到的坑

前情概要

1. 问题背景

使用多个角色查询列表时,会遇到两个维度的不同点:

  • 行维度:多个角色能够看到行的并集,sql需要多次查询取并集,之后还要去重分页排序
  • 列维度:如果不同角色可见列不同,计算出当前行能看到列的并集

举一个例子:

假设存在一个登录员工拥有两个角色:

  1. 长期激励负责人:能看到拥有长期激励的人(行维度),能看到基本信息和长期激励信息(列维度)
  2. 薪酬负责人:能看到低职级的人(行维度),能看到基本信息和薪酬信息(列维度)

那么,在列表中他能看见:

基本信息 薪酬信息 长期激励信息
低职级/无长期激励 x
低职级/长期激励
高职级/无长期激励 x x x
高职级/长期激励 x

2. 实际遇到的问题(困难重重)

基本思路已经在前期概要里介绍,本人已经实践了一段时间,挖了两个深坑正在解决中。

性能问题(已解决)

最开始的实现中数据是一条一条读取的,同时薪酬字段属于加密信息,使用了第三方微服务提供解密,读取字段多+解密字段多 导致了在百条分页的情况下接口在超时的边缘不断试探。。。

解决方案:

  • 合并查询sql,批量查询数据
  • 合并解密请求,批量调用解密微服务

因为之前为了方便我们解密使用了mybatis的TypeHandler做到字段隐式加解密,目前我们的做法是对于单条数据的加解密,还是保持原来的typeHandler做法,而对批量数据处理,重新写一套数据实体,同时使用mybatis的拦截器对查询的批量数据做批量解密的处理。具体做法可以参见我的另一片文章:【片段】 Mybatis ResultSetHandler 实践-续

批量查询带来的问题

批量查询返回的列表中列字段都是一致的,而我们的需求是不同的行能看见不同的列字段,把批量查询出来的列表直接返回是有问题的,这个问题因为疏忽导致了线上的一次故障。

所以目前的思路是先做一次数据批量预取,之后在对列字段做处理,隐藏掉不能看见的字段。

3. 总结

没有想到当时想解决权限查询时避免角色切换这个问题时会遇到这么多困难,想法是正确的,在实际执行时还是困难重重。值得欣慰的在最开始的时候思路和方向都是正确的,同时也把其中遇到的各种问题和心得记录了下来,经过层层积累,才到达现在的高度。

[]: https://blog.yamato.moe/2018/11/06/2018-11-06-biz/ “根据权限查询时避免角色切换的一种思路”
[]: https://blog.yamato.moe/2019/04/04/Mybatis%20ResultSetHandler_2019-04-04%20%E7%BB%AD/ “【片段】 Mybatis ResultSetHandler 实践-续”
[]: https://blog.yamato.moe/2019/01/09/Mybatis%20ResultSetHandler_2019-01-09/ “【片段】 Mybatis ResultSetHandler 实践”

resilience4j-retry源码阅读

resilience4j 源码还是比较清晰简单的,比较适合阅读。

放一张主要类的结构图:

Retry入口

Retry接口是提供重试功能的入口,主要提供了方法模版,具体校验结构,失败后处理由Context子类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Creates a retryable supplier.
*
* @param retry the retry context
* @param supplier the original function
* @param <T> the type of results supplied by this supplier
* @return a retryable function
*/
static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier) {
return () -> {
Retry.Context<T> context = retry.context();
do try {
T result = supplier.get();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
} while (true);
};
}

这里摘抄了一段核心代码,作用是循环直到context.onResult(result)返回true为止,需要留意context.onResult/onRuntimeError/onError可能执行多次, onSuccess只会执行一次,这里每次进入重试都是一个新的context对象。

Retry.ContextImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean onResult(T result) {
if (null != resultPredicate && resultPredicate.test(result)) {
int currentNumOfAttempts = numOfAttempts.incrementAndGet();
if (currentNumOfAttempts >= maxAttempts) {
return false;
} else {
waitIntervalAfterFailure(currentNumOfAttempts, null);
return true;
}
}
return false;
}

public void onRuntimeError(RuntimeException runtimeException) {
if (exceptionPredicate.test(runtimeException)) {
lastRuntimeException.set(runtimeException);
throwOrSleepAfterRuntimeException();
} else {
failedWithoutRetryCounter.increment();
publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), runtimeException));
throw runtimeException;
}
}

先关注onResult,它负责判断是否需要继续重试,如果通过校验或者重试超过此数,会停止重试。

onRuntimeError/onError, 负责把catch的异常存储在lastRuntimeException中。

1
2
3
4
5
6
7
8
9
10
public void onSuccess() {
int currentNumOfAttempts = numOfAttempts.get();
if (currentNumOfAttempts > 0) {
succeededAfterRetryCounter.increment();
Throwable throwable = Option.of(lastException.get()).getOrElse(lastRuntimeException.get());
publishRetryEvent(() -> new RetryOnSuccessEvent(getName(), currentNumOfAttempts, throwable));
} else {
succeededWithoutRetryCounter.increment();
}
}

onSuccess负责统计和发送事件。

总结

总体来说retry比较简单,需要注意的点有一个如果设置了结果校验,如果一直校验不通过,将返回未通过的结果,而不是返回失败。

[片段] Mybatis ResultSetHandler实践-续

这次拦截的方法是handleResultSets(Statement stmt),用来批量解密用@Encrypted注解的String字段。

上次的局限是只能批量解密一个对象的所有加密字段,对批量数据来说稍显不足,这个主要改进了这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());

final List<Object> multipleResults = new ArrayList<Object>();

int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt);

List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}

String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}

return collapseSingleResultList(multipleResults);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package app.pooi.common.encrypt;


import app.pooi.common.encrypt.anno.CipherSpi;
import app.pooi.common.encrypt.anno.Encrypted;
import lombok.Getter;
import org.apache.ibatis.executor.resultset.ResultSetHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;

import java.lang.reflect.Field;
import java.sql.Statement;
import java.util.*;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;


@Intercepts({
@Signature(type = ResultSetHandler.class, method = "handleResultSets", args = {Statement.class}),
})
public class DecryptInterceptor implements Interceptor {

private static final Logger logger = Logger.getLogger(DecryptInterceptor.class.getName());

private CipherSpi cipherSpi;

public DecryptInterceptor(CipherSpi cipherSpi) {
this.cipherSpi = cipherSpi;
}

@Override
public Object intercept(Invocation invocation) throws Throwable {

final Object proceed = invocation.proceed();

if (proceed == null) {
return proceed;
}

List<?> results = (List<?>) proceed;

if (results.isEmpty()) {
return proceed;
}

final Object first = results.iterator().next();

final Class<?> modelClazz = first.getClass();

final List<String> decryptFields = getDecryptFields(modelClazz);

if (decryptFields.isEmpty()) {
return proceed;
}

final List<List<String>> secret = Flux.fromIterable(results)
.map(SystemMetaObject::forObject)
.flatMapIterable(mo -> decryptFields.stream().map(mo::getValue).collect(Collectors.toList()))
.cast(String.class)
.buffer(1000)
.collectList()
.block();

final Map<String, String> secretMap = secret.stream()
.map(secrets -> {
try {
return cipherSpi.batchDecrypt(secrets);
} catch (Exception e) {
e.printStackTrace();
return Maps.<String, String>newHashMap();
}
}).reduce(Maps.newHashMap(), (m1, m2) -> {
m1.putAll(m2);
return m1;
});

secretMap.put("", "0");

for (Object r : results) {
final MetaObject metaObject = SystemMetaObject.forObject(r);
decryptFields.forEach(f -> metaObject.setValue(f, secretMap.get(metaObject.getValue(f))));
}

return results;
}

@NotNull
private List<String> getDecryptFields(Class<?> modelClazz) {
return Arrays.stream(modelClazz.getDeclaredFields())
.filter(f -> f.getAnnotation(Decrypted.class) != null)
.filter(f -> {
boolean isString = f.getType() == String.class;
if (!isString) {
logger.warning(f.getName() + "is not String, actual type is " + f.getType().getSimpleName() + " ignored");
}
return isString;
})
.map(Field::getName)
.collect(Collectors.toList());
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

@Getter
class Tuple2<T1, T2> {

private final T1 t1;

private final T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}

[片段] SpringBoot Mybatis配置

纯记录,供自己参考🤣。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
private final MybatisProperties properties;

private final Interceptor[] interceptors;

private final ResourceLoader resourceLoader;

private final DatabaseIdProvider databaseIdProvider;

private final List<ConfigurationCustomizer> configurationCustomizers;

public DataSourceConfig(MybatisProperties properties,
ObjectProvider<Interceptor[]> interceptorsProvider,
ResourceLoader resourceLoader,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider) {
this.properties = properties;
this.interceptors = interceptorsProvider.getIfAvailable();
this.resourceLoader = resourceLoader;
this.databaseIdProvider = databaseIdProvider.getIfAvailable();
this.configurationCustomizers = configurationCustomizersProvider.getIfAvailable();
}


/**
* 普通数据源
* 主数据源,必须配置,spring启动时会执行初始化数据操作(无论是否真的需要),选择查找DataSource class类型的数据源
*
* @return {@link DataSource}
*/
@Primary
@Bean(name = BEANNAME_DATASOURCE_COMMON)
@ConfigurationProperties(prefix = "com.lianjia.confucius.bridge.boot.datasource.common")
public DataSource createDataSourceCommon() {
return DataSourceBuilder.create().build();
}

/**
* 只读数据源
*
* @return {@link DataSource}
*/
@Bean(name = BEANNAME_DATASOURCE_READONLY)
@ConfigurationProperties(prefix = "com.lianjia.confucius.bridge.boot.datasource.readonly")
public DataSource createDataSourceReadonly() {
return DataSourceBuilder.create().build();
}

private SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
factory.setDataSource(dataSource);
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation()));
}
org.apache.ibatis.session.Configuration configuration = this.properties.getConfiguration();
if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation())) {
configuration = new org.apache.ibatis.session.Configuration();
}
if (configuration != null && !CollectionUtils.isEmpty(this.configurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.configurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration);
if (this.properties.getConfigurationProperties() != null) {
factory.setConfigurationProperties(this.properties.getConfigurationProperties());
}
if (!ObjectUtils.isEmpty(this.interceptors)) {
factory.setPlugins(this.interceptors);
}
if (this.databaseIdProvider != null) {
factory.setDatabaseIdProvider(this.databaseIdProvider);
}
if (StringUtils.hasLength(this.properties.getTypeAliasesPackage())) {
factory.setTypeAliasesPackage(this.properties.getTypeAliasesPackage());
}
if (StringUtils.hasLength(this.properties.getTypeHandlersPackage())) {
factory.setTypeHandlersPackage(this.properties.getTypeHandlersPackage());
}
if (!ObjectUtils.isEmpty(this.properties.resolveMapperLocations())) {
factory.setMapperLocations(this.properties.resolveMapperLocations());
}

return factory.getObject();
}

public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
ExecutorType executorType = this.properties.getExecutorType();
if (executorType != null) {
return new SqlSessionTemplate(sqlSessionFactory, executorType);
} else {
return new SqlSessionTemplate(sqlSessionFactory);
}
}

@Bean
@Primary
public SqlSessionFactory primarySqlSessionFactory() throws Exception {
return this.sqlSessionFactory(this.createDataSourceCommon());
}

@Bean
public SqlSessionFactory secondarySqlSessionFactory() throws Exception {
return this.sqlSessionFactory(this.createDataSourceReadonly());
}

/**
* 实例普通的 sqlSession
*
* @return SqlSession
* @throws Exception when any exception occured
*/
@Bean(name = BEANNAME_SQLSESSION_COMMON)
public SqlSession initSqlSessionCommon() throws Exception {
return this.sqlSessionTemplate(this.primarySqlSessionFactory());
}

/**
* 实例只读的 sqlSession
*
* @return SqlSession
* @throws Exception when any exception occured
*/
@Bean(name = BEANNAME_SQLSESSION_READONLY)
public SqlSession initSqlSessionReadonly() throws Exception {
return this.sqlSessionTemplate(this.secondarySqlSessionFactory());
}


@MapperScan(annotationClass = PrimaryMapper.class,
sqlSessionTemplateRef = BEANNAME_SQLSESSION_COMMON,
basePackageClasses = ITalentApplicationSpringBootStart.class)
static class PrimaryMapperConfiguration {
}

@MapperScan(annotationClass = SecondaryMapper.class,
sqlSessionTemplateRef = BEANNAME_SQLSESSION_READONLY,
basePackageClasses = ITalentApplicationSpringBootStart.class)
static class SecondaryMapperConfiguration {
}

[片段] 使用redis创建简易搜索引擎(核心篇)

支持and查询、多选、多字段排序分页,缺少的功能:or 条件

核心类,有一些测试代码,将就一下。另外需要spring-data-redis 2.0版本以上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
package app.pooi.redissearch.search;

import app.pooi.redissearch.search.anno.CreateIndex;
import app.pooi.redissearch.search.anno.Field;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.Data;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.hash.Jackson2HashMapper;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static app.pooi.redissearch.search.SearchCore.Util.*;

@RestController
@Service
public class SearchCore {

private StringRedisTemplate redisTemplate;

private Jackson2HashMapper hashMapper = new Jackson2HashMapper(true);

@Data
private static class Person {
private Long id;
private String name;
private Integer age;
private Long ctime;
}

@PostMapping("/person")
@CreateIndex(
index = "person",
documentId = "#p0.id",
fields = {
@Field(propertyName = "name", value = "#p0.name"),
@Field(propertyName = "age", value = "#p0.age", sort = true),
@Field(propertyName = "ctime", value = "#p0.ctime", sort = true)
})
Person addPerson(Person person) {
return person;
}

public SearchCore(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}


public void indexMeta(String index, Map<String, FieldMeta> fieldMeta) {
this.redisTemplate.opsForHash().putAll(genIdxMetaName(index), hashMapper.toHash(fieldMeta));
}

@PostMapping("/index")
public int indexDocument(
final String index,
final String field,
final String documentId,
final String document) {
return this.indexDocument(index, field, documentId, document, doc -> Lists.newArrayList(doc.split("")));
}

public int indexDocument(
final String index,
final String field,
final String documentId,
final String document,
final Function<String, List<String>> tokenizer) {

final List<String> tokens = tokenizer != null ?
tokenizer.apply(document) :
Collections.singletonList(document);

final String docKey = genDocIdxName(index, documentId);

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;

final String[] idxs = tokens.stream()
.map(word -> genIdxName(index, field, word))
.peek(idx -> ((StringRedisTemplate) operations).opsForSet().add(idx, documentId))
.toArray(String[]::new);

template.opsForSet().add(docKey, idxs);
return null;
}
});
return results.size();
}

public int indexSortField(
final String index,
final String field,
final String documentId,
final Double document) {

final String docKey = genDocIdxName(index, documentId);

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;
final String idxName = genSortIdxName(index, field);
template.opsForZSet().add(idxName, documentId, document);
template.opsForSet().add(docKey, idxName);
return null;
}
});
return results.size();
}

@DeleteMapping("/index")
public int deleteDocumentIndex(final String index, final String documentId) {
final String docKey = genDocIdxName(index, documentId);
final Boolean hasKey = redisTemplate.hasKey(docKey);
if (!hasKey) {
return 0;
}

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final Set<String> idx = redisTemplate.opsForSet().members(docKey);
((StringRedisTemplate) operations).delete(idx);
((StringRedisTemplate) operations).delete(docKey);
return null;
}
});
return results.size();
}

@PatchMapping("/index")
public int updateDocumentIndex(final String index, final String field, final String documentId, final String document) {
this.deleteDocumentIndex(index, documentId);
return this.indexDocument(index, field, documentId, document);
}

public int updateSortField(final String index, final String field, final String documentId, final Double document) {
this.deleteDocumentIndex(index, documentId);
return this.indexSortField(index, field, documentId, document);
}

private Consumer<SetOperations<String, String>> operateAndStore(String method, String key, Collection<String> keys, String destKey) {
switch (method) {
case "intersectAndStore":
return (so) -> so.intersectAndStore(key, keys, destKey);
case "unionAndStore":
return (so) -> so.unionAndStore(key, keys, destKey);
case "differenceAndStore":
return (so) -> so.differenceAndStore(key, keys, destKey);
default:
return so -> {
};
}
}

private Consumer<ZSetOperations<String, String>> zOperateAndStore(String method, String key, Collection<String> keys, String destKey, final RedisZSetCommands.Weights weights) {
switch (method) {
case "intersectAndStore":
return (so) -> so.intersectAndStore(key, keys, destKey, RedisZSetCommands.Aggregate.SUM, weights);
case "unionAndStore":
return (so) -> so.unionAndStore(key, keys, destKey, RedisZSetCommands.Aggregate.SUM, weights);
default:
return so -> {
};
}
}

private String common(String index, String method, List<String> keys, long ttl) {
final String destKey = Util.genQueryIdxName(index);

redisTemplate.executePipelined(new SessionCallback<String>() {
@Override
public <K, V> String execute(RedisOperations<K, V> operations) throws DataAccessException {
operateAndStore(method,
keys.stream().limit(1L).findFirst().get(),
keys.stream().skip(1L).collect(Collectors.toList()),
destKey)
.accept(((StringRedisTemplate) operations).opsForSet());
((StringRedisTemplate) operations).expire(destKey, ttl, TimeUnit.SECONDS);
return null;
}
});
return destKey;
}

public String intersect(String index, List<String> keys, long ttl) {
return common(index, "intersectAndStore", keys, ttl);
}

public String union(String index, List<String> keys, long ttl) {
return common(index, "unionAndStore", keys, ttl);
}

public String diff(String index, List<String> keys, long ttl) {
return common(index, "differenceAndStore", keys, ttl);
}

private static Tuple2<Set<Tuple2<String, String>>, Set<Tuple2<String, String>>> parse(String query) {

final Pattern pattern = Pattern.compile("[+-]?([\\w\\d]+):(\\S+)");

final Matcher matcher = pattern.matcher(query);

Set<Tuple2<String, String>> unwant = Sets.newHashSet();
Set<Tuple2<String, String>> want = Sets.newHashSet();

while (matcher.find()) {
String word = matcher.group();

String prefix = null;
if (word.length() > 1) {
prefix = word.substring(0, 1);
}

final Tuple2<String, String> t = Tuples.of(matcher.group(1), matcher.group(2));
if ("-".equals(prefix)) {
unwant.add(t);
} else {
want.add(t);
}
}
return Tuples.of(want, unwant);
}


public String query(
String index,
String query) {

final Tuple2<Set<Tuple2<String, String>>, Set<Tuple2<String, String>>> parseResult = parse(query);
final Set<Tuple2<String, String>> want = parseResult.getT1();
final Set<Tuple2<String, String>> unwant = parseResult.getT2();


if (want.isEmpty()) {
return "";
}

final Map<String, FieldMeta> entries = (Map<String, FieldMeta>) hashMapper.fromHash(redisTemplate.<String, Object>opsForHash().entries(genIdxMetaName(index)));

// union
final List<Tuple2<String, String>> unionFields = want.stream()
.filter(w -> w.getT2().contains(","))
.filter(w -> "true".equals(entries.get(w.getT1()).getSort()))
.collect(Collectors.toList());
final List<String> unionIdx = unionFields.stream()
.flatMap(w -> Arrays.stream(w.getT2().split(",")).map(value -> Tuples.of(w.getT1(), value)))
.map(w -> genIdxName(index, w.getT1(), w.getT2()))
.collect(Collectors.toList());

final String unionResultId = unionIdx.isEmpty() ? "" : this.union(index, unionIdx, 30L);

want.removeAll(unionFields);

// intersect
final List<String> intersectIdx = want.stream()
.flatMap(t -> {
if ("true".equals(entries.get(t.getT1()).getSort()))
return Stream.of(t);
return Arrays.stream(t.getT2().split("")).map(value -> Tuples.of(t.getT1(), value));
})
.map(w -> genIdxName(index, w.getT1(), w.getT2()))
.collect(Collectors.toList());

if (!unionResultId.isEmpty())
intersectIdx.add(unionResultId);

String intersectResult = this.intersect(index, intersectIdx, 30L);

// diff
return unwant.isEmpty() ?
intersectResult :
this.diff(index, Stream.concat(Stream.of(intersectResult), unwant.stream().map(w -> genIdxName(index, w.getT1(), w.getT2()))).collect(Collectors.toList()), 30L);
}

@GetMapping("/query/{index}")
public Set<String> queryAndSort(
@PathVariable("index") String index,
@RequestParam("param") String query,
@RequestParam("sort") String sort,
Integer start,
Integer stop
) {
final String[] sorts = sort.split(" ");

final Map<String, Integer> map = Arrays.stream(sorts).collect(
Collectors.toMap(f -> {
if (f.startsWith("+") || f.startsWith("-")) {
f = f.substring(1);
}
return genSortIdxName("person", f);
}, field -> field.startsWith("-") ? -1 : 1)
);

final int[] weights = map.values()
.stream()
.mapToInt(Integer::intValue)
.toArray();


// if (!sort.startsWith("+") && !sort.startsWith("-")) {
// sort = "+" + sort;
// }
// boolean desc = sort.startsWith("-");
// sort = sort.substring(1);

String queryId = this.query(index, query);
Long size;
if (queryId.length() == 0 || (size = redisTemplate.opsForSet().size(queryId)) == null || size == 0) {
return Collections.emptySet();
}

final String resultId = genQueryIdxName(index);

// String sortField = sort;

redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;

// template.opsForZSet().intersectAndStore(genSortIdxName(index, sortField), queryId, resultId);

SearchCore.this.zOperateAndStore("intersectAndStore",
map.keySet().stream().limit(1L).findFirst().get(),
Stream.concat(map.keySet().stream().skip(1L), Stream.of(queryId)).collect(Collectors.toList()),
resultId, RedisZSetCommands.Weights.of(ArrayUtils.add(weights, 0))).accept(template.opsForZSet());

// template.opsForZSet().size(resultId);
template.expire(resultId, 30L, TimeUnit.SECONDS);

return null;
}
});

// sort
return redisTemplate.opsForZSet().range(resultId, start, stop);

}

static class Util {

private Util() {
}

static String genIdxMetaName(String index) {
return String.format("meta:idx:%s", index);
}

static String genIdxName(String index, String field, String value) {
return String.format("idx:%s:%s:%s", index, field, value);
}

static String genSortIdxName(String index, String field) {
return String.format("idx:%s:%s", index, field);
}

static String genQueryIdxName(String index) {
return String.format("idx:%s:q:%s", index, UUID.randomUUID().toString());
}

static String genDocIdxName(String index, String documentId) {
return String.format("doc:%s:%s", index, documentId);
}
}
}

辅助类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import lombok.Data;


@Data
public class FieldMeta {

private String sort = "false";

private String splitFun = "";

public FieldMeta() {

}

public FieldMeta(boolean sort) {
this.sort = Boolean.toString(sort);
}
}

做一个轻量级的搜索还是可以的。

[片段] 使用TypeToken在运行期保存泛型信息

一般来说可以使用getGenericSuperclass 获取子类范型信息,但是泛型有嵌套的话想获取完整信息还是有点复杂的。例如:Message<List> 有两个泛型信息。

guava中有强大的TypeToken帮助你保存复杂泛型信息,可以参考:

1
2
3
ParameterizedTypeReference<Message<T>> responseTypeRef = 
ParameterizedTypeReferenceBuilder.fromTypeToken(
new TypeToken<Message<T>>() {}.where(new TypeParameter<T>() {}, new TypeToken<List<OrgSugVOV1>>() {}));

如果需要在spring框架中使用,需要一个适配器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ParameterizedTypeReferenceBuilder {

public static <T> ParameterizedTypeReference<T> fromTypeToken(TypeToken<T> typeToken) {
return new TypeTokenParameterizedTypeReference<>(typeToken);
}

private static class TypeTokenParameterizedTypeReference<T> extends ParameterizedTypeReference<T> {

private final Type type;

private TypeTokenParameterizedTypeReference(TypeToken<T> typeToken) {
this.type = typeToken.getType();
}

@Override
public Type getType() {
return type;
}

@Override
public boolean equals(Object obj) {
return (this == obj || (obj instanceof ParameterizedTypeReference &&
this.type.equals(((ParameterizedTypeReference<?>) obj).getType())));
}

@Override
public int hashCode() {
return this.type.hashCode();
}

@Override
public String toString() {
return "ParameterizedTypeReference<" + this.type + ">";
}
}
}

关于java的泛型我就不多做吐槽了。

[片段] @CreatedBy / @ModifiedBy 拦截器实现

拦截器实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package app.pooi.common.entity;

import app.pooi.common.entity.anno.CreatedBy;
import app.pooi.common.entity.anno.ModifiedBy;
import lombok.Data;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.Intercepts;
import org.apache.ibatis.plugin.Invocation;
import org.apache.ibatis.plugin.Plugin;
import org.apache.ibatis.plugin.Signature;

import java.util.Arrays;
import java.util.Properties;
import java.util.function.Supplier;

@Data
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
})
public class EntityInterceptor implements org.apache.ibatis.plugin.Interceptor {

private Supplier<Long> auditorAware;

@Override
public Object intercept(Invocation invocation) throws Throwable {

Executor executor = (Executor) invocation.getTarget();

MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
Object o = invocation.getArgs()[1];

Arrays.stream(o.getClass().getDeclaredFields())
.forEach(field -> {
final CreatedBy createdBy = field.getAnnotation(CreatedBy.class);
final ModifiedBy modifiedBy = field.getAnnotation(ModifiedBy.class);

if (createdBy != null || modifiedBy != null) {
field.setAccessible(true);
try {
field.set(o, auditorAware.get());
} catch (IllegalAccessException ignore) {
}
}
});

return invocation.proceed();
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
static class MybatisInterceptorConfig {

@Bean
public Interceptor[] configurationCustomizer(CipherSpi cipherSpi) {
final EntityInterceptor entityInterceptor = new EntityInterceptor();

entityInterceptor.setAuditorAware(() -> {
final String header = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest().getHeader(XHeaders.LOGIN_USER_ID);
return Long.valueOf(header);
});
return new Interceptor[]{new DecryptInterceptor(cipherSpi), entityInterceptor};
}
}

[片段] Java收集方法参数+Spring DataBinder

收集参数

目前是使用了spring aop 来拦截方法调用,把方法参数包装成Map形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CollectArguments {
}

@Aspect
public class ArgumentsCollector {

private static final ThreadLocal<Map<String, Object>> ARGUMENTS = ThreadLocal.withInitial(ImmutableMap::of);

static Map<String, Object> getArgs() {
return ARGUMENTS.get();
}

private Object[] args(Object[] args, int exceptLength) {
if (exceptLength == args.length) {
return args;
}

return Arrays.copyOf(args, exceptLength);
}

@Pointcut("@annotation(CollectArguments)")
void collectArgumentsAnnotationPointCut() {
}

@Before("collectArgumentsAnnotationPointCut()")
public void doAccessCheck(JoinPoint joinPoint) {
final String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
final Object[] args = args(joinPoint.getArgs(), parameterNames.length);

ARGUMENTS.set(Collections.unmodifiableMap((IntStream.range(0, parameterNames.length - 1)
.mapToObj(idx -> Tuple2.of(parameterNames[idx], args[idx]))
.collect(HashMap::new, (m, t) -> m.put(t.getT1(), t.getT2()), HashMap::putAll))));
}

@After("collectArgumentsAnnotationPointCut()")
public void remove() {
ARGUMENTS.remove();
}

@Data
private static class Tuple2<T1, T2> {

private T1 t1;
private T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

public static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}
}

通过Map构造对象

1
2
3
4
5
6
7
8
9
10
11
12
public class BinderUtil {

BinderUtil() {
}

@SuppressWarnings("unchecked")
public static <T> T getTarget(Class<T> beanClazz) {
final DataBinder binder = new DataBinder(BeanUtils.instantiate(beanClazz));
binder.bind(new MutablePropertyValues(ArgumentsCollector.getArgs()));
return (T) binder.getTarget();
}
}

[片段] Mybatis ResultSetHandler实践

这次拦截的方法是handleResultSets(Statement stmt),用来批量解密用@Encrypted注解的String字段,可能还有一些坑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());

final List<Object> multipleResults = new ArrayList<Object>();

int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt);

List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}

String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}

return collapseSingleResultList(multipleResults);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package app.pooi.common.encrypt;


import app.pooi.common.encrypt.anno.CipherSpi;
import app.pooi.common.encrypt.anno.Encrypted;
import lombok.Getter;
import org.apache.ibatis.executor.resultset.ResultSetHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;

import java.lang.reflect.Field;
import java.sql.Statement;
import java.util.*;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;

@Intercepts({
@Signature(type = ResultSetHandler.class, method = "handleResultSets", args = {Statement.class}),
})
public class EncryptInterceptor implements Interceptor {

private static final Logger logger = Logger.getLogger(EncryptInterceptor.class.getName());

private CipherSpi cipherSpi;

public EncryptInterceptor(CipherSpi cipherSpi) {
this.cipherSpi = cipherSpi;
}

@Override
public Object intercept(Invocation invocation) throws Throwable {

final Object proceed = invocation.proceed();

if (proceed == null) {
return proceed;
}

List<?> results = (List<?>) proceed;

if (results.isEmpty()) {
return proceed;
}

final Object first = results.iterator().next();

final Class<?> modelClazz = first.getClass();

final List<String> fieldsNeedDecrypt = Arrays.stream(modelClazz.getDeclaredFields())
.filter(f -> f.getAnnotation(Encrypted.class) != null)
.filter(f -> {
boolean isString = f.getType() == String.class;
if (!isString) {
logger.warning(f.getName() + "is not String, actual type is " + f.getType().getSimpleName() + " ignored");
}
return isString;
})
.map(Field::getName)
.collect(Collectors.toList());

final List<List<String>> partition = partition(fieldsNeedDecrypt, 20);

for (Object r : results) {
final MetaObject metaObject = SystemMetaObject.forObject(r);

for (List<String> fields : partition) {
final Map<String, String> fieldValueMap = fields.stream().collect(Collectors.toMap(Function.identity(), f -> (String) metaObject.getValue(f)));
final ArrayList<String> values = new ArrayList<>(fieldValueMap.values());
Map<String, String> decryptValues = cipherSpi.decrypt(values);

fieldValueMap.entrySet()
.stream()
.map(e -> Tuple2.of(e.getKey(), decryptValues.getOrDefault(e.getValue(), "")))
.forEach(e -> metaObject.setValue(e.getT1(), e.getT2()));
}
}

return results;
}

private <T> List<List<T>> partition(List<T> list, int batchCount) {
if (!(batchCount > 0)) {
throw new IllegalArgumentException("batch count must greater than zero");
}

List<List<T>> partitionList = new ArrayList<>(list.size() / (batchCount + 1));

for (int i = 0; i < list.size(); i += batchCount) {
partitionList.add(list.stream().skip(i).limit(batchCount).collect(Collectors.toList()));

}
return partitionList;
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

@Getter
class Tuple2<T1, T2> {

private final T1 t1;

private final T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}

AbstractQueuedSynchronizer解析

AbstractQueuedSynchronizer 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 	/** 
     * Head of the wait queue , lazily initialized . Except for 
     * initialization , it is modified only via method setHead .   Note : 
     * If head exists , its waitStatus is guaranteed not to be 
     * CANCELLED . 
     */ 
    private transient volatile Node head; 

    /** 
     * Tail of the wait queue , lazily initialized . Modified only via 
     * method enq to add new wait node . 
     */ 
    private transient volatile Node tail; 

    /** 
     * The synchronization state . 
     */ 
    private volatile int state;

稍微注意下在线程争用锁是才会初始化链表

AbstractQueuedSynchronizer.Node 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/** 
* Status field , taking on only the values : 
*    SIGNAL :      The successor of this node is ( or will soon be ) 
*                blocked ( via park ), so the current node must 
*                unpark its successor when it releases or 
*                cancels . To avoid races , acquire methods must 
*                first indicate they need a signal , 
*                then retry the atomic acquire , and then , 
*                on failure , block . 
*    CANCELLED :   This node is cancelled due to timeout or interrupt . 
*                Nodes never leave this state . In particular , 
*                a thread with cancelled node never again blocks . 
*    CONDITION :   This node is currently on a condition queue . 
*                It will not be used as a sync queue node 
*                until transferred , at which time the status 
*                will be set to 0. ( Use of this value here has 
*                nothing to do with the other uses of the 
*                field , but simplifies mechanics .) 
*    PROPAGATE :   A releaseShared should be propagated to other 
*                nodes . This is set ( for head node only ) in 
*                doReleaseShared to ensure propagation 
*                continues , even if other operations have 
*                since intervened . 
*    0:           None of the above 

* The values are arranged numerically to simplify use . 
* Non - negative values mean that a node doesn ' t need to 
* signal . So , most code doesn ' t need to check for particular 
* values , just for sign . 

* The field is initialized to 0 for normal sync nodes , and 
* CONDITION for condition nodes .   It is modified using CAS 
* ( or when possible , unconditional volatile writes ). 
*/ 
volatile int waitStatus; 

/** 
* Link to predecessor node that current node / thread relies on 
* for checking waitStatus . Assigned during enqueuing , and nulled 
* out ( for sake of GC ) only upon dequeuing .   Also , upon 
* cancellation of a predecessor , we short - circuit while 
* finding a non - cancelled one , which will always exist 
* because the head node is never cancelled : A node becomes 
* head only as a result of successful acquire . A 
* cancelled thread never succeeds in acquiring , and a thread only 
* cancels itself , not any other node . 
*/ 
volatile Node prev; 

/** 
* Link to the successor node that the current node / thread 
* unparks upon release . Assigned during enqueuing , adjusted 
* when bypassing cancelled predecessors , and nulled out ( for 
* sake of GC ) when dequeued .   The enq operation does not 
* assign next field of a predecessor until after attachment , 
* so seeing a null next field does not necessarily mean that 
* node is at end of queue . However , if a next field appears 
* to be null , we can scan prev ' s from the tail to 
* double - check .   The next field of cancelled nodes is set to 
* point to the node itself instead of null , to make life 
* easier for isOnSyncQueue . 
*/ 
volatile Node next; 

/** 
* The thread that enqueued this node . Initialized on 
* construction and nulled out after use . 
*/ 
volatile Thread thread; 

/** 
* Link to next node waiting on condition , or the special 
* value SHARED . Because condition queues are accessed only 
* when holding in exclusive mode , we just need a simple 
* linked queue to hold nodes while they are waiting on 
* conditions . They are then transferred to the queue to 
* re - acquire . And because conditions can only be exclusive , 
* we save a field by using special value to indicate shared 
* mode . 
*/ 
Node nextWaiter;

AbstractQueuedSynchronizer** 的数据结构(盗用的图)

AbstractQueuedSynchronizer 做了什么 ?

内部维护state和CLH队列,负责在资源争用时线程入队,资源释放时唤醒队列中线程。

而实现类只需要实现 什么条件获取资源成功什么条件释放资源 成功就可以了

所以,最简单的CountDownLatch使用AbstractQueuedSynchronizer实现非常简单:

  •          申明AbstractQueuedSynchronizer的state数量(比如十个)
    
  •          await方法尝试获取资源,如果state>0表示获取失败( **什么条件获取资源成功** ,CountDownLatch实现),获取失败线程休眠(AbstractQueuedSynchronizer负责)
    
  •         countDown方法state-1,如果state==0表示资源释放成功( **什么条件释放资源成功** ,CountDownLatch实现),唤醒队列中所有线程(AbstractQueuedSynchronizer负责)
    

AbstractQueuedSynchronizer 怎么做的?

顺着ReentrantLock lock、unlock看一遍我们就大致总结出AbstractQueuedSynchronizer工作原理了

先简单介绍下ReentrantLock特性:可重入,中断,有超时机制。

ReentrantLock lock() 流程 ( 再盗图 )

黄色表示ReentrantLock实现,绿色表示AbstractQueuedSynchronizer内部实现

  1. lock方法入口 直接调用 AbstractQueuedSynchronizer.acquire方法
  2. tryAcquire
  3. addWaiter
  4. acquireQueued
AbstractQueuedSynchronizer.acquire
1
2
3
4
5
**public** final void acquire ( int arg) { 
**if** (! tryAcquire (arg) &&
acquireQueued ( addWaiter ( Node . EXCLUSIVE ), arg))
selfInterrupt ();
}

获取的锁的逻辑:直接获取成功则返回,如果没有获取成功入队休眠(对就是这么简单)

下面我们仔细一个一个方法看

ReentrantLock.tryAcquire

我这里贴的时非公平的所获取,公平和不公平的区别在于公平锁老老实实的会进入队列排队,非公平锁会先检查资源是否可用,如果可用不管队列中的情况直接尝试获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire ( int acquires) { 
final Thread current = Thread . currentThread ();
int c = getState ();
if (c == 0 ) {
if ( compareAndSetState ( 0 , acquires)) {
setExclusiveOwnerThread (current);
return true ;
}
}
else if (current == getExclusiveOwnerThread ()) {
int nextc = c + acquires;
if (nextc < 0 ) // overflow
throw new Error ( "Maximum lock count exceeded" );
setState (nextc);
return true ;
}
return false ;
}

ReentrantLock.tryAcquire读取到state==0时尝试占用锁,并保证同一线程可以重复占用。其他情况下获取资源失败。如果获取成功就没啥事了,不过关键不就是锁争用的时候是如何处理的吗?

AbstractQueuedSynchronizer.addWaiter(Node.EXCLUSIVE)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter ( Node mode) { 
Node node = new Node (mode);

for (;;) {
Node oldTail = tail;
if (oldTail != null ) {
node. setPrevRelaxed (oldTail);
if ( compareAndSetTail (oldTail, node)) {
oldTail. next = node;
return node;
}
} else {
initializeSyncQueue ();
}
}
}

一旦锁争用,一定会初始化队列(因为排队的线程需要前驱节点唤醒,所以要初始化一个前驱节点),之后自旋成为队列尾节点。

简单来说就是获取不到锁就放进队列里维护起来,等锁释放的时候再用。

这里还有一个 很具有参考性的小细节 :先设置新节点的前驱结点,自旋成为尾节点后设置前驱的后驱

AbstractQueuedSynchronizer.acquireQueued
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final boolean acquireQueued final Node node, int arg) 
    boolean interrupted = false ; 
    try { 
        for (;;) { 
            final Node p = node. predecessor (); 
            if (p == head && tryAcquire (arg)) { 
                setHead (node); 
                p. next = null ; // help GC 
                return interrupted; 
            } 
            if ( shouldParkAfterFailedAcquire (p, node)) 
                interrupted |= parkAndCheckInterrupt (); 
        } 
    } catch ( Throwable t) { 
        cancelAcquire (node); 
        if (interrupted) 
            selfInterrupt (); 
        throw t; 
    } 


private static boolean shouldParkAfterFailedAcquire ( Node pred, Node node)
    int ws = pred. waitStatus ; 
    if (ws == Node . SIGNAL ) 
        /* 
             * This node has already set status asking a release 
             * to signal it, so it can safely park. 
             */ 
        return true ; 
    if (ws > 0 ) { 
        /* 
             * Predecessor was cancelled. Skip over predecessors and 
             * indicate retry. 
             */ 
        do { 
            node. prev = pred = pred. prev ; 
        } while (pred. waitStatus > 0 ); 
        pred. next = node; 
    } else { 
        /* 
             * waitStatus must be 0 or PROPAGATE.  Indicate that we 
             * need a signal, but don't park yet.  Caller will need to 
             * retry to make sure it cannot acquire before parking. 
             */ 
        pred. compareAndSetWaitStatus (ws, Node . SIGNAL ); 
    } 
    return false ; 


private final boolean parkAndCheckInterrupt ()
        LockSupport . park ( this ); 
        return Thread . interrupted (); 
    }

前面只是维护下链表数据结构,这里负责找到合适的唤醒前驱,然后让线程休眠。

这里主要是一个循环过程:

  1. 检查是否能获取到锁,获取到则返回
  2. 失败则寻找前面最近的未放弃争用的前驱,把前驱的waitStatus设置为-1,并把放弃争用的节点抛弃
  3. 检查是否能休眠
  4. 使用Usafe.park休眠(不是wait)

ReentrantLock lock 总结

ReentrantLock unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public final boolean release int arg) 
    if ( tryRelease (arg)) { 
        Node h = head; 
        if (h != null && h. waitStatus != 0 ) 
            unparkSuccessor (h); 
        return true ; 
    } 
    return false ; 


protected final boolean tryRelease int releases)
    int c = getState () - releases; 
    if ( Thread . currentThread () != getExclusiveOwnerThread ()) 
        throw new IllegalMonitorStateException (); 
    boolean free = false ; 
    if (c == 0 ) { 
        free = true ; 
        setExclusiveOwnerThread ( null ); 
    } 
    setState (c); 
    return free; 


private void unparkSuccessor ( Node node)
    /* 
         * If status is negative (i.e., possibly needing signal) try 
         * to clear in anticipation of signalling.  It is OK if this 
         * fails or if status is changed by waiting thread. 
         */ 
    int ws = node. waitStatus ; 
    if (ws < 0 ) 
        node. compareAndSetWaitStatus (ws, 0 ); 

    /* 
         * Thread to unpark is held in successor, which is normally 
         * just the next node.  But if cancelled or apparently null, 
         * traverse backwards from tail to find the actual 
         * non-cancelled successor. 
         */ 
    Node s = node. next ; 
    if (s == null || s. waitStatus > 0 ) { 
        s = null ; 
        for ( Node p = tail; p != node && p != null ; p = p. prev ) 
            if (p. waitStatus <= 0 ) 
                s = p; 
    } 
    if (s != null ) 
        LockSupport . unpark (s. thread ); 
}

unlock的代码特别简单:

  1. 每unlock一次state-1
  2. state == 0 时资源成功释放
  3. 如果释放成功,唤醒第二个节点
  4. 如果第二个节点没引用或者放弃争用,从队尾开始寻找可以唤醒的线程
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×