1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.ning.metrics.goodwill.store;
18
19 import com.google.inject.Inject;
20 import com.ning.metrics.goodwill.access.GoodwillSchema;
21 import com.ning.metrics.goodwill.access.GoodwillSchemaField;
22 import com.ning.metrics.goodwill.binder.config.GoodwillConfig;
23 import com.ning.metrics.goodwill.dao.DAOAccess;
24 import org.apache.log4j.Logger;
25
26 import java.io.IOException;
27 import java.sql.Connection;
28 import java.sql.PreparedStatement;
29 import java.sql.ResultSet;
30 import java.sql.SQLException;
31 import java.sql.Statement;
32 import java.sql.Types;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.Comparator;
37 import java.util.HashMap;
38
39 import static com.ning.metrics.goodwill.dao.DAOUtil.close;
40
41 public class MySQLStore extends GoodwillStore
42 {
43 private static Logger log = Logger.getLogger(MySQLStore.class);
44
45 private final String TABLE_STRING_DESCRIPTOR =
46 "event_type = ?, " +
47 "field_id = ?, " +
48 "field_type = ?, " +
49 "field_name = ?, " +
50 "sql_type = ?, " +
51 "sql_length = ?, " +
52 "sql_precision = ?, " +
53 "sql_scale = ?, " +
54 "description = ?";
55
56 private final String tableName;
57 private final DAOAccess access;
58
59 @Inject
60 public MySQLStore(
61 GoodwillConfig config,
62 DAOAccess access
63 ) throws IOException
64 {
65 this(config.getStoreDBThriftTableName(), access);
66 }
67
68 public MySQLStore(
69 String DBTableName,
70 DAOAccess access
71 ) throws IOException
72 {
73 tableName = DBTableName;
74 this.access = access;
75
76 buildGoodwillSchemaList();
77 }
78
79 @Override
80 public Collection<GoodwillSchema> getTypes() throws IOException
81 {
82 buildGoodwillSchemaList();
83
84 final ArrayList<GoodwillSchema> thriftTypesList = new ArrayList(goodwillSchemata.values());
85 Collections.sort(thriftTypesList, new Comparator<GoodwillSchema>()
86 {
87 @Override
88 public int compare(GoodwillSchema o, GoodwillSchema o1)
89 {
90 return o.getName().compareTo(o1.getName());
91 }
92 });
93
94 if (sink != null) {
95 for (int i = 0; i < thriftTypesList.size(); i++) {
96 GoodwillSchema schema = thriftTypesList.get(i);
97 schema.setSinkAddInfo(sink.addTypeInfo(schema));
98 thriftTypesList.set(i, schema);
99
100 }
101 }
102
103 return thriftTypesList;
104 }
105
106
107
108
109
110
111 @Override
112 public void addType(GoodwillSchema schema)
113 {
114
115
116 updateType(schema);
117 }
118
119
120
121
122
123
124 @Override
125 public boolean updateType(GoodwillSchema schema)
126 {
127 Connection connection = null;
128 Statement select = null;
129 PreparedStatement inserts = null;
130 PreparedStatement updates = null;
131 ResultSet result = null;
132
133 try {
134 connection = getConnection();
135 select = connection.createStatement();
136 inserts = connection.prepareStatement(sqlInsertField());
137 updates = connection.prepareStatement(sqlUpdateField());
138
139
140 for (GoodwillSchemaField field : schema.getSchema()) {
141
142 result = select.executeQuery(sqlSelectFieldId(schema, field));
143 boolean seen = false;
144
145 while (result.next()) {
146 if (seen) {
147 throw new SQLException(String.format("Duplicated Thiftfield [%s]! add a UNIQUE constraint on (event_type, field_id)", field));
148 }
149 else {
150 seen = true;
151 }
152
153 int key = result.getInt(1);
154
155
156 updates.setInt(10, key);
157 addSQLStatementToBatch(updates, schema, field);
158 }
159
160 if (!seen) {
161 addSQLStatementToBatch(inserts, schema, field);
162 }
163 }
164
165 log.info(String.format("ThriftType updates: %s", updates.executeBatch().toString()));
166 log.info(String.format("ThriftType inserts: %s", inserts.executeBatch().toString()));
167 connection.commit();
168 }
169 catch (SQLException e) {
170 log.error(String.format("Unable to modify type [%s]: %s", schema, e));
171 return false;
172 }
173 finally {
174 close(inserts);
175 close(updates);
176 close(connection, select, result);
177 }
178
179 return true;
180 }
181
182
183
184
185
186
187
188 @Override
189 public boolean deleteType(GoodwillSchema schema)
190 {
191 Connection connection = null;
192 PreparedStatement delete = null;
193
194 try {
195 connection = getConnection();
196 delete = connection.prepareStatement(sqlDeleteSchema());
197 delete.setString(1, schema.getName());
198 delete.addBatch();
199
200 int[] results = delete.executeBatch();
201 if (results.length == 0) {
202 throw new SQLException(String.format("[%s] no DELETE statement submitted", delete.toString()));
203 }
204
205 int resultCode = results[0];
206 if (resultCode == PreparedStatement.EXECUTE_FAILED) {
207 throw new SQLException(String.format("[%s] PreparedStatement.EXECUTE_FAILED", delete.toString()));
208 }
209
210 log.info(String.format("ThriftType deletes: [%s] %d", delete.toString(), resultCode));
211 connection.commit();
212
213 return true;
214 }
215 catch (SQLException e) {
216 log.error(String.format("Unable to delete type [%s]: %s", schema.getName(), e));
217 return false;
218 }
219 finally {
220 close(connection, delete);
221 }
222 }
223
224
225 private void buildGoodwillSchemaList() throws IOException
226 {
227 HashMap<String, GoodwillSchema> schemata = new HashMap<String, GoodwillSchema>();
228 GoodwillSchema currentThriftType = null;
229 String currentThriftTypeName = null;
230
231 Connection connection = null;
232 Statement select = null;
233 ResultSet result = null;
234 try {
235 connection = getConnection();
236 select = connection.createStatement();
237 result = select.executeQuery(sqlSelectSchema());
238
239 while (result.next()) {
240 String thriftType = result.getString(1);
241
242
243 Integer sqlLength = result.getInt(7);
244 if (result.wasNull()) {
245 sqlLength = null;
246 }
247 Integer sqlScale = result.getInt(8);
248 if (result.wasNull()) {
249 sqlScale = null;
250 }
251 Integer sqlPrecision = result.getInt(9);
252 if (result.wasNull()) {
253 sqlPrecision = null;
254 }
255
256 GoodwillSchemaField thriftField;
257 try {
258 thriftField = new GoodwillSchemaField(result.getString(2), result.getString(3), result.getShort(4), result.getString(5), result.getString(6), sqlLength, sqlScale, sqlPrecision);
259 }
260 catch (IllegalArgumentException e) {
261 log.warn(e);
262 continue;
263 }
264
265 if (currentThriftTypeName == null || !thriftType.equals(currentThriftTypeName)) {
266 currentThriftTypeName = thriftType;
267
268
269 if (schemata != null && schemata.get(currentThriftTypeName) != null) {
270 currentThriftType = schemata.get(currentThriftTypeName);
271 }
272 else {
273 currentThriftType = new GoodwillSchema(currentThriftTypeName, new ArrayList<GoodwillSchemaField>());
274 schemata.put(currentThriftTypeName, currentThriftType);
275 log.debug(String.format("Found new ThriftType: %s", currentThriftTypeName));
276 }
277 }
278
279 currentThriftType.addThriftField(thriftField);
280 log.debug(String.format("Added ThriftField to %s: %s", currentThriftTypeName, thriftField.toString()));
281 }
282
283 }
284 catch (SQLException e) {
285 log.warn(String.format("Unable to retrieve schemata: %s", e.getLocalizedMessage()));
286 }
287 finally {
288 close(connection, select, result);
289 }
290
291 this.goodwillSchemata = schemata;
292 }
293
294 private void addSQLStatementToBatch(PreparedStatement statement, GoodwillSchema schema, GoodwillSchemaField field)
295 throws SQLException
296 {
297 statement.setString(1, schema.getName());
298 statement.setInt(2, field.getId());
299 statement.setString(3, field.getType().name());
300 statement.setString(4, field.getName());
301 if (field.getSql().getType() == null) {
302 statement.setNull(5, Types.VARCHAR);
303 }
304 else {
305 statement.setString(5, field.getSql().getType());
306 }
307 if (field.getSql().getLength() == null) {
308 statement.setNull(6, Types.INTEGER);
309 }
310 else {
311 statement.setInt(6, field.getSql().getLength());
312 }
313 if (field.getSql().getPrecision() == null) {
314 statement.setNull(7, Types.INTEGER);
315 }
316 else {
317 statement.setInt(7, field.getSql().getPrecision());
318 }
319 if (field.getSql().getScale() == null) {
320 statement.setNull(8, Types.INTEGER);
321 }
322 else {
323 statement.setInt(8, field.getSql().getScale());
324 }
325 if (field.getDescription() == null) {
326 statement.setNull(9, Types.VARCHAR);
327 }
328 else {
329 statement.setString(9, field.getDescription());
330 }
331 statement.addBatch();
332 }
333
334
335
336
337
338
339
340
341 private String sqlSelectFieldId(GoodwillSchema schema, GoodwillSchemaField field)
342 {
343 return String.format("SELECT id FROM %s WHERE event_type = '%s' AND field_id = %d LIMIT 1", tableName, schema.getName(), field.getId());
344 }
345
346
347
348
349
350
351 private String sqlSelectSchema()
352 {
353 return String.format("SELECT event_type, field_name, field_type, field_id, description, sql_type, sql_length, sql_scale, sql_precision FROM %s ORDER BY field_id ASC", tableName);
354 }
355
356
357
358
359
360
361 private String sqlUpdateField()
362 {
363 return String.format("UPDATE %s SET %s WHERE id = ?", tableName, TABLE_STRING_DESCRIPTOR);
364 }
365
366
367
368
369
370
371 private String sqlInsertField()
372 {
373 return String.format("INSERT INTO %s SET %s", tableName, TABLE_STRING_DESCRIPTOR);
374 }
375
376
377
378
379
380
381 private String sqlDeleteSchema()
382 {
383 return String.format("DELETE FROM %s WHERE event_type = ?", tableName);
384 }
385
386 private Connection getConnection() throws SQLException
387 {
388 return access.getDataSource().getConnection();
389 }
390 }