Stay Hungry Stay Foolish

Java语言多线程简介

Posted on By Jun Xi Gu

本文是对Java语言多线程的基本知识的汇总和个人理解


多线程主要有两个好处:更快的解决问题,能给某些应用场景提供更好的设计

以下内容中的代码均出自《Java编程思想》

Java基本的多线程应用

在Java的多线程应用中,设计到几个基本的概念需要首先了解

  • 任务:任务是每个线程中所执行的逻辑,想并发执行的任务会在不同的线程上执行
  • 线程:线程是用来驱动任务并发执行的资源,每个任务会绑定到一个线程上
  • 线程池:管理多个线程的生命周期工具,例如像重用线程

Java提供了一些基本的API来使用多线程编程,这些基本API包括

  • 封装任务,每个任务都单独运行在一个线程上
  • 创建和管理线程
  • 对线程进行配置,如优先级,deamon等属性

封装任务

当使用线程时,其实是想在不同的线程上执行不同的任务,所以需要先创建任务,创建任务可以使用两种接口:Runnable和Callable

例如

class LiftOff implements Runnable {
	public void run() {
		// Do things 
	}
}

class TaskWithResult implements Callable<String> {
	public String call() {
		// Do things
		return "";
	}
}

可见,区别就是Callable的接口有返回值,也就是线程执行完以后能返回结果

创建和管理线程

所有的任务都需要线程来驱动,然后不同的任务就可以在不同的线程驱动下并发的运行了

例如

public class BasicThreads {
	public static void main(String[] args) {
		Thread t = new Thread(new Runnable(){
			public void run() {
				// Do things 
			}
		});
		t.start();
		Print.print("Waiting for Lift off");
	}
}

直接创建一个Thread,然后把Runnable放到构造函数里,然后start,就能运行任务了,但Thread是不能用来驱动Callable的,下面会展示驱动Callable的方法

通过手动创建的Thread需要自己管理Thread,例如考虑到Thread资源的重用等问题,这些逻辑会比较复杂,为此,新的Java API提供了多种线程池,这些管理器提供的功能主要是创建线程,管理线程生命周期等,它们都实现了ExecutorService接口,以下可以创建线程池

  • Executors.newFixedThreadPool:创建线程数量固定的线程池
  • Executors.newCachedThreadPool;创建线程池,线程数目不定,按需创建
  • Executors.newSingleThreadExecutor:创建只有一个线程的线程池,所有提交的任务变成序列执行

通过线程池可以驱动Runnable和Callable

例如

public class CachedThreadPool {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newCachedThreadPool();
		for (int i = 0; i < 5; i++) {
			exec.execute(new Runnable(){
				public void run() {
					// Do things 
				}
			});
		}
		exec.shutdown();
	}
}

public class CallableDemo {
	public static void main(String[] args) {
		List<Future<String>> results = new ArrayList<Future<String>>();
		ExecutorService exec = Executors.newCachedThreadPool();
		for (int i = 0; i < 5; i++) {
			results.add(exec.submit(new Callable<String>(){
				public String run() {
					// Do things 
					return "";
				}
			}));
		}
		for (Future<String> f : results) {
			try {
				Print.print(f.get());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			} finally {
				exec.shutdown();
			}
		}
	}

}

当用线程池驱动Callable时,通过Future来获得返回结果
现在一般都是用线程池来提交任务,管理线程的生命周期

配置线程

可以通过Thread的API来配置线程的一些属性,例如

Thread.currentThread().setName //设置线程名字
Thread.currentThread().setDaemon //设置线程是否为后台线程
Thread.currentThread().setPriority //设置线程优先级
Thread.currentThread().setUncaughtExceptionHandler //设置线程的异常处理器

当使用线程池来创建线程时,需要给线程池提供一个线程Factory,在这个Factory中创建线程并配置线程的属性,例如

class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
	public void uncaughtException(Thread t, Throwable e) {
		Print.print("caught " + e);
	}
}

