본문 바로가기

IT for developer/Netty

Netty 예제 분석 - Discard


서버 측 프로그램 개발.

프로그램 플로우.

1. ServerBootstrap을 생성한다.
2. ServerBootstrap에 파이프라인을 지정한다.
3. 파이프라인을 다룰 핸들러를 지정한다.
4. 핸들러에서 데이터 처리를 수행한다. 또는 핸들러에서 별도의 데이터 처리를 위해 작성한 컴포넌트를 호출한다.

프로그래밍 플로우.

1. 내가 처리할 데이터가 무엇인지 모델링한다. 주고 받는 포멧(Protocol) 작성
2. 데이터를 받았을 때 무슨 작업을 해야할 지 정의한다. 비즈니스 로직으로 전달
3. 다 처리한 후 또는 예외가 발생했을 때 무엇을 해야할지 결정한다.
4. 네트워크와 직접 관련된 정보들을 설정한다.

Discard 서버 프로그램에 적용해 보기.
1. 별다른 데이터 포멧을 가지고 있지 않는다.
2. 데이터를 받았을 때 아무 응답도 하지 않는다. 
3. 예외가 발생하면 연결을 중지한다.
4. 8080 포트로 바인딩한다.


실제 코드 살펴보기.
1. 없음
2. 데이터를 받는 코드 부분. - Netty는 이벤트 방식 처리로 데이터가 들어왔을 때 호출되는 함수가 있다. 

지정된 핸들러의 messageReceived라는 함수가 호출된다.

59      @Override
60 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
61 // Discard received data silently by doing nothing.
62 transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes());
63 }

transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes()); 이 코드는 일단 패스
현재 전달 받은 데이터는 메세지 이벤트에서 읽을 수 있다. e.getMessage() , 그러나 지금은 아무것도 하지않는 서버이므로 패스

3. 예외가 발생하면 지정된 핸들러의 exceptionCaught 라는 함수가 호출된다.
66      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
67 // Close the connection when an exception is raised.
68 logger.log(
69 Level.WARNING,
70 "Unexpected exception from downstream.",
71 e.getCause());
72 e.getChannel().close();
73 }

예외가 발생했을 때 정보는 ExceptionEvent에 담아서 전달되며 e.getChannel().close();  호출하여 클라이언트의 연결을 끊는다.

4. 이 핸들러를 사용하는 서버용 프로그램을  설정한다.
http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/discard/DiscardServer.html

ServerBootstrap 클래스를 이용하여 간단하게 서버를 설정할 수 있다. NioServerSocketChannelFactory는 서버 측 non-blocking I/O 모드를 위한 채널을 생성한다. ChannelFactory를 지정할 때, Boss 쓰레드와 Worker 쓰레드의 쓰레드 풀을 지정해 준다.
Boss 쓰레드는 Connection을 담당하고 연결이 되면 Worker 쓰레드에게 Channel을 넘긴다. Worker 쓰레드는 non-blocking 읽고 쓰기를 시도한다.
39          ServerBootstrap bootstrap = new ServerBootstrap(
40 new NioServerSocketChannelFactory(
41 Executors.newCachedThreadPool(),
42 Executors.newCachedThreadPool()));

다음으로, 메세지 처리를 작업을 위한 파이프라인 팩토리를 설정한다. 파이프라인 팩토리 지정시 실제 처리할 파이프라인 핸들러를 지정한다. 여기에서는 DiscardServerHandler를 지정했다. 

45          bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
46 public ChannelPipeline getPipeline() throws Exception {
47 return Channels.pipeline(new DiscardServerHandler());
48 }
49 });

8080 포트로 바인딩 한다.
52          bootstrap.bind(new InetSocketAddress(8080));

프로그램을 실행해 보자.

환경설정하기 - 링크

 
Discard 클라이언트 프로그램에 적용해 보기.
1. 별다른 데이터 포멧을 가지고 있지 않는다.
2. 연결시, 소켓버퍼에 공간이 남아 있을 때마다 패킷을 전송한다. 
3. 예외가 발생하면 연결을 중지한다.
4. 8080포트에 연결하는 클라이언트 프로그램


실제 코드 살펴보기.
1. 없음
2-1. 연결시 패킷 전송
http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/discard/DiscardClientHandler.html

서버와 연결되면 channelConnected() 함수가 호출된다. 여기서는 패킷을 전달하기 위해 커스텀 함수인 genrateTraffic 함수를 호출한다.
74      @Override
75 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
77 generateTraffic(e);
78 }
 
 Channel에 write 함수를 호출하여 패킷을 전송할 수 있다. channel.isWritable()은 현재 I/O 쓰레드가  즉시 write를 수행할 수 있는 상태라면 true를 리턴한다.( 내부 버퍼가 꽉차면 false를 리턴한다.) isWritable()함수로 버퍼상태를 체크하지 않고 계속 write를 쓰면 OutOfMemoryError가 발생된다.
106 private void generateTraffic(ChannelStateEvent e) {
111 Channel channel = e.getChannel();
112 while (channel.isWritable()) {
113 ChannelBuffer m = nextMessage();
114 if (m == null) {
115 break;
116 }
117 channel.write(m);
118 }
119 }

