View Javadoc

1   /*
2    * Copyright 2010 Ning, Inc.
3    *
4    * Ning licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.ning.metrics.goodwill.store;
18  
19  import com.google.inject.Inject;
20  import com.ning.metrics.goodwill.access.GoodwillSchema;
21  import com.ning.metrics.goodwill.access.GoodwillSchemaField;
22  import com.ning.metrics.goodwill.binder.config.GoodwillConfig;
23  import com.ning.metrics.goodwill.dao.DAOAccess;
24  import org.apache.log4j.Logger;
25  
26  import java.io.IOException;
27  import java.sql.Connection;
28  import java.sql.PreparedStatement;
29  import java.sql.ResultSet;
30  import java.sql.SQLException;
31  import java.sql.Statement;
32  import java.sql.Types;
33  import java.util.ArrayList;
34  import java.util.Collection;
35  import java.util.Collections;
36  import java.util.Comparator;
37  import java.util.HashMap;
38  
39  import static com.ning.metrics.goodwill.dao.DAOUtil.close;
40  
41  public class MySQLStore extends GoodwillStore
42  {
43      private static Logger log = Logger.getLogger(MySQLStore.class);
44  
45      private final String TABLE_STRING_DESCRIPTOR =
46          "event_type = ?, " +
47              "field_id = ?, " +
48              "field_type = ?, " +
49              "field_name = ?, " +
50              "sql_type = ?, " +
51              "sql_length = ?, " +
52              "sql_precision = ?, " +
53              "sql_scale = ?, " +
54              "description = ?";
55  
56      private final String tableName;
57      private final DAOAccess access;
58  
59      @Inject
60      public MySQLStore(
61          GoodwillConfig config,
62          DAOAccess access
63      ) throws IOException
64      {
65          this(config.getStoreDBThriftTableName(), access);
66      }
67  
68      public MySQLStore(
69          String DBTableName,
70          DAOAccess access
71      ) throws IOException
72      {
73          tableName = DBTableName;
74          this.access = access;
75  
76          buildGoodwillSchemaList();
77      }
78  
79      @Override
80      public Collection<GoodwillSchema> getTypes() throws IOException
81      {
82          buildGoodwillSchemaList();
83  
84          final ArrayList<GoodwillSchema> thriftTypesList = new ArrayList(goodwillSchemata.values());
85          Collections.sort(thriftTypesList, new Comparator<GoodwillSchema>()
86          {
87              @Override
88              public int compare(GoodwillSchema o, GoodwillSchema o1)
89              {
90                  return o.getName().compareTo(o1.getName());
91              }
92          });
93  
94          if (sink != null) {
95              for (int i = 0; i < thriftTypesList.size(); i++) {
96                  GoodwillSchema schema = thriftTypesList.get(i);
97                  schema.setSinkAddInfo(sink.addTypeInfo(schema));
98                  thriftTypesList.set(i, schema);
99  
100             }
101         }
102 
103         return thriftTypesList;
104     }
105 
106     /**
107      * Add a new type to the store
108      *
109      * @param schema GoodwillSchema to add
110      */
111     @Override
112     public void addType(GoodwillSchema schema)
113     {
114         // Creating a new Schema is really the same as updating/extending one, since the data model we use define schemas
115         // as a list of fields.
116         updateType(schema);
117     }
118 
119     /**
120      * Update a type to the store
121      *
122      * @param schema GoodwillSchema to update
123      */
124     @Override
125     public boolean updateType(GoodwillSchema schema)
126     {
127         Connection connection = null;
128         Statement select = null;
129         PreparedStatement inserts = null;
130         PreparedStatement updates = null;
131         ResultSet result = null;
132 
133         try {
134             connection = getConnection();
135             select = connection.createStatement();
136             inserts = connection.prepareStatement(sqlInsertField());
137             updates = connection.prepareStatement(sqlUpdateField());
138 
139             // Update all fields
140             for (GoodwillSchemaField field : schema.getSchema()) {
141                 // There needs to be a UNIQUE constraint on (event_type, field_id)
142                 result = select.executeQuery(sqlSelectFieldId(schema, field));
143                 boolean seen = false;
144 
145                 while (result.next()) {
146                     if (seen) {
147                         throw new SQLException(String.format("Duplicated Thiftfield [%s]! add a UNIQUE constraint on (event_type, field_id)", field));
148                     }
149                     else {
150                         seen = true;
151                     }
152 
153                     int key = result.getInt(1);
154 
155                     // Needs to be changed if TABLE_STRING_DESCRIPTOR changes!
156                     updates.setInt(10, key);
157                     addSQLStatementToBatch(updates, schema, field);
158                 }
159 
160                 if (!seen) {
161                     addSQLStatementToBatch(inserts, schema, field);
162                 }
163             }
164 
165             log.info(String.format("ThriftType updates: %s", updates.executeBatch().toString()));
166             log.info(String.format("ThriftType inserts: %s", inserts.executeBatch().toString()));
167             connection.commit();
168         }
169         catch (SQLException e) {
170             log.error(String.format("Unable to modify type [%s]: %s", schema, e));
171             return false;
172         }
173         finally {
174             close(inserts);
175             close(updates);
176             close(connection, select, result);
177         }
178 
179         return true;
180     }
181 
182     /**
183      * Delete a type
184      *
185      * @param schema GoodwillSchema to delete
186      * @return true is success, false otherwise
187      */
188     @Override
189     public boolean deleteType(GoodwillSchema schema)
190     {
191         Connection connection = null;
192         PreparedStatement delete = null;
193 
194         try {
195             connection = getConnection();
196             delete = connection.prepareStatement(sqlDeleteSchema());
197             delete.setString(1, schema.getName());
198             delete.addBatch();
199 
200             int[] results = delete.executeBatch();
201             if (results.length == 0) {
202                 throw new SQLException(String.format("[%s] no DELETE statement submitted", delete.toString()));
203             }
204 
205             int resultCode = results[0];
206             if (resultCode == PreparedStatement.EXECUTE_FAILED) {
207                 throw new SQLException(String.format("[%s] PreparedStatement.EXECUTE_FAILED", delete.toString()));
208             }
209 
210             log.info(String.format("ThriftType deletes: [%s] %d", delete.toString(), resultCode));
211             connection.commit();
212 
213             return true;
214         }
215         catch (SQLException e) {
216             log.error(String.format("Unable to delete type [%s]: %s", schema.getName(), e));
217             return false;
218         }
219         finally {
220             close(connection, delete);
221         }
222     }
223 
224 
225     private void buildGoodwillSchemaList() throws IOException
226     {
227         HashMap<String, GoodwillSchema> schemata = new HashMap<String, GoodwillSchema>();
228         GoodwillSchema currentThriftType = null;
229         String currentThriftTypeName = null;
230 
231         Connection connection = null;
232         Statement select = null;
233         ResultSet result = null;
234         try {
235             connection = getConnection();
236             select = connection.createStatement();
237             result = select.executeQuery(sqlSelectSchema());
238 
239             while (result.next()) {
240                 String thriftType = result.getString(1);
241 
242                 // Don't convert int from NULL to 0
243                 Integer sqlLength = result.getInt(7);
244                 if (result.wasNull()) {
245                     sqlLength = null;
246                 }
247                 Integer sqlScale = result.getInt(8);
248                 if (result.wasNull()) {
249                     sqlScale = null;
250                 }
251                 Integer sqlPrecision = result.getInt(9);
252                 if (result.wasNull()) {
253                     sqlPrecision = null;
254                 }
255 
256                 GoodwillSchemaField thriftField;
257                 try {
258                     thriftField = new GoodwillSchemaField(result.getString(2), result.getString(3), result.getShort(4), result.getString(5), result.getString(6), sqlLength, sqlScale, sqlPrecision);
259                 }
260                 catch (IllegalArgumentException e) {
261                     log.warn(e);
262                     continue;
263                 }
264 
265                 if (currentThriftTypeName == null || !thriftType.equals(currentThriftTypeName)) {
266                     currentThriftTypeName = thriftType;
267 
268                     // Do we have records for this Type already?
269                     if (schemata != null && schemata.get(currentThriftTypeName) != null) {
270                         currentThriftType = schemata.get(currentThriftTypeName);
271                     }
272                     else {
273                         currentThriftType = new GoodwillSchema(currentThriftTypeName, new ArrayList<GoodwillSchemaField>());
274                         schemata.put(currentThriftTypeName, currentThriftType);
275                         log.debug(String.format("Found new ThriftType: %s", currentThriftTypeName));
276                     }
277                 }
278 
279                 currentThriftType.addThriftField(thriftField);
280                 log.debug(String.format("Added ThriftField to %s: %s", currentThriftTypeName, thriftField.toString()));
281             }
282 
283         }
284         catch (SQLException e) {
285             log.warn(String.format("Unable to retrieve schemata: %s", e.getLocalizedMessage()));
286         }
287         finally {
288             close(connection, select, result);
289         }
290 
291         this.goodwillSchemata = schemata;
292     }
293 
294     private void addSQLStatementToBatch(PreparedStatement statement, GoodwillSchema schema, GoodwillSchemaField field)
295         throws SQLException
296     {
297         statement.setString(1, schema.getName());
298         statement.setInt(2, field.getId());
299         statement.setString(3, field.getType().name());
300         statement.setString(4, field.getName());
301         if (field.getSql().getType() == null) {
302             statement.setNull(5, Types.VARCHAR);
303         }
304         else {
305             statement.setString(5, field.getSql().getType());
306         }
307         if (field.getSql().getLength() == null) {
308             statement.setNull(6, Types.INTEGER);
309         }
310         else {
311             statement.setInt(6, field.getSql().getLength());
312         }
313         if (field.getSql().getPrecision() == null) {
314             statement.setNull(7, Types.INTEGER);
315         }
316         else {
317             statement.setInt(7, field.getSql().getPrecision());
318         }
319         if (field.getSql().getScale() == null) {
320             statement.setNull(8, Types.INTEGER);
321         }
322         else {
323             statement.setInt(8, field.getSql().getScale());
324         }
325         if (field.getDescription() == null) {
326             statement.setNull(9, Types.VARCHAR);
327         }
328         else {
329             statement.setString(9, field.getDescription());
330         }
331         statement.addBatch();
332     }
333 
334     /**
335      * Get the select statement to find the row id for a field
336      *
337      * @param schema Schema the field belongs to
338      * @param field  The field to look up
339      * @return The select SQL statement
340      */
341     private String sqlSelectFieldId(GoodwillSchema schema, GoodwillSchemaField field)
342     {
343         return String.format("SELECT id FROM %s WHERE event_type = '%s' AND field_id = %d LIMIT 1", tableName, schema.getName(), field.getId());
344     }
345 
346     /**
347      * Get the select statement to retrieve a full schema
348      *
349      * @return The select SQL statement
350      */
351     private String sqlSelectSchema()
352     {
353         return String.format("SELECT event_type, field_name, field_type, field_id, description, sql_type, sql_length, sql_scale, sql_precision FROM %s ORDER BY field_id ASC", tableName);
354     }
355 
356     /**
357      * Get the update statement for a specific field
358      *
359      * @return The update SQL statement
360      */
361     private String sqlUpdateField()
362     {
363         return String.format("UPDATE %s SET %s WHERE id = ?", tableName, TABLE_STRING_DESCRIPTOR);
364     }
365 
366     /**
367      * Get the insert statement to add a field
368      *
369      * @return The insert SQL statement
370      */
371     private String sqlInsertField()
372     {
373         return String.format("INSERT INTO %s SET %s", tableName, TABLE_STRING_DESCRIPTOR);
374     }
375 
376     /**
377      * Get the delete statement to remove a schema (all fields for the schema)
378      *
379      * @return The delete SQL statement
380      */
381     private String sqlDeleteSchema()
382     {
383         return String.format("DELETE FROM %s WHERE event_type = ?", tableName);
384     }
385 
386     private Connection getConnection() throws SQLException
387     {
388         return access.getDataSource().getConnection();
389     }
390 }