public class MyThreadFactory implements ThreadFactory {
	public Thread newThread(Runnable r) {
		Thread t = new Thread(r);
		t.setDaemon(true);
		t.setName("Test");
		t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
		return t;
	}
}

class DaemonFromFactory implements Runnable {
	public void run() {
		Thread.currentThread.setPriority(Thread.MIN_PRIORITY);
		// Do things
	}
	public static void main(String[] args) throws InterruptedException {
		ExecutorService exec = Executors
				.newCachedThreadPool(new MyThreadFactory());
		for (int i = 0; i < 10; i++) {
			exec.execute(new DaemonFromFactory());
		}
		Print.print("All daemons started");
		TimeUnit.MILLISECONDS.sleep(500);
	}

}

以上代码创建了一个异常处理器MyUncaughtExceptionHandler,然后创建一个线程工厂,在线程工厂里创建线程并配置线程参数,这些线程都是驱动任务的线程

以上都是在使用多线程时用到的概念:任务,线程,线程池和它们用法的简单例子,对于这几个概念所对应的类和接口,Java提供了很多API,希望使用之前先多看看代码例子熟悉API

线程的互斥

多线程的复杂在于不同的线程之间会互相影响,其中一种是不同的线程会共用资源,内存,I/O等;这些资源有时候是只能由一个线程从任务的开始到结束的过程中来独占的,不然在线程任务的中间被其他线程干扰后导致状态改变会引起问题
所以,多线程需要解决的一个重要问题是资源的互斥使用,这个问题其实可表现为两方面:原子性和可见性

原子性:一系列操作从开始到结束不能被分割,中断;例如i++其实中间包括好几条指令,这些指令执行时不能被分割中断

可见性:在不同的线程之间,一个线程经过处理得到的结果时,其他线程读取这个结果时,能得到这个结果;例如一个线程执行了i++,然后其他线程读取i时能取得i自增以后的结果

对除long和double外的基本类型进行读取和赋值时,这两个操作是原子的,但不能确保可见性;当用volatile修饰变量时,对变量进行读取和赋值的操作符合原子性和可见性

volatile只能保证变量的读取和赋值满足原子性和可见性,甚至对于基本类型的变量的加减等操作是不能保证的,这时可以用Java API中的原子类,它们提供一些基本类型的运算操作,这些操作是原子的和可见的

但为了保证事务(一系列操作)的原子性和可见性,则需要通过锁来完成,使用锁的方法包括synchronized,Lock

操作 如何满足原子性 如何满足可见性
除long和double外的基本类型的取值和赋值 自动满足 用volatile修饰
变量的取值和赋值 用volatile修饰 用volatile修饰
一系列操作 用锁 用锁

除了确保线程之间的互斥来解决冲突以外,其实有时可以避免冲突,例如用ThreadLocal或其他方法来使得线程不共享资源,使用如CopyOnWriteArray,CopyOnWriteSet,ConcurrentHashMap等免锁容器,使用Atomic等乐观加锁的类等减少加锁的性能损失

线程的协作

除了保持线程对资源的互斥使用外,还需要解决线程间的协作问题,也就是一个线程需要的输入是另一个线程产生的输出,为了保证这两线程任务的顺序性和资源的互斥使用,Java API在锁之上提供了握手API:wait,notify,notifyAll

wait:一个下游线程在获得一个资源的锁,单独使用资源的时候,发现其上游线程还没处理好时,释放获得的资源的锁,并阻塞自己
notify,notifyAll:一个上游线程在获得一个资源的锁后,处理完以后,释放获得的锁并通知下游线程

例如

class Car {
	private boolean waxOn = false;

	public synchronized void wax() throws InterruptedException {
		while (waxOn) {
			wait();
		}
		waxOn = true;
		Print.printnb("Wax on! ");
		notifyAll();
	}

	public synchronized void buffer() throws InterruptedException {
		while (!waxOn) {
			wait();
		}
		waxOn = false;
		Print.print("Wax off! ");
		notifyAll();
	}

}

class WaxOn implements Runnable {
	private Car car;

	public WaxOn(Car car) {
		this.car = car;
	}

