Study/Modern-Java-In-Action

Modern Java Ch16. 안정적 비동기 프로그래밍

hongeeii 2023. 1. 29.
728x90
반응형

Future의 단순 활용

Future 인터페이스는 비동기 계산을 모델링 하는데 쓰이며, 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공한다.

Future 이용시에는 시간이 오래걸리는 작업을 Callable 객체 내부로 감싼 다음 ExecutorService에 제출하면 된다.

ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() { //Callable을 ExecutorService로 제출
  public Double call() {
    return doSomeLongComputation(); //오래걸리는 작업을 비동기 스레드로 실행
   }});
  doSomethingElse(); //비동기 작업동안 다른작업 수행
  try {
      Double result = future.get(1, TimeUnit.SECONDS); //비동기작업의 결과를 가져옴.
      //결과가 준비되지않으면 1초까지 기다리며 호출스레드 블록
  } catch (ExecutionException ee) {
      //계산 중 예외
  } catch (InterruptedException ie) {
      //현재 스레드 대기중 인터럽트 발생
  } catch (TimeoutException te) {
      //Future가 완료되기 전 타임아웃 발생
}

Future의 get 메서드로 결과를 가져올 수 있는데 가져오는 시점에 완료가 되었으면 성공이지만 완료가 되지 않았다면 결과적으로 블로킹이 일어난다.

1초의 시간을 걸어줌으로써 계속 대기하는 것을 방지할 수 있다. (스레드가 대기할 최대 시간을 설정하는 것이 좋다.)

Future 는 FutureTask를 구현체로 가진다. (CompletableFuture와 별개 이야기, 물론 이것도 Future를 구현한다. 한계점으로 지적한 부분들은 CompletableFuture 가 아니다.)

Future 제한

Future 는 기본적으로 isDone, isCanceled 와 같은 기본사항 체크를 할 수 있는 메서드를 제공한다. 하지만 이들로는 충분치 않다. 예를들어, 각기 다른 실행시간을 가지는 Future들을 조합해서 계산을 한다든지 다른 질의의 결과와 같이 계산을 한다든지 하는 복잡한(현실세계의 문제를 해결하는데 꼭 필요한) 로직을 다루기 힘들다.

출처: https://pjh3749.tistory.com/280 [JayTech의 기술 블로그]

CompletableFuture로 비동기 애플리케이션 만들기

동기 API

메서드를 호출한 다음에 메서드가 계산을 완료할 때까지 기다렸다가 메서드가 반환되면 호출자는 반환된 값으로 계속 다른 동작을 수행한다.

호출자와 피호출자가 각각 다른 스레드에서 실행되는 상황이었더라도 호출자는 피호출자의 동작 완료를 기다렸을 것이다.

이처럼 동기 API를 사용하는 상황을 블록 호출이라고 한다.

비동기 API

메서드가 즉시 반환되며 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다.

이와 같은 비동기 API를 사용하는 상황을 비블록 호출이라고 한다.

다른 스레드에 할당된 나머지 계산 결과는 콜백 메서드를 호출해서 전달하거나 호출자가 '계산 결과가 끝날 때까지 기다림' 메서드를 추가로 호출하면서 전달된다.

주로 I/O 시스템 프로그래밍에서 이와 같은 방식으로 동작을 수행한다. 즉, 계산 동작을 수행하는 동안 비동기적으로 디스크 접근을 수행한다.

그리고 더 이상 수행할 동작이 없으면 디스크 블록이 메모리로 로딩될 때까지 기다린다.

public class Shop {
    
    private static final Random random = new Random();
    
    private String name;

    public Shop(String name) {
        this.name = name;
    }
    
    public double getPrice(String product) {
        return calculatePrice(product);
    }
    
    private double calculatePrice(String product) {
        delay();
        return  random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
    
    private static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

상품명을 받아 1초의 작업 후 연산 결과를 반환하는 getPrice 메서드.

public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
       double price = calculatePrice(product);
       futurePrice.complete(price);
    }).start();
    
    return futurePrice;
}

위 코드는 getPriceAsync는 가격 계산이 끝나기 전에 return을 받는다.

나중에 클라이언트가 특별히 할 일이 없으면 Future의 get 메서드를 호출한다.

이때 Future가 결과값을 가지고 있다면 Future에 포함된 값을 읽거나 아니면 값이 계산될 때까지 블록한다.

