한쪽으로 치우친 ZooKeepr 와 ZeroMQ 를 이용한 분산 Job 처리
0. 결론부터
이 비교는 몇가지 이유로 매우 공정하지 못한 비교입니다. 수박과 참외를 비교하는 것과 비슷하달까.. 가장 큰 목적은 ZeroMQ 를 이용해서 분산 프로그램을 비교적 쉽게 작성할 수 있다는 것을 소개하기 위한 것입니다. 하지만 망치를 들었다고 모든게 못으로 보이면 안되듯이, ZeroMQ 가 모든 상황에서 들어맞는 것은 아닙니다.
ZeroMQ 는 AMQP 를 고안하고 OpenAMQ를 만든 iMatix 에서 좀더 쉽고 빠른 메시징 방법은 없을까 해서 만든 오픈소스 입니다. MQ 라는 이름이 붙어 있지만 Zero 역시 붙어 있기 때문에, MQ 서버가 짱짱한 서버에서 돌아가거나 하진 않습니다. 그냥 고성능(?) Message 라이브러리라고 보시면 됩니다. 따라서 설치도 library 만 설치하시면 되고, 대부분의 프로그래밍 언어를 지원합니다.
1. 테스트 방법
1-10 까지의 workload 를 가진 1000개의 work 를 1~100 개의 work process 에 LRU(Least Recently Used)방법으로 나누어 수행을 해본 것입니다. 각 worker 는 예를들어 3이라는 값을 받으면 0.3 초동안 sleep 을 한후 다음 job 을 받아오는 방식입니다. ZooKeepr 의 경우는 예제로 제공되는 Recipe 의 Queue 코드를 그대로 사용하였습니다.
구조는
JobProducer -> ZooKeeper (SingleNode) <-> JobConsumer
JobProducer -> ZeroMQ broker <-> JobConsumer
두 경우 모드 JobConsumer 를 미리 띄운 후, JobProducer 가 순차적으로 1000개의 work 를 밀어넣도록 하였습니다.
2. 불공정한 비교
ZooKeeper 에 사용된 queue recipe는 아래 테스트 결과에서도 볼 수 있듯이 herd 문제가 있습니다. 즉 다음 Job 을 가져가기 위해서 경쟁하는 구조라서 효율적인 방법은 아닙니다. job distribution 은 lock recipe가 차라리 더 나은 성능이 나올 수도 있을것 같습니다.
무엇보다 일단 ZooKeeper 와 ZeroMQ broker 는 reliability 에서 상당한 차이가 납니다. ZooKeeper 는 cluster 로 구성할 수 있을 뿐만 아니라, 서버가 죽었다 살아나도 데이터가 사라지지 않습니다. 하지만 0MQ 의 broker 는 broker 로 데이터가 전달된 후 죽으면 기본적으로 메시지가 사라집니다. 이를 보완하는 disk 에 메시지를 저장하는 등의 알려진 방법이 있지만 추가적인 코딩이 필요합니다.
당연히 ZooKeeper 가 높은 reliability 를 보장하다 보니 오버헤드가 있을 것입니다.
또한 JobConsumer (worker) 의 reliability 에서도 약간(?)의 차이가 납니다. 0MQ 의 worker 는 주기적으로 heartbeat 을 100ms 마다 broker 에게 보내도록 프로그래밍 되어 있습니다(이 역시 필수는 아니지만 좀더 높은 reliability 를 위해 추가된 코드입니다). 사실 ZooKeeper 의 client 들도 주기적으로(상당히 짧은 간격으로) ping 을 보내니까 네트웍 사용에 있어서 크게 차이는 없고, 다만 0MQ 는 소켓 close 를 일부러 처리하지 않으므로 dead worker 의 감지은 상대적으로 느릴 수 있습니다. 가능성의 차이는 있지만 ZooKeeper 또한 worker 가 job 을 할당받았지만 처리하지 못하고 먹고 죽을 가능성은 있으므로 만일 이를 보장하려면 ZooKeepr 또한 worker 가 처리 중간에 죽었는지 점검하는 부분이 어떻게든 필요하므로 이것은 큰 차이가 있다고 보지인 않습니다.
테스트에 사용된 0MQ broker 는 restart 에 대한 reliability 는 없고 worker 의 heartbeat 을 처리하지만 145 line 밖에 되지 않습니다. worker 의 코드도 80줄 내외 입니다.
0MQ 는 성능이 좋다.. 코딩대비 아주 쓸만하다.. 개발이 쉽다.. 버그를 일으킬 확률이 낮다.. 뭐 이런말이 하고 싶은 것이죠..
3. 테스트 결과
첫번째 Job 이 할당되고, 마지막 Job 이 할당될때 까지 걸린 시간(sec) 입니다.
| Worker 수 | 1 | 2 | 5 | 10 | 15 | 20 | 25 | 30 | 35 | 40 | 45 | 50 | 100 |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ZooKeepr | 578.620 | 286.002 | 113.366 | 56.820 | 38.104 | 29.017 | 24.283 | 21.181 | 24.031 | 31.260 | 38.919 | 45.694 | 88.331 |
| ZeroMQ | 554.011 | 276.743 | 110.485 | 55.008 | 36.559 | 27.365 | 21.817 | 18.398 | 15.468 | 13.640 | 12.017 | 11.230 | 5.748 |
ZooKeepr 에서는 Herd 가 분명히 보입니다. 반면 ZeroMQ 는 worker 가 늘어나도 거의 overhead 가 없습니다.
Job 의 개수를 1만개, 10만개, 100만개로 늘려보면, ZooKeepr 는 점점 느려지다가 Client 가 뻗어버립니다. 반면 ZeroMQ 는 Job 개수에 영향을 받지 않습니다. ZooKeepr 의 경우는 getChildren 의 데이터가 커져서 그런게 아닌가 싶습니다.
5. 그다음 테스트
- lock recipe 를 응용하며 herd 를 제거한 후 비교
- 0MQ broker 의 reliability 를 높인, JobProducer -> ZooKeeper -> broker <-> JobConsumer 구조의 비교
소스코드
ZeroMQ broker
1 import java.util.LinkedHashMap;
2 import java.util.Arrays;
3 import java.util.Iterator;
4
5 import org.zeromq.ZMQ;
6 import org.zeromq.ZMQ.Context;
7 import org.zeromq.ZMQ.Poller;
8 import org.zeromq.ZMQ.Socket;
9
10 public class ZMQRouter {
11
12 public static final long HEARTBEAT_INTERVAL = 100 ; //ms
13 public static final byte[] MSG_HEARTBEAT = { '\002' } ;
14
15 static class Worker {
16 String address ;
17 long expiry ;
18 int liveness ;
19
20 Worker( String address ) {
21 this.address = address ;
22 heartBeat();
23 }
24
25 void heartBeat()
26 {
27 liveness = 3;
28 expiry = System.currentTimeMillis() + HEARTBEAT_INTERVAL;
29 }
30
31 boolean isAlive(long curTime)
32 {
33 if ( expiry < curTime && --liveness == 0 ) return false;
34 return true;
35 }
36
37 }
38
39 public static void main(String[] argv) throws Exception
40 {
41
42 ZMQ.Context context = ZMQ.context(1);
43
44
45 int requestCount = 0;
46
47 if ( argv.length < 1 )
48 {
49 System.out.println("java ZMQRouter front_host_port backend_host_port");
50 System.exit(0);
51 }
52
53 String frontend_host = argv[0];
54 String backend_host = argv[1];
55 System.out.println( frontend_host );
56 System.out.println( backend_host );
57
58 ZMQ.Socket frontend = context.socket(ZMQ.XREP);
59 ZMQ.Socket backend = context.socket(ZMQ.XREP);
60 backend.bind( backend_host);
61 frontend.bind( frontend_host);
62
63 LinkedHashMap<String,Worker> workers = new LinkedHashMap<String,Worker>();
64
65 while (!Thread.currentThread().isInterrupted()) {
66
67 Poller items = context.poller(2);
68
69 items.register(backend, Poller.POLLIN );
70 if( workers.size() > 0 )
71 items.register( frontend, Poller.POLLIN );
72
73 long ret = items.poll(HEARTBEAT_INTERVAL*1000);
74 if( ret < 0 ) break;
75 if (items.pollin(0)) {
76 String workAddr = new String(backend.recv(0)) ;
77 if ( workAddr.length() == 0 )
78 {
79 continue;
80 }
81
82 byte[] message = backend.recv(0) ;
83 if( Arrays.equals( message, MSG_HEARTBEAT) )
84 {
85 Worker w = workers.get( workAddr );
86 if ( w == null )
87 {
88 workers.put( workAddr, new Worker(workAddr) );
89 System.out.println(String.format("new %s", workAddr ));
90 } else {
91 w.heartBeat();
92 }
93 } else {
94
95 String clientAddr = new String( message );
96 String reply = new String(backend.recv(0));
97 frontend.send( clientAddr.getBytes(), ZMQ.SNDMORE );
98 frontend.send( reply.getBytes(), 0 );
99 }
100
101 }
102 if (items.pollin(1)) {
103
104 String clientAddr = new String(frontend.recv(0));
105 //String empty = new String(backend.recv(0));
106 //assert empty.length() == 0 | true;
107 //System.out.println(String.format("client addr %d: %s" , requestCount, clientAddr ));
108
109 String request = new String(frontend.recv(0));
110
111 requestCount ++;
112 System.out.println(String.format("request %d: %s" , requestCount,request ));
113 Iterator<Worker> it = workers.values().iterator();
114 Worker w = it.next();
115 it.remove();
116
117 backend.send( w.address.getBytes(), ZMQ.SNDMORE );
118 backend.send( clientAddr.getBytes(), ZMQ.SNDMORE );
119 backend.send( request.getBytes(), 0 );
120 }
121
122 // purge dead workers
123 if (ret == 0 )
124 {
125 long curTime = System.currentTimeMillis() ;
126 for(Iterator<Worker> it = workers.values().iterator();it.hasNext(); )
127 {
128 Worker w = it.next();
129 if ( !w.isAlive(curTime) )
130 {
131 it.remove();
132 System.out.println(String.format("removing %s %s:%d", w.address, w.expiry, curTime ));
133 }
134 }
135 }
136
137 }
138
139 frontend.close();
140 backend.close();
141
142 context.term();
143
144 }
145 }

0 개의 댓글:
댓글 쓰기 |