resilience4j-retry源码阅读

resilience4j 源码还是比较清晰简单的,比较适合阅读。

放一张主要类的结构图:

Retry入口

Retry接口是提供重试功能的入口,主要提供了方法模版,具体校验结构,失败后处理由Context子类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Creates a retryable supplier.
*
* @param retry the retry context
* @param supplier the original function
* @param <T> the type of results supplied by this supplier
* @return a retryable function
*/
static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier) {
return () -> {
Retry.Context<T> context = retry.context();
do try {
T result = supplier.get();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
} while (true);
};
}

这里摘抄了一段核心代码,作用是循环直到context.onResult(result)返回true为止,需要留意context.onResult/onRuntimeError/onError可能执行多次, onSuccess只会执行一次,这里每次进入重试都是一个新的context对象。

Retry.ContextImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean onResult(T result) {
if (null != resultPredicate && resultPredicate.test(result)) {
int currentNumOfAttempts = numOfAttempts.incrementAndGet();
if (currentNumOfAttempts >= maxAttempts) {
return false;
} else {
waitIntervalAfterFailure(currentNumOfAttempts, null);
return true;
}
}
return false;
}

public void onRuntimeError(RuntimeException runtimeException) {
if (exceptionPredicate.test(runtimeException)) {
lastRuntimeException.set(runtimeException);
throwOrSleepAfterRuntimeException();
} else {
failedWithoutRetryCounter.increment();
publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), runtimeException));
throw runtimeException;
}
}

先关注onResult,它负责判断是否需要继续重试,如果通过校验或者重试超过此数,会停止重试。

onRuntimeError/onError, 负责把catch的异常存储在lastRuntimeException中。

1
2
3
4
5
6
7
8
9
10
public void onSuccess() {
int currentNumOfAttempts = numOfAttempts.get();
if (currentNumOfAttempts > 0) {
succeededAfterRetryCounter.increment();
Throwable throwable = Option.of(lastException.get()).getOrElse(lastRuntimeException.get());
publishRetryEvent(() -> new RetryOnSuccessEvent(getName(), currentNumOfAttempts, throwable));
} else {
succeededWithoutRetryCounter.increment();
}
}

onSuccess负责统计和发送事件。

总结

总体来说retry比较简单,需要注意的点有一个如果设置了结果校验,如果一直校验不通过,将返回未通过的结果,而不是返回失败。

[片段] Mybatis ResultSetHandler实践-续

这次拦截的方法是handleResultSets(Statement stmt),用来批量解密用@Encrypted注解的String字段。

上次的局限是只能批量解密一个对象的所有加密字段,对批量数据来说稍显不足,这个主要改进了这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());

final List<Object> multipleResults = new ArrayList<Object>();

int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt);

List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}

String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}

return collapseSingleResultList(multipleResults);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package app.pooi.common.encrypt;


import app.pooi.common.encrypt.anno.CipherSpi;
import app.pooi.common.encrypt.anno.Encrypted;
import lombok.Getter;
import org.apache.ibatis.executor.resultset.ResultSetHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;

import java.lang.reflect.Field;
import java.sql.Statement;
import java.util.*;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;


