[Java] Thread#7, 스레드풀 작업 완료 통보 방식(블로킹, 콜백, 리턴유무)
- 카테고리 없음
- 2019. 9. 11. 02:32
블로킹 방식의 작업완료 통보
- submit() 메소드 이용. 매개값으로 받은 작업객체(Runnable, Callable)를 스레드풀에 저장하고 즉시 Future객체를 반환.
- Future 객체는 작업결과가 아니라 작업이 완료되기까지 기다렸다가(지연되었다가 == 블로킹되다) 최종결과를 얻었을 때 사용된다.
- 따라서 Future 객체를 지연객체라고도 부른다.
- Future 객체의 get() 메소드는 작업이 완료될때까지 블로킹되었다가 처리결과를 리턴한다.
- get()메소드의 리턴타입은 submit() 메소드의 두번째 인자의 타입과 같다.
리턴값이 없는 작업 완료 통보
- Runnable 객체로 생성.
- 작업처리가 정상적으로 완료되면 Future 객체의 get() 메소드는 null을 반환.
- 작업도중 interrupt되면 InterruptedExecption이 발생하고, 작업처리도중 예외가 발생하면 ExecutionException을 발생시키므로 따로 예외처리를 해두어야한다.
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
Runnable runnable = new Runnable() {
@Override
public void run() {
int sum = 0;
for(int i =0; i<=10; i++) {
sum+= i;
//sum = i/0;
}
System.out.println("[처리 결과] "+sum);
}
};
Future future = executorService.submit(runnable);
try {
System.out.println("여기실행");
future.get();//작업이 잘 완료되면 예외 안남. 작업이 실패하면 예외 뱉음.
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생함]" + e.getMessage());
}
executorService.shutdown();
}
리턴값이 있는 작업 완료 통보
- 스레드풀의 스레드가 작업완료 후 처리결과를 얻어야 한다면 Callable 객체로 생성하면된다.
- 작업처리가 정상적으로 완료되면 Future 객체의 get() 메소드는 리턴타입 객체를 반환.
- 작업도중 interrupt되면 InterruptedExecption이 발생하고, 작업처리도중 예외가 발생하면 ExecutionException을 발생시키므로 따로 예외처리를 해두어야한다.
- 1부터 10까지의 합을 리턴하는 작업을 Callable 객체로 생성하고 스레드풀의 스레드가 처리한 예제이다.!
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
Callable<Integer> task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i =0; i<=10; i++) {
sum+=i;
sum = i/0;
}
return sum;
}
};
Future<Integer> future = executorService.submit(task);
try {
System.out.println("여기실행");
int sum = future.get();
System.out.println("[처리 결과] : "+sum);
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생함] "+e.getMessage());
}
executorService.shutdown();
}
작업 처리 결과를 외부 객체에 저장
- 상황에 따라 스레드의 작업결과를 외부 객체에 저장해야하는 경우가 있음.
- 외부 객체는 공유객체가 되어 두개이상의 스레드 작업을 취합할 목적으로 이용된다.
- ExecutorService의 submit(Runnable task, V result) 메소드를 사용하면되는데 result가 바로 공유객체가 된다.
- Future의 get() 메소드는 result 타입의 객체를 리턴하게되는데 이때의 객체는 공유객체 result와 같은 객체이다.
- 작업객체(스레드)는 Runnable 구현 클래스로 생성하는데, 생성자를 통해 Result 객체를 주입받아야 한다.
-예제 : 1부터 10까지의 합을 계산하는 두개의 작업을 스레드풀에 처링청하고 Result 객체에 누적하도록 하였다.
public class ResultByRunnableExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
class Task implements Runnable{
Result result;
Task(Result result){
this.result = result;
}
@Override
public void run() {
int sum = 0;
for(int i =1; i<=10; i++) {
sum+=i;
}
result.addValue(sum);
}
}
Result result = new Result();
Runnable task1 = new Task(result);
Runnable task2 = new Task(result);
Future<Result> future1 = executorService.submit(task1, result);
Future<Result> future2 = executorService.submit(task2, result);
try {
result = future1.get();
result = future2.get();
System.out.println("[처리 결과] "+result.accumValue);
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
e.printStackTrace();
System.out.println("[실행 예외 발생함]"+e.getMessage());
}
executorService.shutdown();
}
}
class Result{
int accumValue;
synchronized void addValue(int value) {
accumValue += value;
}
}
작업 완료 순으로 통보
- 작업의 양과 스레드 스케쥴링에 따라 먼저 시작한 작업이라도 나중에 끝나는 경우가 있음.
- CompletionService는 스레드풀에서 작업처리가 완료된 것만 통보받는 방법.
- 처리 완료된 작업을 가져오는 poll()과 take() 메소드 제공
- 예제 : 3개의 Callable 작업을 처리하고 처리가 완료된 순으로 작업의 결과물을 출력
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
CompletionService<Integer> completionService =
new ExecutorCompletionService<Integer>(executorService);
System.out.println("[작업 처리 요청]");
for(int i =0; i<3; i++) {
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum =0;
for(int i =1; i <= 10; i++) {
sum += i;
}
return sum;
}
});
}
System.out.println("[처리 완료된 작업 확인]");
executorService.submit(new Runnable() {
@Override
public void run() {
while(true) {
try {
//작업 완료 순 통보. 완료된 애부터 take로 뱉어냄.
Future<Integer> future = completionService.take();
int value = future.get();
System.out.println("[처리 결과] "+value);
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
});
try { Thread.sleep(3000); }
catch (Exception e) {}
executorService.shutdownNow();
}
콜백 방식의 작업 완료 통보
- 콜백 : 완료 후 특정 메소드 실행.
- 블로킹 방식 : 요청 후 작업이 완료될 때까지 블로킹.
- ExecutorService는 콜백을 위한 별도의 기능을 제공하지는 않음.
- Runnable 구현클래스를 작성할 때 콜백기능을 구현할 수 있음.
- 비동기 통신에서 콜백객체를 만들때 사용되는 CompletionHandler 객체를 이용한다.
CompletionHandler<V, A> callback = new CompletionHandler<V, A>(){
@Override
public void completed(V result, A attachment){ } // 성공시 호출
@Override
public void failde(V result, A attachment){ } // 실패시 호출
};
- 예제 : 두개의 문자열을 정수화해서 더하는 작업을 처리하고 결과를 콜백방식으로 통보하는 예제.
첫번째 작업은 "3", "3" 을 주었고 두번쨰 작업은 "3","삼"을 주었다.
첫번째 호출은 정상으로 처리될 것이고(completed() 실행) , 두번째 작업은 NumberFormatExecption의 발생으로 failed()메소드가 호출된다.
public class CallbackExample {
private ExecutorService executorService;
public CallbackExample() {
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
}
private CompletionHandler<Integer,Void> callback = new CompletionHandler<Integer,Void>(){
@Override
public void completed(Integer result, Void attachment) {
System.out.println("completed() 실행 : "+result);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("failed() 실행 : " + exc.toString());
}
};
public void doWork(final String x, final String y) {
Runnable task = new Runnable() {
@Override
public void run() {
try {
int intX = Integer.parseInt(x);
int intY = Integer.parseInt(y);
int result = intX + intY;
//일련의 작업을 무사히 마치고!
callback.completed(result, null);
} catch (Exception e) {
//예외가 발생했을 때 호출!
callback.failed(e, null);
}
}
};
executorService.submit(task);
}
public void finish() {
executorService.shutdown();
}
public static void main(String[] args) {
CallbackExample example = new CallbackExample();
example.doWork("3", "3");
example.doWork("3", "삼");
example.finish();
}
}