1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.ning.metrics.goodwill.access;
18
19 import com.ning.http.client.AsyncCompletionHandler;
20 import com.ning.http.client.AsyncHttpClient;
21 import com.ning.http.client.AsyncHttpClientConfig;
22 import com.ning.http.client.Response;
23 import org.codehaus.jackson.map.ObjectMapper;
24 import org.codehaus.jackson.type.TypeReference;
25
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.concurrent.Future;
31
32 public class GoodwillAccessor extends Accessor
33 {
34 protected static final ObjectMapper mapper = new ObjectMapper();
35 protected AsyncHttpClient client;
36
37 public GoodwillAccessor(final String host, final int port)
38 {
39 super(host, port);
40 client = createHttpClient();
41 }
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public Future<GoodwillSchema> getSchema(final String schemaName)
61 {
62 try {
63 return client.prepareGet(String.format("%s/%s", url, schemaName)).addHeader("Accept", "application/json").execute(new AsyncCompletionHandler<GoodwillSchema>()
64 {
65 @Override
66 public GoodwillSchema onCompleted(final Response response) throws Exception
67 {
68 if (response.getStatusCode() != 200) {
69 return null;
70 }
71
72 final InputStream in = response.getResponseBodyAsStream();
73 try {
74 return mapper.readValue(in, GoodwillSchema.class);
75 }
76 finally {
77 closeStream(in);
78 }
79 }
80
81 @Override
82 public void onThrowable(final Throwable t)
83 {
84 log.warn("Got exception looking up the schema", t);
85 }
86 });
87 }
88 catch (IOException e) {
89 log.warn("Got exception looking up the schema", e);
90 return null;
91 }
92 }
93
94
95
96
97
98
99
100
101
102 public Future<List<GoodwillSchema>> getSchemata()
103 {
104 try {
105 return client.prepareGet(url).addHeader("Accept", "application/json").execute(new AsyncCompletionHandler<List<GoodwillSchema>>()
106 {
107 @Override
108 public List<GoodwillSchema> onCompleted(final Response response) throws Exception
109 {
110 if (response.getStatusCode() != 200) {
111 return null;
112 }
113
114 InputStream in = response.getResponseBodyAsStream();
115 try {
116 final HashMap<String, List<GoodwillSchema>> map = mapper.readValue(in,
117 new TypeReference<HashMap<String, List<GoodwillSchema>>>()
118 {
119 });
120 return map.get("types");
121 }
122 finally {
123 closeStream(in);
124 }
125 }
126
127 @Override
128 public void onThrowable(final Throwable t)
129 {
130 log.warn("Got exception looking up the schema list", t);
131 }
132 });
133 }
134 catch (IOException e) {
135 log.warn("Got exception looking up the schema list", e);
136 return null;
137 }
138 }
139
140
141 private static AsyncHttpClient createHttpClient()
142 {
143
144
145 final AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();
146 builder.setMaximumConnectionsPerHost(-1);
147 return new AsyncHttpClient(builder.build());
148 }
149
150
151
152
153 public synchronized void close()
154 {
155 client.close();
156 }
157
158 protected final void closeStream(final InputStream in)
159 {
160 if (in != null) {
161 try {
162 in.close();
163 }
164 catch (IOException e) {
165 log.warn("Failed to close http-client - provided InputStream: {}", e.getLocalizedMessage());
166 }
167 }
168 }
169 }