NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/raw2proc/trunk/raw2proc/raw2proc.py

Revision 211 (checked in by haines, 16 years ago)

corrected lon to negative for (longitude W) in config files; corrected misspelled field in all proc_*.py

  • Property svn:executable set to
Line 
1 #!/usr/bin/env python
2 # Last modified:  Time-stamp: <2008-09-30 17:40:58 haines>
3 """Process raw data to monthly netCDF data files
4
5 This module processes raw ascii- or binary-data from different NCCOOS
6 sensors (ctd, adcp, waves-adcp, met) based on manual or automated
7 operation.  If automated processing, add raw data (level0) from all
8 active sensors to current month's netcdf data files (level1) with the
9 current configuration setting.  If manual processing, determine which
10 configurations to use for requested platform, sensor, and month.
11
12 :Processing steps:
13   0. raw2proc auto or manual for platform, sensor, month
14   1. list of files to process
15   2. parse data
16   3. create, update netcdf
17
18   to-do
19   3. qc (measured) data
20   4. process derived data (and regrid?)
21   5. qc (measured and derived) data flags
22
23 """
24
25 __version__ = "v0.1"
26 __author__ = "Sara Haines <sara_haines@unc.edu>"
27
28 import sys
29 import os
30 import re
31
32 # for production use:
33 # defconfigs='/home/haines/nccoos/raw2proc'
34 # for testing use:
35 # defconfigs='/home/haines/nccoos/test/r2p'
36
37 # define config file location to run under cron
38 defconfigs='/home/haines/nccoos/raw2proc'
39
40 import numpy
41
42 from procutil import *
43 from ncutil import *
44
45 REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*'
46
47 def load_data(inFile):
48     lines=None
49     if os.path.exists(inFile):
50         f = open(inFile, 'r')
51         lines = f.readlines()
52         f.close()
53         if len(lines)<=0:
54             print 'Empty file: '+ inFile           
55     else:
56         print 'File does not exist: '+ inFile
57     return lines
58
59 def import_parser(name):
60     mod = __import__('parsers')
61     parser = getattr(mod, name)
62     return parser
63
64 def import_processors(mod_name):
65     mod = __import__(mod_name)
66     parser = getattr(mod, 'parser')
67     creator = getattr(mod, 'creator')
68     updater = getattr(mod, 'updater')
69     return (parser, creator, updater)
70    
71
72 def get_config(name):
73     """Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')"""
74     components = name.split('.')
75     mod = __import__(components[0])
76     for comp in components[1:]:
77         attr = getattr(mod, comp)
78     return attr
79
80 def find_configs(platform, yyyy_mm, config_dir=''):
81     """Find which configuration files for specified platform and month
82
83     :Parameters:
84        platform : string
85            Platfrom id to process (e.g. 'bogue')
86        yyyy_mm : string
87            Year and month of data to process (e.g. '2007_07')
88
89     :Returns:
90        cns : list of str
91            List of configurations that overlap with desired month
92            If empty [], no configs were found
93     """
94     import glob
95     # list of config files based on platform
96     configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py'))
97     configs.sort()
98     now_dt = datetime.utcnow()
99     now_dt.replace(microsecond=0)
100     # determine when month starts and ends
101     (prev_month, this_month, next_month) = find_months(yyyy_mm)
102     month_start_dt = this_month
103     month_end_dt = next_month - timedelta(seconds=1)
104     # print month_start_dt; print month_end_dt
105     #
106     cns = []
107     for config in configs:
108         # datetime from filename
109         cn = os.path.splitext(os.path.basename(config))[0]
110         cndt = filt_datetime(os.path.basename(config))[0]
111         pi = get_config(cn+'.platform_info')
112         if pi['config_start_date']:
113             config_start_dt = filt_datetime(pi['config_start_date'])[0]
114         elif pi['config_start_date'] == None:
115             config_start_dt = now_dt
116         if pi['config_end_date']:
117             config_end_dt = filt_datetime(pi['config_end_date'])[0]
118         elif pi['config_end_date'] == None:
119             config_end_dt = now_dt
120         #
121         if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \
122                (config_end_dt >= month_start_dt or config_end_dt >= month_end_dt):
123             cns.append(cn)
124     return cns
125
126
127 def find_active_configs(config_dir=''):
128     """Find which configuration files are active
129
130     :Returns:
131        cns : list of str
132            List of configurations that overlap with desired month
133            If empty [], no configs were found
134     """
135     import glob
136     # list of all config files
137     configs = glob.glob(os.path.join(config_dir, '*_config_*.py'))
138     now_dt = datetime.utcnow()
139     now_dt.replace(microsecond=0)
140     #
141     cns = []
142     for config in configs:
143         # datetime from filename
144         cn = os.path.splitext(os.path.basename(config))[0]
145         cndt = filt_datetime(os.path.basename(config))[0]
146         pi = get_config(cn+'.platform_info')
147         if pi['config_end_date'] == None:
148             cns.append(cn)
149     return cns
150
151
152 def find_raw(si, yyyy_mm):
153     """Determine which list of raw files to process for month """
154     import glob
155     # determine when month starts and ends
156     #
157     months = find_months(yyyy_mm)
158     # list all the raw files in prev-month, this-month, and next-month
159     all_raw_files = []
160     for mon in months:
161         mstr = mon.strftime('%Y_%m')
162         gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob'])
163         all_raw_files.extend(glob.glob(gs))
164
165     all_raw_files.sort()
166        
167     # ****** ((SMH) NOTE: Will need to override looking in specific
168     # subdirs of months if all data is contained in one file for long
169     # deployment, such as with adcp binary data.
170
171     #
172     dt_start = si['proc_start_dt']-timedelta(days=1)
173     dt_end = si['proc_end_dt']+timedelta(days=1)
174     raw_files = []; raw_dts = []
175     # compute datetime for each file
176     for fn in all_raw_files:
177         # JC changes
178         fndt_tuple = filt_datetime(os.path.basename(fn))
179         fndt = fndt_tuple[0]
180
181         # "ind" var from filt_datetime() - what level of granularity was used
182         granularity = fndt_tuple[1]
183         if granularity == 4:
184             # change dt_start to before monthly filename filt_datetime() date
185             dt_start = si['proc_start_dt']-timedelta(days=31)
186             # print dt_start
187         # end JC changes
188         if fndt:
189             if dt_start <= fndt <= dt_end:
190                 raw_files.append(fn)
191                 raw_dts.append(fndt)
192     return (raw_files, raw_dts)
193
194 def which_raw(pi, raw_files, dts):
195     """Further limit file names based on configuration file timeframe """
196
197     now_dt = datetime.utcnow()
198     now_dt.replace(microsecond=0)
199     if pi['config_start_date']:
200         config_start_dt = filt_datetime(pi['config_start_date'])[0]
201     elif pi['config_start_date'] == None:
202         config_start_dt = now_dt
203
204     if pi['config_end_date']:
205         config_end_dt = filt_datetime(pi['config_end_date'])[0]
206     elif pi['config_end_date'] == None:
207         config_end_dt = now_dt
208    
209     new_list = [raw_files[i] for i in range(len(raw_files)) \
210                      if config_start_dt <= dts[i] <= config_end_dt]
211
212     if not new_list:
213         new_list = [raw_files[i] for i in range(len(raw_files)) \
214                     if dts[i] <= config_end_dt]
215        
216     return new_list
217        
218
219 def raw2proc(proctype, platform=None, package=None, yyyy_mm=None):
220     """
221     Process data either in auto-mode or manual-mode
222
223     If auto-mode, process newest data for all platforms, all
224     sensors. Otherwise in manual-mode, process data for specified
225     platform, sensor package, and month.
226
227     :Parameters:
228        proctype : string
229            'auto' or 'manual'
230
231        platform : string
232            Platfrom id to process (e.g. 'bogue')
233        package : string
234            Sensor package id to process (e.g. 'adcp')
235        yyyy_mm : string
236            Year and month of data to process (e.g. '2007_07')
237
238     Examples
239     --------
240     >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06')
241     >>> raw2proc('manual', 'bogue', 'adcp', '2007_06')
242          
243     """
244     print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC")
245
246     if proctype == 'auto':
247         print 'Processing in auto-mode, all platforms, all packages, latest data'
248         auto()
249     elif proctype == 'manual':
250         if platform and package and yyyy_mm:
251             print 'Processing in manually ...'
252             print ' ...  platform id : %s' % platform
253             print ' ... package name : %s' % package
254             print ' ...        month : %s' % yyyy_mm
255             print ' ...  starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
256             manual(platform, package, yyyy_mm)
257         else:
258             print 'raw2proc: Manual operation requires platform, package, and month'
259             print "   >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')"
260     else:
261         print 'raw2proc: requires either auto or manual operation'
262
263
264 def auto():
265     """Process all platforms, all packages, latest data
266
267     Notes
268     -----
269    
270     1. determine which platforms (all platforms with currently active
271        config files i.e. config_end_date is None
272     2. for each platform
273          get latest config
274          for each package
275            (determine process for 'latest' data) copy to new area when grabbed
276            parse recent data
277            yyyy_mm is the current month
278            load this months netcdf, if new month, create this months netcdf
279            update modified date and append new data in netcdf
280            
281     """
282     yyyy_mm = this_month()
283     months = find_months(yyyy_mm)
284     month_start_dt = months[1]
285     month_end_dt = months[2] - timedelta(seconds=1)
286
287     configs = find_active_configs(config_dir=defconfigs)
288     if configs:
289         # for each configuration
290         for cn in configs:
291             print ' ... config file : %s' % cn
292             pi = get_config(cn+'.platform_info')
293             asi = get_config(cn+'.sensor_info')
294             platform = pi['id']
295             # for each sensor package
296             for package in asi.keys():
297                 print ' ... package name : %s' % package
298                 si = asi[package]
299                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
300                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
301                 si['proc_start_dt'] = month_start_dt
302                 si['proc_end_dt'] = month_end_dt
303                 if os.path.exists(ofn):
304                     # get last dt from current month file
305                     (es, units) = nc_get_time(ofn)
306                     last_dt = es2dt(es[-1])
307                     # if older than month_start_dt use it instead to only process newest data
308                     if last_dt>=month_start_dt:
309                         si['proc_start_dt'] = last_dt
310
311                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
312                 raw_files = which_raw(pi, raw_files, raw_dts)
313                 if raw_files:
314                     process(pi, si, raw_files, yyyy_mm)
315                 else:
316                     print ' ... ... \nNOTE: no raw files found for %s %s for %s\n' % (package, platform, yyyy_mm)
317
318                 # update latest data for SECOORA commons
319                 if 'latest_dir' in si.keys():
320                     print ' ... ... latest : %s ' % si['latest_dir']
321                     proc2latest(pi, si, yyyy_mm)
322     #
323     else:
324         print ' ... ... ... \nNOTE: No active platforms\n'
325
326 def manual(platform, package, yyyy_mm):
327     """Process data for specified platform, sensor package, and month
328
329     Notes
330     -----
331    
332     1. determine which configs
333     2. for each config for specific platform
334            if have package in config
335                which raw files
336     """
337      # determine when month starts and ends
338     months = find_months(yyyy_mm)
339     month_start_dt = months[1]
340     month_end_dt = months[2] - timedelta(seconds=1)
341    
342     configs = find_configs(platform, yyyy_mm, config_dir=defconfigs)
343
344     if configs:
345         # for each configuration
346         for index in range(len(configs)):
347             cn = configs[index]
348             print ' ... config file : %s' % cn
349             pi = get_config(cn+'.platform_info')
350             # month start and end dt to pi info
351             asi = get_config(cn+'.sensor_info')
352             if package in pi['packages']:
353                 si = asi[package]
354                 if si['utc_offset']:
355                     print ' ... ... utc_offset : %g (hours)' % si['utc_offset']
356                 si['proc_start_dt'] = month_start_dt
357                 si['proc_end_dt'] = month_end_dt
358                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
359                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
360                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
361                 raw_files = which_raw(pi, raw_files, raw_dts)
362                 # print raw_files
363                 # remove any previous netcdf file (platform_package_yyyy_mm.nc)
364                 if index==0  and os.path.exists(ofn):
365                     os.remove(ofn)
366                 # this added just in case data repeated in data files
367                 if os.path.exists(ofn):
368                     # get last dt from current month file
369                     (es, units) = nc_get_time(ofn)
370                     last_dt = es2dt(es[-1])
371                     # if older than month_start_dt use it instead to only process newest data
372                     if last_dt>=month_start_dt:
373                         si['proc_start_dt'] = last_dt
374
375                 if raw_files:
376                     process(pi, si, raw_files, yyyy_mm)
377                 else:
378                     print ' ... ... \nNOTE: no raw files found for %s %s for %s\n' % (package, platform, yyyy_mm)
379                
380             else:
381                 print ' ... ... \nNOTE: %s not operational on %s for %s\n' % (package, platform, yyyy_mm)               
382     else:
383         print ' ... ... ... \nNOTE: %s not operational for %s\n' % (platform, yyyy_mm)
384    
385 def process(pi, si, raw_files, yyyy_mm):
386     # tailored data processing for different input file formats and control over output
387     (parse, create, update) = import_processors(si['process_module'])
388     for fn in raw_files:
389         # sys.stdout.write('... %s ... ' % fn)
390         # attach file name to sensor info so parser can use it, if needed
391         si['fn'] = fn
392         lines = load_data(fn)
393         if lines:
394             data = parse(pi, si, lines)
395             # determine which index of data is within the specified timeframe (usually the month)
396             n = len(data['dt'])
397             data['in'] = numpy.array([False for i in range(n)])
398
399             for index, val in enumerate(data['dt']):
400                 if val>si['proc_start_dt'] and val<=si['proc_end_dt']:
401                     data['in'][index] = True
402                    
403             # if any records are in the month then write to netcdf
404             if data['in'].any():
405                 sys.stdout.write('... %s ... ' % fn)
406                 sys.stdout.write('%d\n' % len(data['in'].nonzero()[0]))
407                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
408                 # update or create netcdf
409                 if os.path.exists(ofn):
410                     ut = update(pi,si,data)
411                     nc_update(ofn, ut)
412                 else:
413                     ct = create(pi,si,data)
414                     nc_create(ofn, ct)
415         else:
416             # if no lines, file was empty
417             print " ... skipping file %s" % (fn,)
418
419    
420 def proc2latest(pi, si, yyyy_mm):
421     """Select specific variables and times from current monthly netCDF
422     and post as latest data.  TEST MODE.
423
424     For each active config file, load specific variables from NCCOOS
425     monthly netCDF, make any necessary changes to data or attributes
426     conform to SEACOOS Data Model, subset data (last 48 hours), and
427     create new netCDF file in latest netCDF directory.
428
429     NOTE: In test mode right now. See auto() function for similar action.
430
431     """
432     platform = pi['id']
433     package = si['id']
434     si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
435     ifn = os.path.join(si['proc_dir'], si['proc_filename'])
436     if os.path.exists(ifn):
437         # get dt from current month file
438         (es, units) = nc_get_time(ifn)
439         dt = [es2dt(e) for e in es]
440         last_dt = dt[-1]
441
442     # determine which index of data is within the specified timeframe (last 2 days)
443     n = len(dt)
444     idx = numpy.array([False for i in range(n)])
445     for i, val in enumerate(dt):
446         if val>last_dt-timedelta(days=2) and val<=last_dt+timedelta(seconds=360):
447             idx[i] = True
448     dt = numpy.array(dt)
449     dt = dt[idx]
450
451     # read in data and unpack tuple
452     d = nc_load(ifn, si['latest_vars'])
453     global_atts, var_atts, dim_inits, var_inits, var_data = d
454     list_of_record_vars = nc_find_record_vars(ifn)
455
456     # find unlimited dimension (a dimension length of zero) and set to length of dt
457     # SECOORA scout does not understand of zero (0) also replace dim names with following names
458     # e.g. ntime for time
459     dees = [['ntime','time'], ['nlat','lat'], ['nlon', 'lon'], ['nz', 'z'],['ndir','dir'],['nfreq','freq']]
460     dim_inits = list(dim_inits)
461     for j in range(len(dees)):
462         for i in range(len(dim_inits)):
463             if dim_inits[i][0]==dees[j][0]:  # 'ntime'
464                 dim_inits[i] = (dees[j][1], dim_inits[i][1])
465     for i in range(len(dim_inits)):
466         if dim_inits[i][1]==0:
467             dim_inits[i] = ('time', len(dt))       
468     dim_inits = tuple(dim_inits)
469     # print dim_inits
470
471     # replace name of dependent dimension names with speificied names for CDL v2.0 in variables
472     # e.g. ntime for time
473     var_inits = list(var_inits)
474     for j in range(len(dees)):
475         for i in range(len(var_inits)):
476             v = list(var_inits[i])
477             dl = list(v[2])
478             for k in range(len(dl)):
479                 if dl[k]==dees[j][0]:
480                     dl[k]=dees[j][1] # 'time'
481             var_inits[i]=(v[0], v[1], tuple(dl))
482     var_inits = tuple(var_inits)
483     # print var_inits
484                                
485     # subset data
486     varNames = [vn for vn, vt, vd in var_inits]
487     var_data = list(var_data)
488     for i in range(len(varNames)):
489         vn, vd = var_data[i]
490         if vn in list_of_record_vars:
491             var_data[i]=(vn, vd[idx])
492     var_data = tuple(var_data)
493     global_atts['start_date'] = dt[0].strftime('%Y-%m-%d %H:%M:%S')
494
495     # write latest data
496     si['latest_filename'] = 'nccoos_%s_%s_latest.nc' % (platform, package)
497     ofn = os.path.join(si['latest_dir'], si['latest_filename'])
498     d = (global_atts, var_atts, dim_inits, var_inits, var_data)
499    
500     nc_create(ofn, d)
501
502                                                
503 # globals
504 start_dt = datetime.utcnow()
505 start_dt.replace(microsecond=0)
506
507 if __name__ == "__main__":
508     import optparse
509     raw2proc('auto')
510
511     # for testing
512     # proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07'
513     # raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')
Note: See TracBrowser for help on using the browser.