根据权限查询时避免角色切换的一种思路

1. 问题背景

权限系统现状

UC权限系统基于角色访问控制技术RBAC(Role Based Access Control) 。具体来说,就是赋予用户某个角色,角色给与角色对应的权限能访问及操作不同范围的资源。

什么是数据权限

代表一个角色对应某个权限所能操作的数据范围,比如gitlab组管理员能看到组下的所有项目代码,我们可以这样配置:

  1. 创建组管理员
  2. 分配给组管理员查看项目代码的权限
  3. 查看项目代码权限设置约束条件,约束为自己组下的项目

实际产生遇到的问题

对绝大多数简单的系统来说一个用户对应一个系统只会有一个角色,一个角色只有一个数据权限范围(即使有多个,也可以合并成一个)。但是随着产品的功能迭代,用户的变更和系统设计的原因,总有一些特殊且重要的用户在同一个系统中拥有多个角色。在多角色和数据权限的组合下,一个用户可以拥有复数的数据权限范围。

考虑到实现复杂性,大多数系统选择使用角色切换的手段简化系统实现,同时对用户暴露出了他们不熟悉的角色这一概念,造成这些用户在系统使用中的各种不便。

本文重点讨论在避免角色切换的前提下,进行多角色数据范围查询的一种思路。

具体需要解决的需求

我们的数据报表后台,不同的角色拥有不同的数据查看范围(不同角色所能看到的员工数据字段也各不相同),例如:

  • 薪酬管理员:查看非高职级员工数据
  • 高级薪酬管理员: 查看高职级员工数据
  • 长期激励管理员:查看有长期激励员工数据
  • 等等

简单来说,拥有长期激励管理员和高级薪酬管理员的用户能否直接看到高职级员工数据和长期激励员工数据?至少在直觉上是可行的。

2.多角色数据范围查询

直觉的做法

单角色单数据范围可以使用一句sql查询出结果,那多角色多数据范围是不是使用多句sql查询出结果合并就可以了?

深入思考 多角色数据范围对行的影响

  1. 查询条件合并还是结果合并? —-结果合并
  2. 如何排序? —–外部排序,或先内部排序,limit,再外部排序
  3. 有重复数据怎么办? —-使用groupby去重
  4. 查询性能有影响吗?—-有

具体体现:

1
2
3
4
5
6
7
select * from (
(select id, sortvalue from table_1 where t_name = 'a' order by sortvalue desc limit 20) -- 先内部排序,limit
union all -- 结果合并
(select id, sortvalue from table_1 where t_name = 'b' order by sortvalue desc limit 20) -- 先内部排序,limit
order by sortvalue desc -- 外部排序
) a group by id -- 使用groupby去重
limit 10, 10

深入思考 多角色数据范围对列的影响

  • 薪酬管理员: 查看员工薪酬字段
  • 长期激励管理员:查看员工长期激励字段

如何解决?方法有很多!

综合思考,给出一种解决方案

1
2
3
graph LR
A(查询行及角色信息) --> B(根据角色查询对应列字段)
B --> C(结果)

步骤:

  1. 查询多角色数据范围下的数据,附带角色信息
1
2
3
4
5
6
7
select id, GROUP_CONCAT(a.role) as roles from (
(select id, 'role_a' as role from table_1 where sortvalue > 10 order by `sortvalue` desc limit 2)
union all
(select id, 'role_b' as role from table_1 where sortvalue > 20 order by `sortvalue` desc limit 2)
order by `sortvalue` desc
) a group by id
limit 0, 2

结果:

id roles
1 薪酬管理员
5 薪酬管理员,长期激励管理员
  1. 根据每一行不同的角色,查询出可见的字段,例如id=1的行只能查看ROLE_B对应字段,而id=5的行可以看到ROLE_A,ROLE_B对应的两个角色的字段

3.总结和延伸

多角色数据范围写操作?

遍历角色直到找到满足条件的权限即可。

收获

自己不行动,等于等着被别人安排哈哈

还有疑问?

