java并发编程(模式)

java并发编程(模式)

保护性暂停

用一个线程等待另一个线程的执行结果

保护性暂停是同步的

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Slf4j(topic = "c.text4")
public class Test4 {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
// 线程1等待线程2的结果
new Thread(() -> {
log.debug("等待结果");
String str = (String)guardedObject.get();
log.debug("结果" + str);
}, "t1").start();

new Thread(() -> {
try {
Thread.sleep(3000);
log.debug("产生结果");
guardedObject.complete("aaaaa");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}

class GuardedObject{

private Object response;

public Object get() {
synchronized (this) {
try {
// 这里用while是为了解决虚假唤醒问题
while (response == null)
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

return response;
}

// 带超时时间
public Object get(long timeout) {
synchronized (this) {
long begin = System.currentTimeMillis();
long passedTime = 0;
try {
while (response == null) {
long waitTime = timeout - passedTime;
if (waitTime <= 0) {
break;
}
this.wait(waitTime);
passedTime = System.currentTimeMillis() - begin;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

return response;
}

public void complete(Object response) {
synchronized (this) {
this.response = response;
this.notifyAll();
}
}

}

保护性暂停例子

邮递员送信,居民收信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@Slf4j(topic = "c.text4")
public class Test4 {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
People people = new People();
people.start();
}
Thread.sleep(500);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}

@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("{}收到信:{}", guardedObject.getId(), mail);
}
}

@Slf4j(topic = "c.Postman")
class Postman extends Thread{
private int id;
private String mail;

@Override
public void run() {
// 送信
GuardedObject go = Mailboxes.getGuardedObject(this.id);
go.complete(mail);
}

public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
}

class Mailboxes{
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();

private static int id = 1;

private static synchronized int generateId() {
return id++;
}

public static synchronized GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}

public static Set<Integer> getIds() {
return boxes.keySet();
}

public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
}


class GuardedObject{

private int id;

private Object response;

public GuardedObject() {
}

public GuardedObject(int id) {
this.id = id;
}

public int getId() {
return id;
}

public Object get() {
synchronized (this) {
try {
// 这里用while是为了解决虚假唤醒问题
while (response == null)
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

return response;
}

// 带超时时间
public Object get(long timeout) {
synchronized (this) {
long begin = System.currentTimeMillis();
long passedTime = 0;
try {
while (response == null) {
long waitTime = timeout - passedTime;
if (waitTime <= 0) {
break;
}
this.wait(waitTime);
passedTime = System.currentTimeMillis() - begin;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

return response;
}

public void complete(Object response) {
synchronized (this) {
this.response = response;
this.notifyAll();
}
}

}

生产者/消费者

通过消息队列实现

异步的

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// 消息队列
class MessageQueue {
private LinkedList<Message> list;
// 容量
private int capcity;

public MessageQueue(int capcity) {
list = new LinkedList<>();
this.capcity = capcity;
}

// 获取消息
public Message take() {
synchronized (this) {
// 检查队列是否为空
while (list.isEmpty()) {
try {
log.debug("队列为空");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 返回并删除队列第一个元素
Message msg = list.removeFirst();
this.notifyAll(); // 通知生产者
log.debug("已消费" + msg);
return msg;
}
}

// 放入消息
public void put(Message msg) {
synchronized (this) {
while (list.size() == capcity) {
try {
log.debug("队列已满");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addLast(msg);
log.debug("已生产" + msg);
this.notifyAll(); // 通知消费者
}
}
}


// 消息
final class Message {
private int id;
private Object value;

public Message(int id, Object value) {
this.id = id;
this.value = value;
}

public int getId() {
return id;
}

public Object getValue() {
return value;
}
}

顺序控制

必须按照顺序执行代码

同步的

案例

顺序交替打印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Test8 {
private static ReentrantLock lock = new ReentrantLock();

private static int flag = 1;

public static void main(String[] args) throws InterruptedException {
AwaitSignal as = new AwaitSignal(5);
Condition aWaitSet = as.newCondition();
Condition bWaitSet = as.newCondition();
Condition cWaitSet = as.newCondition();

new Thread(() -> {
as.print("a", aWaitSet, bWaitSet);
}).start();
new Thread(() -> {
as.print("b", bWaitSet, cWaitSet);
}).start();
new Thread(() -> {
as.print("c", cWaitSet, aWaitSet);
}).start();

Thread.sleep(1000);
as.start(aWaitSet);
}
}

@Slf4j(topic = "c.AwaitSignal")
class AwaitSignal extends ReentrantLock {
private int loopNum;

public AwaitSignal(int loopNum) {
this.loopNum = loopNum;
}

// 由主线程将第一个线程唤醒
public void start(Condition first) {
this.lock();
try {
log.debug("start");
first.signal();
} finally {
this.unlock();
}
}

public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNum; i++) {
this.lock();
try {
current.await(); // 先都到阻塞队列
log.debug(str);
next.signal(); // 叫醒下一个线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.unlock();
}
}
}
}

犹豫模式

在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回

实例1(只启动一个监控线程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MonitorService {
// 用来表示是否已经有线程已经在执行启动了
private volatile boolean starting;
public void start() {
log.info("尝试启动监控线程...");
synchronized (this) {
if (starting) {
return;
}
starting = true;
}

// 真正启动监控线程...
}
}

实例2(单例模式)

1
2
3
4
5
6
7
8
9
10
11
12
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}

享元模式

java中,包装类Boolean,Byte,Short,Integer,Long,Character等使用到了享元模式。调用valueOf方法时,如果值在其缓存的数组内,则直接从缓存中读取

Long的valueOf方法实现

1
2
3
4
5
6
7
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}

各个包装类的缓存范围

  • Byte, Short, Long 缓存的范围都是 -128~127
  • Character 缓存的范围是 0~127
  • Integer的默认范围是 -128~127
    • 最小值不能变
    • 但最大值可以通过调整虚拟机参数 -Djava.lang.Integer.IntegerCache.high 来改变
  • Boolean 缓存了 TRUE 和 FALSE

线程池小demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class Test20 {
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection conn = pool.borrow();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(conn);
}).start();
}
}
}

@Slf4j(topic = "c.Pool")
class Pool {
// 连接池大小
private final int poolSize;

// 连接对象的数组
private Connection[] connections;

// 连接状态的数组 0 表示空闲,1 表示繁忙
private AtomicIntegerArray states;

// 构造方法
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + i);
}
}

// 获取连接
public Connection borrow() {
while (true) {
for (int i = 0; i < poolSize; i++) {
if (states.get(i) == 0) {
states.compareAndSet(i, 0, 1);
log.debug("获取连接{}", connections[i].toString());
return connections[i];
}
}

// 没有空闲连接,线程进入等待状态,防止cpu空转
synchronized (this) {
try {
log.debug("等待获取连接");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

// 归还连接
public void free(Connection connection) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == connection) {
states.set(i, 0);
synchronized (this) {
log.debug("归还连接{}", connections[i].toString());
this.notifyAll();
}
break;
}
}
}

}

分工模式

让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务,它的典型实现 就是线程池,也体现了经典设计模式中的享元模式。

饥饿问题

如果一个线程池的线程什么都做,有可能会产生饥饿。

如一个线程调用线程池中的其他线程来完成任务,如果所有线程都处于调用其他线程的状态,而没有执行任务的线程,就会发生饥饿

解决饥饿方式

不同的任务类型,采用不同的线程池

创建多少线程池合适

线程不是越多越好,太多线程会占用大量内存,线程的上下文切换也会消耗资源

任务分为运算密集型和I/O密集型,当运算密集型任务多的时候,应线程数较少;而I/O密集型任务多时,应多创建线程

经验公式:线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间


java并发编程(模式)
http://xwww12.github.io/2022/11/24/java/java并发编程(模式)/
作者
xw
发布于
2022年11月24日
许可协议