@Intercepts({
@Signature(type = ResultSetHandler.class, method = "handleResultSets", args = {Statement.class}),
})
public class DecryptInterceptor implements Interceptor {

private static final Logger logger = Logger.getLogger(DecryptInterceptor.class.getName());

private CipherSpi cipherSpi;

public DecryptInterceptor(CipherSpi cipherSpi) {
this.cipherSpi = cipherSpi;
}

@Override
public Object intercept(Invocation invocation) throws Throwable {

final Object proceed = invocation.proceed();

if (proceed == null) {
return proceed;
}

List<?> results = (List<?>) proceed;

if (results.isEmpty()) {
return proceed;
}

final Object first = results.iterator().next();

final Class<?> modelClazz = first.getClass();

final List<String> decryptFields = getDecryptFields(modelClazz);

if (decryptFields.isEmpty()) {
return proceed;
}

final List<List<String>> secret = Flux.fromIterable(results)
.map(SystemMetaObject::forObject)
.flatMapIterable(mo -> decryptFields.stream().map(mo::getValue).collect(Collectors.toList()))
.cast(String.class)
.buffer(1000)
.collectList()
.block();

final Map<String, String> secretMap = secret.stream()
.map(secrets -> {
try {
return cipherSpi.batchDecrypt(secrets);
} catch (Exception e) {
e.printStackTrace();
return Maps.<String, String>newHashMap();
}
}).reduce(Maps.newHashMap(), (m1, m2) -> {
m1.putAll(m2);
return m1;
});

secretMap.put("", "0");

for (Object r : results) {
final MetaObject metaObject = SystemMetaObject.forObject(r);
decryptFields.forEach(f -> metaObject.setValue(f, secretMap.get(metaObject.getValue(f))));
}

return results;
}

@NotNull
private List<String> getDecryptFields(Class<?> modelClazz) {
return Arrays.stream(modelClazz.getDeclaredFields())
.filter(f -> f.getAnnotation(Decrypted.class) != null)
.filter(f -> {
boolean isString = f.getType() == String.class;
if (!isString) {
logger.warning(f.getName() + "is not String, actual type is " + f.getType().getSimpleName() + " ignored");
}
return isString;
})
.map(Field::getName)
.collect(Collectors.toList());
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

@Getter
class Tuple2<T1, T2> {

private final T1 t1;

private final T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}

[片段] SpringBoot Mybatis配置

纯记录,供自己参考🤣。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
private final MybatisProperties properties;

private final Interceptor[] interceptors;

private final ResourceLoader resourceLoader;

private final DatabaseIdProvider databaseIdProvider;

private final List<ConfigurationCustomizer> configurationCustomizers;

public DataSourceConfig(MybatisProperties properties,
ObjectProvider<Interceptor[]> interceptorsProvider,
ResourceLoader resourceLoader,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider) {
this.properties = properties;
this.interceptors = interceptorsProvider.getIfAvailable();
this.resourceLoader = resourceLoader;
this.databaseIdProvider = databaseIdProvider.getIfAvailable();
this.configurationCustomizers = configurationCustomizersProvider.getIfAvailable();
}


/**
* 普通数据源
* 主数据源,必须配置,spring启动时会执行初始化数据操作(无论是否真的需要),选择查找DataSource class类型的数据源
*
* @return {@link DataSource}
*/
@Primary
@Bean(name = BEANNAME_DATASOURCE_COMMON)
@ConfigurationProperties(prefix = "com.lianjia.confucius.bridge.boot.datasource.common")
public DataSource createDataSourceCommon() {
return DataSourceBuilder.create().build();
}

/**
* 只读数据源
*
* @return {@link DataSource}
*/
@Bean(name = BEANNAME_DATASOURCE_READONLY)
@ConfigurationProperties(prefix = "com.lianjia.confucius.bridge.boot.datasource.readonly")
public DataSource createDataSourceReadonly() {
return DataSourceBuilder.create().build();
}

private SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
factory.setDataSource(dataSource);
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation()));
}
org.apache.ibatis.session.Configuration configuration = this.properties.getConfiguration();
if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation())) {
configuration = new org.apache.ibatis.session.Configuration();
}
if (configuration != null && !CollectionUtils.isEmpty(this.configurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.configurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration);
if (this.properties.getConfigurationProperties() != null) {
factory.setConfigurationProperties(this.properties.getConfigurationProperties());
}
if (!ObjectUtils.isEmpty(this.interceptors)) {
factory.setPlugins(this.interceptors);
}
if (this.databaseIdProvider != null) {
factory.setDatabaseIdProvider(this.databaseIdProvider);
}
if (StringUtils.hasLength(this.properties.getTypeAliasesPackage())) {
factory.setTypeAliasesPackage(this.properties.getTypeAliasesPackage());
}
if (StringUtils.hasLength(this.properties.getTypeHandlersPackage())) {
factory.setTypeHandlersPackage(this.properties.getTypeHandlersPackage());
}
if (!ObjectUtils.isEmpty(this.properties.resolveMapperLocations())) {
factory.setMapperLocations(this.properties.resolveMapperLocations());
}

return factory.getObject();
}

public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
ExecutorType executorType = this.properties.getExecutorType();
if (executorType != null) {
return new SqlSessionTemplate(sqlSessionFactory, executorType);
} else {
return new SqlSessionTemplate(sqlSessionFactory);
}
}

@Bean
@Primary
public SqlSessionFactory primarySqlSessionFactory() throws Exception {
return this.sqlSessionFactory(this.createDataSourceCommon());
}

@Bean
public SqlSessionFactory secondarySqlSessionFactory() throws Exception {
return this.sqlSessionFactory(this.createDataSourceReadonly());
}

/**
* 实例普通的 sqlSession
*
* @return SqlSession
* @throws Exception when any exception occured
*/
@Bean(name = BEANNAME_SQLSESSION_COMMON)
public SqlSession initSqlSessionCommon() throws Exception {
return this.sqlSessionTemplate(this.primarySqlSessionFactory());
}

