NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/raw2proc/trunk/raw2proc/raw2proc.py

Revision 179 (checked in by haines, 16 years ago)

jpier met back online; handle month or larger raw files in raw2proc.which_raw()

  • Property svn:executable set to
Line 
1 #!/usr/bin/env python
2 # Last modified:  Time-stamp: <2008-07-23 15:43:33 haines>
3 """Process raw data to monthly netCDF data files
4
5 This module processes raw ascii- or binary-data from different NCCOOS
6 sensors (ctd, adcp, waves-adcp, met) based on manual or automated
7 operation.  If automated processing, add raw data (level0) from all
8 active sensors to current month's netcdf data files (level1) with the
9 current configuration setting.  If manual processing, determine which
10 configurations to use for requested platform, sensor, and month.
11
12 :Processing steps:
13   0. raw2proc auto or manual for platform, sensor, month
14   1. list of files to process
15   2. parse data
16   3. create, update netcdf
17
18   to-do
19   3. qc (measured) data
20   4. process derived data (and regrid?)
21   5. qc (measured and derived) data flags
22
23 """
24
25 __version__ = "v0.1"
26 __author__ = "Sara Haines <sara_haines@unc.edu>"
27
28 import sys
29 import os
30 import re
31
32 # define config file location to run under cron
33 defconfigs='/home/haines/nccoos/raw2proc'
34
35 import numpy
36
37 from procutil import *
38 from ncutil import *
39
40 REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*'
41
42 def load_data(inFile):
43     lines=None
44     if os.path.exists(inFile):
45         f = open(inFile, 'r')
46         lines = f.readlines()
47         f.close()
48         if len(lines)<=0:
49             print 'Empty file: '+ inFile           
50     else:
51         print 'File does not exist: '+ inFile
52     return lines
53
54 def import_parser(name):
55     mod = __import__('parsers')
56     parser = getattr(mod, name)
57     return parser
58
59 def import_processors(mod_name):
60     mod = __import__(mod_name)
61     parser = getattr(mod, 'parser')
62     creator = getattr(mod, 'creator')
63     updater = getattr(mod, 'updater')
64     return (parser, creator, updater)
65    
66
67 def get_config(name):
68     """Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')"""
69     components = name.split('.')
70     mod = __import__(components[0])
71     for comp in components[1:]:
72         attr = getattr(mod, comp)
73     return attr
74
75 def find_configs(platform, yyyy_mm, config_dir=''):
76     """Find which configuration files for specified platform and month
77
78     :Parameters:
79        platform : string
80            Platfrom id to process (e.g. 'bogue')
81        yyyy_mm : string
82            Year and month of data to process (e.g. '2007_07')
83
84     :Returns:
85        cns : list of str
86            List of configurations that overlap with desired month
87            If empty [], no configs were found
88     """
89     import glob
90     # list of config files based on platform
91     configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py'))
92     now_dt = datetime.utcnow()
93     now_dt.replace(microsecond=0)
94     # determine when month starts and ends
95     (prev_month, this_month, next_month) = find_months(yyyy_mm)
96     month_start_dt = this_month
97     month_end_dt = next_month - timedelta(seconds=1)
98     # print month_start_dt; print month_end_dt
99     #
100     cns = []
101     for config in configs:
102         # datetime from filename
103         cn = os.path.splitext(os.path.basename(config))[0]
104         cndt = filt_datetime(os.path.basename(config))[0]
105         pi = get_config(cn+'.platform_info')
106         if pi['config_start_date']:
107             config_start_dt = filt_datetime(pi['config_start_date'])[0]
108         elif pi['config_start_date'] == None:
109             config_start_dt = now_dt
110         if pi['config_end_date']:
111             config_end_dt = filt_datetime(pi['config_end_date'])[0]
112         elif pi['config_end_date'] == None:
113             config_end_dt = now_dt
114         #
115         if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \
116                (config_end_dt >= month_start_dt or config_end_dt >= month_end_dt):
117             cns.append(cn)
118     return cns
119
120
121 def find_active_configs(config_dir=''):
122     """Find which configuration files are active
123
124     :Returns:
125        cns : list of str
126            List of configurations that overlap with desired month
127            If empty [], no configs were found
128     """
129     import glob
130     # list of all config files
131     configs = glob.glob(os.path.join(config_dir, '*_config_*.py'))
132     now_dt = datetime.utcnow()
133     now_dt.replace(microsecond=0)
134     #
135     cns = []
136     for config in configs:
137         # datetime from filename
138         cn = os.path.splitext(os.path.basename(config))[0]
139         cndt = filt_datetime(os.path.basename(config))[0]
140         pi = get_config(cn+'.platform_info')
141         if pi['config_end_date'] == None:
142             cns.append(cn)
143     return cns
144
145
146 def find_raw(si, yyyy_mm):
147     """Determine which list of raw files to process for month """
148     import glob
149     # determine when month starts and ends
150     #
151     months = find_months(yyyy_mm)
152     # list all the raw files in prev-month, this-month, and next-month
153     all_raw_files = []
154     for mon in months:
155         mstr = mon.strftime('%Y_%m')
156         gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob'])
157         all_raw_files.extend(glob.glob(gs))
158
159     all_raw_files.sort()
160        
161     # ****** ((SMH) NOTE: Will need to override looking in specific
162     # subdirs of months if all data is contained in one file for long
163     # deployment, such as with adcp binary data.
164
165     #
166     dt_start = si['proc_start_dt']-timedelta(days=1)
167     dt_end = si['proc_end_dt']+timedelta(days=1)
168     raw_files = []; raw_dts = []
169     # compute datetime for each file
170     for fn in all_raw_files:
171         # JC changes
172         fndt_tuple = filt_datetime(os.path.basename(fn))
173         fndt = fndt_tuple[0]
174
175         # "ind" var from filt_datetime() - what level of granularity was used
176         granularity = fndt_tuple[1]
177         if granularity == 4:
178             # change dt_start to before monthly filename filt_datetime() date
179             dt_start = si['proc_start_dt']-timedelta(days=31)
180             # print dt_start
181         # end JC changes
182         if fndt:
183             if dt_start <= fndt <= dt_end:
184                 raw_files.append(fn)
185                 raw_dts.append(fndt)
186     return (raw_files, raw_dts)
187
188 def which_raw(pi, raw_files, dts):
189     """Further limit file names based on configuration file timeframe """
190
191     now_dt = datetime.utcnow()
192     now_dt.replace(microsecond=0)
193     if pi['config_start_date']:
194         config_start_dt = filt_datetime(pi['config_start_date'])[0]
195     elif pi['config_start_date'] == None:
196         config_start_dt = now_dt
197
198     if pi['config_end_date']:
199         config_end_dt = filt_datetime(pi['config_end_date'])[0]
200     elif pi['config_end_date'] == None:
201         config_end_dt = now_dt
202    
203     new_list = [raw_files[i] for i in range(len(raw_files)) \
204                      if config_start_dt <= dts[i] <= config_end_dt]
205
206     if not new_list:
207         new_list = [raw_files[i] for i in range(len(raw_files)) \
208                     if dts[i] <= config_end_dt]
209        
210     return new_list
211        
212
213 def raw2proc(proctype, platform=None, package=None, yyyy_mm=None):
214     """
215     Process data either in auto-mode or manual-mode
216
217     If auto-mode, process newest data for all platforms, all
218     sensors. Otherwise in manual-mode, process data for specified
219     platform, sensor package, and month.
220
221     :Parameters:
222        proctype : string
223            'auto' or 'manual'
224
225        platform : string
226            Platfrom id to process (e.g. 'bogue')
227        package : string
228            Sensor package id to process (e.g. 'adcp')
229        yyyy_mm : string
230            Year and month of data to process (e.g. '2007_07')
231
232     Examples
233     --------
234     >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06')
235     >>> raw2proc('manual', 'bogue', 'adcp', '2007_06')
236          
237     """
238     print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC")
239
240     if proctype == 'auto':
241         print 'Processing in auto-mode, all platforms, all packages, latest data'
242         auto()
243     elif proctype == 'manual':
244         if platform and package and yyyy_mm:
245             print 'Processing in manually ...'
246             print ' ...  platform id : %s' % platform
247             print ' ... package name : %s' % package
248             print ' ...        month : %s' % yyyy_mm
249             print ' ...  starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
250             manual(platform, package, yyyy_mm)
251         else:
252             print 'raw2proc: Manual operation requires platform, package, and month'
253             print "   >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')"
254     else:
255         print 'raw2proc: requires either auto or manual operation'
256
257
258 def auto():
259     """Process all platforms, all packages, latest data
260
261     Notes
262     -----
263    
264     1. determine which platforms (all platforms with currently active
265        config files i.e. config_end_date is None
266     2. for each platform
267          get latest config
268          for each package
269            (determine process for 'latest' data) copy to new area when grabbed
270            parse recent data
271            yyyy_mm is the current month
272            load this months netcdf, if new month, create this months netcdf
273            update modified date and append new data in netcdf
274            
275     """
276     yyyy_mm = this_month()
277     months = find_months(yyyy_mm)
278     month_start_dt = months[1]
279     month_end_dt = months[2] - timedelta(seconds=1)
280
281     configs = find_active_configs(config_dir=defconfigs)
282     if configs:
283         # for each configuration
284         for cn in configs:
285             print ' ... config file : %s' % cn
286             pi = get_config(cn+'.platform_info')
287             asi = get_config(cn+'.sensor_info')
288             platform = pi['id']
289             # for each sensor package
290             for package in asi.keys():
291                 print ' ... package name : %s' % package
292                 si = asi[package]
293                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
294                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
295                 si['proc_start_dt'] = month_start_dt
296                 si['proc_end_dt'] = month_end_dt
297                 if os.path.exists(ofn):
298                     # get last dt from current month file
299                     (es, units) = nc_get_time(ofn)
300                     last_dt = es2dt(es[-1])
301                     # if older than month_start_dt use it instead to only process newest data
302                     if last_dt>=month_start_dt:
303                         si['proc_start_dt'] = last_dt
304
305                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
306                 raw_files = which_raw(pi, raw_files, raw_dts)
307                 if raw_files:
308                     process(pi, si, raw_files, yyyy_mm)
309                 else:
310                     print ' ... ... \nNOTE: no raw files found for %s %s for %s\n' % (package, platform, yyyy_mm)
311
312                 # update latest data for SECOORA commons
313                 if 'latest_dir' in si.keys():
314                     print ' ... ... latest : %s ' % si['latest_dir']
315                     proc2latest(pi, si, yyyy_mm)
316     #
317     else:
318         print ' ... ... ... \nNOTE: No active platforms\n'
319
320 def manual(platform, package, yyyy_mm):
321     """Process data for specified platform, sensor package, and month
322
323     Notes
324     -----
325    
326     1. determine which configs
327     2. for each config for specific platform
328            if have package in config
329                which raw files
330     """
331      # determine when month starts and ends
332     months = find_months(yyyy_mm)
333     month_start_dt = months[1]
334     month_end_dt = months[2] - timedelta(seconds=1)
335    
336     configs = find_configs(platform, yyyy_mm, config_dir=defconfigs)
337
338     if configs:
339         # for each configuration
340         for index in range(len(configs)):
341             cn = configs[index]
342             print ' ... config file : %s' % cn
343             pi = get_config(cn+'.platform_info')
344             # month start and end dt to pi info
345             asi = get_config(cn+'.sensor_info')
346             if package in pi['packages']:
347                 si = asi[package]
348                 if si['utc_offset']:
349                     print ' ... ... utc_offset : %g (hours)' % si['utc_offset']
350                 si['proc_start_dt'] = month_start_dt
351                 si['proc_end_dt'] = month_end_dt
352                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
353                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
354                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
355                 raw_files = which_raw(pi, raw_files, raw_dts)
356                 # print raw_files
357                 # remove any previous netcdf file (platform_package_yyyy_mm.nc)
358                 if index==0  and os.path.exists(ofn):
359                     os.remove(ofn)
360                 #
361                 if raw_files:
362                     process(pi, si, raw_files, yyyy_mm)
363                 else:
364                     print ' ... ... \nNOTE: no raw files found for %s %s for %s\n' % (package, platform, yyyy_mm)
365                
366             else:
367                 print ' ... ... \nNOTE: %s not operational on %s for %s\n' % (package, platform, yyyy_mm)               
368     else:
369         print ' ... ... ... \nNOTE: %s not operational for %s\n' % (platform, yyyy_mm)
370    
371 def process(pi, si, raw_files, yyyy_mm):
372     # tailored data processing for different input file formats and control over output
373     (parse, create, update) = import_processors(si['process_module'])
374     for fn in raw_files:
375         # sys.stdout.write('... %s ... ' % fn)
376         # attach file name to sensor info so parser can use it, if needed
377         si['fn'] = fn
378         lines = load_data(fn)
379         if lines:
380             data = parse(pi, si, lines)
381             # determine which index of data is within the specified timeframe (usually the month)
382             n = len(data['dt'])
383             data['in'] = numpy.array([False for i in range(n)])
384
385             for index, val in enumerate(data['dt']):
386                 if val>si['proc_start_dt'] and val<=si['proc_end_dt']:
387                     data['in'][index] = True
388                    
389             # if any records are in the month then write to netcdf
390             if data['in'].any():
391                 sys.stdout.write('... %s ... ' % fn)
392                 sys.stdout.write('%d\n' % len(data['in'].nonzero()[0]))
393                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
394                 # update or create netcdf
395                 if os.path.exists(ofn):
396                     ut = update(pi,si,data)
397                     nc_update(ofn, ut)
398                 else:
399                     ct = create(pi,si,data)
400                     nc_create(ofn, ct)
401         else:
402             # if no lines, file was empty
403             print " ... skipping file %s" % (fn,)
404
405    
406 def proc2latest(pi, si, yyyy_mm):
407     """Select specific variables and times from current monthly netCDF
408     and post as latest data.  TEST MODE.
409
410     For each active config file, load specific variables from NCCOOS
411     monthly netCDF, make any necessary changes to data or attributes
412     conform to SEACOOS Data Model, subset data (last 48 hours), and
413     create new netCDF file in latest netCDF directory.
414
415     NOTE: In test mode right now. See auto() function for similar action.
416
417     """
418     platform = pi['id']
419     package = si['id']
420     si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
421     ifn = os.path.join(si['proc_dir'], si['proc_filename'])
422     if os.path.exists(ifn):
423         # get dt from current month file
424         (es, units) = nc_get_time(ifn)
425         dt = [es2dt(e) for e in es]
426         last_dt = dt[-1]
427
428     # determine which index of data is within the specified timeframe (last 2 days)
429     n = len(dt)
430     idx = numpy.array([False for i in range(n)])
431     for i, val in enumerate(dt):
432         if val>last_dt-timedelta(days=2) and val<=last_dt+timedelta(seconds=360):
433             idx[i] = True
434     dt = numpy.array(dt)
435     dt = dt[idx]
436
437     # read in data and unpack tuple
438     d = nc_load(ifn, si['latest_vars'])
439     global_atts, var_atts, dim_inits, var_inits, var_data = d
440     list_of_record_vars = nc_find_record_vars(ifn)
441     # subset data
442     varNames = [vn for vn, vt, vd in var_inits]
443     var_data = list(var_data)
444     for i in range(len(varNames)):
445         vn, vd = var_data[i]
446         if vn in list_of_record_vars:
447             var_data[i]=(vn, vd[idx])
448     var_data = tuple(var_data)
449     global_atts['start_date'] = dt[0].strftime('%Y-%m-%d %H:%M:%S')
450
451     # write latest data
452     si['latest_filename'] = 'nccoos_%s_%s_latest.nc' % (platform, package)
453     ofn = os.path.join(si['latest_dir'], si['latest_filename'])
454     d = (global_atts, var_atts, dim_inits, var_inits, var_data)
455     nc_create(ofn, d)
456
457                                                
458 # globals
459 start_dt = datetime.utcnow()
460 start_dt.replace(microsecond=0)
461
462 if __name__ == "__main__":
463     import optparse
464     raw2proc('auto')
465
466     # for testing
467     # proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07'
468     # raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')
Note: See TracBrowser for help on using the browser.