1 package com.ning.metrics.goodwill.sink;
2
3 import com.google.inject.Inject;
4 import com.ning.metrics.goodwill.access.GoodwillSchema;
5 import com.ning.metrics.goodwill.access.GoodwillSchemaField;
6 import com.ning.metrics.goodwill.binder.config.GoodwillConfig;
7 import org.apache.commons.lang.StringUtils;
8 import org.apache.log4j.Logger;
9 import org.netezza.datasource.NzDatasource;
10
11 import java.io.IOException;
12 import java.sql.Connection;
13 import java.sql.SQLException;
14 import java.sql.Statement;
15
16 public class NetezzaSink implements GoodwillSink
17 {
18 private final Logger log = Logger.getLogger(NetezzaSink.class);
19
20 private final GoodwillConfig config;
21
22 @Inject
23 public NetezzaSink(
24 GoodwillConfig config
25 )
26 {
27 this.config = config;
28 }
29
30
31
32
33
34
35
36
37
38 @Override
39 public boolean addType(GoodwillSchema schema)
40 throws SQLException, IOException, ClassNotFoundException
41 {
42 boolean success = false;
43
44 try {
45 Connection connection = connectToNetezza(config.getSinkDBFirstHost(), config.getSinkDBFirstPort(), config.getSinkDBFirstSchema(),
46 config.getSinkDBFirstUsername(), config.getSinkDBFirstPassword());
47
48 String createTableStatement = getCreateTableStatement(schema);
49 Statement statement = connection.createStatement();
50 statement.addBatch(createTableStatement);
51
52 statement.executeBatch();
53 log.info(String.format("Added Thrift to Netezza: %s", schema.getName()));
54 connection.commit();
55
56 connection.close();
57 success = true;
58
59 if (config.getSinkFirstExtraSQL() != null) {
60 success = executeExtraSql(config.getSinkDBFirstHost(), config.getSinkDBFirstPort(), config.getSinkDBFirstSchema(),
61 config.getSinkDBFirstUsername(), config.getSinkDBFirstPassword(), config.getSinkFirstExtraSQL(), schema);
62
63 if (success && config.getSinkSecondExtraSQL() != null) {
64 success = executeExtraSql(config.getSinkDBSecondHost(), config.getSinkDBSecondPort(), config.getSinkDBSecondSchema(),
65 config.getSinkDBSecondUsername(), config.getSinkDBSecondPassword(), config.getSinkSecondExtraSQL(), schema);
66 }
67 }
68
69 return success;
70 }
71 catch (SQLException e) {
72 log.warn(String.format("Unable to add Type to Netezza: %s", e));
73 return success;
74 }
75 }
76
77 private boolean executeExtraSql(String host, int port, String database, String username,
78 String password, String statement, GoodwillSchema schema)
79 {
80 try {
81 Connection connection = connectToNetezza(host, port, database, username, password);
82
83 String safeSQL = getUnescapedStatement(statement, schema);
84
85 log.info(String.format("Running extra SQL in Netezza: %s", safeSQL));
86 Statement extraStatement = connection.createStatement();
87 extraStatement.execute(safeSQL);
88 connection.commit();
89
90 connection.close();
91 return true;
92 }
93 catch (SQLException e) {
94 log.warn(String.format("Unable to run extra SQL in Netezza: %s", e));
95 return false;
96 }
97 catch (ClassNotFoundException e) {
98 log.warn(String.format("Unable to run extra SQL in Netezza: %s", e));
99 return false;
100 }
101 }
102
103 private String getUnescapedStatement(String statement, GoodwillSchema schema)
104 {
105
106
107 String safeSQL = StringUtils.replace(statement, "\\*", "*");
108 safeSQL = StringUtils.replace(safeSQL, "?", getTableName(schema));
109 return safeSQL;
110 }
111
112 private String getCreateTableStatement(GoodwillSchema schema)
113 {
114 String tableName = getTableName(schema);
115 String statement = String.format("CREATE TABLE %s (", tableName);
116
117 for (GoodwillSchemaField field : schema.getSchema()) {
118 statement += String.format("%s %s,", sanitizeThriftName(field.getName()), field.getFullSQLType());
119 }
120 statement = StringUtils.chop(statement);
121 statement += ") DISTRIBUTE ON RANDOM;";
122
123 return statement;
124 }
125
126 private String getTableName(GoodwillSchema schema)
127 {
128 return String.format(config.getSinkDBTableNameFormat(), sanitizeThriftName(schema.getName()));
129 }
130
131 private String sanitizeThriftName(String name)
132 {
133 return StringUtils.lowerCase(StringUtils.deleteWhitespace(name));
134 }
135
136
137
138
139
140
141
142
143
144 @Override
145 public boolean updateType(GoodwillSchema schema)
146 {
147 return false;
148 }
149
150
151
152
153
154
155
156 @Override
157 public String addTypeInfo(GoodwillSchema schema)
158 {
159 String info = String.format("%s\n", getCreateTableStatement(schema));
160
161 if (config.getSinkFirstExtraSQL() != null) {
162 info += getUnescapedStatement(config.getSinkFirstExtraSQL(), schema);
163
164 if (config.getSinkSecondExtraSQL() != null) {
165 info += getUnescapedStatement(config.getSinkSecondExtraSQL(), schema);
166 }
167 }
168
169 return info;
170 }
171
172 private Connection connectToNetezza(String host, int port, String db, String username, String password) throws SQLException, ClassNotFoundException
173 {
174 NzDatasource datasource = new NzDatasource();
175 datasource.setHost(host);
176 datasource.setPort(port);
177 datasource.setDatabase(db);
178 datasource.setUser(username);
179 datasource.setPassword(password);
180
181 Connection connection = datasource.getConnection();
182 connection.setAutoCommit(false);
183
184 return connection;
185 }
186 }