본문 바로가기
  • Where there is a will there is a way.
개발/java

RxJava 란 무엇인가

by 소확행개발자 2018. 9. 21.

RxJava 리엑티브 자바란 무엇인가

  • RxJava 는 자바와 안드로이드를 위한 리엑티브 프로그래밍 구현체
  • 함수형 프로그래밍의 영향을 받았기 때문에 함수 구성을 선호함 ( Ramda 형식 )
  • 전역상태나 부수효과를 피하고 비동기나 이벤트 기반 프로그램을 작성할 때 Stream 방식 ( Ramda Stream ) 으로 생각한다.
  • RxJava 는 생산자/소비자 콜백을 사용한 옵저버 패턴을 시작으로 구성과 변환, 스케줄링, 스로틀링, 오류 처리, 생명주기 관리를 할 수 있는 수많은 연산자를 제공함


RxJava 자바와 안드로이드 뿐만 아니라 서버까지 다루는 오픈소스 라이브러리이다.


리액티브 프로그래밍이란, 데이터나 이벤트 변화와 반응에 초점을 맞춘 프로그래밍을 뜻하는 일반적인 용어이다.


엄밀히 말하면 FRP( fucntional reative programming ) 은 RxJava와 다르다. FRP는 연속적인 시간의 흐름을 포함하고 RxJava는 시간에 대해 불연속적인 이벤트만 다룬다.

 리엑티브 함수형 프로그래밍이란

동시성과 병렬성을 해결하는 프로그래밍으로 리액티브나 비동기 요구사항을 명령형 방식으로 만들었을 때 나타나는 콜백 지옥 문제를 해결하는 것이다.


즉 RxJava 를 이용한 리액티브 프로그래밍은 명령형 방식을 이용하지 않고 선언적 접근을 사용한다.


리액티브 프로그래밍이 필요한 순간

  • 마우스 움직임이나 클릭, 키보드 타이핑, GPS 신호, 자이로스코프 신호, 터치 이벤트 처리
  • 비동기성을 띠는 네트워크 등 지연 I/O 이벤트 응답
  • 이벤트나 앞서 나온 사용자 이벤트, 애플리케이션에서 발생하는 이벤트나 데이터를 다룰때

만약 단 하나의 이벤트 스트림만 처리하는 경우는 콜백 기반의 리액티브 명령형 프로그래밍도 괜찮다.
또한 수많은 이벤트 스트림이 서로 독립적인 경우에도 명령형 프로그래밍은 큰 문제는 아니다.

RxJava 는 함수형 프로그래밍과 데이터플로 프로그래밍에 영향을 받아 리액티브 프로그래밍 원칙들을 구체화한 구현이다.

RxJava는 어떻게 동작하는가

데이터나 이벤트 스트림을 나타내는 Observable 타입으로 push / reactive 방식을 선호

즉시동작이 아닌 지연실행 가능하고 비동기와 동기 방식 모두 사용가능하고 시간에 따라 많은 이벤트를 다룰 수 있다.

RxJava 가 리액티브이기 위한 핵심은 밀어내기 인데 Observable과 Observer 타입 시그니처가 이를 지원한다.

밀어내기를 지원하기 위해서는 Observable / Observer 쌍을 subscribe로 연결한다. 

Observer는 구독을 통해 3가지 유형의 이벤트를 받는다.

interface Observer<T>{
void onNext( T t )
void onError( Throwalbe t )
void onComplete()
}

onComplete 가 호출되면 Observable 스트림은 끝나고 더 이상 이벤트를 보낼 수 없다. 

Observable 이벤트 생성의 중요한 기준은 블로킹/논블로킹 여부이지 동기/비동기 여부가 아니다.

동기 방식 계산

동기 방식을 유지하는 일반적인 이유는 스트림 조합과 연산자를 통한 변환 때문이다. RxJava는 데이터를 조작하거나 결합하고 변환을 위한 map() / filter() take() flatMap() groupBy() 같은 연산자로 구성된 방대한 API이다. 이들 연산자의 대부분은 동기 방식이고, onNext 안에서 이벤트가 지나가는 동안 동기 방식으로 계산을 수행한다. 