	public void run() {
		try {
			while (!Thread.interrupted()) {
				car.wax();
				TimeUnit.MILLISECONDS.sleep(200);
			}
		} catch (InterruptedException e) {
			Print.print("Existing via interrupt");
		}
		Print.print("Ending wax on task");
	}

}

class WaxOff implements Runnable {
	private Car car;

	public WaxOff(Car car) {
		this.car = car;
	}

	public void run() {
		try {
			while (!Thread.interrupted()) {
				car.buffer();
				TimeUnit.MILLISECONDS.sleep(200);
			}
		} catch (InterruptedException e) {
			Print.print("Existing via interrupt");
		}
		Print.print("Ending wax off task");
	}
}

public class WaxMatic {

	public static void main(String[] args) throws InterruptedException {
		ExecutorService exec = Executors.newCachedThreadPool();
		Car car = new Car();
		exec.execute(new WaxOn(car));
		exec.execute(new WaxOff(car));
		TimeUnit.SECONDS.sleep(5);
		exec.shutdownNow();
	}

}

一个值得的注意的地方是wait,notify等API都是在锁对象执行的,所以必须要先通过sychronized获得同一个对象的锁,然后才能使用握手API

另外Lock也提供了握手的接口,要通过Lock获得Condition对象,然后使用Condition对象完成握手
condition.await
condition.signal
condition.signalAll

握手API是非常底层的线程协作API,Java API提供了一些高层的组件,这些组件的操作自动完成了握手的功能,例如BlockingQueue,CountDownLatch,CyclicBarrier,DelayQueue,PriorityBlockingQueue,ScheduleExecutor,Semaphore,Exchanger等,它们都更方便的使用

线程的死锁

当多线程满足以下4个条件时能出现死锁:

  1. 存在互斥使用的资源
  2. 至少一个任务持有一个资源并等待另外一个资源
  3. 非抢占式的使用资源
  4. 循环等待

为了防止死锁,只需要破坏以上其中一个条件即可

任务的终结

当任务被终结时,线程会被回收
终结任务的方式就是通过标记位来通知任务需要终结自己

但任务在其执行过程中使用一些操作会被阻塞时,如何终结会阻塞的任务就变得稍微复杂
一个任务进入阻塞状态的原因包括

  • sleep
  • I/O操作
  • wait
  • 尝试获取锁

为了终结可能进入阻塞的任务,则需要中断阻塞才行,可惜不是每种阻塞都能被中断

终结sleep,wait阻塞

通过Thread.interrupt,Future.cancel,ExecutorService.shutdownNow等来终端线程的sleep和wait,对于这种中断处理需要按照一种模式来安全地编码,释放资源

class NeedsCleanup {
	private final int id;

	public NeedsCleanup(int id) {
		this.id = id;
	}

	public void cleanup() {
		Print.print("Cleaning up " + id);
	}
}

class Blocked3 implements Runnable {
	private volatile double d = 0.0;

	public void run() {
		try {
			while (!Thread.interrupted()) {
				NeedsCleanup n1 = new NeedsCleanup(1);
				try {
					Print.print("Sleeping");
					TimeUnit.SECONDS.sleep(1);

					NeedsCleanup n2 = new NeedsCleanup(2);
					try {
						Print.print("Calculating");
						for (int i = 0; i < 2500000; i++) {
							d = d + (Math.PI + Math.E) / d;
						}
						Print.print("Finish time-consuming operation.");
					} finally {
						n2.cleanup();
					}
				} finally {
					n1.cleanup();
				}
			}
			Print.print("Existing via while() test");
		} catch (InterruptedException e) {
			Print.print("Existing via InterruptedException");
		}
	}

}

public class InterruptingIdiom {

	public static void main(String[] args) throws InterruptedException {
		if (args.length != 1) {
			Print.print("usage: java InterruptingIdiom delay-in-mS");
			System.exit(1);
		}
		Thread t = new Thread(new Blocked3());
		t.start();
		TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
		t.interrupt();
	}
}