/**
* 实例只读的 sqlSession
*
* @return SqlSession
* @throws Exception when any exception occured
*/
@Bean(name = BEANNAME_SQLSESSION_READONLY)
public SqlSession initSqlSessionReadonly() throws Exception {
return this.sqlSessionTemplate(this.secondarySqlSessionFactory());
}


@MapperScan(annotationClass = PrimaryMapper.class,
sqlSessionTemplateRef = BEANNAME_SQLSESSION_COMMON,
basePackageClasses = ITalentApplicationSpringBootStart.class)
static class PrimaryMapperConfiguration {
}

@MapperScan(annotationClass = SecondaryMapper.class,
sqlSessionTemplateRef = BEANNAME_SQLSESSION_READONLY,
basePackageClasses = ITalentApplicationSpringBootStart.class)
static class SecondaryMapperConfiguration {
}

[片段] 使用redis创建简易搜索引擎(核心篇)

支持and查询、多选、多字段排序分页,缺少的功能:or 条件

核心类,有一些测试代码,将就一下。另外需要spring-data-redis 2.0版本以上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
package app.pooi.redissearch.search;

import app.pooi.redissearch.search.anno.CreateIndex;
import app.pooi.redissearch.search.anno.Field;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.Data;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.hash.Jackson2HashMapper;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static app.pooi.redissearch.search.SearchCore.Util.*;

@RestController
@Service
public class SearchCore {

private StringRedisTemplate redisTemplate;

private Jackson2HashMapper hashMapper = new Jackson2HashMapper(true);

@Data
private static class Person {
private Long id;
private String name;
private Integer age;
private Long ctime;
}

@PostMapping("/person")
@CreateIndex(
index = "person",
documentId = "#p0.id",
fields = {
@Field(propertyName = "name", value = "#p0.name"),
@Field(propertyName = "age", value = "#p0.age", sort = true),
@Field(propertyName = "ctime", value = "#p0.ctime", sort = true)
})
Person addPerson(Person person) {
return person;
}

public SearchCore(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}


public void indexMeta(String index, Map<String, FieldMeta> fieldMeta) {
this.redisTemplate.opsForHash().putAll(genIdxMetaName(index), hashMapper.toHash(fieldMeta));
}

@PostMapping("/index")
public int indexDocument(
final String index,
final String field,
final String documentId,
final String document) {
return this.indexDocument(index, field, documentId, document, doc -> Lists.newArrayList(doc.split("")));
}

public int indexDocument(
final String index,
final String field,
final String documentId,
final String document,
final Function<String, List<String>> tokenizer) {

final List<String> tokens = tokenizer != null ?
tokenizer.apply(document) :
Collections.singletonList(document);

final String docKey = genDocIdxName(index, documentId);

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;

final String[] idxs = tokens.stream()
.map(word -> genIdxName(index, field, word))
.peek(idx -> ((StringRedisTemplate) operations).opsForSet().add(idx, documentId))
.toArray(String[]::new);

template.opsForSet().add(docKey, idxs);
return null;
}
});
return results.size();
}

public int indexSortField(
final String index,
final String field,
final String documentId,
final Double document) {

final String docKey = genDocIdxName(index, documentId);

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;
final String idxName = genSortIdxName(index, field);
template.opsForZSet().add(idxName, documentId, document);
template.opsForSet().add(docKey, idxName);
return null;
}
});
return results.size();
}

@DeleteMapping("/index")
public int deleteDocumentIndex(final String index, final String documentId) {
final String docKey = genDocIdxName(index, documentId);
final Boolean hasKey = redisTemplate.hasKey(docKey);
if (!hasKey) {
return 0;
}

final List<Object> results = redisTemplate.executePipelined(new SessionCallback<Integer>() {
@Override
public Integer execute(RedisOperations operations) throws DataAccessException {
final Set<String> idx = redisTemplate.opsForSet().members(docKey);
((StringRedisTemplate) operations).delete(idx);
((StringRedisTemplate) operations).delete(docKey);
return null;
}
});
return results.size();
}

@PatchMapping("/index")
public int updateDocumentIndex(final String index, final String field, final String documentId, final String document) {
this.deleteDocumentIndex(index, documentId);
return this.indexDocument(index, field, documentId, document);
}

public int updateSortField(final String index, final String field, final String documentId, final Double document) {
this.deleteDocumentIndex(index, documentId);
return this.indexSortField(index, field, documentId, document);
}

