Skip to content

Latest commit

Β 

History

History
120 lines (75 loc) Β· 5.59 KB

Ch_10.md

File metadata and controls

120 lines (75 loc) Β· 5.59 KB

10μž₯. 리앑터 κ°œμš”

πŸ‘©πŸΌβ€πŸŽ“ λ°°μš°λŠ” λ‚΄μš©

  • λ¦¬μ•‘ν‹°λΈŒ ν”„λ‘œκ·Έλž˜λ° μ΄ν•΄ν•˜κΈ°
  • ν”„λ‘œμ νŠΈ 리앑터
  • λ¦¬μ•‘ν‹°λΈŒ 데이터 μ˜€νΌλ ˆμ΄μ…˜

10.1 λ¦¬μ•‘ν‹°λΈŒ ν”„λ‘œκ·Έλž˜λ° μ΄ν•΄ν•˜κΈ°

μ• ν”Œλ¦¬μΌ€μ΄μ…˜ μ½”λ“œλ₯Ό κ°œλ°œν•  λ•ŒλŠ” λͺ…λ Ήν˜•κ³Ό **λ¦¬μ•‘ν‹°λΈŒ(λ°˜μ‘ν˜•)**으둜 λ‚˜λˆŒ 수 μžˆλ‹€.

  • λͺ…λ Ήν˜•

    순차적으둜 μ—°μ†λœ μž‘μ—…μ„ ν•œ λ²ˆμ— ν•˜λ‚˜μ”© μ‹€ν–‰

  • λ¦¬μ•‘ν‹°λΈŒ

    μž‘μ—…λ“€μ΄ λ³‘λ ¬μ μœΌλ‘œ μ‹€ν–‰
    데이터λ₯Ό λΆ€λΆ„μ μœΌλ‘œ 처리 κ°€λŠ₯ν•˜λ©° 일정 λΆ€λΆ„λ§Œ λ‹€μŒ μž‘μ—…μœΌλ‘œ λ„˜κΉ€, 기쑴에 λ‚¨μ•„μžˆλŠ” λ°μ΄ν„°λŠ” κ³„μ†ν•΄μ„œ μž‘μ—…

μ΄λŸ¬ν•œ λ¦¬μ•‘ν‹°λΈŒ ν”„λ‘œκ·Έλž˜λ°μ€ λͺ…λ Ήν˜• ν”„λ‘œκ·Έλž¨μ˜ λŒ€μ•ˆμ΄ λ˜λŠ” νŒ¨λŸ¬λ‹€μž„μ΄λ‹€.
κΈ°μ‘΄ λͺ…λ Ήν˜• ν”„λ‘œκ·Έλž˜λ°μ˜ 경우, λΈ”λ‘œν‚Ήμ΄ λ˜λŠ” μž‘μ—…(원격 DB 접속, 파일 μž…μΆœλ ₯ λ“±)이 쑴재 ν•  λ•Œ μŠ€λ ˆλ“œ λ‚­λΉ„κ°€ λ°œμƒν•œλ‹€. 또 닀쀑 μŠ€λ ˆλ“œλ₯Ό μ‚¬μš©ν–ˆμ„ λ•Œ λ™μ‹œμ„± λ¬Έμ œκ°€ λ°œμƒν•˜κΈ°μ— κ΄€λ¦¬ν•˜κΈ°λ„ μ–΄λ €μ›Œμ§„λ‹€.
이에 λ°˜ν•΄ λ¦¬μ•‘ν‹°λΈŒ ν”„λ‘œκ·Έλž˜λ°μ€ μž‘μ—…μ˜ 단계λ₯Ό λ‚˜νƒ€λ‚΄λŠ” 것이 μ•„λ‹Œ 데이터가 ν˜λŸ¬κ°€λŠ” νŒŒμ΄ν”„λΌμΈ ν˜Ήμ€ μŠ€νŠΈλ¦Όμ„ ν¬ν•¨ν•œλ‹€. 그리고 데이터 전체λ₯Ό μ‚¬μš©ν•  수 μžˆμ„ λ•ŒκΉŒμ§€ κΈ°λ‹€λ¦¬λŠ” 것이 μ•„λ‹Œ 데이터가 μ‘΄μž¬ν•  λ•Œλ§ˆλ‹€ μ²˜λ¦¬κ°€ λœλ‹€.

λ¦¬μ•‘ν‹°λΈŒ 슀트림 μ •μ˜