自己想。还可以点这里

《重构-改善既有代码设计》读书笔记

1.1 一个简单的例子

一个计算顾客租赁影片费用的程序,能容易写成面条式的代码(流水账):顾客类调用影片类和租赁时长计算费用

对机器来言只要能运行正确没有好代码和坏代码之分,但是对(维护的)人来说很难找到修改点,容易引入bug

1.2重构前先写测试保证重构结果

利用单元测试保证重构正确性

1.3以微小的步伐修改程序,保证问题快速发现解决

不要为修改变量名感到羞耻,只有写出人能理解的代码才是好程序员

重构完可能 性能变差,但同时会带来更多的机会来优化

1.4干货-用多态代替switch

switch需要关心具体条件,多态具有switch不具备的优势:不需要关心具体类型

2.1重构的定义

重构:不改变运行结果下 提高理解性 降低修改成本

2.2重构的原因

  • 代码结构的流失是累积性的,越难看懂代码设计意图,越难保护其设计

  • 消除重复代码,方便修改

  • 我们编写代码时很容易忘记读者的感受,造成他人时间的浪费

  • 重构时犯错可以加深对代码意图的理解,可以帮助发现bug

  • 好的结构设计是加速开发的根本

2.3重构的时机

  • 添加功能时重构,在修改过程中把结构理清,也可以更简单的添加功能

  • 修复错误时重构

  • 复审代码时重构

2.4 重构的价值

程序有两面价值,今天可以为你做什么和明天可以为你做什么为了满足明天的需要,你会遇到:

  • 难以阅读,逻辑重复
  • 添加新功能许修改以前代码
  • 复杂的条件逻辑等代码

而你希望看到的是:

  • 容易阅读,所有逻辑在唯一指定地点,
  • 新的修改不会危及现有行为
  • 尽可能简单表达逻辑条件

重构就是把程序转变为这些特征的工具

2.5如何告诉(对付)经理

很多经理都是进度驱动,所以更加需要重构带来的好处,所以不要告诉他们 他们不会理解的

2.5.1引入间接层与重构的关系

间接层优点:

  • 允许逻辑共享

  • 增加解释意图和实现的机会-多了类名和函数名

  • 隔离变化

  • 多态封装条件逻辑

2.5.2何时不应该重构

  • 软件根本不工作
  • 最后期限已近

未完成的重构可以称之为债务,迟早要还

2.5.3重构与预先设计的关系

重构可以节约不必要的时间精力花在预先设计上,让软件设计向简化发展

3代码的坏味道

  • 重复代码

  • 过长函数

  • 过大类,过长参数

  • 修改一处程序的原因过多/一个原因修改过多的程序

  • 数据依赖过多

  • 重复的字段和参数

  • 总是放在一起的字段

  • switch语句

  • 平行继承关系

  • 不是所有分支下都需要的临时变量

  • 过度耦合调用链

  • 不必要的委托

  • 失血数据类

  • 频繁重写父类方法

  • 过多注释

4~12具体如何做

自己看书吧

2018下半年书单

经济金融类:

《斯坦福极简经济学》分微观和宏观经济学 没有教科书式的介绍 比较好读,推荐

《随机漫步的傻瓜》 经验之谈 观点可以接受

《买晾衣杆的小贩为何不会倒》 贴近生活 过于简单 不推荐

《富爸爸 穷爸爸》小白入门首选 推荐

《小岛经济学》 深入浅出 最后映射中国和美国关系 后面有点看不懂 推荐

小说类:

《解忧杂货店》 个人感觉一般 后半部分剧情我都猜到了

《人间失格》 遭受到了社会的毒打 很现实 有点致郁 心里承受能力低的不推荐

《三体》 第一部可以的,后面一部不如一部 但是比起其他网上大众喜欢的爽文好多了

技术类:

《java编程的逻辑》 温故而知新,基础好的不用看了 小白推荐

《kafka权威指南》正在看 但是真的写的不错 推荐

《mybatis从入门到精通》 一般般,只有入门吧

