Eureka服务发现机制

Eureka服务发现机制

Eureka服务发现机制

1568279922476.png

注册: 服务提供者启动后发送注册请求到Eureka-Server,默认注册信息生效时长为30秒。

续约: 服务提供者定时发送心跳给Eureka-Service,默认30秒一次,以刷新注册信息过期时间。若服务提供方不能续约,eureka-server将会注销该微服务节点(默认三个心跳周期90s)

注销: 服务提供者停机时发送注销请求到Eureka-Server。

调用: 服务消费者定时从Eureka-Server拉取服务注册表,并刷新本地缓存,默认30秒一次。服务消费者的负载均衡器(如Ribbon)向Eureka-Client获取服务提供者信息后,即可向服务提供者发送HTTP请求。

Eureka缓存机制

1568280499563.png

读写缓存定时刷新信息到只读缓存。刷新间隔:eureka.responseCacheUpdateIntervalMs,默认为30秒。

是否启用只读缓存可配:eureka.shouldUseReadOnlyResponseCache,默认开启。

Client定时从Eureka-Server拉取注册表,刷新本地缓存。拉取频率:eureka.client.registry-fetch-interval-seconds,默认为30秒。

LoadBalancer定时同步Client里的服务列表。同步间隔:ribbon.ServerListRefreshInterval,默认为30秒。可优化为实时刷新

服务下线不主动通知,则依赖剔除任务清除过期数据的机制。相关参数:续约间隔:eureka.instance.lease-renewal-interval-in-seconds,默认为30秒;节点有效期:eureka.instance.lease-expiration-duration-in-seconds,默认为90秒;清理时间间隔:eureka.server.eviction-interval-timer-in-ms,默认为60秒。

Eureka缓存机制造成的问题

在Client未同步到服务提供方下线信息前,流量仍会请求到下线节点上,导致Client报错,影响上游服务稳定性

多级缓存造成的同步延迟:

eureka.responseCacheUpdateIntervalMs+eureka.client.registry-fetch-interval-seconds+ribbon.ServerListRefreshInterval

默认为90s

解决方案

  1. Server端 关闭eureka.shouldUseReadOnlyResponseCache 或 缩短eureka.responseCacheUpdateIntervalMs
  2. Client端 缩短eureka.client.registry-fetch-interval-seconds
  3. ribbon配置 优化为实时从Client获取
  4. 延迟关闭服务,等待未同步的Client同步完成
[片段] 方法参数收集

[片段] 方法参数收集

以前的代码,用于收集当前方法的所有参数,放在map中方便调取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import com.google.common.collect.ImmutableMap;
import lombok.Data;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;

@Aspect
@Component
public class ArgumentsCollector {

private static final ThreadLocal<Map<String, Object>> ARGUMENTS = ThreadLocal.withInitial(ImmutableMap::of);

static Map<String, Object> getArgs() {
return ARGUMENTS.get();
}

private Object[] args(Object[] args, int exceptLength) {
if (exceptLength == args.length) {
return args;
}

return Arrays.copyOf(args, exceptLength);
}

@Pointcut("@annotation(CollectArguments)")
void collectArgumentsAnnotationPointCut() {
}

@Before("collectArgumentsAnnotationPointCut()")
public void doAccessCheck(JoinPoint joinPoint) {
final String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
final Object[] args = args(joinPoint.getArgs(), parameterNames.length);

ARGUMENTS.set(Collections.unmodifiableMap((IntStream.range(0, parameterNames.length)
.mapToObj(idx -> Tuple2.of(parameterNames[idx], args[idx]))
.collect(HashMap::new, (m, t) -> m.put(t.getT1(), t.getT2()), HashMap::putAll))));
}

@After("collectArgumentsAnnotationPointCut()")
public void remove() {
ARGUMENTS.remove();
}

@Data
private static class Tuple2<T1, T2> {

private T1 t1;
private T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

public static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}
}

附送一段代码,用于将方法中收集的参数转换成Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.beans.BeanUtils;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.validation.DataBinder;

public class BinderUtil {

BinderUtil() {
}

@SuppressWarnings("unchecked")
public static <T> T getTarget(Class<T> beanClazz) {
final DataBinder binder = new DataBinder(BeanUtils.instantiate(beanClazz));
binder.bind(new MutablePropertyValues(ArgumentsCollector.getArgs()));
return (T) binder.getTarget();
}
}

