再谈最长公共子串

序言

这次遇到贝壳花名的需求,需要使用最长公共子串对花名做校验。这种算法在面试题中算是必会题,各种四层循环,三层循环,两层循环的代码在我脑中闪过,但是今天就是要带你实现不一样的最长公共子串!

教科书式实现

使用动态规划,两层循环,使用二维数组存储状态,时间复杂度O(n^2^),空间复杂度O(n^2^)或O(2n)

一张图解释原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                  先 横 向 处 理
+--------------------------->

e a b c b c f
+ +---+---+---+---+---+---+---+
| a | 0 | 1 | 0 | 0 | 0 | 0 | 0 |
纵 | +---------------------------+
向 | b | 0 | 0 | 2 | 0 | 1 | 0 | 0 |
累 | +---------------------------+
加 | c | 0 | 0 | 0 | 3 | 0 | 2 | 0 |
| +---------------------------+
| d | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| +---------------------------+
| e | 1 | 0 | 0 | 0 | 0 | 0 | 0 |
v +---+---+---+---+---+---+---+
e a b c b c f

优化空间复杂度至O(1)

从上图可以发现,在纵向累加时实际只需要左上方的计数器即可, O(n^2^)的空间白白被浪费了,最优的空间复杂度应该是O(1)。那么该如何处理呢?

一张图解释原理:

                                   e   a   b   c   b   c   f
    +---+                        +---+---+---+---+---+---+---+
  a | 0 |                            | 1 | 0 | 0 | 0 | 0 | 0 | a
    +-------+                        +-----------------------+
  b | 0 | 0 |                            | 2 | 0 | 1 | 0 | 0 | b
    +-----------+                        --------------------+
  c | 0 | 0 | 0 |                            | 3 | 0 | 2 | 0 | c
    +---------------+                        ----------------+
  d | 0 | 0 | 0 | 0 |                            | 0 | 0 | 0 | d
    +-------------------+                        +-----------+
  e | 1 | 0 | 0 | 0 | 0 |                            | 0 | 0 | e
    +---+---+---+---+---+-------+                    +---+---+
      e   a   b   c   b   c   f

答案就是沿着等长对着线处理。

有意思的代码抽象

大家可以根据上面思路写一下,一般会把算法分成两部分:处理长方形的左下部分和处理长方形的右上部分,两部分都是双层循环,时间复杂度和空间负载度已经变为了O(n^2^) ,O(1)。

肯定有人已经发现自己的代码处理左下角的代码和处理右上角的代码不能复用,一个是从中间向左下角处理,一个是从中间向右上角处理,明明很类似,但是就是没发合并。

那么有没有方法把这两部分处理抽象成公共代码呢?不卖关子了,直接上图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
                                                         +
e |
+--+
e a b c b c f a | 1|
+---+---+---+---+---+---+---+ +-----+
| 1 | 0 | 0 | 0 | 0 | 0 | a b | 0| 2|
+-----------------------+ +--------+
| 2 | 0 | 1 | 0 | 0 | b c | 0| 0| 3|
--------------------+ 翻 折 +-----------+
| 3 | 0 | 2 | 0 | c +------------> b | 0| 1| 0| 0|
----------------+ +--------------+
| 0 | 0 | 0 | d c | 0| 0| 2| 0| 0|
+-----------+ +--------------+
| 0 | 0 | e f | 0| 0| 0| 0| 0|
+---+---+ +--------------+
a b c d e

如果你想使用公共代码同时实现处理左下角和右上角是不可能的了。所以你需要把右上角的三角翻折,然后你就得到了两个三角:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
                                      +
e |
+--+
a | 1|
+---+ +-----+
a | 0 | b | 0| 2|
+-------+ +--------+
b | 0 | 0 | c | 0| 0| 3|
+-----------+ +-----------+
c | 0 | 0 | 0 | b | 0| 1| 0| 0|
+---------------+ +--------------+
d | 0 | 0 | 0 | 0 | c | 0| 0| 2| 0| 0|
+-------------------+ +--------------+
e | 1 | 0 | 0 | 0 | 0 | f | 0| 0| 0| 0| 0|
+---+---+---+---+---+------ +--------------+
e a b c b c f a b c d e

这样就变成了处理两遍左下角了,代码也可以完美复用!!!