블록하지 않고 Future의 작업이 끝났을 때만 이를 통지받으면서 람다 표현식이나 메서드 참조로 정의된 콜백 메서드를 실행하는 방법은 밑에.

에러 처리 방법

비동기로 처리시에는 별도의 쓰레드에서 동작하기에 에러 처리가 매우 까다롭다.

하지만, CompletableFuture 의 completeExceptionally 메서드를 사용하면 매우 간단해진다.

public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);     
        } catch (Exception e) {
            futurePrice.completeExceptionally(e);
        }
    }).start();

    return futurePrice;
}

예외가 발생하게 되면, 클라이언트는 ExecutionException을 받게 된다.

팩토리 메서드 supplyAsync로 CompletableFuture 만들기

위 CompletableFuture를 만드는 코드가 매우 긴게 가독성이 좋지 않다.

이를 위해, CompletableFuture는 팩터리 메서드로 supplyAsync를 제공한다.

public Future<Double> getPriceAsync(String product) {
   return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

비블록 코드

public class ShopTest {

    private static List<Shop> shopList = Arrays.asList(
            new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShp["),
            new Shop("BuyItAll")
    );

    public static void main(String[] args) {
        long start = System.nanoTime();
        System.out.println(findPrices("myPhone27S"));
        long duration = (System.nanoTime() - start) / 1000000;
        System.out.println("Done in " + duration + "msecs");
    }

    public static List<String> findPrices(String product) {
        return shopList.stream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
                .collect(Collectors.toList());
    }
}

각 shop마다 delay가 있어 최소 4초 이상의 시간이 걸린다.

병렬 스트림으로 요청 병렬화하기

public static List<String> findPrices(String product) {
    return shopList.parallelStream()
        .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
        .collect(Collectors.toList());
}

병렬 스트림으로 성능이 개선되었지만 CompletableFuture 기능을 활용해 findPrices 메서드의 동기 호출을 비동기 호출로 성능을 더욱 올릴 수 있다.

CompletableFuture로 비동기 호출 구현하기

병렬스트림으로 처리하더라도, 위와 같은 경우 한 쓰레드가 shop을 2개 이상 가지게 된다면 블로킹은 여전히 존재한다.

public static List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = shopList.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))
            ))
            .collect(Collectors.toList());
    return priceFutures.stream().map(CompletableFuture::join)
            .collect(Collectors.toList());
}

커스텀 Executor 사용하기

위 병렬스트림과 CompletableFuture 를 적용한 성능은 내부적으로 Runtime.getRuntime().availableProcessors() 가 반환하는 스레드 수로 동작한다.

따라서, 쓰레드 풀을 직접 생성하여 동작시킨다면 CompletableFuture를 이용한 비동기 프로그래밍은 더욱 유연해지고 성능 향상이 일어나게 된다.

public static List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = shopList.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))
			, executor
            ))
            .collect(Collectors.toList());
    return priceFutures.stream().map(CompletableFuture::join)
            .collect(Collectors.toList());
}

Executor를 커스텀하고 비동기 처리에 쓸 Executor를 두 번째 인자로 추가만 해주면 된다. (프로그램 종료를 방해하지 않는 데몬 스레드를 사용한다.)

비동기 작업 파이프라인 만들기

아래는 getPrice 메서드를 shop 이름, 가격, DisCount 정보를 가진 문자열로 반환하도록 변경한 코드다.

public class Discount {
    
    public enum Code {
        NONE(0),
        SILVER(5),
        GOLD(10),
        PLATINUM(15),
        DIAMOND(20)
        ;
        
        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }
}

///////////////

public String getPrice(String product) {
    double price = calculatePrice(product);
    Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
    return String.format("%s:%.2f:%s", name, price, code);
}

할인 서비스 구현

위 getPrice의 문자열을 파싱하여 사용하는 클래스는 아래 Quote로 정의

추가로, Quote 객체를 Discount에 넘겨 할인이 적용된 값을 반환하는 메서드를 추가

public class Quote {

    private final String shopName;
    private final double price;
    private final Discount.Code code;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.code = code;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code code = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, code);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getCode() {
        return code;
    }
}

/////////////////

public class Discount {