μ•žμ„œ λ³Έ λ¦¬μ•‘ν‹°λΈŒ 슀트림의 경우 데이터가 μ‘΄μž¬ν•  λ•Œλ§ˆλ‹€ μ²˜λ¦¬κ°€ λœλ‹€λŠ” νŠΉμ§•μ΄ μžˆμ—ˆλ‹€. λ§Œμ•½, μ²˜λ¦¬ν•  수 μžˆλŠ” 양보닀 μ „λ‹¬λ˜λŠ” 데이터가 λ§Žλ‹€λ©΄ λ‹Ήμ—°νžˆ ν­νŒŒλœλ‹€. λ”°λΌμ„œ λ¦¬μ•‘ν‹°λΈŒ ν”„λ‘œκ·Έλž˜λ°μ—μ„œλŠ” **λ°± ν”„λ ˆμ…”(Backpressure)**λΌλŠ” μ²˜λ¦¬ν•  수 μžˆλŠ” λ°μ΄ν„°μ˜ 양을 μ •ν•΄μ£ΌλŠ” μˆ˜λ‹¨μ„ 톡해 λ°μ΄ν„°μ˜ 폭주λ₯Ό λ§‰λŠ”λ‹€.

λ¦¬μ•‘ν‹°λΈŒ μŠ€νŠΈλ¦Όμ„ μ‚΄νŽ΄λ³΄λ©΄ 4개의 μΈν„°νŽ˜μ΄μŠ€λ‘œ μš”μ•½μ΄ κ°€λŠ₯ν•˜λ‹€.

  • λ°œν–‰μž(Publisher)

    public interface Publisher<T> {
        void subscribe(Subscriber<? super T> subscriber);
    }

    λ°œν–‰μž μΈν„°νŽ˜μ΄μŠ€μ—λŠ” subscribe() λ©”μ„œλ“œλ₯Ό 톡해 κ΅¬λ…μžκ°€ λ°œν–‰μžλ₯Ό ꡬ독할 수 μžˆλ„λ‘ ν•œλ‹€. ꡬ독이 신청이 λœλ‹€λ©΄ λ°œν–‰μžλ‘œλΆ€ν„° 이벀트λ₯Ό μˆ˜μ‹ ν•  수 있으며, 이 μ΄λ²€νŠΈλ“€μ€ κ΅¬λ…μž μΈν„°νŽ˜μ΄μŠ€μ˜ λ©”μ„œλ“œλ₯Ό 톡해 μ „μ†‘λœλ‹€.

  • κ΅¬λ…μž(Subscriber)

    public interface Subscriber<T> {
        void onSubscribe(Subscription sub);
        void onNext(T item);
        void onError(Throwable ex);
        void onComplete();
    }

    κ΅¬λ…μžκ°€ μˆ˜μ‹ ν•˜λŠ” 첫 번째 μ΄λ²€νŠΈλŠ” onSubscribe() λ©”μ„œλ“œμ˜ ν˜ΈμΆœμ„ 톡해 이루어진닀. 즉, λ°œν–‰μžκ°€ μ € λ©”μ„œλ“œλ₯Ό ν˜ΈμΆœν•˜λ©΄μ„œ ꡬ독을 관리할 수 μžˆλŠ” 객체인 Subscription을 λ„˜κΈ΄λ‹€.

    κ΅¬λ…μžλŠ” ꡬ독 객체의 request() λ©”μ„œλ“œλ₯Ό ν˜ΈμΆœν•˜μ—¬ 데이터λ₯Ό μš”μ²­ν•˜κ±°λ‚˜ cancel() λ©”μ„œλ“œλ₯Ό 톡해 ꡬ독을 μ·¨μ†Œν•  수 μžˆλ‹€. request() λ©”μ„œλ“œ 인자둜 long νƒ€μž…μ˜ 값을 λ„˜κ²¨ λ°›λŠ”λ° 이가 λ°”λ‘œ λ°± ν”„λ ˆμ…”μ΄λ‹€.

    κ΅¬λ…μžμ˜ 데이터 μš”μ²­μ΄ μ™„λ£Œλœλ‹€λ©΄ 데이터가 μŠ€νŠΈλ¦Όμ„ 톡해 μ „λ‹¬λ˜λŠ”λ° μ΄λ•Œ, onNext() λ©”μ„œλ“œκ°€ ν˜ΈμΆœλœλ‹€.(λ°œν–‰μž β†’ κ΅¬λ…μž) λ§Œμ•½, μ—λŸ¬κ°€ λ°œμƒν•œλ‹€λ©΄ onError()κ°€ ν˜ΈμΆœλœλ‹€. 그리고 λͺ¨λ“  데이터가 μ „μ†‘λ˜μ—ˆλ‹€λ©΄ onComplete() λ©”μ„œλ“œκ°€ ν˜ΈμΆœλœλ‹€.

  • ꡬ독(Subscription)

    public interface Subscription {
        void request(long n);
        void cancel();
    }
  • ν”„λ‘œμ„Έμ„œ(Processor)

    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

    ν”„λ‘œμ„ΈμŠ€λŠ” λ°œν–‰μžμ™€ κ΅¬λ…μžλ₯Ό κ²°ν•©ν•œ κ²ƒμœΌλ‘œ κ΅¬λ…μž μ—­ν• λ‘œ 데이터λ₯Ό μˆ˜μ‹ κ³Ό μ²˜λ¦¬κ°€ κ°€λŠ₯ν•˜κ³ , λ°œν–‰μž μ—­ν• λ‘œ κ²°κ³Όλ₯Ό κ΅¬λ…μžλ“€μ—κ²Œ λ°œν–‰ν•œλ‹€. ν”„λ‘œμ„Έμ„œλŠ” 보톡 이벀트λ₯Ό λ³€ν™˜ν•˜λŠ”λ° μ‚¬μš©ν•œλ‹€.