private Consumer<SetOperations<String, String>> operateAndStore(String method, String key, Collection<String> keys, String destKey) {
switch (method) {
case "intersectAndStore":
return (so) -> so.intersectAndStore(key, keys, destKey);
case "unionAndStore":
return (so) -> so.unionAndStore(key, keys, destKey);
case "differenceAndStore":
return (so) -> so.differenceAndStore(key, keys, destKey);
default:
return so -> {
};
}
}

private Consumer<ZSetOperations<String, String>> zOperateAndStore(String method, String key, Collection<String> keys, String destKey, final RedisZSetCommands.Weights weights) {
switch (method) {
case "intersectAndStore":
return (so) -> so.intersectAndStore(key, keys, destKey, RedisZSetCommands.Aggregate.SUM, weights);
case "unionAndStore":
return (so) -> so.unionAndStore(key, keys, destKey, RedisZSetCommands.Aggregate.SUM, weights);
default:
return so -> {
};
}
}

private String common(String index, String method, List<String> keys, long ttl) {
final String destKey = Util.genQueryIdxName(index);

redisTemplate.executePipelined(new SessionCallback<String>() {
@Override
public <K, V> String execute(RedisOperations<K, V> operations) throws DataAccessException {
operateAndStore(method,
keys.stream().limit(1L).findFirst().get(),
keys.stream().skip(1L).collect(Collectors.toList()),
destKey)
.accept(((StringRedisTemplate) operations).opsForSet());
((StringRedisTemplate) operations).expire(destKey, ttl, TimeUnit.SECONDS);
return null;
}
});
return destKey;
}

public String intersect(String index, List<String> keys, long ttl) {
return common(index, "intersectAndStore", keys, ttl);
}

public String union(String index, List<String> keys, long ttl) {
return common(index, "unionAndStore", keys, ttl);
}

public String diff(String index, List<String> keys, long ttl) {
return common(index, "differenceAndStore", keys, ttl);
}

private static Tuple2<Set<Tuple2<String, String>>, Set<Tuple2<String, String>>> parse(String query) {

final Pattern pattern = Pattern.compile("[+-]?([\\w\\d]+):(\\S+)");

final Matcher matcher = pattern.matcher(query);

Set<Tuple2<String, String>> unwant = Sets.newHashSet();
Set<Tuple2<String, String>> want = Sets.newHashSet();

while (matcher.find()) {
String word = matcher.group();

String prefix = null;
if (word.length() > 1) {
prefix = word.substring(0, 1);
}

final Tuple2<String, String> t = Tuples.of(matcher.group(1), matcher.group(2));
if ("-".equals(prefix)) {
unwant.add(t);
} else {
want.add(t);
}
}
return Tuples.of(want, unwant);
}


public String query(
String index,
String query) {

final Tuple2<Set<Tuple2<String, String>>, Set<Tuple2<String, String>>> parseResult = parse(query);
final Set<Tuple2<String, String>> want = parseResult.getT1();
final Set<Tuple2<String, String>> unwant = parseResult.getT2();


if (want.isEmpty()) {
return "";
}

final Map<String, FieldMeta> entries = (Map<String, FieldMeta>) hashMapper.fromHash(redisTemplate.<String, Object>opsForHash().entries(genIdxMetaName(index)));

// union
final List<Tuple2<String, String>> unionFields = want.stream()
.filter(w -> w.getT2().contains(","))
.filter(w -> "true".equals(entries.get(w.getT1()).getSort()))
.collect(Collectors.toList());
final List<String> unionIdx = unionFields.stream()
.flatMap(w -> Arrays.stream(w.getT2().split(",")).map(value -> Tuples.of(w.getT1(), value)))
.map(w -> genIdxName(index, w.getT1(), w.getT2()))
.collect(Collectors.toList());

final String unionResultId = unionIdx.isEmpty() ? "" : this.union(index, unionIdx, 30L);

want.removeAll(unionFields);

// intersect
final List<String> intersectIdx = want.stream()
.flatMap(t -> {
if ("true".equals(entries.get(t.getT1()).getSort()))
return Stream.of(t);
return Arrays.stream(t.getT2().split("")).map(value -> Tuples.of(t.getT1(), value));
})
.map(w -> genIdxName(index, w.getT1(), w.getT2()))
.collect(Collectors.toList());

if (!unionResultId.isEmpty())
intersectIdx.add(unionResultId);

String intersectResult = this.intersect(index, intersectIdx, 30L);

// diff
return unwant.isEmpty() ?
intersectResult :
this.diff(index, Stream.concat(Stream.of(intersectResult), unwant.stream().map(w -> genIdxName(index, w.getT1(), w.getT2()))).collect(Collectors.toList()), 30L);
}

