View Javadoc

1   package com.ning.metrics.goodwill.sink;
2   
3   import com.google.inject.Inject;
4   import com.ning.metrics.goodwill.access.GoodwillSchema;
5   import com.ning.metrics.goodwill.access.GoodwillSchemaField;
6   import com.ning.metrics.goodwill.binder.config.GoodwillConfig;
7   import org.apache.commons.lang.StringUtils;
8   import org.apache.log4j.Logger;
9   import org.netezza.datasource.NzDatasource;
10  
11  import java.io.IOException;
12  import java.sql.Connection;
13  import java.sql.SQLException;
14  import java.sql.Statement;
15  
16  public class NetezzaSink implements GoodwillSink
17  {
18      private final Logger log = Logger.getLogger(NetezzaSink.class);
19  
20      private final GoodwillConfig config;
21  
22      @Inject
23      public NetezzaSink(
24          GoodwillConfig config
25      )
26      {
27          this.config = config;
28      }
29  
30      /**
31       * Add a new type to the sink
32       * <p/>
33       * For Netezza, this means creating a table where the data can be dumped. The CREATE TABLE statement
34       * is constructed from the SQL information documented in the ThriftFields.
35       *
36       * @param schema GoodwillSchema to add
37       */
38      @Override
39      public boolean addType(GoodwillSchema schema)
40          throws SQLException, IOException, ClassNotFoundException
41      {
42          boolean success = false;
43  
44          try {
45              Connection connection = connectToNetezza(config.getSinkDBFirstHost(), config.getSinkDBFirstPort(), config.getSinkDBFirstSchema(),
46                  config.getSinkDBFirstUsername(), config.getSinkDBFirstPassword());
47  
48              String createTableStatement = getCreateTableStatement(schema);
49              Statement statement = connection.createStatement();
50              statement.addBatch(createTableStatement);
51  
52              statement.executeBatch();
53              log.info(String.format("Added Thrift to Netezza: %s", schema.getName()));
54              connection.commit();
55  
56              connection.close();
57              success = true;
58  
59              if (config.getSinkFirstExtraSQL() != null) {
60                  success = executeExtraSql(config.getSinkDBFirstHost(), config.getSinkDBFirstPort(), config.getSinkDBFirstSchema(),
61                      config.getSinkDBFirstUsername(), config.getSinkDBFirstPassword(), config.getSinkFirstExtraSQL(), schema);
62  
63                  if (success && config.getSinkSecondExtraSQL() != null) {
64                      success = executeExtraSql(config.getSinkDBSecondHost(), config.getSinkDBSecondPort(), config.getSinkDBSecondSchema(),
65                          config.getSinkDBSecondUsername(), config.getSinkDBSecondPassword(), config.getSinkSecondExtraSQL(), schema);
66                  }
67              }
68  
69              return success;
70          }
71          catch (SQLException e) {
72              log.warn(String.format("Unable to add Type to Netezza: %s", e));
73              return success;
74          }
75      }
76  
77      private boolean executeExtraSql(String host, int port, String database, String username,
78                                      String password, String statement, GoodwillSchema schema)
79      {
80          try {
81              Connection connection = connectToNetezza(host, port, database, username, password);
82  
83              String safeSQL = getUnescapedStatement(statement, schema);
84  
85              log.info(String.format("Running extra SQL in Netezza: %s", safeSQL));
86              Statement extraStatement = connection.createStatement();
87              extraStatement.execute(safeSQL);
88              connection.commit();
89  
90              connection.close();
91              return true;
92          }
93          catch (SQLException e) {
94              log.warn(String.format("Unable to run extra SQL in Netezza: %s", e));
95              return false;
96          }
97          catch (ClassNotFoundException e) {
98              log.warn(String.format("Unable to run extra SQL in Netezza: %s", e));
99              return false;
100         }
101     }
102 
103     private String getUnescapedStatement(String statement, GoodwillSchema schema)
104     {
105         // TODO: hack. We do a manual replacement first because escaping can do strange things. More thoughts needed here.
106         // TODO: hack. Maven escapes strangely parameters on the command line, replace manually \* with *.
107         String safeSQL = StringUtils.replace(statement, "\\*", "*");
108         safeSQL = StringUtils.replace(safeSQL, "?", getTableName(schema));
109         return safeSQL;
110     }
111 
112     private String getCreateTableStatement(GoodwillSchema schema)
113     {
114         String tableName = getTableName(schema);
115         String statement = String.format("CREATE TABLE %s (", tableName);
116 
117         for (GoodwillSchemaField field : schema.getSchema()) {
118             statement += String.format("%s %s,", sanitizeThriftName(field.getName()), field.getFullSQLType());
119         }
120         statement = StringUtils.chop(statement); // remove last comma
121         statement += ") DISTRIBUTE ON RANDOM;";
122 
123         return statement;
124     }
125 
126     private String getTableName(GoodwillSchema schema)
127     {
128         return String.format(config.getSinkDBTableNameFormat(), sanitizeThriftName(schema.getName()));
129     }
130 
131     private String sanitizeThriftName(String name)
132     {
133         return StringUtils.lowerCase(StringUtils.deleteWhitespace(name));
134     }
135 
136     /**
137      * Update a type to the sink
138      * <p/>
139      * Updating a table in Netezza can be quite tricky. Don't do it.
140      *
141      * @param schema ThriftType to update
142      * @return true is success, false otherwise
143      */
144     @Override
145     public boolean updateType(GoodwillSchema schema)
146     {
147         return false;
148     }
149 
150     /**
151      * Give information on how to add a Type in the sink
152      *
153      * @param schema ThriftType to add
154      * @return info how to create a Type in the sink
155      */
156     @Override
157     public String addTypeInfo(GoodwillSchema schema)
158     {
159         String info = String.format("%s\n", getCreateTableStatement(schema));
160 
161         if (config.getSinkFirstExtraSQL() != null) {
162             info += getUnescapedStatement(config.getSinkFirstExtraSQL(), schema);
163 
164             if (config.getSinkSecondExtraSQL() != null) {
165                 info += getUnescapedStatement(config.getSinkSecondExtraSQL(), schema);
166             }
167         }
168 
169         return info;
170     }
171 
172     private Connection connectToNetezza(String host, int port, String db, String username, String password) throws SQLException, ClassNotFoundException
173     {
174         NzDatasource datasource = new NzDatasource();
175         datasource.setHost(host);
176         datasource.setPort(port);
177         datasource.setDatabase(db);
178         datasource.setUser(username);
179         datasource.setPassword(password);
180 
181         Connection connection = datasource.getConnection();
182         connection.setAutoCommit(false);
183 
184         return connection;
185     }
186 }