使用实例:

1
2
3
4
5
6
7
8
9
10
@Override
@CollectArguments
public List<PsJobSequenceVO> findJobSequence(
String jobSeqGroupId,
String jobSeqId,
Integer state,
Date endDate
) {
return jobSequenceHandler.findJobSequence(BinderUtil.getTarget(PsJobSequenceFindRO.class)).getData();
}
[片段] Mybatis ParameterHandler实践

[片段] Mybatis ParameterHandler实践

用来批量加密用@Decrypted注解的String字段,可能还有一些坑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.ke.zhaopin.manage.server.config.mybatis.interceptor.anno.Decrypted;
import com.lianjia.ctt.kinko.spi.CipherSpi;
import com.sun.istack.internal.NotNull;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.binding.MapperMethod;
import org.apache.ibatis.executor.parameter.ParameterHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.scripting.defaults.DefaultParameterHandler;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.defaults.DefaultSqlSession;
import org.joor.Reflect;
import reactor.core.publisher.Flux;

import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;


@Intercepts({
@Signature(type = ParameterHandler.class, method = "setParameters", args = {PreparedStatement.class}),
})
@Slf4j
public class EncryptInterceptor implements Interceptor {

private static final String COLLECTION_KEY = "collection";
private static final String ARRAY_KEY = "array";

private final LoadingCache<Class, List<String>> decryptFieldCaches = CacheBuilder.newBuilder()
.maximumSize(200)
.expireAfterAccess(10L, TimeUnit.MINUTES)
.build(new CacheLoader<Class, List<String>>() {
@Override
public List<String> load(Class key) {
return Arrays.stream(key.getDeclaredFields())
.filter(f -> f.getAnnotation(Decrypted.class) != null)
.filter(f -> {
boolean isString = f.getType() == String.class;
if (!isString) {
log.warn(f.getName() + "is not String, actual type is " + f.getType().getSimpleName() + " ignored");
}
return isString;
})
.map(Field::getName)
.collect(Collectors.toList());
}
}
);

private CipherSpi cipherSpi;

public EncryptInterceptor(CipherSpi cipherSpi) {
this.cipherSpi = cipherSpi;
}

@Override
public Object intercept(Invocation invocation) throws Throwable {

Flux<CryptContext> contextFlux = Flux.empty();

do {
if (!(invocation.getTarget() instanceof DefaultParameterHandler)) break;

final Reflect parameterHandler = Reflect.on(invocation.getTarget());
final Object parameterObject = parameterHandler.get("parameterObject");
final Configuration configuration = parameterHandler.get("configuration");

if (parameterObject instanceof DefaultSqlSession.StrictMap) {
// 单个Collection/Map/Array参数
DefaultSqlSession.StrictMap<?> paramMap = (DefaultSqlSession.StrictMap<?>) parameterObject;

Collection<?> collection = null;
Class<?> componentType = null;
if (paramMap.containsKey(COLLECTION_KEY)) {
collection = (Collection<?>) paramMap.get(COLLECTION_KEY);
componentType = collection.iterator().next().getClass();
} else if (paramMap.containsKey(ARRAY_KEY)) {
Object[] array = (Object[]) paramMap.get(ARRAY_KEY);
componentType = array.getClass().getComponentType();
collection = Arrays.asList(array);
}

if (!isUserDefinedClass(componentType)) break;

contextFlux = collection(configuration, collection, componentType);

} else if (parameterObject instanceof MapperMethod.ParamMap) {
// 多个参数
MapperMethod.ParamMap<?> paramMap = (MapperMethod.ParamMap<?>) parameterObject;

final List<?> params = paramMap.values().stream().filter(Objects::nonNull).distinct().collect(Collectors.toList());

for (Object parameter : params) {
if (parameter instanceof Collection) {
Collection<?> collection = (Collection<?>) parameter;
if (collection.isEmpty()) {
continue;
}

Class<?> componentType = collection.iterator().next().getClass();
if (!isUserDefinedClass(componentType)) {
continue;
}
final Flux<CryptContext> collectionFlux = collection(configuration, collection, componentType);
contextFlux = contextFlux.concatWith(collectionFlux);

} else if (parameter.getClass().isArray()) {
if (Array.getLength(parameter) == 0) continue;
final Class<?> componentType = parameter.getClass().getComponentType();
if (!isUserDefinedClass(componentType)) {
continue;
}
Collection<?> collection = Arrays.asList((Object[]) parameter);

final Flux<CryptContext> collectionFlux = collection(configuration, collection, componentType);
contextFlux = contextFlux.concatWith(collectionFlux);

} else if (isUserDefinedClass(parameter.getClass())) {
final Flux<CryptContext> singleFlux = collection(configuration, Collections.singletonList(parameter), parameter.getClass());
contextFlux = contextFlux.concatWith(singleFlux);
}
}

} else if (isUserDefinedClass(parameterObject.getClass())) {
// 单个非Collection/Map/Array参数
contextFlux = collection(configuration, Collections.singletonList(parameterObject), parameterObject.getClass());
} else {
// 不是用interface的情况
}


} while (false);

final List<CryptContext> cryptContexts = encrypt(contextFlux);

invocation.proceed();

restore(cryptContexts);

return null;
}

private void restore(List<CryptContext> cryptContexts) {
for (CryptContext cryptContext : cryptContexts) {
cryptContext.metaObject.setValue(cryptContext.fieldName, cryptContext.value);
}
}

private Flux<CryptContext> collection(Configuration configuration, Collection<?> collection, Class<?> componentType) throws ExecutionException {
final List<String> fieldNames = this.getDecryptFields(componentType);

return Flux.fromIterable(collection)
.map(configuration::newMetaObject)
.flatMapIterable(metaObject -> fieldNames.stream().map(fieldName -> new CryptContext(metaObject, fieldName)).collect(Collectors.toList()));
}

private List<CryptContext> encrypt(Flux<CryptContext> contextFlux) {
return contextFlux
.filter(context -> StringUtils.isNotBlank(context.value))
.buffer(1000)
.doOnNext(contexts -> {
Map<String, String> secretMap = Collections.emptyMap();
try {
secretMap = cipherSpi.batchEncrypt(contexts.stream().map(CryptContext::getValue).distinct().collect(Collectors.toList()));
} catch (Exception e) {

}
for (CryptContext context : contexts) {
context.secret = secretMap.get(context.value);
}
})
.flatMapIterable(Function.identity())
.doOnNext(context -> context.metaObject.setValue(context.fieldName, context.secret))
.collectList()
.block();
}

@NotNull
private List<String> getDecryptFields(Class<?> modelClazz) throws ExecutionException {
return this.decryptFieldCaches.get(modelClazz);
}

private boolean isUserDefinedClass(Class<?> clazz) {
return !clazz.isPrimitive() && !clazz.getPackage().getName().startsWith("java");
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

@Getter
class CryptContext {

CryptContext(MetaObject metaObject, String fieldName) {
this.metaObject = metaObject;
this.fieldName = fieldName;
this.value = (String) metaObject.getValue(fieldName);
if (StringUtils.isBlank(value)) {
this.secret = StringUtils.EMPTY;
}
}

final MetaObject metaObject;

final String fieldName;

final String value;

String secret;
}

[片段] Mybatis ResultSetHandler实践-续

这次拦截的方法是handleResultSets(Statement stmt),用来批量解密用@Encrypted注解的String字段。

上次的局限是只能批量解密一个对象的所有加密字段,对批量数据来说稍显不足,这个主要改进了这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());

final List<Object> multipleResults = new ArrayList<Object>();

int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt);