《netty权威指南》 一般般

《redis设计与实现》 真设计与实现 推荐

《深入理解java虚拟机》多读几遍也不为过 推荐

《SpringBoot实战》 读的比较早没影响了 不推荐

《算法图解》 程序=数据结构+算法 小白推荐(我就是小白)

心理类:

《乌合之众》 人在小范围为私利行动 在民族范畴会抛开私利做出些过于崇高或粗鲁的行为 推荐

《原生家庭》 一个人受童年家庭的影响是最大的,这本书能看懂是一回事,能做好是另一回事

AbstractQueuedSynchronizer解析

AbstractQueuedSynchronizer 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 	/** 
     * Head of the wait queue , lazily initialized . Except for 
     * initialization , it is modified only via method setHead .   Note : 
     * If head exists , its waitStatus is guaranteed not to be 
     * CANCELLED . 
     */ 
    private transient volatile Node head; 

    /** 
     * Tail of the wait queue , lazily initialized . Modified only via 
     * method enq to add new wait node . 
     */ 
    private transient volatile Node tail; 

    /** 
     * The synchronization state . 
     */ 
    private volatile int state;

稍微注意下在线程争用锁是才会初始化链表

AbstractQueuedSynchronizer.Node 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/** 
* Status field , taking on only the values : 
*    SIGNAL :      The successor of this node is ( or will soon be ) 
*                blocked ( via park ), so the current node must 
*                unpark its successor when it releases or 
*                cancels . To avoid races , acquire methods must 
*                first indicate they need a signal , 
*                then retry the atomic acquire , and then , 
*                on failure , block . 
*    CANCELLED :   This node is cancelled due to timeout or interrupt . 
*                Nodes never leave this state . In particular , 
*                a thread with cancelled node never again blocks . 
*    CONDITION :   This node is currently on a condition queue . 
*                It will not be used as a sync queue node 
*                until transferred , at which time the status 
*                will be set to 0. ( Use of this value here has 
*                nothing to do with the other uses of the 
*                field , but simplifies mechanics .) 
*    PROPAGATE :   A releaseShared should be propagated to other 
*                nodes . This is set ( for head node only ) in 
*                doReleaseShared to ensure propagation 
*                continues , even if other operations have 
*                since intervened . 
*    0:           None of the above 

* The values are arranged numerically to simplify use . 
* Non - negative values mean that a node doesn ' t need to 
* signal . So , most code doesn ' t need to check for particular 
* values , just for sign . 

* The field is initialized to 0 for normal sync nodes , and 
* CONDITION for condition nodes .   It is modified using CAS 
* ( or when possible , unconditional volatile writes ). 
*/ 
volatile int waitStatus; 

/** 
* Link to predecessor node that current node / thread relies on 
* for checking waitStatus . Assigned during enqueuing , and nulled 
* out ( for sake of GC ) only upon dequeuing .   Also , upon 
* cancellation of a predecessor , we short - circuit while 
* finding a non - cancelled one , which will always exist 
* because the head node is never cancelled : A node becomes 
* head only as a result of successful acquire . A 
* cancelled thread never succeeds in acquiring , and a thread only 
* cancels itself , not any other node . 
*/ 
volatile Node prev; 

/** 
* Link to the successor node that the current node / thread 
* unparks upon release . Assigned during enqueuing , adjusted 
* when bypassing cancelled predecessors , and nulled out ( for 
* sake of GC ) when dequeued .   The enq operation does not 
* assign next field of a predecessor until after attachment , 
* so seeing a null next field does not necessarily mean that 
* node is at end of queue . However , if a next field appears 
* to be null , we can scan prev ' s from the tail to 
* double - check .   The next field of cancelled nodes is set to 
* point to the node itself instead of null , to make life 
* easier for isOnSyncQueue . 
*/ 
volatile Node next; 

/** 
* The thread that enqueued this node . Initialized on 
* construction and nulled out after use . 
*/ 
volatile Thread thread; 