最终实现

我的完整思考过程已经分析完毕,这样沿对着线处理还有一个小小的优点:提前结束搜索。这一点大家可以自行思考,这里不做过多解释。

直接干货上场:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Solution {
/**
* @param A: A string
* @param B: A string
* @return: the length of the longest common substring.
*/
public int longestCommonSubstring(String A, String B) {
// write your code here
char[] achars = A.toCharArray(), bchars = B.toCharArray();
return getMaxLength(bchars, achars, getMaxLength(achars, bchars, 0, 0), 1);
}

private static int getMaxLength(char[] s1, char[] s2, int maxLength, int startIndex) {

for (int start = startIndex; start < s1.length; start++) {
int upper = Math.min(s2.length, s1.length - start);
// if (upper <= maxLength) break; //提前结束搜索
for (int currentLineLength = 0, x = 0, y = start; x < upper; x++, y++) {
if (s1[y] == s2[x])
maxLength = Math.max(maxLength, ++currentLineLength);
else {
// if (upper - x - 1 <= maxLength) break; //提前结束搜索
currentLineLength = 0;
};
}
}
return maxLength;
}
}

结尾

怎么样,经历这次优化过程是否感觉自己对最长公共子串的认识又更深了一步呢?虽然不能保证是首创(也可能是首创?),但是这次一步一步真切思考优化直到获得成果让我无比兴奋。

说了这么多,我就是要给我们贝壳招聘开发组打个广告>_>,期待更多爱思考优秀的同学加入!

![](/Users/sage/Desktop/屏幕快照 2018-11-10 13.33.20.png)

根据权限查询时避免角色切换的一种思路

1. 问题背景

权限系统现状

UC权限系统基于角色访问控制技术RBAC(Role Based Access Control) 。具体来说,就是赋予用户某个角色,角色给与角色对应的权限能访问及操作不同范围的资源。

什么是数据权限

代表一个角色对应某个权限所能操作的数据范围,比如gitlab组管理员能看到组下的所有项目代码,我们可以这样配置:

  1. 创建组管理员
  2. 分配给组管理员查看项目代码的权限
  3. 查看项目代码权限设置约束条件,约束为自己组下的项目

实际产生遇到的问题

对绝大多数简单的系统来说一个用户对应一个系统只会有一个角色,一个角色只有一个数据权限范围(即使有多个,也可以合并成一个)。但是随着产品的功能迭代,用户的变更和系统设计的原因,总有一些特殊且重要的用户在同一个系统中拥有多个角色。在多角色和数据权限的组合下,一个用户可以拥有复数的数据权限范围。

考虑到实现复杂性,大多数系统选择使用角色切换的手段简化系统实现,同时对用户暴露出了他们不熟悉的角色这一概念,造成这些用户在系统使用中的各种不便。

本文重点讨论在避免角色切换的前提下,进行多角色数据范围查询的一种思路。

具体需要解决的需求

我们的数据报表后台,不同的角色拥有不同的数据查看范围(不同角色所能看到的员工数据字段也各不相同),例如:

  • 薪酬管理员:查看非高职级员工数据
  • 高级薪酬管理员: 查看高职级员工数据
  • 长期激励管理员:查看有长期激励员工数据
  • 等等

简单来说,拥有长期激励管理员和高级薪酬管理员的用户能否直接看到高职级员工数据和长期激励员工数据?至少在直觉上是可行的。

2.多角色数据范围查询

直觉的做法

单角色单数据范围可以使用一句sql查询出结果,那多角色多数据范围是不是使用多句sql查询出结果合并就可以了?

深入思考 多角色数据范围对行的影响

  1. 查询条件合并还是结果合并? —-结果合并
  2. 如何排序? —–外部排序,或先内部排序,limit,再外部排序
  3. 有重复数据怎么办? —-使用groupby去重
  4. 查询性能有影响吗?—-有

具体体现:

1
2
3
4
5
6
7
select * from (
(select id, sortvalue from table_1 where t_name = 'a' order by sortvalue desc limit 20) -- 先内部排序,limit
union all -- 结果合并
(select id, sortvalue from table_1 where t_name = 'b' order by sortvalue desc limit 20) -- 先内部排序,limit
order by sortvalue desc -- 外部排序
) a group by id -- 使用groupby去重
limit 10, 10