    public enum Code {
        NONE(0),
        SILVER(5),
        GOLD(10),
        PLATINUM(15),
        DIAMOND(20)
        ;

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }
    
    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + "price is " + 
                Discount.apply(quote.getPrice(), quote.getCode());
    }
    
    private static double apply(double price, Code code) {
        delay();
        return price * (100 - code.percentage) / 100;
    } 
}

할인 서비스 사용

위 추가된 내용을 사용하기에 가장 쉬운 방법은 stream으로 처리하는 것이다.

public static List<String> findPrices(String product) {
    return shopList.stream()
        .map(shop -> shop.getPrice(product))
        .map(Quote::parse)
        .map(Discount::applyDiscount)
        .collect(Collectors.toList());
}

위 코드는 처리만 할 뿐 최적화와는 거리가 멀다.

동기 작업과 비동기 작업 조합하기

public static List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = shopList.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor)
            )
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(
                    quote -> CompletableFuture.supplyAsync(
                            () -> Discount.applyDiscount(quote), executor
                    )
                )
            )
            .collect(Collectors.toList());
    
    return priceFutures.stream().map(CompletableFuture::join)
            .collect(Collectors.toList());
}

위 코드에서 thenApply 는 CompletableFuture가 끝날때까지 블록하지 않는다.

따라서 블로킹 없이 CompletableFuture<String> 에서 CompletableFuture<Quote>로 변환된다.

추가로, thenCompose 메서드는 첫번째 연산의 결과를 두번째 연산으로 전달하는 메서드다. 위에서는 quote의 결과를 받으면 Discount.applyDiscount 메서드를 한번 더 비동기로 수행시키는 코드다.

독립 CompletableFuture 와 비독립 CompletableFuture 합치기

thenCombine 메서드를 활용

Future<Double> futurePriceInUSD = 
        CompletableFuture.supplyAsync(() -> shop.getPrice(prduct))
        .thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate);

타임아웃 효과적으로 사용하기

Future의 단점으로는 계산이 길어지는 경우 무한정 대기할 수도 있다는 부분이다.

CompletableFuture에서는 이를 위해 orTimeout 메서드를 제공한다.

Future<Double> futurePriceInUSD = 
        CompletableFuture.supplyAsync(() -> shop.getPrice(prduct))
        .thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate)
        .orTimeout(3, TimeUnit.SECONDS);

CompletableFuture의 종료에 대응하는 방법

CompletableFuture는 thenAccept라는 메서드를 제공한다.

이 메서드는 연산 결과를 소비하는 Consumer를 인수로 받아 사용.

CompletableFuture[] futures = shopList.stream()
    .map(shop -> CompletableFuture.supplyAsync(
            () -> shop.getPrice(product), executor)
    )
    .map(future -> future.thenApply(Quote::parse))
    .map(future -> future.thenCompose(
            quote -> CompletableFuture.supplyAsync(
                    () -> Discount.applyDiscount(quote), executor
            )
        )
    )
    .map(f -> f.thenAccept(System.out::println))
    .toArray(size -> new CompletableFuture[size]);

supplyAsync(), runAsync()

CompletableFuture는 supplyAsync()와 runAsync()를 제공하여 직접 쓰레드를 생성하지 않고 작업을 async로 처리하도록 할 수 있다. supplyAsync()는 Lambda로 인자를 전달할 수 있다. 인자로 전달된 Lambda는 다른 쓰레드에서 처리 된다.

supplyAsync()는 리턴 값이 있는 반면에 runAsync()는 리턴 값이 없다.

Exception Handling

CompletableFuture에서 작업을 처리하는 중에 Exception이 발생하면 handle()로 예외를 처리할 수 있다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    String name = null;
    if (name == null) {
        throw new RuntimeException("Computation error!");
    }
    return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

log(future.get());
15:51:35.385 (main) Hello, Stranger!

thenApply() : 리턴 값이 있는 작업 수행

supplyAsync()로 어떤 작업이 처리되면, 그 결과를 가지고 다른 작업도 수행하도록 구현할 수 있다. thenApply() 메소드는 인자와 리턴 값이 있는 Lambda를 수행한다. 여기서 인자는 supplyAsync()에서 리턴되는 값이 된다.

CompletableFuture<String> future1
        = CompletableFuture.supplyAsync(() -> "Future1");

log("future1.get(): " + future1.get());
15:56:26.203 (main) future1.get(): Future1
CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply(s -> s + " + Future2");