RxJava Observable의 규약에 의하면 onNext onCompleted onError 이벤트는 동시에 방출되지 않는다. 즉 직렬화된 스레드이다. 

Observable은 lazy해서 어딘가에서 subscribe 하지 않으면 아무것도 하지 않음을 뜻한다. 이는 future와 같이 일단 생성되면 즉시 동작하는 조급한 유형과는 다르다.

이런 특성 때문에 Observable의 객체 생성은 어떤 작업을 유발하지 않고 구독했을때 해야 할 작업을 정의한다.

예제

Observable<T> someData = observable.create(s -> {
getDataFromServerWithCallback(args, data -> {
s.onNext(data);
s.onCompleted();
});
})

someData.subscribe( s -> System.out.prinltn("Subscriber 1: " + s ));
someData.onErrorResumeNext(lazyFallback).subscribe(s -> System.out.println(s));

이게 느긋함의 힘이다. 2번의 구독을 실행할 수 있으며 원하는 상황에서 원하는 데이터를 뽑아낼 수 있다. 


observable 의 특징 
" 소비자가 호출하는 next()로 데이터를 끌어오는 대신 생산자가 onNext(T) 를 통해 데이터를 밀어내고, 모든 항목을 순회하는 동안 스레드를 블로킹하는 대신 onComplete() 콜백을 통해 정상 종료 신호를 보내며 호출 스택에 예외를 던지는 대신 onError(Throwable) 콜백을 통해 오류 이벤트를 방출한다. " 



// 생산자가 소비자에게 밀어낸다 ? 


// 생산자 Observable mouseEvent = ...;


// 소비자 mouseEvent.subscribe() 


그렇다면 왜 observable을 사용하나?


반환해야 할 목록이 작으면 성능상 아무런 문제가 없다. 주관적인 선택일 뿐이지만 목록이 크거나 다양한 데이터 소스를 끌어와 목록 요소를 채워야 한다면 Observable 을 사용했을 때 성능이나 반응 시간 측면에서 이점이 있다.


또한 컬렉션 전체가 도착할 대까지 기다리지 않고 항목을 받는 대로 처리할 수 있기 때문이다. 이는 특히 상이한 네트워크 지연이 각 항목별로 영향을 미치는 경우에 그렇다. 


Rx Observable이 다중 값 스트림을 다루기에는 좋지만 API를 설계하거나 사용할 때는 단일 값 표현이 단순해서 좋다. 


다음 예제를 확인하자


public static void Single<String> getDataA() {

return Single.<String> create( o -> {

o.onSuccess("DataA");

}).subscribeOn(Schedulers.io());

}



public static Single<String> getDataB(){

return Single.jush("DataB").subscribeOn(Schedulers.io());

}



반환값이 필요없는 Completable //


Observable<Void> c = writeToDatabase("data");


static Completable writeToDatabase(Object data){


return Completable.create( s -> {

doAsyncWrite(data,

// 성공적인 완료 시 콜백

() -> s.onCompleted(),

// Throwable을 포함하는 실패 시 콜백

error -> s.onError(error));


});


}


하드웨어 측면에서 블로킹 I/O와 논블로킹 I/O

논블로킹 I/O와 이벤트 루프 방식이 요청별로 블로킹 I/O 스레드를 사용하는 방식보다 효율적인지의 비교해보면


결과적으로 지연시간 처리량의 개선과 인프라스트럭처 비용을 낮출 수 있다. 그리고 부하에 대한 탄력성이 더 좋다. 


RxJava의 타입과 연산자는 명령형 콜백 위에 쌓아올린 추상화이다.


이 추상화로 코딩 스타일을 완전히 바꿔놓으며 비동기 혹은 논블로킹 프로그래밍을 위한 강력한 도구를 제공한다.



// 참고문헌 RxJava를 활용한 리액티브 프로그래밍 //


이글은 책의 내용을 정리와 참조용 으로 작성한 것입니다.


댓글