导读:感觉自己不会并发编程,因此阅读了 java 核心编程第一卷的并发编程章节,结合网上博客文章,做了一些总结。
进程:一个操作系统运行多个任务,比如可以在一个操作系统上同时玩游戏、写文档、上网等。
线程:一个进程上运行多个处理流水线,给人一种并行处理的错觉。比如一个GUI应用程序可以同时处理多个按钮的监听事件,而不是互相干扰,彼此等待。线程是为了提高应用程序的使用率。
Java线程的五个状态:new、runnable、running、blocked、dead。
继承 Thread 类,重写 run 方法。
xxxxxxxxxx
class MyThreadExtended extends Thread {
public void run() {
System.out.println("Running");
}
}
实现 Implemented 接口,重写 run 方法。
xxxxxxxxxx
class MyThread implements Runnable {
public void run() {
System.out.println("Running");
}
}
public class Test {
public static void main(String[] args) {
th1 = new Thread(new MyThread());
th1.start()
}
}
通过 Callable 的实现类,在下文详细讲解。
Synchronized 是自动挡汽车,能满足大部分锁的需求,但如果另有需要,就需要手动 lock 了。
一些概念:
锁(Lock):如果一个物体获得锁,其他线程在访问持有锁的物体时,进入阻塞状态。这可以避免竞争状态(race condition)下的数据损坏(Data Corruption)。
显式锁的例子:
假设有两个线程,第一个线程执行到 lock.lock()
时,发现还没有持有锁,因此它得到锁,并开始执行临界区域的代码。第二个线程也试图执行临界区域代码,但执行到 lock.lock()
时发现 lock
对象已经被锁住了,所以开始排队等待。当第一个线程放弃锁,第二个线程就获得锁,开始执行临界区域代码。
xxxxxxxxxx
private ReentrantLock lock = new ReentrantLock();
public void dataAccess(){
lock.lock();
try {
//临界区域 Critical Area
} finally {
//必须写在这里,不然容易产生死锁
lock.unlock()
}
}
死锁(Dead Lock):所有线程都在等待某线程放弃某个物体的锁,该线程自己进入了等待状态,或者没有放弃物体的锁,造成所有线程无限等待,应用程序无法响应。
把上面代码的解锁部分去掉,就造成了死锁,因为线程2无限等待线程1放弃lock,而线程1已经结束了。
内在锁(Intrinsic Lock):每个Object对象/方法/类 都配备内在的锁,使用 Synchronized 可以对某对象/方法/类 加内在锁。
Synchronized,根据修饰位置和对象的不同,可以分为 “对象锁”, “类锁”, “方法锁”。
此 synchronized 获取的是 this 的内在锁,类别为对象锁,一次只能有一个线程访问同一对象的临界区域:
xxxxxxxxxx
class DataAccessObject {
public void printNumbers() {
synchronized (this) {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread() + "|" + i);
}
}
}
}
相当于以下代码(不完全是),这个 synchronized 获得的是方法和对象的内在锁:
xxxxxxxxxx
class DataAccessObject {
public synchronized void printNumbers() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread() + "|" + i);
}
}
}
synchronized 可以把类锁住,此处获得的是类锁,这样一次只能由一个线程访问这个类。
xxxxxxxxxx
class DataAccessObject {
public void printNumbers() {
synchronized(this.class){
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread() + "|" + i);
}
}
}
Synchronized 块内还能套用 Synchronized,典型的三线程循环打印ABC的题目就是这样(需要了解)。
Synchronized 的大量使用会造成很大的开销。
详细参考 [知乎文章] https://zhuanlan.zhihu.com/p/71156910
悲观锁(Pessimistic Lock):不是特指 java 中的某个锁,而是一种应对并发的策略,即认为拿到的数据肯定被其他线程修改过了,所以一拿到数据就上锁。java 中所有的锁都是悲观锁。
乐观锁(Optimistic Lock):没有哪种 java 类直接叫 OptimisticLock,这还是一种应对并发的策略,即认为读取的数据肯定没被修改过,但对数据写操作之前,先检查一段时间内即从上次读取到现在,数据是否被改过了。如果没改过,则赋给它新的值,否则循环等待。乐观锁不是锁,是一种基于CAS的算法。
CAS操作:即 compareAndSwap,是乐观锁实现的基础,java中存在native方法,可以一步完成(原子的)。
Synchronized 的 “自动挂档”: 无锁 -- 偏向锁 --自旋锁(轻量级锁)-- 重量级锁
偏向锁:synchronized 修饰的块,有个线程一开始就抢到了锁,并且没有其他线程参与锁竞争,线程第一次结束临界代码执行,不主动释放锁,如果第二次又到达临界代码块,则由于线程仍持有锁,不必重新加锁,所以性能很高。
自旋锁(轻量级锁):基于CAS,通过 while (true) 判断锁标志位是否发生改变,如果改变了,则修改标志位,并成功抢到锁。自旋锁现象发生在轻度的锁竞争当中,抢不到锁的线程只能不断 while true,还什么都干不了,非常耗资源。
重量级锁:当参与竞争的线程太多,synchronized 就将自身的内在锁升级为重量级锁,让超出轻量级锁最大额度的线程自行挂起,避免空耗资源。
可中断锁:Lock实现类大部分可被中断,synchronized 不是可中断锁。
递归锁:就是 ReenterantLock,可重入锁,即持有锁的临界代码内部还能继续持有该锁。
公平锁(Fair Lock):通过如下方式定义private ReentrantLock lock = new ReentrantLock(true);
如果锁是公平的,则当一个持有锁的物体放弃锁的时候,排队等待时间最长的那个物体优先得到锁。公平锁的开销比一般锁大。
公平锁不一定公平。(为什么?)
读写锁(Read/Write Lock):通过以下方式:
xxxxxxxxxx
ReentrantReadWriteLock rwl = new ReenterantReadWriteLock();
ReadLock readLock = rwl.readLock();
WriteLock writeLock = rwl.writeLock();
共享锁:就是读锁。其他线程可以读(锁计数器+1),但不能写。
互斥锁:就是写锁,持有时,其他线程不论是想要读取还是写入都必须等待。
Thread.sleep()
方法:
同步阻塞当前线程,进入 blocked 状态,sleep 结束后重新进入 runnable 状态。
xxxxxxxxxx
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
join()
方法:
其他线程调用 join
使得当前线程被阻塞,直到调用者线程死亡。
xxxxxxxxxx
try {
System.out.println(Thread.currentThread() + "Thread1 joined");
thread1.join();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread() + "Interrupted");
}
wait()
和 notifyAll()
方法:
是 Object 类的方法,必须在已经得到锁的情况下使用,包括内在锁 (Intrinsic Lock) 和显式锁 ( 主要是 Reentrant Lock )。使用后,获得锁的物体进入阻塞状态,直到等待期结束(如果设置等待时间、否则无限等待)、被 notify
或者 interrupt
。
进行银行取钱模拟实验,建立两个线程,一个线程取钱,当钱不够的时候无限等待,另一个线程存钱。
xxxxxxxxxx
private int allMoney = 1000;
public synchronized void decreaseMoney(int money) {
while (allMoney < money) {
System.out.println("No money, start waiting...");
try {
this.wait(); //重要,是指当前线程在 this 上 wait
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.allMoney -= money;
System.out.println("Money decrease OK, money = " + this.allMoney);
}
public synchronized void increaseMoney(int money) {
this.allMoney += money;
System.out.println("Money increase OK, money = " + this.allMoney);
this.notifyAll(); //重要,是指叫醒所有在 this 物体上 wait 的线程
}
分析:当钱不够的时候进入阻塞状态,另一个线程不断增加钱,每增加一次,调用 notifyAll()
试图唤醒阻塞的线程,当发现钱还是不够时,继续进入阻塞,反复循环,直到钱增加到指定数量时扣除。
当 wait
调用者不是锁的持有者时,抛出 IllegalMonitorState
异常。
和 sleep()
的比较
sleep
能在任何地方调用,wait
必须在临界区域调用。InterruptedException
。
yield()
方法:
调用者线程把状态恢复到 Runnable,但仍有可能在进入 Runnable 后又被立刻选中进入 Running 状态。
原子操作例子:a += 1
,并不是原子操作,而是三个原子操作的集合:
正是因为其操作的非原子性,在CPU时间调度时,很可能在其中一个操作做完后,就切换到另一个线程了,这样就会导致数据损坏,也就有了线程安全一说。
Volatile 关键字的双重语义:
为了 I/O 一个变量设置一堆锁显然费时费力,这时 Volatile 就有用处了。
xxxxxxxxxx
private volatile boolean done;
public boolean isDone() { return done; }
public void setDone() { done = true; }
如上,不必使用 synchronized 或者 lock,数据在一次原子操作后立即更新,对所有线程可见,也就避免了数据损坏。
但是给变量加了 volatile 一定安全吗?假设有两个线程在同时操作 volatile 修饰的 a 变量,设线程 A 取得了 a 变量的值为1,在计划对其 +1 操作时,线程 B 抢先一步把它更新为 3,然后切换回 a 线程,把 1 + 1 = 2 重新写回,结果 a 的值是 2,显然不是线程安全的。
甚至对于 a += 1
的情况也不安全,他不是原子操作;然而,对于 a++
是可行的,这是一步操作。
结论:volatile
仅对赋值、自增等原子操作才有用,否则仍然是线程不安全的。
暂时不讲。
Callable<V>
接口包含 call()
方法,类似 Runnable
的 run()
方法,但它有返回值。
Future<V>
接口的方法:
xxxxxxxxxx
public interface Future<V> {
V get() throws . . .;
V get(long timeout, TimeUnit unit) throws . . .;
void cancel(boolean mayInterrupt);
boolean isCancelled();
boolean isDone();
}
FutureTask<T>
类继承 FutureRunnable
,FutureRunnable
实现 Runnable
和 Future
。
FutureTask<T>
类封装一个 callable
实现类,通过 get()
方法得到 call()
的计算结果。如果计算没有结束,则开始同步阻塞等待。
通过下列封装,可以把 FutureTask
变成 Future
+ Runnable
。
xxxxxxxxxx
private static void testFuture() {
Callable<Integer> myCallable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(myCallable);
Thread th = new Thread(futureTask);
th.start();
try {
System.out.println(futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
xxxxxxxxxx
class MyCallable implements Callable<Integer> {
public Integer call() throws Exception {
return 1+2;
}
}
线程池是一个装载了一堆线程的统一调度工具,好处:
线程池类型有很多,个人感觉其创建属于工厂模式,都是一个名为 ThreadPoolExecutor
的类创建的,只不过每种线程池的参数不同,创建某个类别的线程池相当于创建了一种 ”预设“。
相关源码:
xxxxxxxxxx
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
一般的,每个线程池初始时都有一些等待的线程(Idle Thread),只需把创建好的 Callable/Runnable 对象扔进去(调用 submit方法)就能自动装配成一个完整的线程并开始执行。Callable submit之后,其返回值是 Future,可以从中得到计算结果返回值。
xxxxxxxxxx
private static void threadPoolTest() {
ExecutorService service = Executors.newCachedThreadPool();
Callable<Integer> myCallable = new MyCallable();
ArrayList<Future<Integer>> resList = new ArrayList<Future<Integer>>();
resList.add(service.submit(myCallable));
resList.add(service.submit(myCallable));
resList.add(service.submit(myCallable));
resList.add(service.submit(myCallable));
resList.add(service.submit(myCallable));
try {
for (Future<Integer> item: resList) {
System.out.println(item.get());
}
} catch (Exception e) {
System.out.println(e);
}
service.shutdown();
}
上述代码把 future 加到 resList 是一种获得结果的方法,如果不这么做,也可以:
invokeAll()
同步阻塞等待所有线程执行完毕,并返回一个结果集(List<Future<T>>
)。invokeAny()
随便返回一个执行完毕的线程的计算结果(Future<T>
)。