NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/raw2proc/trunk/raw2proc/raw2proc.py

Revision 167 (checked in by haines, 16 years ago)

read and process directional wave data from 2-d spectra

  • Property svn:executable set to
Line 
1 #!/usr/bin/env python
2 # Last modified:  Time-stamp: <2008-05-07 13:03:19 haines>
3 """Process raw data to monthly netCDF data files
4
5 This module processes raw ascii- or binary-data from different NCCOOS
6 sensors (ctd, adcp, waves-adcp, met) based on manual or automated
7 operation.  If automated processing, add raw data (level0) from all
8 active sensors to current month's netcdf data files (level1) with the
9 current configuration setting.  If manual processing, determine which
10 configurations to use for requested platform, sensor, and month.
11
12 :Processing steps:
13   0. raw2proc auto or manual for platform, sensor, month
14   1. list of files to process
15   2. parse data
16   3. create, update netcdf
17
18   to-do
19   3. qc (measured) data
20   4. process derived data (and regrid?)
21   5. qc (measured and derived) data flags
22
23 """
24
25 __version__ = "v0.1"
26 __author__ = "Sara Haines <sara_haines@unc.edu>"
27
28 import sys
29 import os
30 import re
31
32 # define config file location to run under cron
33 defconfigs='/home/haines/nccoos/test/r2p'
34
35 import numpy
36
37 from procutil import *
38 from ncutil import *
39
40 REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*'
41
42 def load_data(inFile):
43     lines=None
44     if os.path.exists(inFile):
45         f = open(inFile, 'r')
46         lines = f.readlines()
47         f.close()
48         if len(lines)<=0:
49             print 'Empty file: '+ inFile           
50     else:
51         print 'File does not exist: '+ inFile
52     return lines
53
54 def import_parser(name):
55     mod = __import__('parsers')
56     parser = getattr(mod, name)
57     return parser
58
59 def import_processors(mod_name):
60     mod = __import__(mod_name)
61     parser = getattr(mod, 'parser')
62     creator = getattr(mod, 'creator')
63     updater = getattr(mod, 'updater')
64     return (parser, creator, updater)
65    
66
67 def get_config(name):
68     """Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')"""
69     components = name.split('.')
70     mod = __import__(components[0])
71     for comp in components[1:]:
72         attr = getattr(mod, comp)
73     return attr
74
75 def find_configs(platform, yyyy_mm, config_dir=''):
76     """Find which configuration files for specified platform and month
77
78     :Parameters:
79        platform : string
80            Platfrom id to process (e.g. 'bogue')
81        yyyy_mm : string
82            Year and month of data to process (e.g. '2007_07')
83
84     :Returns:
85        cns : list of str
86            List of configurations that overlap with desired month
87            If empty [], no configs were found
88     """
89     import glob
90     # list of config files based on platform
91     configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py'))
92     now_dt = datetime.utcnow()
93     now_dt.replace(microsecond=0)
94     # determine when month starts and ends
95     (prev_month, this_month, next_month) = find_months(yyyy_mm)
96     month_start_dt = this_month
97     month_end_dt = next_month - timedelta(seconds=1)
98     # print month_start_dt; print month_end_dt
99     #
100     cns = []
101     for config in configs:
102         # datetime from filename
103         cn = os.path.splitext(os.path.basename(config))[0]
104         cndt = filt_datetime(os.path.basename(config))[0]
105         pi = get_config(cn+'.platform_info')
106         if pi['config_start_date']:
107             config_start_dt = filt_datetime(pi['config_start_date'])[0]
108         elif pi['config_start_date'] == None:
109             config_start_dt = now_dt
110         if pi['config_end_date']:
111             config_end_dt = filt_datetime(pi['config_end_date'])[0]
112         elif pi['config_end_date'] == None:
113             config_end_dt = now_dt
114         #
115         if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \
116                (config_end_dt >= month_start_dt or config_end_dt >= month_end_dt):
117             cns.append(cn)
118     return cns
119
120
121 def find_active_configs(config_dir=''):
122     """Find which configuration files are active
123
124     :Returns:
125        cns : list of str
126            List of configurations that overlap with desired month
127            If empty [], no configs were found
128     """
129     import glob
130     # list of all config files
131     configs = glob.glob(os.path.join(config_dir, '*_config_*.py'))
132     now_dt = datetime.utcnow()
133     now_dt.replace(microsecond=0)
134     #
135     cns = []
136     for config in configs:
137         # datetime from filename
138         cn = os.path.splitext(os.path.basename(config))[0]
139         cndt = filt_datetime(os.path.basename(config))[0]
140         pi = get_config(cn+'.platform_info')
141         if pi['config_end_date'] == None:
142             cns.append(cn)
143     return cns
144
145
146 def find_raw(si, yyyy_mm):
147     """Determine which list of raw files to process for month """
148     import glob
149     # determine when month starts and ends
150     #
151     months = find_months(yyyy_mm)
152     # list all the raw files in prev-month, this-month, and next-month
153     all_raw_files = []
154     for mon in months:
155         mstr = mon.strftime('%Y_%m')
156         gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob'])
157         all_raw_files.extend(glob.glob(gs))
158
159     all_raw_files.sort()
160        
161     # ****** ((SMH) NOTE: Will need to override looking in specific
162     # subdirs of months if all data is contained in one file for long
163     # deployment, such as with adcp binary data.
164
165     #
166     dt_start = si['proc_start_dt']-timedelta(days=1)
167     dt_end = si['proc_end_dt']+timedelta(days=1)
168     raw_files = []; raw_dts = []
169     # compute datetime for each file
170     for fn in all_raw_files:
171         # JC changes
172         fndt_tuple = filt_datetime(os.path.basename(fn))
173         fndt = fndt_tuple[0]
174
175         # "ind" var from filt_datetime() - what level of granularity was used
176         granularity = fndt_tuple[1]
177         if granularity == 4:
178             # change dt_start to before monthly filename filt_datetime() date
179             dt_start = si['proc_start_dt']-timedelta(days=31)
180             print dt_start
181         # end JC changes
182         if fndt:
183             if dt_start <= fndt <= dt_end:
184                 raw_files.append(fn)
185                 raw_dts.append(fndt)
186     return (raw_files, raw_dts)
187
188 def which_raw(pi, raw_files, dts):
189     """Further limit file names based on configuration file timeframe """
190
191     now_dt = datetime.utcnow()
192     now_dt.replace(microsecond=0)
193     if pi['config_start_date']:
194         config_start_dt = filt_datetime(pi['config_start_date'])[0]
195     elif pi['config_start_date'] == None:
196         config_start_dt = now_dt
197
198     if pi['config_end_date']:
199         config_end_dt = filt_datetime(pi['config_end_date'])[0]
200     elif pi['config_end_date'] == None:
201         config_end_dt = now_dt
202        
203     new_list = [raw_files[i] for i in range(len(raw_files)) \
204                      if config_start_dt <= dts[i] <= config_end_dt]
205     return new_list
206        
207
208 def raw2proc(proctype, platform=None, package=None, yyyy_mm=None):
209     """
210     Process data either in auto-mode or manual-mode
211
212     If auto-mode, process newest data for all platforms, all
213     sensors. Otherwise in manual-mode, process data for specified
214     platform, sensor package, and month.
215
216     :Parameters:
217        proctype : string
218            'auto' or 'manual'
219
220        platform : string
221            Platfrom id to process (e.g. 'bogue')
222        package : string
223            Sensor package id to process (e.g. 'adcp')
224        yyyy_mm : string
225            Year and month of data to process (e.g. '2007_07')
226
227     Examples
228     --------
229     >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06')
230     >>> raw2proc('manual', 'bogue', 'adcp', '2007_06')
231          
232     """
233     print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC")
234
235     if proctype == 'auto':
236         print 'Processing in auto-mode, all platforms, all packages, latest data'
237         auto()
238     elif proctype == 'manual':
239         if platform and package and yyyy_mm:
240             print 'Processing in manually ...'
241             print ' ...  platform id : %s' % platform
242             print ' ... package name : %s' % package
243             print ' ...        month : %s' % yyyy_mm
244             print ' ...  starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
245             manual(platform, package, yyyy_mm)
246         else:
247             print 'raw2proc: Manual operation requires platform, package, and month'
248             print "   >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')"
249     else:
250         print 'raw2proc: requires either auto or manual operation'
251
252
253 def auto():
254     """Process all platforms, all packages, latest data
255
256     Notes
257     -----
258    
259     1. determine which platforms (all platforms with currently active
260        config files i.e. config_end_date is None
261     2. for each platform
262          get latest config
263          for each package
264            (determine process for 'latest' data) copy to new area when grabbed
265            parse recent data
266            yyyy_mm is the current month
267            load this months netcdf, if new month, create this months netcdf
268            update modified date and append new data in netcdf
269            
270     """
271     yyyy_mm = this_month()
272     months = find_months(yyyy_mm)
273     month_start_dt = months[1]
274     month_end_dt = months[2] - timedelta(seconds=1)
275
276     configs = find_active_configs(config_dir=defconfigs)
277     if configs:
278         # for each configuration
279         for cn in configs:
280             print ' ... config file : %s' % cn
281             pi = get_config(cn+'.platform_info')
282             asi = get_config(cn+'.sensor_info')
283             platform = pi['id']
284             # for each sensor package
285             for package in asi.keys():
286                 print ' ... package name : %s' % package
287                 si = asi[package]
288                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
289                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
290                 si['proc_start_dt'] = month_start_dt
291                 si['proc_end_dt'] = month_end_dt
292                 if os.path.exists(ofn):
293                     # get last dt from current month file
294                     (es, units) = nc_get_time(ofn)
295                     last_dt = es2dt(es[-1])
296                     # if older than month_start_dt use it instead to only process newest data
297                     if last_dt>=month_start_dt:
298                         si['proc_start_dt'] = last_dt
299
300                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
301                 raw_files = which_raw(pi, raw_files, raw_dts)
302                 process(pi, si, raw_files, yyyy_mm)
303     #
304     else:
305         print ' ... ... ... \nNOTE: No active platforms\n'
306
307 def manual(platform, package, yyyy_mm):
308     """Process data for specified platform, sensor package, and month
309
310     Notes
311     -----
312    
313     1. determine which configs
314     2. for each config for specific platform
315            if have package in config
316                which raw files
317     """
318      # determine when month starts and ends
319     months = find_months(yyyy_mm)
320     month_start_dt = months[1]
321     month_end_dt = months[2] - timedelta(seconds=1)
322    
323     configs = find_configs(platform, yyyy_mm, config_dir=defconfigs)
324
325     if configs:
326         # for each configuration
327         for index in range(len(configs)):
328             cn = configs[index]
329             print ' ... config file : %s' % cn
330             pi = get_config(cn+'.platform_info')
331             # month start and end dt to pi info
332             asi = get_config(cn+'.sensor_info')
333             if package in pi['packages']:
334                 si = asi[package]
335                 if si['utc_offset']:
336                     print ' ... ... utc_offset : %g (hours)' % si['utc_offset']
337                 si['proc_start_dt'] = month_start_dt
338                 si['proc_end_dt'] = month_end_dt
339                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
340                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
341                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
342                 raw_files = which_raw(pi, raw_files, raw_dts)
343                 # remove any previous netcdf file (platform_package_yyyy_mm.nc)
344                 if index==0  and os.path.exists(ofn):
345                     os.remove(ofn)
346                 #
347                 process(pi, si, raw_files, yyyy_mm)
348             else:
349                 print ' ... ... \nNOTE: %s not operational on %s for %s\n' % (package, platform, yyyy_mm)               
350     else:
351         print ' ... ... ... \nNOTE: %s not operational for %s\n' % (platform, yyyy_mm)
352    
353 def process(pi, si, raw_files, yyyy_mm):
354     # tailored data processing for different input file formats and control over output
355     (parse, create, update) = import_processors(si['process_module'])
356     for fn in raw_files:
357         # sys.stdout.write('... %s ... ' % fn)
358         # attach file name to sensor info so parser can use it, if needed
359         si['fn'] = fn
360         lines = load_data(fn)
361         if lines:
362             data = parse(pi, si, lines)
363             # determine which index of data is within the specified timeframe (usually the month)
364             n = len(data['dt'])
365             data['in'] = numpy.array([False for i in range(n)])
366             for index, val in enumerate(data['dt']):
367                 if val>si['proc_start_dt'] and val<=si['proc_end_dt']:
368                     data['in'][index] = True
369                    
370             # if any records are in the month then write to netcdf
371             if data['in'].any():
372                 sys.stdout.write('... %s ... ' % fn)
373                 sys.stdout.write('%d\n' % len(data['in']))
374                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
375                 # update or create netcdf
376                 if os.path.exists(ofn):
377                     ut = update(pi,si,data)
378                     nc_update(ofn, ut)
379                 else:
380                     ct = create(pi,si,data)
381                     nc_create(ofn, ct)
382         else:
383             # if no lines, file was empty
384             print " ... skipping file %s" % (fn,)
385
386    
387 # globals
388 start_dt = datetime.utcnow()
389 start_dt.replace(microsecond=0)
390
391 if __name__ == "__main__":
392     import optparse
393     raw2proc('auto')
394
395     # for testing
396     # proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07'
397     # raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')
Note: See TracBrowser for help on using the browser.