[Java] Thread#7, 스레드풀 작업 완료 통보 방식(블로킹, 콜백, 리턴유무)

블로킹 방식의 작업완료 통보

- submit() 메소드 이용. 매개값으로 받은 작업객체(Runnable, Callable)를 스레드풀에 저장하고 즉시 Future객체를 반환.

- Future 객체는 작업결과가 아니라 작업이 완료되기까지 기다렸다가(지연되었다가 == 블로킹되다) 최종결과를 얻었을 때 사용된다.

- 따라서 Future 객체를 지연객체라고도 부른다.

- Future 객체의 get() 메소드는 작업이 완료될때까지 블로킹되었다가 처리결과를 리턴한다.

- get()메소드의 리턴타입은 submit() 메소드의 두번째 인자의 타입과 같다.

 

리턴값이 없는 작업 완료 통보

- Runnable 객체로 생성.

- 작업처리가 정상적으로 완료되면 Future 객체의 get() 메소드는 null을 반환.

- 작업도중 interrupt되면 InterruptedExecption이 발생하고, 작업처리도중 예외가 발생하면 ExecutionException을 발생시키므로 따로 예외처리를 해두어야한다.

public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
		
		System.out.println("[작업 처리 요청]");
		
		Runnable runnable = new Runnable() {

			@Override
			public void run() {
				int sum = 0;
				for(int i =0; i<=10; i++) {
					sum+= i;
					//sum = i/0;
				}
				System.out.println("[처리 결과] "+sum);
			}
		};
		Future future = executorService.submit(runnable);
		
		try {
			System.out.println("여기실행");
			future.get();//작업이 잘 완료되면 예외 안남. 작업이 실패하면 예외 뱉음.
			System.out.println("[작업 처리 완료]");
		} catch (Exception e) {
			System.out.println("[실행 예외 발생함]" + e.getMessage());
		}
		executorService.shutdown();
	}

 

리턴값이 있는 작업 완료 통보

- 스레드풀의 스레드가 작업완료 후 처리결과를 얻어야 한다면 Callable 객체로 생성하면된다.

- 작업처리가 정상적으로 완료되면 Future 객체의 get() 메소드는 리턴타입 객체를 반환.

- 작업도중 interrupt되면 InterruptedExecption이 발생하고, 작업처리도중 예외가 발생하면 ExecutionException을 발생시키므로 따로 예외처리를 해두어야한다.

- 1부터 10까지의 합을 리턴하는 작업을 Callable 객체로 생성하고 스레드풀의 스레드가 처리한 예제이다.!

public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
		
		System.out.println("[작업 처리 요청]");
		
		Callable<Integer> task = new Callable<Integer>() {

			@Override
			public Integer call() throws Exception {
				int sum = 0;
				for(int i =0; i<=10; i++) {
					sum+=i;
					sum = i/0;
				}
				return sum;
			}
		};
		Future<Integer> future = executorService.submit(task);
		
		try {
			System.out.println("여기실행");
			int sum = future.get(); 
			System.out.println("[처리 결과] : "+sum);
			System.out.println("[작업 처리 완료]");
		} catch (Exception e) {
			System.out.println("[실행 예외 발생함] "+e.getMessage());
		}
		executorService.shutdown();
	}

 

작업 처리 결과를 외부 객체에 저장

- 상황에 따라 스레드의 작업결과를 외부 객체에 저장해야하는 경우가 있음.

- 외부 객체는 공유객체가 되어 두개이상의 스레드 작업을 취합할 목적으로 이용된다.

- ExecutorService의 submit(Runnable task, V result) 메소드를 사용하면되는데 result가 바로 공유객체가 된다.

- Future의 get() 메소드는 result 타입의 객체를 리턴하게되는데 이때의 객체는 공유객체 result와 같은 객체이다.

- 작업객체(스레드)는 Runnable 구현 클래스로 생성하는데, 생성자를 통해 Result 객체를 주입받아야 한다.

 

-예제 : 1부터 10까지의 합을 계산하는 두개의 작업을 스레드풀에 처링청하고 Result 객체에 누적하도록 하였다.

public class ResultByRunnableExample {
	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
		
		System.out.println("[작업 처리 요청]");
		class Task implements Runnable{
			Result result;
			
			Task(Result result){
				this.result = result;
			}
			
			@Override
			public void run() {
				int sum = 0;
				for(int i =1; i<=10; i++) {
					sum+=i;
				}
				result.addValue(sum);
			}
		}
		
		Result result = new Result();
		Runnable task1 = new Task(result);
		Runnable task2 = new Task(result);
		