List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}

String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}

return collapseSingleResultList(multipleResults);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package app.pooi.common.encrypt;


import app.pooi.common.encrypt.anno.CipherSpi;
import app.pooi.common.encrypt.anno.Encrypted;
import lombok.Getter;
import org.apache.ibatis.executor.resultset.ResultSetHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;

import java.lang.reflect.Field;
import java.sql.Statement;
import java.util.*;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;


@Intercepts({
@Signature(type = ResultSetHandler.class, method = "handleResultSets", args = {Statement.class}),
})
public class DecryptInterceptor implements Interceptor {

private static final Logger logger = Logger.getLogger(DecryptInterceptor.class.getName());

private CipherSpi cipherSpi;

public DecryptInterceptor(CipherSpi cipherSpi) {
this.cipherSpi = cipherSpi;
}

@Override
public Object intercept(Invocation invocation) throws Throwable {

final Object proceed = invocation.proceed();

if (proceed == null) {
return proceed;
}

List<?> results = (List<?>) proceed;

if (results.isEmpty()) {
return proceed;
}

final Object first = results.iterator().next();

final Class<?> modelClazz = first.getClass();

final List<String> decryptFields = getDecryptFields(modelClazz);

if (decryptFields.isEmpty()) {
return proceed;
}

final List<List<String>> secret = Flux.fromIterable(results)
.map(SystemMetaObject::forObject)
.flatMapIterable(mo -> decryptFields.stream().map(mo::getValue).collect(Collectors.toList()))
.cast(String.class)
.buffer(1000)
.collectList()
.block();

final Map<String, String> secretMap = secret.stream()
.map(secrets -> {
try {
return cipherSpi.batchDecrypt(secrets);
} catch (Exception e) {
e.printStackTrace();
return Maps.<String, String>newHashMap();
}
}).reduce(Maps.newHashMap(), (m1, m2) -> {
m1.putAll(m2);
return m1;
});

secretMap.put("", "0");

for (Object r : results) {
final MetaObject metaObject = SystemMetaObject.forObject(r);
decryptFields.forEach(f -> metaObject.setValue(f, secretMap.get(metaObject.getValue(f))));
}

return results;
}

@NotNull
private List<String> getDecryptFields(Class<?> modelClazz) {
return Arrays.stream(modelClazz.getDeclaredFields())
.filter(f -> f.getAnnotation(Decrypted.class) != null)
.filter(f -> {
boolean isString = f.getType() == String.class;
if (!isString) {
logger.warning(f.getName() + "is not String, actual type is " + f.getType().getSimpleName() + " ignored");
}
return isString;
})
.map(Field::getName)
.collect(Collectors.toList());
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

@Getter
class Tuple2<T1, T2> {

private final T1 t1;

private final T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}

[片段] SpringBoot Mybatis配置

纯记录,供自己参考🤣。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
private final MybatisProperties properties;

private final Interceptor[] interceptors;

private final ResourceLoader resourceLoader;

private final DatabaseIdProvider databaseIdProvider;

private final List<ConfigurationCustomizer> configurationCustomizers;

public DataSourceConfig(MybatisProperties properties,
ObjectProvider<Interceptor[]> interceptorsProvider,
ResourceLoader resourceLoader,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider) {
this.properties = properties;
this.interceptors = interceptorsProvider.getIfAvailable();
this.resourceLoader = resourceLoader;
this.databaseIdProvider = databaseIdProvider.getIfAvailable();
this.configurationCustomizers = configurationCustomizersProvider.getIfAvailable();
}


/**
* 普通数据源
* 主数据源,必须配置,spring启动时会执行初始化数据操作(无论是否真的需要),选择查找DataSource class类型的数据源
*
* @return {@link DataSource}
*/
@Primary
@Bean(name = BEANNAME_DATASOURCE_COMMON)
@ConfigurationProperties(prefix = "com.lianjia.confucius.bridge.boot.datasource.common")
public DataSource createDataSourceCommon() {
return DataSourceBuilder.create().build();
}

/**
* 只读数据源
*
* @return {@link DataSource}
*/
@Bean(name = BEANNAME_DATASOURCE_READONLY)
@ConfigurationProperties(prefix = "com.lianjia.confucius.bridge.boot.datasource.readonly")
public DataSource createDataSourceReadonly() {
return DataSourceBuilder.create().build();
}

private SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
factory.setDataSource(dataSource);
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation()));
}
org.apache.ibatis.session.Configuration configuration = this.properties.getConfiguration();
if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation())) {
configuration = new org.apache.ibatis.session.Configuration();
}
if (configuration != null && !CollectionUtils.isEmpty(this.configurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.configurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration);
if (this.properties.getConfigurationProperties() != null) {
factory.setConfigurationProperties(this.properties.getConfigurationProperties());
}
if (!ObjectUtils.isEmpty(this.interceptors)) {
factory.setPlugins(this.interceptors);
}
if (this.databaseIdProvider != null) {
factory.setDatabaseIdProvider(this.databaseIdProvider);
}
if (StringUtils.hasLength(this.properties.getTypeAliasesPackage())) {
factory.setTypeAliasesPackage(this.properties.getTypeAliasesPackage());
}
if (StringUtils.hasLength(this.properties.getTypeHandlersPackage())) {
factory.setTypeHandlersPackage(this.properties.getTypeHandlersPackage());
}
if (!ObjectUtils.isEmpty(this.properties.resolveMapperLocations())) {
factory.setMapperLocations(this.properties.resolveMapperLocations());
}

return factory.getObject();
}

