본문 바로가기
  • Where there is a will there is a way.
개발/java

Rxjava 익스텐션

by 소확행개발자 2018. 11. 14.

Rxjava 익스텐션 

Rxjava를 활용한 리액티브 프로그래밍을 참고했습니다.

Observable<T> 는 핵심 API 이므로 무엇을 의미하고 어떻게 작동하는지 확실히 이해해야 한다.


이번 장에서는 간단한 데이터 스트림을 만들어서 매우 흥미로운 방식으로 결합하고 조합하는 방법을 배울 수 있다.




rx.Ovservable 해부하기

rx.Observable<T> 는 값이 흐르는 순서를 나타낸다.


주요 사용되는 사용처


- 사용자 인터페이스 이벤트

- 온라인 상점에서 발생하는 주문

- 소셜 미디어 사이트에 올라오는 글



Observable<T> 와 Iterator<T> 의 유사점


둘다 무한 순열을 나타냄

둘다 더이상 next가 없을 때 자체에서 이를 알릴 수 있다.


차이점


Observable 은 본질적으로 밀어내기 방식을 취하기 때문에 언제 값을 생성할지 스스로 정한다.

반면 Iterator는 누군가 실제로 next() 를 써서 요청하지 않으면 가만히 머물러 있는다.


Observable 의 특징

  • 특정 시점에서 클라이언트가 Observable을 구독할 수 있으며 OBservable이 값을 방출하면 알림을 받지만 구체적으로 언제 받을지는 알 수 없다.
  • Observable 은 임의의 개수만큼 이벤트를 생성할 수 있다.

Observable 은 어디서 사용하나?

 Observable<Tweet> tweets 

 소셜 미디어 웹사이트에서 벌어지는 상태 갱신은 확실히 이벤트 스트림으로 나타낼 수 있다고 바로 납득할 만하다.

 Observable<Double> temperature

 마찬가지로 어떤 기기에서 온도값을 생성하여 구독자에게 밀어낸다. 

 Observable<Customer> customer

 대부분의 경우 데이터베이스 질의를 통한 고객 목록을 반환할 것이며, 결과 개수가 어떻든 간에 느긋하게 수행된다. 

 Observable<HttpResponse> response 

 종료 시까지 단 하나의 이벤트 (값)만 넘긴다. 이 값은 언젠가 나타나서 클라이언 코드로 밀려나갈 텐데, 해당 응답을 받으려면 구독해야 한다. 


느긋하게 수행된다는 것은 


2018/09/21 - [개발/java] - RxJava Observerable 


글에서 확인이 가능하다.



Observable 알림 구독

Observable 은 누군가 실제로 관심을 가지고 받기를 원하지 않는 한 이벤트를 방출하지 않는다. Observable 관찰을 시작하려면 subscribe() 계통의 메서드를 사용해야 한다.



Observable<Tweet> tweets = //..


tweets.subscribe((Tweet tweet) -> System.out.println(tweet));


위의 예제는 어떤 콜백을 등록하는 형태로 tweets Observable을 구독하는데, tweets 스트림이 다운스트림에 이벤트를 방출할 때마다 해당 콜백이 실행된다.


Rxjava 규약은 이벤트가 여러 스레드에서 방출된다 해도 콜백이 한 번에 두 개 이상의 스레드에서 작동하지 않음을 보장한다.


Obervable 은


tweets.subscribe(

(Tweet tweet ) -> {System.out.println(tweet);},

(Throwable t ) -> {t.printStackTrace(); },

{() -> {this.noMore();}

);


위의 예제는 구독과 예외 처리를 하는 방법이고 complete가 났을 때 구독을 종료한다.


하지만 무한 event 가 발생하는 경우엔 마지막 () -> this.noMore(); 을 빼면 된다.


Observer<T> 로 모든 알림 잡아내기

위에서 세 가지 인자를 모두 받는 subscribe()가 상당히 유용함을 확인했고,

이들 세 가지 콜백을 ( next / throwable / complete ) 

감싸놓은 간단한 구현체를 사용하면 편리하다. 바로 이를 위해 Observer<T>를 만들었다. 

Observer<T> 는 이들 세 가지 콜백을 위한 컨테이너로서 Observable<T>에서 발생하는 모든 알림을 받는다.


public interface Observer<T> {


void onSubscribe(@NonNull Disposable d);


void onNext(@NonNull T t);


void onError(@NonNull Throwable e);


void onComplete();

}


tweets.subscribe(observer);


Observer<T> 는 RxJava에서 청취를 위한 핵심 추상화이다. 하지만 보다 강한 제어를 원한다면 Observer 의 추상 구현체인 Subscriber를 사용하면 된다.


Subscription 과 Subscriber<T>로 리스너 제어하기

하나의 Observable은 게시-구독 패턴처럼 게시자 하나가 여러 소비자에 이벤트를 전달할 수 있는 것 처럼 본래 여러 구독자를 보유할 수 있다. 


RxJava에서 Observable<T>는 서버가 작동하는 매우 짧은 순간 동안, 혹은 여러 날에 걸친 긴 시간 동안 존재하는 정형화된 자료 구조일 뿐이다.


구독자에도 똑같이 적용된다. 어떤 Observable을 구독하여 몇몇 이벤트는 소비하고 나머지는 모두 버릴 수 도 있다.


 또는 몇 시간 혹은 며칠에 걸쳐 Observable이 살아 있는 동안 끊임없이 이벤트를 소비할 수도 있다.


구독취소하기

구독 취소하기 방법은 2가지 방법으로 


Subscription과 Subscriber가 있다.


Subscription subscription = tweets.subscribe(System.out.println);


subscription.unsubscribe(); // 다음 메소드를 통해서 구독을 취소할 수 있다. ( 추가로 isUnsubscribed() 를 통해서 구독 상태를 확인할 수 있다. )




뜨거운 Observable 과 차가운 Observable

차가운 Observable은 실제로 구독자가 없으면 이벤트 방출을 하지 않는다. 단순히 정적 자료 구조일 뿐이다. 
즉 별도로 처리하지 않으면 구독자들은 각자 스트림의 복사본을 받는다는 뜻이기도 하다.

차가운 Observable은 .create()를 써서 만들거나 just from range 등이 있다.

뜨거운 Observable은 획득한 순간 Subscriber 여부와 관계 없이 즉시 이벤트를 방출한다. Observable은 심지어 아무도 듣고 있지 않아도 이벤트를 다운스트림으로 밀어내기 땜ㄴ에 이벤트 유실 있을 수 있다.

차가운이는 전적으로 통제가 가능하지만 뜨거운이는 소비자로부터 독립적이다. 

뜨거운이 사용 예제 ( ex 마우스움직임 또는 키보드 입력 ) 

뜨거움과 차가움을 구별하는 중요성은 이벤트 전달에 의존할 때 더욱 두드러진다. 

즉시 혹은 몇 시간 후든 상관없이 차가운 Observable은 언제 구독하더라도 완전하고도 일관된 이벤트 집합을 받는다. 


댓글