개발/java

RxJava Observerable

소확행개발자 2018. 9. 21. 20:51

RxJava Observable 이란 무엇인가

Rxjava는 비동기나 이벤트 기반 프로그래밍의 고통을 덜어내기 위해 만들었다.



rx.Observable은 값이 흐르는 순서를 나타내는 추상화이다. 


observable은 본질적으로 push 방싱을 취하기 때문에 언제 값을 생성할지 스스로 정한다. 즉 특정 시점에서 클라이언트가 Observable을 구독할 수 있으며 Observable이 값을 방출하면 알림을 받지만 구체적으로 언제 받을지는 알 수 없다. 


Observable<Tweet> tweets


tweets 는 상태 갱신은 확실히 이벤트 스트림으로 받는데 Iterator와는 달리 필요할 때 값을 끌어오는게 아니라 Observable은 값이 들어오는대로 밀어내야 한다.


Observable<Double> Temperature


어떤 기기에서 온도값을 생성하여 구독자에게 밀어낸다. tweets와 temperature 모두 미래 사건(이벤트)의 무한 스트림 예제이다.


Observable<Customer> customers


대부분의 경우 아마도 데이터베이스 질의를 통한 고객 목록을 반환할 것이며, 결과 개수가 어떻든 간에 느긋하게 수행된다. 


Observable<HttpResponse> response


반면 위에는 종료 시까지 단 하나의 이벤트만 넘긴다. 이 값은 언젠가 나타나서 클라이언트 코드로 가고 해당 응답을 받으려면 구독해야 한다.


Observable 알림 구독

" 사용자가 트위터를 올렸고 업데이트를 할때 이벤트가 발생하고 알림이 발생하여 들어온 정보를 push 할테니까 구독자들 은 받아봐 ~"

누군가 실제로 관심을 갖고 받기를 원하지 않는 한 이벤트를 방출하지 않는다. Observable 관찰을 시작하려면 subscribe()계통의 메서드를 사용해야 한다.

Observable<Tweet> tweets = //..

tweets.subscribe((Tweet tweet) -> 
System.out.println(tweet));

tweets 스트림이 다운스트림에 이벤트를 방출할 때맏 해당 콜백이 실행된다. RxJava 규약은 심지어 이벤트가 여러 스레드에서 방출된다 해도 콜백이 한 번에 두 개 이상의 스레드에서 작동하지 않음을 보장한다. 

subscribe는 예외를 던지지 않으므로 try-catch 구문을 사용하지 않으며, 대신 따로 콜백을 제공한다.
tweets.subscribe(
(Tweet tweet) -> {System.out.println(tweet); },
(Throwable t ) -> {t.printStackrTrace(); }

참고로 자바 8에서는 가독성을 높이기 위해 종종 람다식 대신 메서드 레퍼런스를 사용하기도 한다.

tweets.subscribe(
 System.out::println,
Throwable::printStackTrace,



뜨거운 Observable 차가운 Observable

 차가운 observable 은 단순히 정적인 구조로 일반적으로 Observable.create() 를 사용해서 만드는데, 누군가가 수신하지 않는 한 어떤 작업도 시작하면 안되고 실행을 연기한다.


반면에 뜨거운 Observable은 소비자로부터 독립적이어서 Observable은 subscribe동작에 영향을 미치지 않으며 서로 완전히 분리되고 독립적이다. 


뜨거운 Observable은 보통 이벤트 소스를 통제할 수 없는 경우에 발생한다. 대표적인 예가 바로 마우스의 움직임이나 키보드 입력이 있다. 


느슨한 Observable 을 사용하게 되면 observe() 를 시작할때 단 한 번 호출한 뒤 싱글턴 방식을 유지해야 한다. 즉 다수의 구독자들이 재사용 할 수 있어야 한다.


그 방법을 알아보자


수동으로 Subscriber 관리하기


set <Subscriber<Status>> 로 모든 구독자를 추적하겠다는 발상인데, 이 집합이 비어 있는지 여부에 따라 외부 시스템 연결 시작/종료를 결정한다.




애플리케이션 시작시 몇몇 컴포넌트 ( 예 스프링 빈 ) 에서 이 Observable을 구독하고 듣기 시작한다. 


뜨거운 observable 이벤트를 방출하기 시작하여 모든 subscriber가 전전으로 일관된 세계관을 유지하기 바란다면 


해당 observable을 게시하고 대상 시스템의 모든 컴포넌트에 구독( subscribe ) 기능을 부여해야 한다.