public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
ExecutorType executorType = this.properties.getExecutorType();
if (executorType != null) {
return new SqlSessionTemplate(sqlSessionFactory, executorType);
} else {
return new SqlSessionTemplate(sqlSessionFactory);
}
}

@Bean
@Primary
public SqlSessionFactory primarySqlSessionFactory() throws Exception {
return this.sqlSessionFactory(this.createDataSourceCommon());
}

@Bean
public SqlSessionFactory secondarySqlSessionFactory() throws Exception {
return this.sqlSessionFactory(this.createDataSourceReadonly());
}

/**
* 实例普通的 sqlSession
*
* @return SqlSession
* @throws Exception when any exception occured
*/
@Bean(name = BEANNAME_SQLSESSION_COMMON)
public SqlSession initSqlSessionCommon() throws Exception {
return this.sqlSessionTemplate(this.primarySqlSessionFactory());
}

/**
* 实例只读的 sqlSession
*
* @return SqlSession
* @throws Exception when any exception occured
*/
@Bean(name = BEANNAME_SQLSESSION_READONLY)
public SqlSession initSqlSessionReadonly() throws Exception {
return this.sqlSessionTemplate(this.secondarySqlSessionFactory());
}


@MapperScan(annotationClass = PrimaryMapper.class,
sqlSessionTemplateRef = BEANNAME_SQLSESSION_COMMON,
basePackageClasses = ITalentApplicationSpringBootStart.class)
static class PrimaryMapperConfiguration {
}