@GetMapping("/query/{index}")
public Set<String> queryAndSort(
@PathVariable("index") String index,
@RequestParam("param") String query,
@RequestParam("sort") String sort,
Integer start,
Integer stop
) {
final String[] sorts = sort.split(" ");

final Map<String, Integer> map = Arrays.stream(sorts).collect(
Collectors.toMap(f -> {
if (f.startsWith("+") || f.startsWith("-")) {
f = f.substring(1);
}
return genSortIdxName("person", f);
}, field -> field.startsWith("-") ? -1 : 1)
);

final int[] weights = map.values()
.stream()
.mapToInt(Integer::intValue)
.toArray();


// if (!sort.startsWith("+") && !sort.startsWith("-")) {
// sort = "+" + sort;
// }
// boolean desc = sort.startsWith("-");
// sort = sort.substring(1);

String queryId = this.query(index, query);
Long size;
if (queryId.length() == 0 || (size = redisTemplate.opsForSet().size(queryId)) == null || size == 0) {
return Collections.emptySet();
}

final String resultId = genQueryIdxName(index);

// String sortField = sort;

redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
final StringRedisTemplate template = (StringRedisTemplate) operations;

// template.opsForZSet().intersectAndStore(genSortIdxName(index, sortField), queryId, resultId);

SearchCore.this.zOperateAndStore("intersectAndStore",
map.keySet().stream().limit(1L).findFirst().get(),
Stream.concat(map.keySet().stream().skip(1L), Stream.of(queryId)).collect(Collectors.toList()),
resultId, RedisZSetCommands.Weights.of(ArrayUtils.add(weights, 0))).accept(template.opsForZSet());

// template.opsForZSet().size(resultId);
template.expire(resultId, 30L, TimeUnit.SECONDS);

return null;
}
});

// sort
return redisTemplate.opsForZSet().range(resultId, start, stop);

}

static class Util {

private Util() {
}

static String genIdxMetaName(String index) {
return String.format("meta:idx:%s", index);
}

static String genIdxName(String index, String field, String value) {
return String.format("idx:%s:%s:%s", index, field, value);
}

static String genSortIdxName(String index, String field) {
return String.format("idx:%s:%s", index, field);
}

static String genQueryIdxName(String index) {
return String.format("idx:%s:q:%s", index, UUID.randomUUID().toString());
}

static String genDocIdxName(String index, String documentId) {
return String.format("doc:%s:%s", index, documentId);
}
}
}

辅助类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import lombok.Data;


@Data
public class FieldMeta {

private String sort = "false";

private String splitFun = "";

public FieldMeta() {

}

public FieldMeta(boolean sort) {
this.sort = Boolean.toString(sort);
}
}

做一个轻量级的搜索还是可以的。

[片段] 使用TypeToken在运行期保存泛型信息

一般来说可以使用getGenericSuperclass 获取子类范型信息,但是泛型有嵌套的话想获取完整信息还是有点复杂的。例如:Message<List> 有两个泛型信息。

guava中有强大的TypeToken帮助你保存复杂泛型信息,可以参考:

1
2
3
ParameterizedTypeReference<Message<T>> responseTypeRef = 
ParameterizedTypeReferenceBuilder.fromTypeToken(
new TypeToken<Message<T>>() {}.where(new TypeParameter<T>() {}, new TypeToken<List<OrgSugVOV1>>() {}));

如果需要在spring框架中使用,需要一个适配器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ParameterizedTypeReferenceBuilder {

public static <T> ParameterizedTypeReference<T> fromTypeToken(TypeToken<T> typeToken) {
return new TypeTokenParameterizedTypeReference<>(typeToken);
}

private static class TypeTokenParameterizedTypeReference<T> extends ParameterizedTypeReference<T> {

private final Type type;

private TypeTokenParameterizedTypeReference(TypeToken<T> typeToken) {
this.type = typeToken.getType();
}

@Override
public Type getType() {
return type;
}

@Override
public boolean equals(Object obj) {
return (this == obj || (obj instanceof ParameterizedTypeReference &&
this.type.equals(((ParameterizedTypeReference<?>) obj).getType())));
}

@Override
public int hashCode() {
return this.type.hashCode();
}

@Override
public String toString() {
return "ParameterizedTypeReference<" + this.type + ">";
}
}
}

关于java的泛型我就不多做吐槽了。

[片段] @CreatedBy / @ModifiedBy 拦截器实现

拦截器实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package app.pooi.common.entity;