深入思考 多角色数据范围对列的影响

  • 薪酬管理员: 查看员工薪酬字段
  • 长期激励管理员:查看员工长期激励字段

如何解决?方法有很多!

综合思考,给出一种解决方案

1
2
3
graph LR
A(查询行及角色信息) --> B(根据角色查询对应列字段)
B --> C(结果)

步骤:

  1. 查询多角色数据范围下的数据,附带角色信息
1
2
3
4
5
6
7
select id, GROUP_CONCAT(a.role) as roles from (
(select id, 'role_a' as role from table_1 where sortvalue > 10 order by `sortvalue` desc limit 2)
union all
(select id, 'role_b' as role from table_1 where sortvalue > 20 order by `sortvalue` desc limit 2)
order by `sortvalue` desc
) a group by id
limit 0, 2

结果:

id roles
1 薪酬管理员
5 薪酬管理员,长期激励管理员
  1. 根据每一行不同的角色,查询出可见的字段,例如id=1的行只能查看ROLE_B对应字段,而id=5的行可以看到ROLE_A,ROLE_B对应的两个角色的字段

3.总结和延伸

多角色数据范围写操作?

遍历角色直到找到满足条件的权限即可。

收获

自己不行动,等于等着被别人安排哈哈

还有疑问?

自己想。还可以点这里

AbstractQueuedSynchronizer解析

AbstractQueuedSynchronizer 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 	/** 
     * Head of the wait queue , lazily initialized . Except for 
     * initialization , it is modified only via method setHead .   Note : 
     * If head exists , its waitStatus is guaranteed not to be 
     * CANCELLED . 
     */ 
    private transient volatile Node head; 

    /** 
     * Tail of the wait queue , lazily initialized . Modified only via 
     * method enq to add new wait node . 
     */ 
    private transient volatile Node tail; 

    /** 
     * The synchronization state . 
     */ 
    private volatile int state;

稍微注意下在线程争用锁是才会初始化链表

AbstractQueuedSynchronizer.Node 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/** 
* Status field , taking on only the values : 
*    SIGNAL :      The successor of this node is ( or will soon be ) 
*                blocked ( via park ), so the current node must 
*                unpark its successor when it releases or 
*                cancels . To avoid races , acquire methods must 
*                first indicate they need a signal , 
*                then retry the atomic acquire , and then , 
*                on failure , block . 
*    CANCELLED :   This node is cancelled due to timeout or interrupt . 
*                Nodes never leave this state . In particular , 
*                a thread with cancelled node never again blocks . 
*    CONDITION :   This node is currently on a condition queue . 
*                It will not be used as a sync queue node 
*                until transferred , at which time the status 
*                will be set to 0. ( Use of this value here has 
*                nothing to do with the other uses of the 
*                field , but simplifies mechanics .) 
*    PROPAGATE :   A releaseShared should be propagated to other 
*                nodes . This is set ( for head node only ) in 
*                doReleaseShared to ensure propagation 
*                continues , even if other operations have 
*                since intervened . 
*    0:           None of the above 

* The values are arranged numerically to simplify use . 
* Non - negative values mean that a node doesn ' t need to 
* signal . So , most code doesn ' t need to check for particular 
* values , just for sign . 

* The field is initialized to 0 for normal sync nodes , and 
* CONDITION for condition nodes .   It is modified using CAS 
* ( or when possible , unconditional volatile writes ). 
*/ 
volatile int waitStatus; 

/** 
* Link to predecessor node that current node / thread relies on 
* for checking waitStatus . Assigned during enqueuing , and nulled 
* out ( for sake of GC ) only upon dequeuing .   Also , upon 
* cancellation of a predecessor , we short - circuit while 
* finding a non - cancelled one , which will always exist 
* because the head node is never cancelled : A node becomes 
* head only as a result of successful acquire . A 
* cancelled thread never succeeds in acquiring , and a thread only 
* cancels itself , not any other node . 
*/ 
volatile Node prev; 