@MapperScan(annotationClass = SecondaryMapper.class,
sqlSessionTemplateRef = BEANNAME_SQLSESSION_READONLY,
basePackageClasses = ITalentApplicationSpringBootStart.class)
static class SecondaryMapperConfiguration {
}

[片段] @CreatedBy / @ModifiedBy 拦截器实现

拦截器实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package app.pooi.common.entity;

import app.pooi.common.entity.anno.CreatedBy;
import app.pooi.common.entity.anno.ModifiedBy;
import lombok.Data;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.Intercepts;
import org.apache.ibatis.plugin.Invocation;
import org.apache.ibatis.plugin.Plugin;
import org.apache.ibatis.plugin.Signature;

import java.util.Arrays;
import java.util.Properties;
import java.util.function.Supplier;

@Data
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
})
public class EntityInterceptor implements org.apache.ibatis.plugin.Interceptor {

private Supplier<Long> auditorAware;

@Override
public Object intercept(Invocation invocation) throws Throwable {

Executor executor = (Executor) invocation.getTarget();

MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
Object o = invocation.getArgs()[1];

Arrays.stream(o.getClass().getDeclaredFields())
.forEach(field -> {
final CreatedBy createdBy = field.getAnnotation(CreatedBy.class);
final ModifiedBy modifiedBy = field.getAnnotation(ModifiedBy.class);

if (createdBy != null || modifiedBy != null) {
field.setAccessible(true);
try {
field.set(o, auditorAware.get());
} catch (IllegalAccessException ignore) {
}
}
});

return invocation.proceed();
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
static class MybatisInterceptorConfig {

@Bean
public Interceptor[] configurationCustomizer(CipherSpi cipherSpi) {
final EntityInterceptor entityInterceptor = new EntityInterceptor();

entityInterceptor.setAuditorAware(() -> {
final String header = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest().getHeader(XHeaders.LOGIN_USER_ID);
return Long.valueOf(header);
});
return new Interceptor[]{new DecryptInterceptor(cipherSpi), entityInterceptor};
}
}

[片段] Java收集方法参数+Spring DataBinder

收集参数

目前是使用了spring aop 来拦截方法调用,把方法参数包装成Map形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CollectArguments {
}

@Aspect
public class ArgumentsCollector {

private static final ThreadLocal<Map<String, Object>> ARGUMENTS = ThreadLocal.withInitial(ImmutableMap::of);

static Map<String, Object> getArgs() {
return ARGUMENTS.get();
}

private Object[] args(Object[] args, int exceptLength) {
if (exceptLength == args.length) {
return args;
}

return Arrays.copyOf(args, exceptLength);
}

@Pointcut("@annotation(CollectArguments)")
void collectArgumentsAnnotationPointCut() {
}

@Before("collectArgumentsAnnotationPointCut()")
public void doAccessCheck(JoinPoint joinPoint) {
final String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
final Object[] args = args(joinPoint.getArgs(), parameterNames.length);

ARGUMENTS.set(Collections.unmodifiableMap((IntStream.range(0, parameterNames.length - 1)
.mapToObj(idx -> Tuple2.of(parameterNames[idx], args[idx]))
.collect(HashMap::new, (m, t) -> m.put(t.getT1(), t.getT2()), HashMap::putAll))));
}

@After("collectArgumentsAnnotationPointCut()")
public void remove() {
ARGUMENTS.remove();
}

@Data
private static class Tuple2<T1, T2> {

private T1 t1;
private T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

public static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}
}

通过Map构造对象

1
2
3
4
5
6
7
8
9
10
11
12
public class BinderUtil {

BinderUtil() {
}

@SuppressWarnings("unchecked")
public static <T> T getTarget(Class<T> beanClazz) {
final DataBinder binder = new DataBinder(BeanUtils.instantiate(beanClazz));
binder.bind(new MutablePropertyValues(ArgumentsCollector.getArgs()));
return (T) binder.getTarget();
}
}

[片段] Mybatis ResultSetHandler实践

这次拦截的方法是handleResultSets(Statement stmt),用来批量解密用@Encrypted注解的String字段,可能还有一些坑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());

final List<Object> multipleResults = new ArrayList<Object>();

int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt);