import app.pooi.common.entity.anno.CreatedBy;
import app.pooi.common.entity.anno.ModifiedBy;
import lombok.Data;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.Intercepts;
import org.apache.ibatis.plugin.Invocation;
import org.apache.ibatis.plugin.Plugin;
import org.apache.ibatis.plugin.Signature;

import java.util.Arrays;
import java.util.Properties;
import java.util.function.Supplier;

@Data
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
})
public class EntityInterceptor implements org.apache.ibatis.plugin.Interceptor {

private Supplier<Long> auditorAware;

@Override
public Object intercept(Invocation invocation) throws Throwable {

Executor executor = (Executor) invocation.getTarget();

MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
Object o = invocation.getArgs()[1];

Arrays.stream(o.getClass().getDeclaredFields())
.forEach(field -> {
final CreatedBy createdBy = field.getAnnotation(CreatedBy.class);
final ModifiedBy modifiedBy = field.getAnnotation(ModifiedBy.class);

if (createdBy != null || modifiedBy != null) {
field.setAccessible(true);
try {
field.set(o, auditorAware.get());
} catch (IllegalAccessException ignore) {
}
}
});

return invocation.proceed();
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
static class MybatisInterceptorConfig {

@Bean
public Interceptor[] configurationCustomizer(CipherSpi cipherSpi) {
final EntityInterceptor entityInterceptor = new EntityInterceptor();

entityInterceptor.setAuditorAware(() -> {
final String header = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest().getHeader(XHeaders.LOGIN_USER_ID);
return Long.valueOf(header);
});
return new Interceptor[]{new DecryptInterceptor(cipherSpi), entityInterceptor};
}
}

[片段] Java收集方法参数+Spring DataBinder

收集参数

目前是使用了spring aop 来拦截方法调用,把方法参数包装成Map形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CollectArguments {
}

@Aspect
public class ArgumentsCollector {

private static final ThreadLocal<Map<String, Object>> ARGUMENTS = ThreadLocal.withInitial(ImmutableMap::of);

static Map<String, Object> getArgs() {
return ARGUMENTS.get();
}

private Object[] args(Object[] args, int exceptLength) {
if (exceptLength == args.length) {
return args;
}

return Arrays.copyOf(args, exceptLength);
}

@Pointcut("@annotation(CollectArguments)")
void collectArgumentsAnnotationPointCut() {
}

@Before("collectArgumentsAnnotationPointCut()")
public void doAccessCheck(JoinPoint joinPoint) {
final String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
final Object[] args = args(joinPoint.getArgs(), parameterNames.length);

ARGUMENTS.set(Collections.unmodifiableMap((IntStream.range(0, parameterNames.length - 1)
.mapToObj(idx -> Tuple2.of(parameterNames[idx], args[idx]))
.collect(HashMap::new, (m, t) -> m.put(t.getT1(), t.getT2()), HashMap::putAll))));
}

@After("collectArgumentsAnnotationPointCut()")
public void remove() {
ARGUMENTS.remove();
}

@Data
private static class Tuple2<T1, T2> {

private T1 t1;
private T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

public static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}
}

通过Map构造对象

1
2
3
4
5
6
7
8
9
10
11
12
public class BinderUtil {

BinderUtil() {
}

@SuppressWarnings("unchecked")
public static <T> T getTarget(Class<T> beanClazz) {
final DataBinder binder = new DataBinder(BeanUtils.instantiate(beanClazz));
binder.bind(new MutablePropertyValues(ArgumentsCollector.getArgs()));
return (T) binder.getTarget();
}
}

[片段] Mybatis ResultSetHandler实践

这次拦截的方法是handleResultSets(Statement stmt),用来批量解密用@Encrypted注解的String字段,可能还有一些坑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());

final List<Object> multipleResults = new ArrayList<Object>();

int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt);

List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}

String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}

return collapseSingleResultList(multipleResults);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package app.pooi.common.encrypt;


import app.pooi.common.encrypt.anno.CipherSpi;
import app.pooi.common.encrypt.anno.Encrypted;
import lombok.Getter;
import org.apache.ibatis.executor.resultset.ResultSetHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;

import java.lang.reflect.Field;
import java.sql.Statement;
import java.util.*;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;

