본문 바로가기
  • Where there is a will there is a way.
개발/java

RxJava Observerable

by 소확행개발자 2018. 9. 21.

RxJava Observable 이란 무엇인가

Rxjava는 비동기나 이벤트 기반 프로그래밍의 고통을 덜어내기 위해 만들었다.



rx.Observable은 값이 흐르는 순서를 나타내는 추상화이다. 


observable은 본질적으로 push 방싱을 취하기 때문에 언제 값을 생성할지 스스로 정한다. 즉 특정 시점에서 클라이언트가 Observable을 구독할 수 있으며 Observable이 값을 방출하면 알림을 받지만 구체적으로 언제 받을지는 알 수 없다. 


Observable<Tweet> tweets


tweets 는 상태 갱신은 확실히 이벤트 스트림으로 받는데 Iterator와는 달리 필요할 때 값을 끌어오는게 아니라 Observable은 값이 들어오는대로 밀어내야 한다.


Observable<Double> Temperature


어떤 기기에서 온도값을 생성하여 구독자에게 밀어낸다. tweets와 temperature 모두 미래 사건(이벤트)의 무한 스트림 예제이다.


Observable<Customer> customers


대부분의 경우 아마도 데이터베이스 질의를 통한 고객 목록을 반환할 것이며, 결과 개수가 어떻든 간에 느긋하게 수행된다. 


Observable<HttpResponse> response


반면 위에는 종료 시까지 단 하나의 이벤트만 넘긴다. 이 값은 언젠가 나타나서 클라이언트 코드로 가고 해당 응답을 받으려면 구독해야 한다.


Observable 알림 구독

" 사용자가 트위터를 올렸고 업데이트를 할때 이벤트가 발생하고 알림이 발생하여 들어온 정보를 push 할테니까 구독자들 은 받아봐 ~"

누군가 실제로 관심을 갖고 받기를 원하지 않는 한 이벤트를 방출하지 않는다. Observable 관찰을 시작하려면 subscribe()계통의 메서드를 사용해야 한다.

Observable<Tweet> tweets = //..

tweets.subscribe((Tweet tweet) -> 
System.out.println(tweet));

tweets 스트림이 다운스트림에 이벤트를 방출할 때맏 해당 콜백이 실행된다. RxJava 규약은 심지어 이벤트가 여러 스레드에서 방출된다 해도 콜백이 한 번에 두 개 이상의 스레드에서 작동하지 않음을 보장한다. 

subscribe는 예외를 던지지 않으므로 try-catch 구문을 사용하지 않으며, 대신 따로 콜백을 제공한다.
tweets.subscribe(
(Tweet tweet) -> {System.out.println(tweet); },
(Throwable t ) -> {t.printStackrTrace(); }

참고로 자바 8에서는 가독성을 높이기 위해 종종 람다식 대신 메서드 레퍼런스를 사용하기도 한다.

tweets.subscribe(
 System.out::println,
Throwable::printStackTrace,



뜨거운 Observable 차가운 Observable

 차가운 observable 은 단순히 정적인 구조로 일반적으로 Observable.create() 를 사용해서 만드는데, 누군가가 수신하지 않는 한 어떤 작업도 시작하면 안되고 실행을 연기한다.


반면에 뜨거운 Observable은 소비자로부터 독립적이어서 Observable은 subscribe동작에 영향을 미치지 않으며 서로 완전히 분리되고 독립적이다. 


뜨거운 Observable은 보통 이벤트 소스를 통제할 수 없는 경우에 발생한다. 대표적인 예가 바로 마우스의 움직임이나 키보드 입력이 있다. 


느슨한 Observable 을 사용하게 되면 observe() 를 시작할때 단 한 번 호출한 뒤 싱글턴 방식을 유지해야 한다. 즉 다수의 구독자들이 재사용 할 수 있어야 한다.


그 방법을 알아보자


수동으로 Subscriber 관리하기


set <Subscriber<Status>> 로 모든 구독자를 추적하겠다는 발상인데, 이 집합이 비어 있는지 여부에 따라 외부 시스템 연결 시작/종료를 결정한다.




애플리케이션 시작시 몇몇 컴포넌트 ( 예 스프링 빈 ) 에서 이 Observable을 구독하고 듣기 시작한다. 


뜨거운 observable 이벤트를 방출하기 시작하여 모든 subscriber가 전전으로 일관된 세계관을 유지하기 바란다면 


해당 observable을 게시하고 대상 시스템의 모든 컴포넌트에 구독( subscribe ) 기능을 부여해야 한다.  


댓글