121     private ChannelBuffer nextMessage() {
//wrappedBuffer함수는 LitteEndian or BigEndian 설정해줌
122 return ChannelBuffers.wrappedBuffer(content); 123 }
채널 상태에 변화가 일어날 때마다 호출된다. 지금 예제에서는 write 가능상태<--> 불가능 상태 두 상태가 변화가 일어 날 때 마다 이 함수가 호출 된다. 즉, 내부 버퍼가 꽉차서 OP_WRITE 플래그가 0이 였다가 다시 쓸수 있는 상태라서 1이 되는 경우이거나 그 반대의 경우에 이벤트가 발생한다. 
(OP_NONE, OP_READ, OP_WRITE )

81      public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {83          generateTraffic(e);
84

결국 클라이언트가 하는일을 정리해보면,  연결할 때 무한 반복으로 내부 버퍼가 비어있다면 데이터를 계속 write하고 내부 버퍼가 꽉찼다가 공간이 남을 때마다 계속 데이터를 write 한다.

그렇다면 좀전에 패스했던 서버쪽 핸들러에 구현된 messageReceived 코드를 살펴보자.

transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes());
클라이언트에서 전송한 데이터를 바이트 수를 계산해서 계속 증가시켜준다. 그리고 아무것도 하는일은 없다; 왜냐 Discard 이므로;;

3. 서버쪽과 동일하므로 패스.

4. ClientBootstrap을 이용하여 클라이언트측 네트워크 설정을 수행한다. 서버측과의 차이점은 일반 Socket 프로그램처럼 클라이언트는 별도의 포트 바인드는 없고 연결할 대상 서버를 위한 IP 주소와 포트를 지정한다.
58          ClientBootstrap bootstrap = new ClientBootstrap(
59 new NioClientSocketChannelFactory(
60 Executors.newCachedThreadPool(),
61 Executors.newCachedThreadPool()));
64 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
65 public ChannelPipeline getPipeline() throws Exception {
66 return Channels.pipeline(
67 new DiscardClientHandler(firstMessageSize));
68 }
69 });
72          ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

일반적인 TCP/IP 프로그래밍 처럼 connect 함수를 이용하여 연결을 시도한다. 그러나 Netty는 비동기 I/O 이기 때문에, connection 호출하여 연결이 완료된 후 결과를 리턴하는 것이 아니라. 즉시 I/O 오퍼레이션 결과 또는 상태를 리턴해준다. (ChannelFuture) - 리턴은 받지만 아직 연결은 되지 않은 상태 일 수 있다.

Uncomplete, Completed 상태가 있다. 성공, 실패, 취소 된경우 Completed 상태로 전환된다.
비동기 I/O 형태로 작성하는 경우 (event-driven) ChannelFutureListener를 추가한다.
동기 I/O 처럼 동작하도록 하기 위해서는 future.awaitUninterruptbly(); 를 사용한다. 

75          future.getChannel().getCloseFuture().awaitUninterruptibly();

위에 코드는 해석해보면 결국 연결 종료가 완료될 때 까지 대기하라는 것이다. 결국 연결 종료가 될 때 까지 무한으로 데이터를 전송한다.

마지막으로 할당한 리소스들을 해제한다. 실행중이던 Executor 즉 쓰레드들을 종료한다.
78          bootstrap.releaseExternalResources();

그밖에 핸들러에 구현된 함수들에 대해 알기 위해서는 어떻게 ChannelEvent가 ChannelHandler에 의해 처리되는지 자세히 알아야 한다.

어떻게 ChannelEvent가 ChannelHandler에의해  어떻게 처리되는지 살펴보자.

모든 이벤트는 다운스트림 또는 업스트립 이벤트중에 하나이다. 만약에 이벤트 플로우가 ChannelPipleine에서 처음 핸들러에서 마지막 핸들러까지 전달한다면 이는 업스트림 이벤트이다. 그 반대의 경우, 즉 마지막 핸들러에서 처음 핸들러로 이벤트 흐름이라면 이는 다운스트림 이벤트이다.

 ChannelPipeline p = Channels.pipeline();
 p.addLast("1", new UpstreamHandlerA());
 p.addLast("2", new UpstreamHandlerB());
 p.addLast("3", new DownstreamHandlerA());
 p.addLast("4", new DownstreamHandlerB());
 p.addLast("5", new UpstreamHandlerX());

업스트림일 때 핸들러는 1, 2, 3, 4, 5순으로 수행하고 다운스트림일 때 5, 4, 3, 2, 1 순으로 처리한다.




실제 3, 4는 ChannelUpstreamHandler를 구현하지 않아서 upstream 이벤트의 순서는 1, 2, 5 이다.
1, 2, 5는 ChannelDownstreamHandler를 구현하지 않아서 downstream 이벤트의 순서는 4, 3 이다.
둘다 처리하도록 구현하고 싶으면 SimpleChannelHandler를 이용하자.
만약에 5가 SimpleChannelHandler를 상속받았다면 upstream과 downstream 평가 순서는 각각 125, 543이 된다.

결국 하나이상의 핸들러를 파이프라인에 등록하는 경우, 업스트림과 다운스트림을 잘 활용하면 되겠다.

이 예제에서는 다운스트림 업스트림이 크게 의미 있지는 않은 듯하다. 일단 패스!