Java

[Java] Fork Join Pool

Hyung1 2021. 1. 29. 14:26
728x90
반응형

Fork Join Pool의 상속구조 

java.lang.Object
	java.util.concurrent.AbstractExecutorService
		java.util.concurrent.ForkJoinPool

Fork Join Pool 이란?

ForkJoinPool 은 Java7부터 사용가능한 Java Concurrency 툴이며, 동일한 작업을 여러개의 Sub Task로 분리(Fork)하여 각각 처리하고, 이를 최종적으로 합쳐서(Join) 결과를 만들어내는 방식이다.

 

 

 

위의 그림과 같이 Fork를 통해서 업무를 분담하고 Join을 통해서 업무를 취합한다. 더 자세하게 아래 그림을 보자.

 

하나의 작업 큐를 가지고 있으며, 이러한 작업들을 Fork Join Pool에서 관리하는 여러 스레드에서 접근하여 작업들을 처리한다. 서로 작업을 하려고 큐에서 작업을 가져가며, 각 스레드들은 부모 큐에서 가져간 작업들을 내부 큐( inbound queue)에 담아 관리한다. 스레드들은 서로의 큐에 접근하여 작업들을 가져가 처리한다. 이러한 방법들은 놀고 있는 스레드가 없도록 하기 위해 도입되었다.

 

 

위의 그림 처럼 스레드들이 관리하는 큐는 Dequeue이기 때문에 B스레드에서 A스레드의 작업을 뺏어간다.

부모 작업은 Task를 분할하여 fork함으로써 자식 스레드에서 처리되며 join을 통해 포함된다.

 

만약 100개의 랜덤한 숫자가 있고 이를 합산하는 프로그램을 Fork/Join을 통해 처리한다면 아래 그림과 같을 것이다.

 

 

ForkJoinPool 인터페이스

 

 

 

ForkJoinPool에는 RecursiveActionRecursiveTask 2가지 인터페이스를 제공한다. 

 

RecursiveAction

RecursiveAction는 리턴 값이 없는 Task다.

 

다음은 RecursiveAction로 구현한 Task이다. Task가 실행되면 compute()가 호출된다. Task를 분할하고 싶다면 RecursiveTask를 생성하고 fork()를 호출하여 다른 쓰레드에서 작업이 처리되도록 한다.

public class MyRecursiveAction extends RecursiveAction {

    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {
        String threadName = Thread.currentThread().getName();

        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                    + " Splitting workLoad : " + this.workLoad);
            sleep(1000);
            List<MyRecursiveAction> subtasks =
                    new ArrayList<MyRecursiveAction>();

            subtasks.addAll(createSubtasks());

            for(RecursiveAction subtask : subtasks){
                subtask.fork();
            }

        } else {
            System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                    + " Doing workLoad myself: " + this.workLoad);
        }
    }

    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks =
                new ArrayList<MyRecursiveAction>();

        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

위 코드를 보면

  • 작업에 부하를 주기 위해 sleep() 코드를 추가하였다. 바로 종료되면 동일 쓰레드에서 Task가 연달아 실행될 수 있기 때문에 병렬처리되는 것처럼 보이지 않을 수 있다.
  • Task이름과 시간을 출력하여 어떤 쓰레드에서 어떤 작업이 언제 처리되는지 쉽게 볼 수 있다.
  • compute()에서는 workload가 16보다 클 때 Task를 나누고 16이하면 더 이상 나누지 않고 그 쓰레드에서 처리하도록 정의하였다.

다음과 같이 forkJoinPool.invoke()으로 RecursiveAction을 인자로 전달하고 처리되도록 할 수 있다.

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);

        MyRecursiveAction myRecursiveAction = new MyRecursiveAction(128);
        forkJoinPool.invoke(myRecursiveAction);