@Intercepts({
@Signature(type = ResultSetHandler.class, method = "handleResultSets", args = {Statement.class}),
})
public class EncryptInterceptor implements Interceptor {

private static final Logger logger = Logger.getLogger(EncryptInterceptor.class.getName());

private CipherSpi cipherSpi;

public EncryptInterceptor(CipherSpi cipherSpi) {
this.cipherSpi = cipherSpi;
}

@Override
public Object intercept(Invocation invocation) throws Throwable {

final Object proceed = invocation.proceed();

if (proceed == null) {
return proceed;
}

List<?> results = (List<?>) proceed;

if (results.isEmpty()) {
return proceed;
}

final Object first = results.iterator().next();

final Class<?> modelClazz = first.getClass();

final List<String> fieldsNeedDecrypt = Arrays.stream(modelClazz.getDeclaredFields())
.filter(f -> f.getAnnotation(Encrypted.class) != null)
.filter(f -> {
boolean isString = f.getType() == String.class;
if (!isString) {
logger.warning(f.getName() + "is not String, actual type is " + f.getType().getSimpleName() + " ignored");
}
return isString;
})
.map(Field::getName)
.collect(Collectors.toList());

final List<List<String>> partition = partition(fieldsNeedDecrypt, 20);

for (Object r : results) {
final MetaObject metaObject = SystemMetaObject.forObject(r);

for (List<String> fields : partition) {
final Map<String, String> fieldValueMap = fields.stream().collect(Collectors.toMap(Function.identity(), f -> (String) metaObject.getValue(f)));
final ArrayList<String> values = new ArrayList<>(fieldValueMap.values());
Map<String, String> decryptValues = cipherSpi.decrypt(values);

fieldValueMap.entrySet()
.stream()
.map(e -> Tuple2.of(e.getKey(), decryptValues.getOrDefault(e.getValue(), "")))
.forEach(e -> metaObject.setValue(e.getT1(), e.getT2()));
}
}

return results;
}

private <T> List<List<T>> partition(List<T> list, int batchCount) {
if (!(batchCount > 0)) {
throw new IllegalArgumentException("batch count must greater than zero");
}

List<List<T>> partitionList = new ArrayList<>(list.size() / (batchCount + 1));

for (int i = 0; i < list.size(); i += batchCount) {
partitionList.add(list.stream().skip(i).limit(batchCount).collect(Collectors.toList()));

}
return partitionList;
}

@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

@Override
public void setProperties(Properties properties) {

}
}

@Getter
class Tuple2<T1, T2> {

private final T1 t1;

private final T2 t2;

Tuple2(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

static <T1, T2> Tuple2<T1, T2> of(T1 t1, T2 t2) {
return new Tuple2<>(t1, t2);
}
}

代码外的生存之道-读书笔记-职业篇

职业发展的驱动力应该来自自身,工作属于公司,职业生涯属于自己。

第一要务 拥有正确的心态

大多数人形成的错误的心态: 认为在为公司打工,没有把自己的职业生涯当作生意来看待。铭记在心,开始积极主动的管理自己的职业生涯吧。

像企业一样思考

自己能提供什么:自己的能力就是创造软件
自己需要做什么:

  • 持续不断地改进和完善自己的产品

  • 传达自己的价值,和千万同行的独特之处

一头扎进工作不可能非同凡响,你应该:

  • 专注于你正在提供怎样的服务, 以及如何营销这项服务;
  • 想方设 提升你的服务;
  • 思考你可以专注为哪一 特定类型的客户或行业提供特定的服务;
  • 集中精力成为一位专家,专门为某一特定类型的客户提供专业的整体服务( 记住, 作为一个软件开发 人员, 你 只有真正专注 于一类客户,才能找到非常好的工作)。
  • 更好的宣传自己的产品,更好的找到你的客户

第二要务 设定自己的目标

无论因为何种原因你没有为自己的职业生涯设定目标, 现在都是时候设定目标了。 不是明天, 也不是下周, 就是现在。 没有明确的方向, 你走的每一步都是徒劳的。

如何设定目标?

先在心中树立一个大目标,然后分解成多个小目标

追踪你的目标

定期核对自己的目标,必要时还要调整。

人际交往能力

构建大型支付系统时分布式架构的关键点

SLA

在构建大型系统时,常常会遇到各种错误。在计划构建一个系统时,定义系统的“健康状态”十分重要。

“健康状态”必须是可度量的,一般做法是使用SLAs来度量系统的“健康状态”。最常见的SLA为

  • 可达性

    从时间维度衡量(99.999%可达性,每年下线50分钟)

  • 准确性

    对于数据的丢失或失真是否可以接受?可以达到多少百分比?对于支付系统来说,不接受任何数据的丢失和失真

  • 容量

    系统支持并发

  • 延迟

