java线程池任务取消注意点

java线程池提交一个任务后任务取消注意点。

在java中使用线程池场景非常多, 只要实现ExecutorService即可实现一个线程池,java中提供了几种实现,通过submit提交一个任务就行了。 有的时候需要在指定的时间点把提交的任务取消掉,线程池框架提供了一个cancel方法,但是在实践中发现有些需要注意点,任务实现接口后需要声明改方法抛出线程中断异常InterruptedException、有Thread.sleep()调用支持线程中断,否则cancel不起作用。

api定义

线程池定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
public interface ExecutorService extends Executor {

/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
void shutdown();

/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
List<Runnable> shutdownNow();

/**
* Returns {@code true} if this executor has been shut down.
*
* @return {@code true} if this executor has been shut down
*/
boolean isShutdown();

/**
* Returns {@code true} if all tasks have completed following shut down.
* Note that {@code isTerminated} is never {@code true} unless
* either {@code shutdown} or {@code shutdownNow} was called first.
*
* @return {@code true} if all tasks have completed following shut down
*/
boolean isTerminated();

/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* {@code result = exec.submit(aCallable).get();}
*
* <p>Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @param <T> the type of the result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);

/**
* Executes the given tasks, returning a list of Futures holding
* their status and results when all complete.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list, each of which has completed
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks or any of its elements are {@code null}
* @throws RejectedExecutionException if any task cannot be
* scheduled for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

/**
* Executes the given tasks, returning a list of Futures holding
* their status and results
* when all complete or the timeout expires, whichever happens first.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Upon return, tasks that have not completed are cancelled.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list. If the operation did not time out,
* each task will have completed. If it did time out, some
* of these tasks will not have completed.
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks, any of its elements, or
* unit are {@code null}
* @throws RejectedExecutionException if any task cannot be scheduled
* for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do. Upon normal or exceptional return,
* tasks that have not completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param <T> the type of the values returned from the tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks or any element task
* subject to execution is {@code null}
* @throws IllegalArgumentException if tasks is empty
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
* for execution
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do before the given timeout elapses.
* Upon normal or exceptional return, tasks that have not
* completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param <T> the type of the values returned from the tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks, or unit, or any element
* task subject to execution is {@code null}
* @throws TimeoutException if the given timeout elapses before
* any task successfully completes
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
* for execution
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

任务定义

提交线程池的任务可以实现Runnable或者Callable接口,如果需要获取返回值的话就实现Callable,不需要的话就实现Runnable接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

Future定义

需要提供cancel功能的话,需要线程池提交任务后返回一个Future对象,通过Future提供的cancel方法取消任务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public interface Future<V> {

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();

/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

项目实践

线程池定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Bean
public FinesysDataPlayBack finesysDataPlayBack() {
FinesysDataPlayBack finesysDataPlayBack = FinesysDataPlayBack.getInstance();
finesysDataPlayBack.setPlayBackThreadPool(playbackExecutorService());
return finesysDataPlayBack;
}

public ExecutorService playbackExecutorService() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("playback-pool-%d").build();
return new ThreadPoolExecutor(16,
1024,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(4096),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}

任务定义

AbstractTask.java

1
2
3
4
public interface AbstractTask extends Callable<String> {
void setTaskId(String taskId);
String taskId();
}

DataPlaybackWithSleepTask.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Slf4j
public class DataPlaybackWithSleepTask implements AbstractTask {
private String taskId;
private IProducer producer;
private IConsumer consumer;
private long sleepTimeout = 30000L;

public DataPlaybackWithSleepTask(IProducer producer, IConsumer consumer) {
this(producer, consumer, 0);
}

public DataPlaybackWithSleepTask(IProducer producer, IConsumer consumer, long sleepTimeout) {
this(consumer.instanceId(), producer, consumer, sleepTimeout);
}

public DataPlaybackWithSleepTask(String taskId, IProducer producer, IConsumer consumer, long sleepTimeout) {
this.taskId = taskId;
this.producer = producer;
this.consumer = consumer;
this.sleepTimeout = sleepTimeout;
this.consumer.producer(producer);
}

public void run() throws Exception {
if (producer == null || consumer == null) {
log.error("[sleep task] producer或consumer为空, 任务未运行退出");
return;
}

if (!producer.init()) {
log.error("[sleep task] producer.init失败, 任务未运行退出");
return;
}

if (!consumer.init()) {
log.error("[sleep task] consumer.init失败, 任务未运行退出");
return;
}

log.info("playback sleep task begin sleep({})", sleepTimeout);
Thread.sleep(sleepTimeout);
log.info("playback sleep task end sleep({})", sleepTimeout);
consumer.beforeProcess();
consumer.process();
consumer.afterProcess();
}

public void setSleepTimeout(long sleepTimeout) {
sleepTimeout = Math.abs(sleepTimeout);
if (sleepTimeout < 10000L) sleepTimeout = 10000L;
if (sleepTimeout > Long.MAX_VALUE) sleepTimeout = 10000L;
this.sleepTimeout = sleepTimeout;
}

protected void sleep(long timeout) {
try {
Thread.sleep(timeout);
} catch (InterruptedException e){
}
}

@Override
public String call() throws Exception {
run();
return taskId();
}

@Override
public void setTaskId(String taskId) {
this.taskId = taskId;
}

@Override
public String taskId() {
return taskId;
}
}

任务逻辑实现定义

IConsumer.java

1
2
3
4
5
6
7
8
9
10
public interface IConsumer<T> {
boolean init();
void beforeProcess();
void afterProcess();
void producer(IProducer<T> producer);
//具体实现方法
void process() throws Exception;
boolean workDone();
String instanceId();
}

实现类AbstractPlayBackSendEvent2ApamaConsumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public abstract class AbstractPlayBackSendEvent2ApamaConsumer implements IConsumer<DepthModel> {
protected PlaybackContext playbackContext = new PlaybackContext();
protected IProducer<DepthModel> producer;

protected AbstractPlayBackSendEvent2ApamaConsumer() {
}

public AbstractPlayBackSendEvent2ApamaConsumer(String serviceId,
String instanceId,
String backReqId,
String channelKey,
int playbackRate) {
this.serviceId = serviceId;
this.instanceId = instanceId;
this.backReqId = backReqId;
this.channelKey = channelKey;
this.playbackRate = Math.abs(playbackRate);
}

@Override
public void producer(IProducer<DepthModel> producer) {
this.producer = producer;
}

@Override
public void process() throws Exception {
final Date beginTime = DateUtils.formatDepthPlaybackDate1(producer.beginTime()),
endTime = DateUtils.formatDepthPlaybackDate1(producer.endTime());

long subDays = DateUtils.subDateTime(beginTime, endTime);
if (subDays == 0) subDays = 1;
subDays += 1;

for (int sub = 0; sub < subDays; sub++) {
Date beginDate = DateUtils.getMarketBegineTime(beginTime, sub);
if (DateUtils.isWorkOutDay(beginDate)) continue;
Date endDate = DateUtils.getMarketEndTime(beginDate);
playbackContext.resetDepthMap();

try {
((EsPlaybackOnceDayProducer) producer).setMarketTime(beginDate, endDate);
} catch (EsReaderException e) {
log.error("查询当日行情数据失败", e);
continue;
}

log.info(String.format("[%s]日待回放行情数据量[%d]", DateUtils.formatSdfDay(beginDate),
producer.totalCount()));
if (producer.totalCount() == 0L) continue;

while (producer.hasNext()) {
List<DepthModel> dataList = null;
try {
dataList = producer.next();
} catch (EsReaderException e) {
log.error("es查询数据失败", e);
}

if (dataList == null || dataList.isEmpty()) {
break;
}

consumerCount.addAndGet(dataList.size());
consumerDailyCount.addAndGet(dataList.size());
//log.info(String.format("开始回放数据,本次回放条数%d", dataList.size()));
for (DepthModel depth : dataList) {
playbackContext.decrementCurrentWaitSendCount();
//logger.info(String.format("当前处理行情数据[%s]", depth.toString()));
currentCreated = depth.getCreated().getTime();
if (lastCreated == 0L) lastCreated = currentCreated;
subTime = currentCreated - lastCreated;
lastCreated = currentCreated;

if (subTime > oneSecond && playbackRate > 0) {
long sleepTimeout = subTime / playbackRate;
log.info(String.format("上次行情时间[%d],本次行情时间[%d],回放速率[%d],本次行情需要延迟[%d]毫秒",
lastCreated, currentCreated, playbackRate, sleepTimeout));
Thread.sleep(sleepTimeout);
}

//发送行情数据
log.info(String.format("当前处理行情数据%s", depth.toString()));
isLastOne = isLastOne(depth);
isTodayLastOne = isTodayLastOne(depth);
send(depth);
update(depth);
resetPlaybackContext();
}
}

log.info(String.format("[%s]日期行情回放完毕,本次共回放[%d]条数据,[%s]",
DateUtils.formatSdfDay(beginDate),
consumerDailyCount.get(), requestInfo()));

consumerDailyCount.set(0L);
Thread.sleep(2000);
//发送日结信息
sendDailySettlementEvent(beginDate);
}
}
}

提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean submitTask(IProducer producer, IConsumer consumer) {
if (playBackThreadPool == null) return false;
if (producer == null) return false;
if (consumer == null) return false;

PLAYBACK_TASK_LOCK.lock();

try {
AbstractTask playbackTask = new DataPlaybackWithSleepTask(producer, consumer, 30000L);
Future<String> futureTask = playBackThreadPool.submit(playbackTask);
PLAYBACK_TASK_MAP.put(playbackTask.taskId(), futureTask);
return true;
} catch (RejectedExecutionException e) {
return false;
} finally {
PLAYBACK_TASK_LOCK.unlock();
}
}

取消任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void cancelTask(String taskId) {
if (taskId == null || taskId.length() == 0) return;
PLAYBACK_TASK_LOCK.lock();
try {
if (!PLAYBACK_TASK_MAP.containsKey(taskId)) {
log.info("取消回放任务,当前没有带取消的taskid任务对象");
return;
}
Future<String> futureTask = PLAYBACK_TASK_MAP.get(taskId);
if (!futureTask.isDone()) {
futureTask.cancel(true);
}
PLAYBACK_TASK_MAP.remove(taskId);
} finally {
PLAYBACK_TASK_LOCK.unlock();
}
}
文章目录
  1. 1. api定义
    1. 1.1. 线程池定义
    2. 1.2. 任务定义
    3. 1.3. Future定义
  2. 2. 项目实践
    1. 2.1. 线程池定义
    2. 2.2. 任务定义
    3. 2.3. 任务逻辑实现定义
    4. 2.4. 提交任务
    5. 2.5. 取消任务