log("future.get(): " + future.get());
15:57:49.343 (main) future.get(): Future1 + Future2

thenAccept() : 리턴 값이 없는 작업 수행

thenAccept()도 thenApply()와 비슷하다. 하지만, 인자는 있지만 리턴 값이 없는 Lambda를 처리할 수 있다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Hello");

CompletableFuture<Void> future2 = future1.thenAccept(
        s -> log(s + " World"));
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());
16:02:05.452 (main) Hello World
16:02:05.453 (main) future1.get(): Hello
16:02:05.453 (main) future2.get(): null

thenCompose() : 여러 작업을 순차적으로 수행

thenCompose()는 chain처럼 두 개의 CompletableFuture를 하나의 CompletableFuture로 만들어주는 역할을 한다.

첫 번째 CompletableFuture의 결과가 리턴되면 그 결과를 두 번째 CompletableFuture로 전달하며, 순차적으로 작업이 처리된다.

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
log(future.get());
16:07:33.196 (main) Hello World

thenCombine() : 여러 작업을 동시에 수행

thenCompose()가 여러 개의 CompletableFuture를 순차적으로 처리되도록 만들었다면, thenCombine()는 여러 CompletableFuture를 병렬로 처리되도록 만든다.

모든 처리가 완료되고 그 결과를 하나로 합칠 수 있다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApply((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));
16:12:03.569 (main) Starting future1
16:12:05.571 (main) Starting future2
16:12:07.573 (main) Future1! + Future2!

결과를 보면, 순차적으로 처리가 된 것처럼 보인다.

그 이유는 thenApply()가 동일한 쓰레드를 사용하기 때문에 대기하는 시간이 있기 때문이다.

thenApply() vs thenApplyAsync()

thenApply() 대신에 thenApplyAsync()를 사용하면 다른 쓰레드에서 동작하도록 만들 수 있다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApplyAsync((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApplyAsync((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));

Thread.sleep(5000);
16:15:39.532 (ForkJoinPool.commonPool-worker-2) Starting future2
16:15:39.537 (ForkJoinPool.commonPool-worker-1) Starting future1
16:15:41.537 (ForkJoinPool.commonPool-worker-1) Future1! + Future2!

두 개의 작업이 다른 쓰레드에서 처리되어 2초만에 처리되는 것을 볼 수 있다.

anyOf()

anyOf()는 여러 개의 CompletableFuture중에서 빨리 처리되는 1개의 결과만을 가져오는 메소드다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future1");
            return "Future1";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future2");
            return "Future2";
        });

CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future3");
            return "Future3";
        });

CompletableFuture.anyOf(future1, future2, future3)
        .thenAccept(s -> log("Result: " + s));
16:19:56.826 (ForkJoinPool.commonPool-worker-2) starting future2
16:19:56.826 (ForkJoinPool.commonPool-worker-1) starting future1
16:19:56.826 (ForkJoinPool.commonPool-worker-3) starting future3
16:19:56.826 (ForkJoinPool.commonPool-worker-2) Result: Future2

3개의 future를 모두 실행시키지만 thenAccept()에 전달되는 것이 1개 뿐이다.

allOf()

allOf()는 모든 future의 결과를 받아서 처리할 수 있다.

anyOf()와 다르게 Stream API를 사용하여 결과를 처리할 수 있다. get()은 null을 리턴한다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1");

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2");

CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(() -> "Future3");

CompletableFuture<Void> combinedFuture
        = CompletableFuture.allOf(future1, future2, future3);

log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());

String combined = Stream.of(future1, future2, future3)
        .map(CompletableFuture::join)
        .collect(Collectors.joining(" + "));
log("Combined: " + combined);
16:22:26.615 (main) get() : null
16:22:26.615 (main) future1.isDone() : true
16:22:26.615 (main) future2.isDone() : true
16:22:26.616 (main) future3.isDone() : true
16:22:26.620 (main) Combined: Future1 + Future2 + Future3

async method

thenApply()와 thenApplyAsync()처럼 뒤에 asycn가 붙은 메소드들이 항상 존재한다.

동일한 쓰레드를 사용하지 않고 다른 쓰레드를 사용하여 처리하고 싶을 때 async가 붙은 메소드를 사용하면 된다.

대부분 async가 붙은 메소드들이 pair로 존재한다.

728x90
반응형

추천 글