本文共 28082 字,大约阅读时间需要 93 分钟。
1.Excutor
源码非常简单,只有一个execute(Runnable command)回调接口
public interface Executor {
/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the <tt>Executor</tt> implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution. * @throws NullPointerException if command is null */ void execute(Runnable command);}执行已提交的 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():
Executor executor = anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...
不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用方的线程中立即运行已提交的任务:
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
更常见的是,任务是在某个不是调用方线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这说明了一个复合执行程序。
class SerialExecutor implements Executor { final Queuetasks = new LinkedBlockingQueue (); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
2.ExcutorService接口
* @since 1.5 * @author Doug Lea */public interface ExecutorService extends Executor { /** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * @throws SecurityException if a security manager exists and * shutting down this ExecutorService may manipulate * threads that the caller is not permitted to modify * because it does not hold { @link * java.lang.RuntimePermission}("modifyThread"), * or the security manager's checkAccess method * denies access. */ void shutdown(); /** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks that were * awaiting execution. * *There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. For example, typical * implementations will cancel via {
@link Thread#interrupt}, so any * task that fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution * @throws SecurityException if a security manager exists and * shutting down this ExecutorService may manipulate * threads that the caller is not permitted to modify * because it does not hold { @link * java.lang.RuntimePermission}("modifyThread"), * or the security manager's checkAccess method * denies access. */ ListshutdownNow(); /** * Returns true if this executor has been shut down. * * @return true if this executor has been shut down */ boolean isShutdown(); /** * Returns true if all tasks have completed following shut down. * Note that isTerminated is never true unless * either shutdown or shutdownNow was called first. * * @return true if all tasks have completed following shut down */ boolean isTerminated(); /** * Blocks until all tasks have completed execution after a shutdown * request, or the timeout occurs, or the current thread is * interrupted, whichever happens first. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return true if this executor terminated and * false if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * Submits a value-returning task for execution and returns a * Future representing the pending results of the task. The * Future's get method will return the task's result upon * successful completion. * * * If you would like to immediately block waiting * for a task, you can use constructions of the form * result = exec.submit(aCallable).get(); * *
Note: The {
@link Executors} class includes a set of methods * that can convert some other common closure-like objects, * for example, { @link java.security.PrivilegedAction} to * { @link Callable} form so they can be submitted. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */Future submit(Callable task); /** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's get method will * return the given result upon successful completion. * * @param task the task to submit * @param result the result to return * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ Future submit(Runnable task, T result); /** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's get method will * return null upon successful completion. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ Future submit(Runnable task); /** * Executes the given tasks, returning a list of Futures holding * their status and results when all complete. * { @link Future#isDone} is true for each * element of the returned list. * Note that a completed task could have * terminated either normally or by throwing an exception. * The results of this method are undefined if the given * collection is modified while this operation is in progress. * * @param tasks the collection of tasks * @return A list of Futures representing the tasks, in the same * sequential order as produced by the iterator for the * given task list, each of which has completed. * @throws InterruptedException if interrupted while waiting, in * which case unfinished tasks are cancelled. * @throws NullPointerException if tasks or any of its elements are null * @throws RejectedExecutionException if any task cannot be * scheduled for execution */ List > invokeAll(Collection > tasks) throws InterruptedException; /** * Executes the given tasks, returning a list of Futures holding * their status and results * when all complete or the timeout expires, whichever happens first. * { @link Future#isDone} is true for each * element of the returned list. * Upon return, tasks that have not completed are cancelled. * Note that a completed task could have * terminated either normally or by throwing an exception. * The results of this method are undefined if the given * collection is modified while this operation is in progress. * * @param tasks the collection of tasks * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return a list of Futures representing the tasks, in the same * sequential order as produced by the iterator for the * given task list. If the operation did not time out, * each task will have completed. If it did time out, some * of these tasks will not have completed. * @throws InterruptedException if interrupted while waiting, in * which case unfinished tasks are cancelled * @throws NullPointerException if tasks, any of its elements, or * unit are null * @throws RejectedExecutionException if any task cannot be scheduled * for execution */ List > invokeAll(Collection > tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * Executes the given tasks, returning the result * of one that has completed successfully (i.e., without throwing * an exception), if any do. Upon normal or exceptional return, * tasks that have not completed are cancelled. * The results of this method are undefined if the given * collection is modified while this operation is in progress. * * @param tasks the collection of tasks * @return the result returned by one of the tasks * @throws InterruptedException if interrupted while waiting * @throws NullPointerException if tasks or any of its elements * are null * @throws IllegalArgumentException if tasks is empty * @throws ExecutionException if no task successfully completes * @throws RejectedExecutionException if tasks cannot be scheduled * for execution */ T invokeAny(Collection > tasks) throws InterruptedException, ExecutionException; /** * Executes the given tasks, returning the result * of one that has completed successfully (i.e., without throwing * an exception), if any do before the given timeout elapses. * Upon normal or exceptional return, tasks that have not * completed are cancelled. * The results of this method are undefined if the given * collection is modified while this operation is in progress. * * @param tasks the collection of tasks * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the result returned by one of the tasks. * @throws InterruptedException if interrupted while waiting * @throws NullPointerException if tasks, any of its elements, or * unit are null * @throws TimeoutException if the given timeout elapses before * any task successfully completes * @throws ExecutionException if no task successfully completes * @throws RejectedExecutionException if tasks cannot be scheduled * for execution */ T invokeAny(Collection > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
ExecutorService提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown()方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务的启动并试图停止当前正在执行的任务。在终止后,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService以允许回收其资源。通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法submit扩展了基本方法 Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService类来编写这些方法的自定义变体)。Executors类为创建ExecutorService提供了便捷的工厂方法。注意1:它只有一个直接实现类ThreadPoolExecutor和间接实现类ScheduledThreadPoolExecutor。关于ThreadPoolExecutor的更多内容请参考《》关于ScheduledThreadPoolExecutor的更多内容请参考《》用法示例下面给出了一个网络服务的简单结构,这里线程池中的线程作为传入的请求。它使用了预先配置的 Executors.newFixedThreadPool(int) 工厂方法:
class NetworkService implements Runnable { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkService(int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void run() { // run the service try { for (;;) { pool.execute(new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this.socket = socket; } public void run() { // read and service request on socket } }
下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后等60秒后,任务还没执行完成,就调用 shutdownNow(如有必要)取消所有遗留的任务:
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } }内存一致性效果:线程中向 ExecutorService 提交 Runnable 或 Callable 任务之前的操作 happen-before 由该任务所提取的所有操作, 后者依次 happen-before 通过 Future.get() 获取的结果。 主要函数: void shutdown() 启动一个关闭命令,不再接受新任务,当所有已提交任务执行完后,就关闭。如果已经关闭,则调用没有其他作用。 抛出: SecurityException - 如果安全管理器存在并且关闭,此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有保持 RuntimePermission("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。 List<Runnable> shutdownNow() 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。 返回: 从未开始执行的任务的列表 抛出: SecurityException - 如果安全管理器存在并且关闭, 此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有保持 RuntimePermission("modifyThread")), 或者安全管理器的 checkAccess 方法拒绝访问。 注意1: 它会返回等待执行的任务列表。 注意2: 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消, 所以任何任务无法响应中断都可能永远无法终止。 boolean isShutdown() 如果此执行程序已关闭,则返回 true。 返回: 如果此执行程序已关闭,则返回 true boolean isTerminated() 如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。 返回: 如果关闭后所有任务都已完成,则返回 true boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException 等待(阻塞)直到关闭或最长等待时间或发生中断 参数: timeout - 最长等待时间 unit - timeout 参数的时间单位 返回: 如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false 抛出: InterruptedException - 如果等待时发生中断 注意1:如果此执行程序终止(关闭),则返回 true;如果终止前超时期满,则返回 false <T> Future<T> submit(Callable<T> task) 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。 如果想立即阻塞任务的等待,则可以使用 result = exec.submit(aCallable).get(); 形式的构造。 注:Executors 类包括了一组方法,可以转换某些其他常见的类似于闭包的对象, 例如,将 PrivilegedAction 转换为 Callable 形式,这样就可以提交它们了。 参数: task - 要提交的任务 返回: 表示任务等待完成的 Future 抛出: RejectedExecutionException - 如果任务无法安排执行 NullPointerException - 如果该任务为 null 注意:关于submit的使用和Callable可以参阅《 》
class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(...) { super(...); protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }}关于它的使用请参考《 》
Nested Classes | ||
---|---|---|
class | A handler for rejected tasks that throws a RejectedExecutionException . | |
class | A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded. | |
class | A handler for rejected tasks that discards the oldest unhandled request and then retries execute , unless the executor is shut down, in which case the task is discarded. | |
class | A handler for rejected tasks that silently discards the rejected task. |
abstract <T> <T> | ( task, T result) Submits a Runnable task for execution and returns a Future representing that task. 如果执行成功就返回T result |
abstract <T> <T> | (<T> task) Submits a value-returning task for execution and returns a Future representing the pending results of the task. |
abstract <?> | ( task) Submits a Runnable task for execution and returns a Future representing that task.
|
http://blog.csdn.net/linghu_java/article/details/17123057