/** 
* Link to the successor node that the current node / thread 
* unparks upon release . Assigned during enqueuing , adjusted 
* when bypassing cancelled predecessors , and nulled out ( for 
* sake of GC ) when dequeued .   The enq operation does not 
* assign next field of a predecessor until after attachment , 
* so seeing a null next field does not necessarily mean that 
* node is at end of queue . However , if a next field appears 
* to be null , we can scan prev ' s from the tail to 
* double - check .   The next field of cancelled nodes is set to 
* point to the node itself instead of null , to make life 
* easier for isOnSyncQueue . 
*/ 
volatile Node next; 

/** 
* The thread that enqueued this node . Initialized on 
* construction and nulled out after use . 
*/ 
volatile Thread thread; 

/** 
* Link to next node waiting on condition , or the special 
* value SHARED . Because condition queues are accessed only 
* when holding in exclusive mode , we just need a simple 
* linked queue to hold nodes while they are waiting on 
* conditions . They are then transferred to the queue to 
* re - acquire . And because conditions can only be exclusive , 
* we save a field by using special value to indicate shared 
* mode . 
*/ 
Node nextWaiter;

AbstractQueuedSynchronizer** 的数据结构(盗用的图)

AbstractQueuedSynchronizer 做了什么 ?

内部维护state和CLH队列,负责在资源争用时线程入队,资源释放时唤醒队列中线程。

而实现类只需要实现 什么条件获取资源成功什么条件释放资源 成功就可以了

所以,最简单的CountDownLatch使用AbstractQueuedSynchronizer实现非常简单:

  •          申明AbstractQueuedSynchronizer的state数量(比如十个)
    
  •          await方法尝试获取资源,如果state>0表示获取失败( **什么条件获取资源成功** ,CountDownLatch实现),获取失败线程休眠(AbstractQueuedSynchronizer负责)
    
  •         countDown方法state-1,如果state==0表示资源释放成功( **什么条件释放资源成功** ,CountDownLatch实现),唤醒队列中所有线程(AbstractQueuedSynchronizer负责)
    

AbstractQueuedSynchronizer 怎么做的?

顺着ReentrantLock lock、unlock看一遍我们就大致总结出AbstractQueuedSynchronizer工作原理了

先简单介绍下ReentrantLock特性:可重入,中断,有超时机制。

ReentrantLock lock() 流程 ( 再盗图 )

黄色表示ReentrantLock实现,绿色表示AbstractQueuedSynchronizer内部实现

  1. lock方法入口 直接调用 AbstractQueuedSynchronizer.acquire方法
  2. tryAcquire
  3. addWaiter
  4. acquireQueued
AbstractQueuedSynchronizer.acquire
1
2
3
4
5
**public** final void acquire ( int arg) { 
**if** (! tryAcquire (arg) &&
acquireQueued ( addWaiter ( Node . EXCLUSIVE ), arg))
selfInterrupt ();
}

获取的锁的逻辑:直接获取成功则返回,如果没有获取成功入队休眠(对就是这么简单)

下面我们仔细一个一个方法看

ReentrantLock.tryAcquire

我这里贴的时非公平的所获取,公平和不公平的区别在于公平锁老老实实的会进入队列排队,非公平锁会先检查资源是否可用,如果可用不管队列中的情况直接尝试获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire ( int acquires) { 
final Thread current = Thread . currentThread ();
int c = getState ();
if (c == 0 ) {
if ( compareAndSetState ( 0 , acquires)) {
setExclusiveOwnerThread (current);
return true ;
}
}
else if (current == getExclusiveOwnerThread ()) {
int nextc = c + acquires;
if (nextc < 0 ) // overflow
throw new Error ( "Maximum lock count exceeded" );
setState (nextc);
return true ;
}
return false ;
}

ReentrantLock.tryAcquire读取到state==0时尝试占用锁,并保证同一线程可以重复占用。其他情况下获取资源失败。如果获取成功就没啥事了,不过关键不就是锁争用的时候是如何处理的吗?

AbstractQueuedSynchronizer.addWaiter(Node.EXCLUSIVE)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter ( Node mode) { 
Node node = new Node (mode);

for (;;) {
Node oldTail = tail;
if (oldTail != null ) {
node. setPrevRelaxed (oldTail);
if ( compareAndSetTail (oldTail, node)) {
oldTail. next = node;
return node;
}
} else {
initializeSyncQueue ();
}
}
}