在任务中使用嵌套的try catch来完成资源的释放,通过while条件判断来获取中断的信号

终结I/O阻塞

不能通过线程API来中断I/O,但可以通过释放stream等资源来跳出阻塞

class IOBlocked implements Runnable {
	private InputStream in;

	public IOBlocked(InputStream is) {
		in = is;
	}

	public void run() {
		try {
			Print.print("Waiting for read()%: ");
			in.read();
		} catch (IOException e) {
			if (Thread.currentThread().isInterrupted()) {
				Print.print("Interrupted from blocked I/O");
			} else {
				throw new RuntimeException(e);
			}
		}
		Print.print("Exiting IOBlocked.run()");
	}
}

public class CloseResource {
	public static void main(String[] args) throws Exception {
		ExecutorService exec = Executors.newCachedThreadPool();
		InputStream socketInput = new Socket("localhost", 8080)
				.getInputStream();
		exec.execute(new IOBlocked(socketInput));
		exec.execute(new IOBlocked(System.in));
		TimeUnit.MILLISECONDS.sleep(100);
		Print.print("Shutting down all threads");
		exec.shutdownNow();
		TimeUnit.SECONDS.sleep(1);
		Print.print("Closing " + socketInput.getClass().getName());
		socketInput.close(); // Releases blocked thread
		TimeUnit.SECONDS.sleep(1);
		Print.print("C1osing " + System.in.getClass().getName());
		System.in.close(); // Releases blocked thread
		TimeUnit.SECONDS.sleep(1);
	}
}

若使用nio中的I/O API,则有API来中断I/O阻塞

class NIOBlocked implements Runnable {
	private final SocketChannel sc;

	public NIOBlocked(SocketChannel sc) {
		this.sc = sc;
	}

	public void run() {
		try {
			Print.print("Waiting for read() in " + this);
			sc.read(ByteBuffer.allocate(1));
		} catch (ClosedByInterruptException e) {
			Print.print("ClosedBy1nterruptException");
		} catch (AsynchronousCloseException e) {
			Print.print("AsynchronousCloseException");
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		Print.print("Exiting NIOBlocked.run() " + this);
	}
}

public class NIOInterruption {

	public static void main(String[] args) throws Exception {
		ExecutorService exec = Executors.newCachedThreadPool();
		InetSocketAddress isa = new InetSocketAddress("localhost", 8080);
		SocketChannel sc1 = SocketChannel.open(isa);
		SocketChannel sc2 = SocketChannel.open(isa);
		Future<?> f = exec.submit(new NIOBlocked(sc1));
		exec.execute(new NIOBlocked(sc2));
		// or can interrupt all by exec.shutdownNow();
		exec.shutdown();
		TimeUnit.SECONDS.sleep(1);
		// Produce an interrupt via cancel:
		f.cancel(true);
		TimeUnit.SECONDS.sleep(1);
		// Release the block by closing the channel:
		sc2.close();
	}
}

终结锁阻塞

如果进入阻塞时是通过Lock的lockInterruptibly API来获取锁的,则阻塞可以被中断

class BlockedMutex {
	private Lock lock = new ReentrantLock();

	public BlockedMutex() {
		// Acquire it right away. to demonstrate interruption
		// of a task blocked on a ReentrantLock:
		lock.lock();
	}

	public void f() {
		try {
			// This will never be available to a second task
			lock.lockInterruptibly(); // Special call
			Print.print("lock acquired in f()");
		} catch (InterruptedException e) {
			Print.print("1nterrupted from lock acquisition in f()");
		}
	}
}

class Blocked2 implements Runnable {
	BlockedMutex blocked = new BlockedMutex();

	public void run() {
		Print.print("Waiting for f() i n BlockedMutex");
		blocked.f();
		Print.print("Broken out of blocked call ");
	}
}

public class Interrupting2 {

	public static void main(String[] args) throws InterruptedException {
		Thread t = new Thread(new Blocked2());
		t.start();
		TimeUnit.SECONDS.sleep(1);
		Print.print("Issuing t.interrupt()");
		t.interrupt();
	}

}