NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/raw2proc/trunk/raw2proc/raw2proc.py

Revision 478 (checked in by haines, 12 years ago)

Add Billy Mitchell config for 15-195 meters altitude.

  • Property svn:executable set to
Line 
1 #!/usr/bin/env python
2 # Last modified:  Time-stamp: <2011-12-16 17:05:08 haines>
3 """Process raw data to monthly netCDF data files
4
5 This module processes raw ascii- or binary-data from different NCCOOS
6 sensors (ctd, adcp, waves-adcp, met) based on manual or automated
7 operation.  If automated processing, add raw data (level0) from all
8 active sensors to current month's netcdf data files (level1) with the
9 current configuration setting.  If manual processing, determine which
10 configurations to use for requested platform, sensor, and month.
11
12 :Processing steps:
13   0. raw2proc auto or manual for platform, sensor, month
14   1. list of files to process
15   2. parse data
16   3. create, update netcdf
17
18   to-do
19   3. qc (measured) data
20   4. process derived data (and regrid?)
21   5. qc (measured and derived) data flags
22
23 """
24
25 __version__ = "v0.1"
26 __author__ = "Sara Haines <sara_haines@unc.edu>"
27
28 import sys
29 import os
30 import re
31 import traceback
32
33 # for production use:
34 # defconfigs='/home/haines/nccoos/raw2proc'
35 # for testing use:
36 # defconfigs='/home/haines/nccoos/test/r2p'
37
38 # define config file location to run under cron
39 defconfigs='/opt/env/haines/dataproc/raw2proc'
40
41 import numpy
42
43 from procutil import *
44 from ncutil import *
45
46 REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*'
47 NAN_RE_STR = '[Nn][Aa][Nn]'
48
49 def load_data(inFile):
50     lines=None
51     if os.path.exists(inFile):
52         f = open(inFile, 'r')
53         lines = f.readlines()
54         f.close()
55         if len(lines)<=0:
56             print 'Empty file: '+ inFile           
57     else:
58         print 'File does not exist: '+ inFile
59     return lines
60
61 def import_parser(name):
62     mod = __import__('parsers')
63     parser = getattr(mod, name)
64     return parser
65
66 def import_processors(mod_name):
67     mod = __import__(mod_name)
68     parser = getattr(mod, 'parser')
69     creator = getattr(mod, 'creator')
70     updater = getattr(mod, 'updater')
71     return (parser, creator, updater)
72    
73
74 def get_config(name):
75     """Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')"""
76     components = name.split('.')
77     mod = __import__(components[0])
78     for comp in components[1:]:
79         attr = getattr(mod, comp)
80     return attr
81
82 def find_configs(platform, yyyy_mm, config_dir=''):
83     """Find which configuration files for specified platform and month
84
85     :Parameters:
86        platform : string
87            Platfrom id to process (e.g. 'bogue')
88        yyyy_mm : string
89            Year and month of data to process (e.g. '2007_07')
90
91     :Returns:
92        cns : list of str
93            List of configurations that overlap with desired month
94            If empty [], no configs were found
95     """
96     import glob
97     # list of config files based on platform
98     configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py'))
99     configs.sort()
100     now_dt = datetime.utcnow()
101     now_dt.replace(microsecond=0)
102     # determine when month starts and ends
103     (prev_month, this_month, next_month) = find_months(yyyy_mm)
104     month_start_dt = this_month
105     month_end_dt = next_month - timedelta(seconds=1)
106     # print month_start_dt; print month_end_dt
107     #
108     cns = []
109     for config in configs:
110         # datetime from filename
111         cn = os.path.splitext(os.path.basename(config))[0]
112         cndt = filt_datetime(os.path.basename(config))
113         pi = get_config(cn+'.platform_info')
114         if pi['config_start_date']:
115             config_start_dt = filt_datetime(pi['config_start_date'])
116         elif pi['config_start_date'] == None:
117             config_start_dt = now_dt
118         if pi['config_end_date']:
119             config_end_dt = filt_datetime(pi['config_end_date'])
120         elif pi['config_end_date'] == None:
121             config_end_dt = now_dt
122         #
123         if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \
124                (config_end_dt >= month_start_dt or config_end_dt >= month_end_dt):
125             cns.append(cn)
126     return cns
127
128
129 def find_active_configs(config_dir=''):
130     """Find which configuration files are active
131
132     :Returns:
133        cns : list of str
134            List of configurations that overlap with desired month
135            If empty [], no configs were found
136     """
137     import glob
138     # list of all config files
139     configs = glob.glob(os.path.join(config_dir, '*_config_*.py'))
140     now_dt = datetime.utcnow()
141     now_dt.replace(microsecond=0)
142     #
143     cns = []
144     for config in configs:
145         # datetime from filename
146         cn = os.path.splitext(os.path.basename(config))[0]
147         cndt = filt_datetime(os.path.basename(config))
148         pi = get_config(cn+'.platform_info')
149         if pi['config_end_date'] == None:
150             cns.append(cn)
151     return cns
152
153
154 def find_raw(si, yyyy_mm):
155     """Determine which list of raw files to process for month """
156     import glob
157
158     months = find_months(yyyy_mm)
159     # list all the raw files in prev-month, this-month, and next-month
160     all_raw_files = []
161     m = re.search('\d{4}_\d{2}$', si['raw_dir'])
162     if m:
163         # look for raw_file_glob in specific directory ending in YYYY_MM
164         # but look no further. 
165         gs = os.path.join(si['raw_dir'], si['raw_file_glob'])
166         all_raw_files.extend(glob.glob(gs))
167     else:
168         # no YYYY_MM at end of raw_dir then look for files
169         # in prev-month, this-month, and next-month
170         for mon in months:
171             mstr = mon.strftime('%Y_%m')
172             gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob'])
173             all_raw_files.extend(glob.glob(gs))
174            
175     all_raw_files.sort()
176        
177     #
178     dt_start = si['proc_start_dt']-timedelta(days=1)
179     dt_end = si['proc_end_dt']+timedelta(days=1)
180     raw_files = []; raw_dts = []
181     # compute datetime for each file
182     for fn in all_raw_files:
183         (fndt, granularity) = filt_datetime(os.path.basename(fn), gran=True)
184
185         # "ind" var from filt_datetime() - what level of granularity was used
186         if granularity == 4:
187             # change dt_start to before monthly filename filt_datetime() date
188             # for filenames with just YYYY_MM or YYYYMM add or substract 30 days to
189             # see if it falls within config range.  It won't hurt to add names to files
190             # parsed.
191             dt_start = si['proc_start_dt']-timedelta(days=31)
192             # print dt_start
193
194         if fndt:
195             if dt_start <= fndt <= dt_end or m:
196                 raw_files.append(fn)
197                 raw_dts.append(fndt)
198     return (raw_files, raw_dts)
199
200 def which_raw(pi, raw_files, dts):
201     """Further limit file names based on configuration file timeframe """
202
203     now_dt = datetime.utcnow()
204     now_dt.replace(microsecond=0)
205     if pi['config_start_date']:
206         config_start_dt = filt_datetime(pi['config_start_date'])
207     elif pi['config_start_date'] == None:
208         config_start_dt = now_dt
209
210     if pi['config_end_date']:
211         config_end_dt = filt_datetime(pi['config_end_date'])
212     elif pi['config_end_date'] == None:
213         config_end_dt = now_dt
214
215     # # check if raw_files are monthly (granularity == 4)
216     #     for idx, fn in enumerate(raw_files):
217     #         (fndt, granularity) = filt_datetime(os.path.basename(fn), gran=True)
218     #   # "ind" var from filt_datetime() - what level of granularity was used
219     #         if granularity == 4:
220     #             if fndt < config_start_dt:
221     #                 dts[idx] = fndt + timedelta(days=31)
222     #             if fndt > config_end_dt:
223     #                 dts[idx] = fndt - timedelta(days=31)
224
225     new_list = [raw_files[i] for i in range(len(raw_files)) \
226                      if config_start_dt <= dts[i] <= config_end_dt]
227
228     if not new_list:
229         new_list = [raw_files[i] for i in range(len(raw_files)) \
230                     if dts[i] <= config_end_dt]
231        
232     return new_list
233        
234
235 def raw2proc(proctype, platform=None, package=None, yyyy_mm=None):
236     """
237     Process data either in auto-mode or manual-mode
238
239     If auto-mode, process newest data for all platforms, all
240     sensors. Otherwise in manual-mode, process data for specified
241     platform, sensor package, and month.
242
243     :Parameters:
244        proctype : string
245            'auto' or 'manual'
246
247        platform : string
248            Platfrom id to process (e.g. 'bogue')
249        package : string
250            Sensor package id to process (e.g. 'adcp')
251        yyyy_mm : string
252            Year and month of data to process (e.g. '2007_07')
253
254     Examples
255     --------
256     >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06')
257     >>> raw2proc('manual', 'bogue', 'adcp', '2007_06')
258          
259     """
260     print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC")
261
262     if proctype == 'auto':
263         print 'Processing in auto-mode, all platforms, all packages, latest data'
264         auto()
265     elif proctype == 'manual':
266         if platform and package and yyyy_mm:
267             print 'Processing in manually ...'
268             print ' ...  platform id : %s' % platform
269             print ' ... package name : %s' % package
270             print ' ...        month : %s' % yyyy_mm
271             print ' ...  starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
272             manual(platform, package, yyyy_mm)
273         else:
274             print 'raw2proc: Manual operation requires platform, package, and month'
275             print "   >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')"
276     else:
277         print 'raw2proc: requires either auto or manual operation'
278
279
280 def auto():
281     """Process all platforms, all packages, latest data
282
283     Notes
284     -----
285    
286     1. determine which platforms (all platforms with currently active
287        config files i.e. config_end_date is None
288     2. for each platform
289          get latest config
290          for each package
291            (determine process for 'latest' data) copy to new area when grabbed
292            parse recent data
293            yyyy_mm is the current month
294            load this months netcdf, if new month, create this months netcdf
295            update modified date and append new data in netcdf
296            
297     """
298     yyyy_mm = this_month()
299     months = find_months(yyyy_mm)
300     month_start_dt = months[1]
301     month_end_dt = months[2] - timedelta(seconds=1)
302
303     configs = find_active_configs(config_dir=defconfigs)
304     if configs:
305         # for each configuration
306         for cn in configs:
307             print ' ... config file : %s' % cn
308             pi = get_config(cn+'.platform_info')
309             asi = get_config(cn+'.sensor_info')
310             platform = pi['id']
311             # for each sensor package
312             for package in asi.keys():
313                 try: # if package files, try next package
314                     print ' ... package name : %s' % package
315                     si = asi[package]
316                     si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
317                     ofn = os.path.join(si['proc_dir'], si['proc_filename'])
318                     si['proc_start_dt'] = month_start_dt
319                     si['proc_end_dt'] = month_end_dt
320                     if os.path.exists(ofn):
321                         # get last dt from current month file
322                         (es, units) = nc_get_time(ofn)
323                         last_dt = es2dt(es[-1])
324                         # if older than month_start_dt use it instead to only process newest data
325                         if last_dt>=month_start_dt:
326                             si['proc_start_dt'] = last_dt
327
328                     (raw_files, raw_dts) = find_raw(si, yyyy_mm)
329                     raw_files = which_raw(pi, raw_files, raw_dts)
330                     if raw_files:
331                         process(pi, si, raw_files, yyyy_mm)
332                     else:
333                         print ' ... ... NOTE: no new raw files found'
334
335                     # update latest data for SECOORA commons
336                     if 'latest_dir' in si.keys():
337                         # print ' ... ... latest : %s ' % si['latest_dir']
338                         proc2latest(pi, si, yyyy_mm)
339
340                     if 'csv_dir' in si.keys():
341                         proc2csv(pi, si, yyyy_mm)
342                 except:
343                     traceback.print_exc()
344     #
345     else:
346         print ' ... ... NOTE: No active platforms'
347
348 def manual(platform, package, yyyy_mm):
349     """Process data for specified platform, sensor package, and month
350
351     Notes
352     -----
353    
354     1. determine which configs
355     2. for each config for specific platform
356            if have package in config
357                which raw files
358     """
359      # determine when month starts and ends
360     months = find_months(yyyy_mm)
361     month_start_dt = months[1]
362     month_end_dt = months[2] - timedelta(seconds=1)
363    
364     configs = find_configs(platform, yyyy_mm, config_dir=defconfigs)
365
366     if configs:
367         # for each configuration
368         for index in range(len(configs)):
369             cn = configs[index]
370             print ' ... config file : %s' % cn
371             pi = get_config(cn+'.platform_info')
372             # month start and end dt to pi info
373             asi = get_config(cn+'.sensor_info')
374             if package in pi['packages']:
375                 si = asi[package]
376                 if si['utc_offset']:
377                     print ' ... ... utc_offset : %g (hours)' % si['utc_offset']
378                 si['proc_start_dt'] = month_start_dt
379                 si['proc_end_dt'] = month_end_dt
380                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
381                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
382                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
383                 print raw_files
384                 print raw_dts
385                 raw_files = which_raw(pi, raw_files, raw_dts)
386                 print raw_files
387                 print raw_dts
388                 # remove any previous netcdf file (platform_package_yyyy_mm.nc)
389                 if index==0  and os.path.exists(ofn):
390                     os.remove(ofn)
391                 # this added just in case data repeated in data files
392                 if os.path.exists(ofn):
393                     # get last dt from current month file
394                     (es, units) = nc_get_time(ofn)
395                     last_dt = es2dt(es[-1])
396                     # if older than month_start_dt use it instead to only process newest data
397                     if last_dt>=month_start_dt:
398                         si['proc_start_dt'] = last_dt
399
400                 if raw_files:
401                     process(pi, si, raw_files, yyyy_mm)
402                 else:
403                     print ' ... ... NOTE: no raw files found for %s %s for %s' % (package, platform, yyyy_mm)
404                
405             else:
406                 print ' ... ... NOTE: %s not operational on %s for %s' % (package, platform, yyyy_mm)               
407     else:
408         print ' ... ... ... NOTE: %s not operational for %s' % (platform, yyyy_mm)
409    
410 def process(pi, si, raw_files, yyyy_mm):
411     # tailored data processing for different input file formats and control over output
412     (parse, create, update) = import_processors(si['process_module'])
413     for fn in raw_files:
414         # sys.stdout.write('... %s ... ' % fn)
415         # attach file name to sensor info so parser can use it, if needed
416         si['fn'] = fn
417         lines = load_data(fn)
418         if lines:
419             data = parse(pi, si, lines)
420             # determine which index of data is within the specified timeframe (usually the month)
421             n = len(data['dt'])
422             data['in'] = numpy.array([False for i in range(n)])
423
424             for index, val in enumerate(data['dt']):
425                 if val>=si['proc_start_dt'] and val<=si['proc_end_dt']:
426                     data['in'][index] = True
427                    
428             # if any records are in the month then write to netcdf
429             if data['in'].any():
430                 sys.stdout.write(' ... %s ... ' % fn)
431                 sys.stdout.write('%d\n' % len(data['in'].nonzero()[0]))
432                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
433                 # update or create netcdf
434                 if os.path.exists(ofn):
435                     ut = update(pi,si,data)
436                     nc_update(ofn, ut)
437                 else:
438                     ct = create(pi,si,data)
439                     nc_create(ofn, ct)
440         else:
441             # if no lines, file was empty
442             print " ... skipping file %s" % (fn,)
443
444    
445 # globals
446 start_dt = datetime.utcnow()
447 start_dt.replace(microsecond=0)
448
449 if __name__ == "__main__":
450     import optparse
451     raw2proc('auto')
452
453     # for testing
454     # proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07'
455     # raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')
Note: See TracBrowser for help on using the browser.