WhatAKitty Daily

A Programmer's Daily Record

自定义消息队列死锁

WhatAKitty   阅读次数loading...

背景

最近在看AQS并发框架以及其相关的同步器/队列等等。然后,想以同步器来实现一个双向链表的队列。但是,在实现过程中,发现一个问题:put/take线程在高并发的情况下两者阻塞,如果保持1ms的put时间间隔,则不会出现上述问题。

源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
public class MyQueue<T> {

private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();

private volatile Node<T> head;
private volatile Node<T> tail;

/**
* 实际大小
*/
private AtomicInteger count = new AtomicInteger();

/**
* 允许的容量
*/
private final int capacity;

public MyQueue(int capacity) {
this.capacity = capacity;
}

public T put(T item) throws InterruptedException {
assert item != null;
final ReentrantLock putLock = this.putLock;
putLock.lock();

int c = -1;
try {

// 队列已满,其他线程加入nextWaiters阻塞
while (count.get() == capacity) {
notFull.await();
}

enqueue(item);
c = count.getAndIncrement();
if (c + 1 < capacity) {
signalNotEmpty();
}

} finally {
putLock.unlock();
}
return item;
}

public T take() throws InterruptedException {
takeLock.lock();

int c = -1;
Node<T> node;
try {

// 队列为空,其他线程加入nextWaiters阻塞
while (count.get() == 0) {
notEmpty.await();
}

node = dequeue();
c = count.getAndDecrement();
if (c > 1) {
signalNotFull();
}

} finally {
takeLock.unlock();
}
return node == null ? null : node.item;
}

private void enqueue(T t) {
if (head == null) {
head = tail = new Node<>(t);
return;
}

// append to tail
Node<T> node = new Node<>(t);
tail.next = node;
tail = node;
}

private Node<T> dequeue() {
if (head == null) {
return null;
}

// remove first node from list
Node<T> removed;
Node<T> next = head.next;
head.next = null;

if (next != null) {
next.prev = null;
}

removed = head;
head = next;
return removed;
}

private void signalNotEmpty() {
takeLock.lock();

try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

private void signalNotFull() {
putLock.lock();

try {
notFull.signal();
} finally {
putLock.unlock();
}
}

private class Node<T> {

Node<T> next;
Node<T> prev;

private final T item;
Node(T item) {
this.item = item;
}

}

public static void main(String[] args) throws InterruptedException {

MyQueue<String> queue = new MyQueue<>(20);

new Thread(() -> {
while (true) {
try {
String item = queue.take();
if (item != null && item.trim() != "") {
System.out.println(item);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

Thread.sleep(1000);

new Thread(() -> {
try {
int i = 0;
int max = 1000000;
while (true) {
i++;
queue.put(String.valueOf(ThreadLocalRandom.current().nextInt(999999999)));
if (i > max) {
System.out.println("结束" + i);
break;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

}

}

问题定位

为了查看是哪里出现了阻塞的情况,我做了dump线程操作
如下是DUMP信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
"Thread-1@685" prio=5 tid=0xd nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at com.whatakitty.learn.collections.MyQueue.signalNotEmpty(MyQueue.java:136)
at com.whatakitty.learn.collections.MyQueue.put(MyQueue.java:62)
at com.whatakitty.learn.collections.MyQueue.lambda$main$1(MyQueue.java:193)
at com.whatakitty.learn.collections.MyQueue$$Lambda$2.1577213552.run(Unknown Source:-1)
at java.lang.Thread.run(Thread.java:748)


"Thread-0@681" prio=5 tid=0xc nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at com.whatakitty.learn.collections.MyQueue.signalNotFull(MyQueue.java:146)
at com.whatakitty.learn.collections.MyQueue.take(MyQueue.java:91)
at com.whatakitty.learn.collections.MyQueue.lambda$main$0(MyQueue.java:175)
at com.whatakitty.learn.collections.MyQueue$$Lambda$1.431687835.run(Unknown Source:-1)
at java.lang.Thread.run(Thread.java:748)

可以看到的是,两者阻塞地方是:

1
com.whatakitty.learn.collections.MyQueue.signalNotEmpty(MyQueue.java:136)

以及

1
com.whatakitty.learn.collections.MyQueue.signalNotFull(MyQueue.java:146)

里面的lock操作,那么必定是两个线程相互死锁了。

可以看到signalNotEmptysignalNotFull内部都有加锁操作。

我们可以假设如下情景:
putLock执行到notEmpty.single的时候,需要获取takeLock的锁,然后这个时候,takeLock正执行到notFull.single且take操作所在线程正持有takeLock的锁,导致前者无法获得锁,造成相互等待,形成死锁。

解决问题

问题的解决思路主要是参照了LinkedBlockingQueue的设计;在put操作的时候,如果加入了元素,且目前队列的大小不超过容量,则做notFull的通知,且如果初次增加,则做notEmpty唤醒,这么做主要是为了防止take操作所在线程park等待,而put后未通知造成阻塞的情况发生。

新的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
public class MyQueue<T> {

private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();

private volatile Node<T> head;
private volatile Node<T> tail;

/**
* 实际大小
*/
private AtomicInteger count = new AtomicInteger();

/**
* 允许的容量
*/
private final int capacity;

public MyQueue(int capacity) {
this.capacity = capacity;
}

public T put(T item) throws InterruptedException {
assert item != null;
final ReentrantLock putLock = this.putLock;
putLock.lock();

int c = -1;
try {

// 队列已满,其他线程加入nextWaiters阻塞
while (count.get() == capacity) {
notFull.await();
}

enqueue(item);
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}

} finally {
putLock.unlock();
}

// 在队列增加一个元素后,通知take线程,可以继续消费了
if (c == 0) {
signalNotEmpty();
}
return item;
}

public T take() throws InterruptedException {
takeLock.lock();

int c = -1;
Node<T> node;
try {

// 队列为空,其他线程加入nextWaiters阻塞
while (count.get() == 0) {
notEmpty.await();
}

node = dequeue();
c = count.getAndDecrement();
if (c > 1) {
// 通知take的其他线程
notEmpty.signal();
}

} finally {
takeLock.unlock();
}

// 在队列消费一次后,通知加入元素的线程,可以继续加入元素了
if (c == capacity) {
signalNotFull();
}
return node == null ? null : node.item;
}

private void enqueue(T t) {
if (head == null) {
head = tail = new Node<>(t);
return;
}

// append to tail
Node<T> node = new Node<>(t);
tail.next = node;
tail = node;
}

private Node<T> dequeue() {
if (head == null) {
return null;
}

// remove first node from list
Node<T> removed;
Node<T> next = head.next;
head.next = null;

if (next != null) {
next.prev = null;
}

removed = head;
head = next;
return removed;
}

private void signalNotEmpty() {
takeLock.lock();

try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

private void signalNotFull() {
putLock.lock();

try {
notFull.signal();
} finally {
putLock.unlock();
}
}

private class Node<T> {

Node<T> next;
Node<T> prev;

private final T item;
Node(T item) {
this.item = item;
}

}

public static void main(String[] args) throws InterruptedException {

MyQueue<String> queue = new MyQueue<>(20);

new Thread(() -> {
try {
int i = 0;
int max = 1000000;
while (true) {
i++;
queue.put(String.valueOf(ThreadLocalRandom.current().nextInt(999999999)));
if (i > max) {
System.out.println("结束" + i);
break;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

Thread.sleep(1000);

new Thread(() -> {
while (true) {
try {
String item = queue.take();
if (item != null && item.trim() != "") {
System.out.println(item);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

}

}