/** 
* Link to next node waiting on condition , or the special 
* value SHARED . Because condition queues are accessed only 
* when holding in exclusive mode , we just need a simple 
* linked queue to hold nodes while they are waiting on 
* conditions . They are then transferred to the queue to 
* re - acquire . And because conditions can only be exclusive , 
* we save a field by using special value to indicate shared 
* mode . 
*/ 
Node nextWaiter;

AbstractQueuedSynchronizer** 的数据结构(盗用的图)

AbstractQueuedSynchronizer 做了什么 ?

内部维护state和CLH队列,负责在资源争用时线程入队,资源释放时唤醒队列中线程。

而实现类只需要实现 什么条件获取资源成功什么条件释放资源 成功就可以了

所以,最简单的CountDownLatch使用AbstractQueuedSynchronizer实现非常简单:

  •          申明AbstractQueuedSynchronizer的state数量(比如十个)
    
  •          await方法尝试获取资源,如果state>0表示获取失败( **什么条件获取资源成功** ,CountDownLatch实现),获取失败线程休眠(AbstractQueuedSynchronizer负责)
    
  •         countDown方法state-1,如果state==0表示资源释放成功( **什么条件释放资源成功** ,CountDownLatch实现),唤醒队列中所有线程(AbstractQueuedSynchronizer负责)
    

AbstractQueuedSynchronizer 怎么做的?

顺着ReentrantLock lock、unlock看一遍我们就大致总结出AbstractQueuedSynchronizer工作原理了

先简单介绍下ReentrantLock特性:可重入,中断,有超时机制。

ReentrantLock lock() 流程 ( 再盗图 )

黄色表示ReentrantLock实现,绿色表示AbstractQueuedSynchronizer内部实现

  1. lock方法入口 直接调用 AbstractQueuedSynchronizer.acquire方法
  2. tryAcquire
  3. addWaiter
  4. acquireQueued
AbstractQueuedSynchronizer.acquire
1
2
3
4
5
**public** final void acquire ( int arg) { 
**if** (! tryAcquire (arg) &&
acquireQueued ( addWaiter ( Node . EXCLUSIVE ), arg))
selfInterrupt ();
}

获取的锁的逻辑:直接获取成功则返回,如果没有获取成功入队休眠(对就是这么简单)

下面我们仔细一个一个方法看

ReentrantLock.tryAcquire

我这里贴的时非公平的所获取,公平和不公平的区别在于公平锁老老实实的会进入队列排队,非公平锁会先检查资源是否可用,如果可用不管队列中的情况直接尝试获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire ( int acquires) { 
final Thread current = Thread . currentThread ();
int c = getState ();
if (c == 0 ) {
if ( compareAndSetState ( 0 , acquires)) {
setExclusiveOwnerThread (current);
return true ;
}
}
else if (current == getExclusiveOwnerThread ()) {
int nextc = c + acquires;
if (nextc < 0 ) // overflow
throw new Error ( "Maximum lock count exceeded" );
setState (nextc);
return true ;
}
return false ;
}

ReentrantLock.tryAcquire读取到state==0时尝试占用锁,并保证同一线程可以重复占用。其他情况下获取资源失败。如果获取成功就没啥事了,不过关键不就是锁争用的时候是如何处理的吗?

AbstractQueuedSynchronizer.addWaiter(Node.EXCLUSIVE)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter ( Node mode) { 
Node node = new Node (mode);

for (;;) {
Node oldTail = tail;
if (oldTail != null ) {
node. setPrevRelaxed (oldTail);
if ( compareAndSetTail (oldTail, node)) {
oldTail. next = node;
return node;
}
} else {
initializeSyncQueue ();
}
}
}

一旦锁争用,一定会初始化队列(因为排队的线程需要前驱节点唤醒,所以要初始化一个前驱节点),之后自旋成为队列尾节点。

简单来说就是获取不到锁就放进队列里维护起来,等锁释放的时候再用。

这里还有一个 很具有参考性的小细节 :先设置新节点的前驱结点,自旋成为尾节点后设置前驱的后驱

