【Java并发】Lock之ReentrantReadWriteLock

读写锁的使用

/**
 * ReentrantReadWriteLock 读写锁 默认非公平
 */
public class ReadWriteLockTest {
    // 创建读锁
    private static final Lock readLock = new ReentrantReadWriteLock().readLock();
    // 创建写锁
    private static final Lock writeLock = new ReentrantReadWriteLock().writeLock();

    public static void main(String[] args) throws Exception {
        Thread thread = new Thread() {
            @Override
            public void run() {
                readLock.lock();
                writeLock.lock();
                System.out.println(Thread.currentThread().getName() + " AAAAAAAAAA: " + writeLock.toString());
                method("a");
                writeLock.unlock();
            }
        };
        Thread thread2 = new Thread() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " BBBBBBBBBB: " + writeLock.toString());
                method("b");
            }
        };
        thread.start();
        thread2.start();
    }

    public static void method(String a) {
        writeLock.lock();
        System.out.println(a);
        System.out.println(Thread.currentThread().getName() + " : " + writeLock.toString());
        for (int i = 0; i < 10; i++) {
            System.out.println(Thread.currentThread().getName() + " : " + i);
        }
        writeLock.unlock();
    }
}

读锁ReadLock

readLock.lock()

/**
 * Acquires the read lock.
 *
 * <p>Acquires the read lock if the write lock is not held by
 * another thread and returns immediately.
 *
 * <p>If the write lock is held by another thread then
 * the current thread becomes disabled for thread scheduling
 * purposes and lies dormant until the read lock has been acquired.
 */
public void lock() {
    sync.acquireShared(1);
}

官方注释描述的很清晰了,读锁加锁主要是先获取下读锁,看看能不能获取到,如果没有其他线程获取到写锁,则获取读锁成功并返回,如果有其他线程获取到写锁,则获取读锁失败,并阻塞当前线程,直到获取到读锁为止。

继续 sync.acquireShared(1);

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

先看tryAcquireShared(arg):

protected final int tryAcquireShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // c是volatile变量
    int c = getState();
    // 如果被其他线程写锁持有,返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 获取到了读锁
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

回看 doAcquireShared(arg):

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 没有获取到读锁,执行自旋锁,直到获取到读锁
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

写锁WriteLock

writeLock.lock();

/**
 * Acquires the write lock.
 *
 * <p>Acquires the write lock if neither the read nor write lock
 * are held by another thread
 * and returns immediately, setting the write lock hold count to
 * one.
 *
 * <p>If the current thread already holds the write lock then the
 * hold count is incremented by one and the method returns
 * immediately.
 *
 * <p>If the lock is held by another thread then the current
 * thread becomes disabled for thread scheduling purposes and
 * lies dormant until the write lock has been acquired, at which
 * time the write lock hold count is set to one.
 */
public void lock() {
    sync.acquire(1);
}

获取写锁。如果读锁和写锁都没有被另一个线程持有,获得写锁,并立即返回,设置写锁持有计数为1

如果当前线程已经持有写锁,那么count加1,方法立即返回

如果锁被另一个线程持有,那么当前的线程出于线程调度的目的将被禁用,并且处于休眠状态,直到获得写锁,此时,写锁持有计数设置为1。

sync.acquire(1):

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg):

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            // 被其他线程持有 返回false
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 获取到了写锁 返回true
    setExclusiveOwnerThread(current);
    return true;
}

addWaiter(Node.EXCLUSIVE):

/**
* 获得写锁失败,排队
*/
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                // 再次获取锁成功 返回false 不让线程中断
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 取消尝试获取锁
            cancelAcquire(node);
    }
}

selfInterrupt():

static void selfInterrupt() {
    // 中断线程
    Thread.currentThread().interrupt();
}

1

Last modification:May 19th, 2020 at 04:18 pm