一旦锁争用,一定会初始化队列(因为排队的线程需要前驱节点唤醒,所以要初始化一个前驱节点),之后自旋成为队列尾节点。

简单来说就是获取不到锁就放进队列里维护起来,等锁释放的时候再用。

这里还有一个 很具有参考性的小细节 :先设置新节点的前驱结点,自旋成为尾节点后设置前驱的后驱

AbstractQueuedSynchronizer.acquireQueued
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final boolean acquireQueued final Node node, int arg)
    boolean interrupted = false ; 
    try { 
        for (;;) { 
            final Node p = node. predecessor (); 
            if (p == head && tryAcquire (arg)) { 
                setHead (node); 
                p. next = null ; // help GC 
                return interrupted; 
            } 
            if ( shouldParkAfterFailedAcquire (p, node)) 
                interrupted |= parkAndCheckInterrupt (); 
        } 
    } catch ( Throwable t) { 
        cancelAcquire (node); 
        if (interrupted) 
            selfInterrupt (); 
        throw t; 
    } 


private static boolean shouldParkAfterFailedAcquire ( Node pred, Node node)
    int ws = pred. waitStatus ; 
    if (ws == Node . SIGNAL ) 
        /* 
             * This node has already set status asking a release 
             * to signal it, so it can safely park. 
             */ 
        return true ; 
    if (ws > 0 ) { 
        /* 
             * Predecessor was cancelled. Skip over predecessors and 
             * indicate retry. 
             */ 
        do { 
            node. prev = pred = pred. prev ; 
        } while (pred. waitStatus > 0 ); 
        pred. next = node; 
    } else { 
        /* 
             * waitStatus must be 0 or PROPAGATE.  Indicate that we 
             * need a signal, but don't park yet.  Caller will need to 
             * retry to make sure it cannot acquire before parking. 
             */ 
        pred. compareAndSetWaitStatus (ws, Node . SIGNAL ); 
    } 
    return false ; 


private final boolean parkAndCheckInterrupt ()
        LockSupport . park ( this ); 
        return Thread . interrupted (); 
    }

前面只是维护下链表数据结构,这里负责找到合适的唤醒前驱,然后让线程休眠。

这里主要是一个循环过程:

  1. 检查是否能获取到锁,获取到则返回
  2. 失败则寻找前面最近的未放弃争用的前驱,把前驱的waitStatus设置为-1,并把放弃争用的节点抛弃
  3. 检查是否能休眠
  4. 使用Usafe.park休眠(不是wait)

ReentrantLock lock 总结

ReentrantLock unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public final boolean release int arg)
    if ( tryRelease (arg)) { 
        Node h = head; 
        if (h != null && h. waitStatus != 0 ) 
            unparkSuccessor (h); 
        return true ; 
    } 
    return false ; 


protected final boolean tryRelease int releases)
    int c = getState () - releases; 
    if ( Thread . currentThread () != getExclusiveOwnerThread ()) 
        throw new IllegalMonitorStateException (); 
    boolean free = false ; 
    if (c == 0 ) { 
        free = true ; 
        setExclusiveOwnerThread ( null ); 
    } 
    setState (c); 
    return free; 


private void unparkSuccessor ( Node node)
    /* 
         * If status is negative (i.e., possibly needing signal) try 
         * to clear in anticipation of signalling.  It is OK if this 
         * fails or if status is changed by waiting thread. 
         */ 
    int ws = node. waitStatus ; 
    if (ws < 0 ) 
        node. compareAndSetWaitStatus (ws, 0 ); 

    /* 
         * Thread to unpark is held in successor, which is normally 
         * just the next node.  But if cancelled or apparently null, 
         * traverse backwards from tail to find the actual 
         * non-cancelled successor. 
         */ 
    Node s = node. next ; 
    if (s == null || s. waitStatus > 0 ) { 
        s = null ; 
        for ( Node p = tail; p != node && p != null ; p = p. prev ) 
            if (p. waitStatus <= 0 ) 
                s = p; 
    } 
    if (s != null ) 
        LockSupport . unpark (s. thread ); 
}

unlock的代码特别简单:

  1. 每unlock一次state-1
  2. state == 0 时资源成功释放
  3. 如果释放成功,唤醒第二个节点
  4. 如果第二个节点没引用或者放弃争用,从队尾开始寻找可以唤醒的线程