AbstractQueuedSynchronizer.acquireQueued
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final boolean acquireQueued final Node node, int arg)
    boolean interrupted = false ; 
    try { 
        for (;;) { 
            final Node p = node. predecessor (); 
            if (p == head && tryAcquire (arg)) { 
                setHead (node); 
                p. next = null ; // help GC 
                return interrupted; 
            } 
            if ( shouldParkAfterFailedAcquire (p, node)) 
                interrupted |= parkAndCheckInterrupt (); 
        } 
    } catch ( Throwable t) { 
        cancelAcquire (node); 
        if (interrupted) 
            selfInterrupt (); 
        throw t; 
    } 


private static boolean shouldParkAfterFailedAcquire ( Node pred, Node node)
    int ws = pred. waitStatus ; 
    if (ws == Node . SIGNAL ) 
        /* 
             * This node has already set status asking a release 
             * to signal it, so it can safely park. 
             */ 
        return true ; 
    if (ws > 0 ) { 
        /* 
             * Predecessor was cancelled. Skip over predecessors and 
             * indicate retry. 
             */ 
        do { 
            node. prev = pred = pred. prev ; 
        } while (pred. waitStatus > 0 ); 
        pred. next = node; 
    } else { 
        /* 
             * waitStatus must be 0 or PROPAGATE.  Indicate that we 
             * need a signal, but don't park yet.  Caller will need to 
             * retry to make sure it cannot acquire before parking. 
             */ 
        pred. compareAndSetWaitStatus (ws, Node . SIGNAL ); 
    } 
    return false ; 


private final boolean parkAndCheckInterrupt ()
        LockSupport . park ( this ); 
        return Thread . interrupted (); 
    }

前面只是维护下链表数据结构,这里负责找到合适的唤醒前驱,然后让线程休眠。

这里主要是一个循环过程:

  1. 检查是否能获取到锁,获取到则返回
  2. 失败则寻找前面最近的未放弃争用的前驱,把前驱的waitStatus设置为-1,并把放弃争用的节点抛弃
  3. 检查是否能休眠
  4. 使用Usafe.park休眠(不是wait)

ReentrantLock lock 总结

ReentrantLock unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public final boolean release int arg)
    if ( tryRelease (arg)) { 
        Node h = head; 
        if (h != null && h. waitStatus != 0 ) 
            unparkSuccessor (h); 
        return true ; 
    } 
    return false ; 


protected final boolean tryRelease int releases)
    int c = getState () - releases; 
    if ( Thread . currentThread () != getExclusiveOwnerThread ()) 
        throw new IllegalMonitorStateException (); 
    boolean free = false ; 
    if (c == 0 ) { 
        free = true ; 
        setExclusiveOwnerThread ( null ); 
    } 
    setState (c); 
    return free; 


private void unparkSuccessor ( Node node)
    /* 
         * If status is negative (i.e., possibly needing signal) try 
         * to clear in anticipation of signalling.  It is OK if this 
         * fails or if status is changed by waiting thread. 
         */ 
    int ws = node. waitStatus ; 
    if (ws < 0 ) 
        node. compareAndSetWaitStatus (ws, 0 ); 

    /* 
         * Thread to unpark is held in successor, which is normally 
         * just the next node.  But if cancelled or apparently null, 
         * traverse backwards from tail to find the actual 
         * non-cancelled successor. 
         */ 
    Node s = node. next ; 
    if (s == null || s. waitStatus > 0 ) { 
        s = null ; 
        for ( Node p = tail; p != node && p != null ; p = p. prev ) 
            if (p. waitStatus <= 0 ) 
                s = p; 
    } 
    if (s != null ) 
        LockSupport . unpark (s. thread ); 
}

unlock的代码特别简单:

  1. 每unlock一次state-1
  2. state == 0 时资源成功释放
  3. 如果释放成功,唤醒第二个节点
  4. 如果第二个节点没引用或者放弃争用,从队尾开始寻找可以唤醒的线程