//      Just wait until all tasks done
        try {
            forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

실행 결과는 다음과 같다.

 

 

RecursiveTask

RecursiveTask는 리턴 값이 있는 Task다. RecursiveAction과 실행 방법은 동일하지만 처리된 결과를 리턴 받기 위해, Parent는 join()으로 Child의 Task가 완료될 때까지 기다린다.

 

다음은 RecursiveTask로 구현한 Task다.

public class MyRecursiveTask extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }

    protected Long compute() {
        String threadName = Thread.currentThread().getName();

        //if work is above threshold, break tasks up into smaller tasks
        if (this.workLoad > 16) {
            System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                    + " Splitting workLoad : " + this.workLoad);
            sleep(1000);
            List<MyRecursiveTask> subtasks =
                    new ArrayList<MyRecursiveTask>();
            subtasks.addAll(createSubtasks());

            for (MyRecursiveTask subtask : subtasks) {
                subtask.fork();
            }

            long result = 0;
            for (MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
                System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                        + "Received result from subtask");
            }
            return result;

        } else {
            sleep(1000);
            System.out.println("[" + LocalTime.now() + "]["
                    + " Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }

    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
                new ArrayList<MyRecursiveTask>();

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);

        MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
        long mergedResult = forkJoinPool.invoke(myRecursiveTask);
        System.out.println("mergedResult = " + mergedResult);

//      Just wait until all tasks done
        try {
            forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

출력결과

 

 

비동기적으로 Task 처리(Future)

Task를 비동기적으로 처리할 수 있다. submit()에 Task를 인자로 전달할 수 있으며 Future를 리턴받는다. Future를 통해 필요할 때 Result를 기다리거나 읽을 수 있다.

    public static void main(String[] args) {

        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
        Future<Long> future = forkJoinPool.submit(myRecursiveTask);

        System.out.println("Do something....");

        try {
            System.out.println("mergedResult = " + future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        try {
            forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

실행 결과

 

 

Fork Join Pool은 언제 쓰는 것이 적절할까?

천만개의 랜덤 숫자 (1 ~ 100)이 있다고 가정해보저. CPU는 4개라고 치고 이 숫자들 중에서 10보다 작은 수가 몇개나 되는지 세는 코딩을 해보자. 

 

방법 1.  그냥 쓰레드 하나로  천만번을 순회하면서 숫자를 센다.  (6초 걸림) 

 

방법2. ForkJoinPool 을 사용해서 리프가 100개 일 때까지 분활(fork)해서 각각의 수치를 위로 합쳐서(join) 계산한다. 쓰레드 4개를 골고루 사용하며 대신 태스크 객체는 분활한 만큼 만들어 지게 된다.(2.5초 걸림)  

 

방법3.  그냥 ThreadPoolExecutor 로 쓰레드 4개를 만든 후에 각각 천만개/4 로 나뉘어진 영역에 대해 순회하면서 숫자를 계산해서 합친다. ( 2초 걸림)  ForkJoinPool 보다 더 빠르네? 그렇다. 쓸 때없는 객체 생성이 없어졌기 때문이다.

 

방법4. 저렇게 쓰레드 4개가 거의 동일한 일을 하게 된다면 ForkJoinPool 이 오히려 독이겠지만 하나의 쓰레드가 굉장히 오래 걸리고 나머지 3개의 쓰레드는 금방 끝이나는 경우는?? 

이 경우는 ForkJoinPool 이 빛을 발하게 됩니다. (ThreadPoolExecutor  4초 , ForkJoinPool 3초)

정리

  • Fork Join Pool은 우리가 원하는 작업을 Divided and Conquer 방식으로 처리할 수 있도록 해준다.
  • 실제 프로그램에서 사용할때에는 외부 API 를 여러번 호출해야하는 상황에서 작업을 분리하여 호출할 수도 있을 것이다. 

 

출처 :

oracle docs

HAMA 블로그

Circlee7

자바 성능 튜닝 이야기

무명 소졸의 웹개발

Uncle Bae's Story

chacha

Java Fork and Join using ForkJoinPool - jenkov.com

728x90
반응형