List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}

String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}

return collapseSingleResultList(multipleResults);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package app.pooi.common.encrypt;


import app.pooi.common.encrypt.anno.CipherSpi;
import app.pooi.common.encrypt.anno.Encrypted;
import lombok.Getter;
import org.apache.ibatis.executor.resultset.ResultSetHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;

import java.lang.reflect.Field;
import java.sql.Statement;
import java.util.*;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;

@Intercepts({
@Signature(type = ResultSetHandler.class, method = "handleResultSets", args = {Statement.class}),
})
public class EncryptInterceptor implements Interceptor {

private static final Logger logger = Logger.getLogger(EncryptInterceptor.class.getName());

private CipherSpi cipherSpi;

public EncryptInterceptor(CipherSpi cipherSpi) {
this.cipherSpi = cipherSpi;
}

@Override
public Object intercept(Invocation invocation) throws Throwable {

final Object proceed = invocation.proceed();

if (proceed == null) {
return proceed;
}

List<?> results = (List<?>) proceed;

if (results.isEmpty()) {
return proceed;
}

final Object first = results.iterator().next();

final Class<?> modelClazz = first.getClass();

final List<String> fieldsNeedDecrypt = Arrays.stream(modelClazz.getDeclaredFields())
.filter(f -> f.getAnnotation(Encrypted.class) != null)
.filter(f -> {
boolean isString = f.getType() == String.class;
if (!isString) {
logger.warning(f.getName() + "is not String, actual type is " + f.getType().getSimpleName() + " ignored");
}
return isString;
})
.map(Field::getName)
.collect(Collectors.toList());

final List<List<String>> partition = partition(fieldsNeedDecrypt, 20);

for (Object r : results) {
final MetaObject metaObject = SystemMetaObject.forObject(r);

for (List<String> fields : partition) {
final Map<String, String> fieldValueMap = fields.stream().collect(Collectors.toMap(Function.identity(), f -> (String) metaObject.getValue(f)));
final ArrayList<String> values = new ArrayList<>(fieldValueMap.values());
Map<String, String> decryptValues = cipherSpi.decrypt(values);

fieldValueMap.entrySet()
.stream()
.map(e -> Tuple2.of(e.getKey(), decryptValues.getOrDefault(e.getValue(), "")))
.forEach(e -> metaObject.setValue(e.getT1(), e.getT2()));
}
}

return results;
}

private <T> List<List<T>> partition(List<T> list, int batchCount) {
if (!(batchCount > 0)) {
throw new IllegalArgumentException("batch count must greater than zero");
}

List<List<T>> partitionList = new ArrayList<>(list.size() / (batchCount + 1));

for (int i = 0; i < list.size(); i += batchCount) {
partitionList.add(list.stream().skip(i).limit(batchCount).collect(Collectors.toList()));

}
return partitionList;
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

@Getter
class Tuple2<T1, T2> {

private final T1 t1;

private final T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}