View Javadoc

1   /*
2    * Copyright 2010 Ning, Inc.
3    *
4    * Ning licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.ning.metrics.goodwill.store;
18  
19  import au.com.bytecode.opencsv.CSVReader;
20  import com.google.inject.Inject;
21  import com.google.inject.Singleton;
22  import com.ning.metrics.goodwill.access.GoodwillSchema;
23  import com.ning.metrics.goodwill.access.GoodwillSchemaField;
24  import com.ning.metrics.goodwill.binder.config.GoodwillConfig;
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.log4j.Logger;
27  
28  import java.io.FileReader;
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.List;
36  
37  @Singleton
38  public class CSVFileStore extends GoodwillStore
39  {
40      private final Logger log = Logger.getLogger(CSVFileStore.class);
41  
42      private String fileName;
43  
44      @Inject
45      public CSVFileStore(
46          GoodwillConfig config
47      ) throws IOException
48      {
49          fileName = config.getCSVFilePath();
50          parseFile();
51      }
52  
53      public void parseFile() throws IOException
54      {
55          CSVReader reader = new CSVReader(new FileReader(fileName));
56          log.info(String.format("Read CSV file: %s", fileName));
57          List<String[]> entries = reader.readAll();
58  
59          GoodwillSchema currentSchema = null;
60          String currentSchemaName = null;
61  
62          /**
63           * CSV file format:
64           *
65           * "TermFrequency",1,"i64","app_id",1,2147483647
66           * "TermFrequency",2,"string","subdomain",1,2147483647
67           * "TermFrequency",3,"string","term_freq_json",1,2147483647
68           * "SpamMarkEvent",19,"string","abuse_type",8,2147483647
69           * ...
70           *
71           * TODO: extend file format with extra sql fields
72           */
73          HashMap<String, GoodwillSchema> schemata = new HashMap<String, GoodwillSchema>();
74          for (Object entry : entries) {
75              short position;
76              GoodwillSchemaField thriftField;
77              String[] line = (String[]) entry;
78  
79              try {
80                  position = Short.valueOf(line[1]);
81              }
82              catch (NumberFormatException e) {
83                  log.warn(String.format("Ignoring malformed line: %s", StringUtils.join(line, ",")));
84                  continue;
85              }
86  
87              try {
88                  thriftField = new GoodwillSchemaField(line[3], line[2], position, null, null, null, null, null);
89              }
90              catch (IllegalArgumentException e) {
91                  log.warn(String.format("Ignoring unsupported type <%s>: %s", line[2], StringUtils.join(line, ",")));
92                  continue;
93              }
94  
95              if (currentSchemaName == null || !line[0].equals(currentSchemaName)) {
96                  currentSchemaName = line[0];
97                  currentSchema = new GoodwillSchema(currentSchemaName, new ArrayList<GoodwillSchemaField>());
98                  schemata.put(currentSchemaName, currentSchema);
99                  log.debug(String.format("Found new ThriftType thriftField to: %s", currentSchemaName));
100             }
101 
102             currentSchema.addThriftField(thriftField);
103             log.debug(String.format("Added ThriftField to %s: %s", currentSchemaName, thriftField.toString()));
104         }
105 
106         this.goodwillSchemata = schemata;
107     }
108 
109     @Override
110     public Collection<GoodwillSchema> getTypes() throws IOException
111     {
112         parseFile();
113 
114         final ArrayList<GoodwillSchema> thriftTypesList = new ArrayList(goodwillSchemata.values());
115         Collections.sort(thriftTypesList, new Comparator<GoodwillSchema>()
116         {
117             @Override
118             public int compare(GoodwillSchema o, GoodwillSchema o1)
119             {
120                 return o.getName().compareTo(o1.getName());
121             }
122         });
123 
124         if (sink != null) {
125             for (int i = 0; i < thriftTypesList.size(); i++) {
126                 GoodwillSchema schema = thriftTypesList.get(i);
127                 schema.setSinkAddInfo(sink.addTypeInfo(schema));
128                 thriftTypesList.set(i, schema);
129 
130             }
131         }
132 
133         return thriftTypesList;
134     }
135 
136     /**
137      * Add a new type to the store
138      *
139      * @param schema GoodwillSchema to add
140      */
141     @Override
142     public void addType(GoodwillSchema schema)
143     {
144         goodwillSchemata.put(schema.getName(), schema);
145     }
146 
147     /**
148      * Update a type to the store
149      *
150      * @param schema GoodwillSchema to update
151      */
152     @Override
153     public boolean updateType(GoodwillSchema schema)
154     {
155         // Seek etc. Painful here
156         return false;
157     }
158 
159     /**
160      * Delete a type
161      *
162      * @param schema GoodwillSchema to delete
163      * @return true is success, false otherwise
164      */
165     @Override
166     public boolean deleteType(GoodwillSchema schema)
167     {
168         return false;
169     }
170 }