μ΄λŸ¬ν•œ λ¦¬μ•‘ν‹°λΈŒ 슀트림 μΈν„°νŽ˜μ΄μŠ€μ˜ 경우, μŠ€νŠΈλ¦Όμ„ κ΅¬μ„±ν•˜λŠ” κΈ°λŠ₯이 μ‘΄μž¬ν•˜μ§€ μ•ŠλŠ”λ‹€. ν•˜μ§€λ§Œ ν”„λ‘œμ νŠΈ 리앑터(λ¦¬μ•‘ν‹°λΈŒ μŠ€νŠΈλ¦Όμ„ κ΅¬μ„±ν•˜λŠ” API 제곡)λ‚˜ RXJava 등을 톡해 μ‰½κ²Œ ꡬ성할 수 μžˆλ‹€.


10.2 리앑터 μ‹œμž‘ν•˜κΈ°

λ¦¬μ•‘ν„°μ—λŠ” 두가지 νƒ€μž…μ΄ μžˆλŠ”λ° λ°”λ‘œ Mono와 Flux이닀. 이 λ‘κ°œ λͺ¨λ‘ λ¦¬μ•‘ν‹°λΈŒ 슀트림의 Publisher μΈν„°νŽ˜μ΄μŠ€λ₯Ό κ΅¬ν˜„ν•œ 것이닀.

  • Mono

    ν•˜λ‚˜μ˜ 데이터 ν•­λͺ©λ§Œ κ°–λŠ” 데이터에 μ΅œμ ν™”

  • Flux

    0, 1 λ˜λŠ” λ‹€μˆ˜μ˜ 데이터λ₯Ό κ°–λŠ” νŒŒμ΄ν”„λΌμΈ

Mono.just("jongnan")
    .map(n -> n.toUpperCase())
	.subscribe(System.out::println)

μœ„ μ˜ˆμ‹œλŠ” "jongnan" μ΄λž€ λ¬Έμžμ—΄μ„ λŒ€λ¬Έμžλ‘œ λ°”κΎΈκ³  이λ₯Ό μ½˜μ†”μ— 좜λ ₯ν•œλ‹€. just() μ˜€νΌλ ˆμ΄μ…˜μ€ 주어진 값을 Mono ν˜•νƒœλ‘œ μƒμ„±ν•˜λ©°, map() 은 제곡된 ν•¨μˆ˜λ₯Ό μ‹€ν–‰ν•˜λŠ” μ˜€νΌλ ˆμ΄μ…˜μ΄λ‹€. subscribe()은 데이터λ₯Ό μˆ˜μ‹  및 주어진 ν•¨μˆ˜λ₯Ό ν˜ΈμΆœν•˜κ³  μ’…λ£Œν•œλ‹€. 각 μ˜€νΌλ ˆμ΄μ…˜μ„ μ§€λ‚˜μΉ  λ•Œλ§ˆλ‹€ μƒˆλ‘œμš΄ Monoκ°€ μƒμ„±λœλ‹€.

μ΄λŸ¬ν•œ 리앑터λ₯Ό μ‚¬μš©ν•˜κΈ° μœ„ν•΄μ„œλŠ” λ‹€μŒκ³Ό 같은 μ˜μ‘΄μ„±μ„ μΆ”κ°€ν•΄μ£Όλ©΄ λœλ‹€.

compile "io.projectreactor:reactor-core:3.4.3"
testCompile "io.projectreactor:reactor-test:3.4.3"

10.3 λ¦¬μ•‘ν‹°λΈŒ μ˜€νΌλ ˆμ΄μ…˜ μ μš©ν•˜κΈ°

Flux와 Monoμ—λŠ” 500개 μ΄μƒμ˜ μ˜€νΌλ ˆμ΄μ…˜μ΄ μ‘΄μž¬ν•˜λ©°, μ΄λŸ¬ν•œ μ˜€νΌλ ˆμ΄μ…˜μ€ 생성, μ‘°ν•©, λ³€ν™˜, 둜직으둜 λΆ„λ₯˜λ  수 μžˆλ‹€.

μœ„μ— Project Reactor의 곡식 λ¬Έμ„œλ₯Ό λ‚¨κ²¨λ‘μ—ˆμœΌλ‹ˆ μžμ„Έν•˜κ²Œ λ³Ό 수 μžˆλ‹€.