    响应延迟,一般衡量95%请求的响应时间和99%请求响应时间

确保新系统比被替代系统“更好”,可以使用上面四个SLA指标来衡量,可达性是最重要的需求。

水平和垂直伸缩

随着新业务的增长,负载也会增加。最常见的伸缩策略是垂直和水平伸缩。

水平伸缩就是增加更多的机器或节点,对于分布式系统来说水平伸缩是最常有的方式。

垂直伸缩基本上就是买更大/好的机器。

一致性

可达性对于任何系统都是很重要的,但是分布式系统一般都构建在低可达性的机器上(比如:服务的可达性要求99.999% 机器的可达性为99.9%)。简单的做法是维护一组机器组成集群,这样服务的可达性不依赖单独的机器。

一致性是在高可用系统中最需要关心的。一个一致性系统在所有的节点上看到和返回的数据在同一时间是相同的。如果使用一组机器来组成集群,它们还需要互相发送消息来保持同步,但是发送消息可能失败,这样一些节点就会因为不一致而不可达。

一致性有多个模型,在分布式系统最常用的是强一致性,弱一致性和最终一致性。一般来说,一致性要求越低,系统可以工作的更快,但是返回的数据不一定是最新的。

系统中的数据需要是一致的,但是到底是怎样的一致?对于一些系统,需要强一致性,比如一次支付必须是强一致的存储下来。对于没那么重要的部分,最终一致性是可以考虑的权衡。比如列出最近的交易。

数据持久性

持久性表示一旦数据成功添加到数据存储,它就永远可以访问到。不同的分布式数据库拥有不同级别的数据持久性。一般使用副本来增加数据持久性。

对于支付系统来说,数据不允许丢失。我们构建的分布式数据存储需要支持集群级别的数据持久型。目前Cassandra, MongoDB, HDFS和Dynamodb 都支持多种级别的数据持久性。

消息保持与持久性

分布式系统中的节点执行计算,存储数据,互相发送消息。发送消息的关键是消息的可靠到达。对于关键系统,经常需要消息零丢失。

对于分布式系统,发送消息一般石油分布式消息服务发送,如RabbitMQ,Kafka。这些消息服务支持不同级别的消息投递可靠性。

消息保持表示当节点处理消息失败时,在错误被解决前消息一直被保持着。消息的持久性一般在消息队列层被使用。如果在消息发送的时候队列或节点下线了,那在它们重新上线是还能接收到消息。

在支付系统中我们需要每一条消息投递一次,在构建系统中保证投递一次和投递至少一次在实现上是有区别的。最后我们使用了kafka来保证投递至少一次。

幂等性

在分布式系统中,很多东西都可能出错,连接会丢包或超时,客户端经常会重试这些请求。一个幂等的系统保证无论多少特定的请求被执行,一个请求实际的操作只会执行一次。比如支付请求,如果客户端请求支付并且请求已经成功,但是客户端超时了,客户端是能够重试相同的请求的。对于一个幂等的系统,一个个人的支付是不能被收取两次的。

对幂等的设计,分布式系统需要某种分布式锁机制。假设我们想要使用乐观锁来实现幂等性,这时系统需要强一致性的锁来执行操作,我们可以使用同一个版本的乐观锁来检查是否有启动了额外的操作。

根据系统的一致性和操作的类型,有很多方式来实现幂等性。在设计分布式系统时,幂等性时最容易忽略的。在支付系统中,幂等操作时最重要的,它避免了双花和双收问题。消息系统已经保证了消息至少消费一次,我们只需要保证所有重复的消息保证幂等性即可。我们选择使用乐观锁,并使用强一致性存储作为乐观锁的数据源。

分片和法定人数

分布式系统经常需要存储大量的数据,远超一台节点的容量。一般的解决方案时使用分片,数据使用某种hash算法被水平分区。尽管很多分布式数据库屏蔽了分片的实现,但是分片还是挺有意思的,特别是关于重新分片。

许多分布式系统在多个拥有数据和统计信息。为保证对数据操作的一致性,使用基于投票的方式是不行的,只有超过一定数量的节点操作成功,这个操作才是成功的,这个叫做法定人数。

Actor模型

描述分布式系统最普遍的做法是使用Actor模型,还有一种方法是CSP。

Actor模型基于actor互相发送消息并作出回应。每一个actor只能做少量的动作,创建其他actors, 发送消息或者决定如何处理下个消息。通过这些简单的规则,复杂的分布式系统可以被准确描述,可以在actor崩溃后自我修复。

使用akka提供了标准的分布式模型,避免我们重复造轮子。

反应式架构

当构建大型分布式系统时,目标常常是它们的弹性,伸缩性,和扩展性。反应式架构是在这个领域最流行和最通用的方案。