		Future<Result> future1 = executorService.submit(task1, result);
		Future<Result> future2 = executorService.submit(task2, result);
		
		try {
			result = future1.get();
			result = future2.get();
			System.out.println("[처리 결과] "+result.accumValue);
			System.out.println("[작업 처리 완료]");
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("[실행 예외 발생함]"+e.getMessage());
		}
		executorService.shutdown();
	}
}

class Result{
	int accumValue;
	synchronized void addValue(int value) {
		accumValue += value;
	}
}

 

작업 완료 순으로 통보

- 작업의 양과 스레드 스케쥴링에 따라 먼저 시작한 작업이라도 나중에 끝나는 경우가 있음.

- CompletionService는 스레드풀에서 작업처리가 완료된 것만 통보받는 방법.

- 처리 완료된 작업을 가져오는 poll()과 take() 메소드 제공

- 예제 : 3개의 Callable 작업을 처리하고 처리가 완료된 순으로 작업의 결과물을 출력

public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
			Runtime.getRuntime().availableProcessors()
		);
		
		CompletionService<Integer> completionService = 
				new ExecutorCompletionService<Integer>(executorService);
		
		System.out.println("[작업 처리 요청]");
		
		for(int i =0; i<3; i++) {
			completionService.submit(new Callable<Integer>() {

				@Override
				public Integer call() throws Exception {
					int sum =0;
					for(int i =1; i <= 10; i++) {
						sum += i;
					}
					return sum;
				}
			});
		}
		
		System.out.println("[처리 완료된 작업 확인]");
		executorService.submit(new Runnable() {

			@Override
			public void run() {
				while(true) {
					try {
                    	//작업 완료 순 통보. 완료된 애부터 take로 뱉어냄.
						Future<Integer> future = completionService.take(); 
						int value = future.get();
						System.out.println("[처리 결과] "+value);
					} catch (Exception e) {
						e.printStackTrace();
						break;
					}
				}
			}
		});
		
		try { Thread.sleep(3000); } 
		catch (Exception e) {}
		executorService.shutdownNow();
		
	}

 

콜백 방식의 작업 완료 통보

- 콜백 : 완료 후 특정 메소드 실행.

- 블로킹 방식 : 요청 후 작업이 완료될 때까지 블로킹.

- ExecutorService는 콜백을 위한 별도의 기능을 제공하지는 않음.

- Runnable 구현클래스를 작성할 때 콜백기능을 구현할 수 있음.

- 비동기 통신에서 콜백객체를 만들때 사용되는 CompletionHandler 객체를 이용한다.

CompletionHandler<V, A> callback = new CompletionHandler<V, A>(){
	@Override
    public void completed(V result, A attachment){ } // 성공시 호출
    
    @Override
    public void failde(V result, A attachment){ } // 실패시 호출
};

 

- 예제 : 두개의 문자열을 정수화해서 더하는 작업을 처리하고 결과를 콜백방식으로 통보하는 예제.

첫번째 작업은 "3", "3" 을 주었고 두번쨰 작업은 "3","삼"을 주었다.

첫번째 호출은 정상으로 처리될 것이고(completed() 실행) , 두번째 작업은 NumberFormatExecption의 발생으로 failed()메소드가 호출된다.

public class CallbackExample {
	private ExecutorService executorService;
	
	public CallbackExample() {
		executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
	}
	
	private CompletionHandler<Integer,Void> callback = new CompletionHandler<Integer,Void>(){

		@Override
		public void completed(Integer result, Void attachment) {	
			System.out.println("completed() 실행 : "+result);
		}

		@Override
		public void failed(Throwable exc, Void attachment) {
			System.out.println("failed() 실행 : " + exc.toString());
		}
	};
	
	public void doWork(final String x, final String y) {
		Runnable task = new Runnable() {

			@Override
			public void run() {
				try {
					int intX = Integer.parseInt(x);
					int intY = Integer.parseInt(y);
					int result = intX + intY;
                    //일련의 작업을 무사히 마치고!
					callback.completed(result, null);
				} catch (Exception e) {
                	//예외가 발생했을 때 호출!
					callback.failed(e, null);
				}
			}
		};
		executorService.submit(task);
	}
	
	public void finish() {
		executorService.shutdown();
	}
	
	public static void main(String[] args) {
		CallbackExample example = new CallbackExample();
		example.doWork("3", "3");
		example.doWork("3", "삼");
		example.finish();
	}
	
}

댓글

Designed by JB FACTORY