java并发编程(模式)
保护性暂停
用一个线程等待另一个线程的执行结果
保护性暂停是同步的

案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| @Slf4j(topic = "c.text4") public class Test4 { public static void main(String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { log.debug("等待结果"); String str = (String)guardedObject.get(); log.debug("结果" + str); }, "t1").start();
new Thread(() -> { try { Thread.sleep(3000); log.debug("产生结果"); guardedObject.complete("aaaaa"); } catch (InterruptedException e) { e.printStackTrace(); } },"t2").start(); } }
class GuardedObject{
private Object response;
public Object get() { synchronized (this) { try { while (response == null) this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
return response; }
public Object get(long timeout) { synchronized (this) { long begin = System.currentTimeMillis(); long passedTime = 0; try { while (response == null) { long waitTime = timeout - passedTime; if (waitTime <= 0) { break; } this.wait(waitTime); passedTime = System.currentTimeMillis() - begin; } } catch (InterruptedException e) { e.printStackTrace(); } }
return response; } public void complete(Object response) { synchronized (this) { this.response = response; this.notifyAll(); } }
}
|
保护性暂停例子
邮递员送信,居民收信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| @Slf4j(topic = "c.text4") public class Test4 { public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { People people = new People(); people.start(); } Thread.sleep(500); for (Integer id : Mailboxes.getIds()) { new Postman(id, "内容" + id).start(); } } }
@Slf4j(topic = "c.People") class People extends Thread{ @Override public void run() { GuardedObject guardedObject = Mailboxes.createGuardedObject(); log.debug("开始收信:{}", guardedObject.getId()); Object mail = guardedObject.get(5000); log.debug("{}收到信:{}", guardedObject.getId(), mail); } }
@Slf4j(topic = "c.Postman") class Postman extends Thread{ private int id; private String mail;
@Override public void run() { GuardedObject go = Mailboxes.getGuardedObject(this.id); go.complete(mail); }
public Postman(int id, String mail) { this.id = id; this.mail = mail; } }
class Mailboxes{ private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
private static synchronized int generateId() { return id++; }
public static synchronized GuardedObject createGuardedObject() { GuardedObject go = new GuardedObject(generateId()); boxes.put(go.getId(), go); return go; }
public static Set<Integer> getIds() { return boxes.keySet(); }
public static GuardedObject getGuardedObject(int id) { return boxes.remove(id); } }
class GuardedObject{
private int id;
private Object response;
public GuardedObject() { }
public GuardedObject(int id) { this.id = id; }
public int getId() { return id; }
public Object get() { synchronized (this) { try { while (response == null) this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
return response; }
public Object get(long timeout) { synchronized (this) { long begin = System.currentTimeMillis(); long passedTime = 0; try { while (response == null) { long waitTime = timeout - passedTime; if (waitTime <= 0) { break; } this.wait(waitTime); passedTime = System.currentTimeMillis() - begin; } } catch (InterruptedException e) { e.printStackTrace(); } }
return response; }
public void complete(Object response) { synchronized (this) { this.response = response; this.notifyAll(); } }
}
|
生产者/消费者
通过消息队列实现
是异步的
案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| class MessageQueue { private LinkedList<Message> list; private int capcity;
public MessageQueue(int capcity) { list = new LinkedList<>(); this.capcity = capcity; }
public Message take() { synchronized (this) { while (list.isEmpty()) { try { log.debug("队列为空"); this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message msg = list.removeFirst(); this.notifyAll(); log.debug("已消费" + msg); return msg; } }
public void put(Message msg) { synchronized (this) { while (list.size() == capcity) { try { log.debug("队列已满"); this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.addLast(msg); log.debug("已生产" + msg); this.notifyAll(); } } }
final class Message { private int id; private Object value;
public Message(int id, Object value) { this.id = id; this.value = value; }
public int getId() { return id; }
public Object getValue() { return value; } }
|
顺序控制
必须按照顺序执行代码
同步的
案例
顺序交替打印
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public class Test8 { private static ReentrantLock lock = new ReentrantLock();
private static int flag = 1;
public static void main(String[] args) throws InterruptedException { AwaitSignal as = new AwaitSignal(5); Condition aWaitSet = as.newCondition(); Condition bWaitSet = as.newCondition(); Condition cWaitSet = as.newCondition();
new Thread(() -> { as.print("a", aWaitSet, bWaitSet); }).start(); new Thread(() -> { as.print("b", bWaitSet, cWaitSet); }).start(); new Thread(() -> { as.print("c", cWaitSet, aWaitSet); }).start();
Thread.sleep(1000); as.start(aWaitSet); } }
@Slf4j(topic = "c.AwaitSignal") class AwaitSignal extends ReentrantLock { private int loopNum;
public AwaitSignal(int loopNum) { this.loopNum = loopNum; }
public void start(Condition first) { this.lock(); try { log.debug("start"); first.signal(); } finally { this.unlock(); } }
public void print(String str, Condition current, Condition next) { for (int i = 0; i < loopNum; i++) { this.lock(); try { current.await(); log.debug(str); next.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { this.unlock(); } } } }
|
犹豫模式
在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回
实例1(只启动一个监控线程)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class MonitorService { private volatile boolean starting; public void start() { log.info("尝试启动监控线程..."); synchronized (this) { if (starting) { return; } starting = true; }
} }
|
实例2(单例模式)
1 2 3 4 5 6 7 8 9 10 11 12
| public final class Singleton { private Singleton() { } private static Singleton INSTANCE = null; public static synchronized Singleton getInstance() { if (INSTANCE != null) { return INSTANCE; } INSTANCE = new Singleton(); return INSTANCE; } }
|
享元模式
java中,包装类Boolean,Byte,Short,Integer,Long,Character等使用到了享元模式。调用valueOf方法时,如果值在其缓存的数组内,则直接从缓存中读取
Long的valueOf方法实现
1 2 3 4 5 6 7
| public static Long valueOf(long l) { final int offset = 128; if (l >= -128 && l <= 127) { return LongCache.cache[(int)l + offset]; } return new Long(l); }
|
各个包装类的缓存范围
- Byte, Short, Long 缓存的范围都是 -128~127
- Character 缓存的范围是 0~127
- Integer的默认范围是 -128~127
- 最小值不能变
- 但最大值可以通过调整虚拟机参数
-Djava.lang.Integer.IntegerCache.high
来改变
- Boolean 缓存了 TRUE 和 FALSE
线程池小demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| public class Test20 { public static void main(String[] args) { Pool pool = new Pool(2); for (int i = 0; i < 5; i++) { new Thread(() -> { Connection conn = pool.borrow(); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } pool.free(conn); }).start(); } } }
@Slf4j(topic = "c.Pool") class Pool { private final int poolSize;
private Connection[] connections;
private AtomicIntegerArray states;
public Pool(int poolSize) { this.poolSize = poolSize; this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接" + i); } }
public Connection borrow() { while (true) { for (int i = 0; i < poolSize; i++) { if (states.get(i) == 0) { states.compareAndSet(i, 0, 1); log.debug("获取连接{}", connections[i].toString()); return connections[i]; } }
synchronized (this) { try { log.debug("等待获取连接"); this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public void free(Connection connection) { for (int i = 0; i < poolSize; i++) { if (connections[i] == connection) { states.set(i, 0); synchronized (this) { log.debug("归还连接{}", connections[i].toString()); this.notifyAll(); } break; } } }
}
|
分工模式
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务,它的典型实现 就是线程池,也体现了经典设计模式中的享元模式。
饥饿问题
如果一个线程池的线程什么都做,有可能会产生饥饿。
如一个线程调用线程池中的其他线程来完成任务,如果所有线程都处于调用其他线程的状态,而没有执行任务的线程,就会发生饥饿
解决饥饿方式
不同的任务类型,采用不同的线程池
创建多少线程池合适
线程不是越多越好,太多线程会占用大量内存,线程的上下文切换也会消耗资源
任务分为运算密集型和I/O密集型,当运算密集型任务多的时候,应线程数较少;而I/O密集型任务多时,应多创建线程
经验公式:线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间