NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/raw2proc/trunk/raw2proc/raw2proc.py

Revision 488 (checked in by haines, 12 years ago)

removed test_ and scr_ files not necessary for SVN

  • Property svn:executable set to
Line 
1 #!/usr/bin/env python
2 # Last modified:  Time-stamp: <2012-04-17 13:22:05 haines>
3 """Process raw data to monthly netCDF data files
4
5 This module processes raw ascii- or binary-data from different NCCOOS
6 sensors (ctd, adcp, waves-adcp, met) based on manual or automated
7 operation.  If automated processing, add raw data (level0) from all
8 active sensors to current month's netcdf data files (level1) with the
9 current configuration setting.  If manual processing, determine which
10 configurations to use for requested platform, sensor, and month.
11
12 :Processing steps:
13   0. raw2proc auto or manual for platform, sensor, month
14   1. list of files to process
15   2. parse data
16   3. create, update netcdf
17
18   to-do
19   3. qc (measured) data
20   4. process derived data (and regrid?)
21   5. qc (measured and derived) data flags
22
23 """
24
25 __version__ = "v0.1"
26 __author__ = "Sara Haines <sara_haines@unc.edu>"
27
28 import sys
29 import os
30 import re
31 import traceback
32
33 # for production use:
34 # defconfigs='/home/haines/nccoos/raw2proc'
35 # for testing use:
36 # defconfigs='/home/haines/nccoos/test/r2p'
37
38 # define config file location to run under cron
39 defconfigs='/opt/env/haines/dataproc/raw2proc'
40
41 import numpy
42
43 from procutil import *
44 from ncutil import *
45
46 REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*'
47 NAN_RE_STR = '[Nn][Aa][Nn]'
48
49 def load_data(inFile):
50     lines=None
51     if os.path.exists(inFile):
52         f = open(inFile, 'r')
53         lines = f.readlines()
54         f.close()
55         if len(lines)<=0:
56             print 'Empty file: '+ inFile           
57     else:
58         print 'File does not exist: '+ inFile
59     return lines
60
61 def import_parser(name):
62     mod = __import__('parsers')
63     parser = getattr(mod, name)
64     return parser
65
66 def import_processors(mod_name):
67     mod = __import__(mod_name)
68     parser = getattr(mod, 'parser')
69     creator = getattr(mod, 'creator')
70     updater = getattr(mod, 'updater')
71     return (parser, creator, updater)
72    
73
74 def get_config(name):
75     """Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')"""
76     components = name.split('.')
77     mod = __import__(components[0])
78     for comp in components[1:]:
79         attr = getattr(mod, comp)
80     return attr
81
82 def find_configs(platform, yyyy_mm, config_dir=''):
83     """Find which configuration files for specified platform and month
84
85     :Parameters:
86        platform : string
87            Platfrom id to process (e.g. 'bogue')
88        yyyy_mm : string
89            Year and month of data to process (e.g. '2007_07')
90
91     :Returns:
92        cns : list of str
93            List of configurations that overlap with desired month
94            If empty [], no configs were found
95     """
96     import glob
97     # list of config files based on platform
98     configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py'))
99     configs.sort()
100     now_dt = datetime.utcnow()
101     now_dt.replace(microsecond=0)
102     # determine when month starts and ends
103     (prev_month, this_month, next_month) = find_months(yyyy_mm)
104     month_start_dt = this_month
105     month_end_dt = next_month - timedelta(seconds=1)
106     # print month_start_dt; print month_end_dt
107     #
108     cns = []
109     for config in configs:
110         # datetime from filename
111         cn = os.path.splitext(os.path.basename(config))[0]
112         cndt = filt_datetime(os.path.basename(config))
113         pi = get_config(cn+'.platform_info')
114         if pi['config_start_date']:
115             config_start_dt = filt_datetime(pi['config_start_date'])
116         elif pi['config_start_date'] == None:
117             config_start_dt = now_dt
118         if pi['config_end_date']:
119             config_end_dt = filt_datetime(pi['config_end_date'])
120         elif pi['config_end_date'] == None:
121             config_end_dt = now_dt
122         #
123         if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \
124                (config_end_dt >= month_start_dt or config_end_dt >= month_end_dt):
125             cns.append(cn)
126     return cns
127
128
129 def find_active_configs(config_dir=''):
130     """Find which configuration files are active
131
132     :Returns:
133        cns : list of str
134            List of configurations that overlap with desired month
135            If empty [], no configs were found
136     """
137     import glob
138     # list of all config files
139     configs = glob.glob(os.path.join(config_dir, '*_config_*.py'))
140     now_dt = datetime.utcnow()
141     now_dt.replace(microsecond=0)
142     #
143     cns = []
144     for config in configs:
145         # datetime from filename
146         cn = os.path.splitext(os.path.basename(config))[0]
147         cndt = filt_datetime(os.path.basename(config))
148         pi = get_config(cn+'.platform_info')
149         if pi['config_end_date'] == None:
150             cns.append(cn)
151     return cns
152
153
154 def find_raw(si, yyyy_mm):
155     """Determine which list of raw files to process for month """
156     import glob
157
158     months = find_months(yyyy_mm)
159     # list all the raw files in prev-month, this-month, and next-month
160     all_raw_files = []
161     m = re.search('\d{4}_\d{2}$', si['raw_dir'])
162     if m:
163         # look for raw_file_glob in specific directory ending in YYYY_MM
164         # but look no further. 
165         gs = os.path.join(si['raw_dir'], si['raw_file_glob'])
166         all_raw_files.extend(glob.glob(gs))
167     else:
168         # no YYYY_MM at end of raw_dir then look for files
169         # in prev-month, this-month, and next-month
170         for mon in months:
171             mstr = mon.strftime('%Y_%m')
172             gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob'])
173             all_raw_files.extend(glob.glob(gs))
174            
175     all_raw_files.sort()
176        
177     #
178     dt_start = si['proc_start_dt']-timedelta(days=1)
179     dt_end = si['proc_end_dt']+timedelta(days=1)
180     raw_files = []; raw_dts = []
181     # compute datetime for each file
182     for fn in all_raw_files:
183         (fndt, granularity) = filt_datetime(os.path.basename(fn), gran=True)
184         if granularity == 4:
185             # change dt_start to before monthly filename filt_datetime() date
186             # for filenames with just YYYY_MM or YYYYMM add or substract 30 days to
187             # see if it falls within config range.  It won't hurt to add names to files
188             # parsed.
189             dt_start = si['proc_start_dt']-timedelta(days=31)
190             # print dt_start
191         if fndt:
192             if dt_start <= fndt <= dt_end or m:
193                 raw_files.append(fn)
194                 raw_dts.append(fndt)
195     return (raw_files, raw_dts)
196
197 def which_raw(pi, raw_files, dts):
198     """Further limit file names based on configuration file timeframe """
199
200     now_dt = datetime.utcnow()
201     now_dt.replace(microsecond=0)
202     if pi['config_start_date']:
203         config_start_dt = filt_datetime(pi['config_start_date'])
204     elif pi['config_start_date'] == None:
205         config_start_dt = now_dt
206
207     if pi['config_end_date']:
208         config_end_dt = filt_datetime(pi['config_end_date'])
209     elif pi['config_end_date'] == None:
210         config_end_dt = now_dt
211
212     for idx, fn in enumerate(raw_files):
213         (fndt, granularity) = filt_datetime(os.path.basename(fn), gran=True)
214         if granularity == 4:
215             if fndt < config_start_dt:
216                 dts[idx] = config_start_dt
217             if fndt > config_end_dt:
218                 dts[idx] = config_end_dt
219
220     new_list = [raw_files[i] for i in range(len(raw_files)) \
221                      if config_start_dt <= dts[i] <= config_end_dt]
222
223     if not new_list:
224         new_list = [raw_files[i] for i in range(len(raw_files)) \
225                     if dts[i] <= config_end_dt]
226        
227     return new_list
228        
229
230 def raw2proc(proctype, platform=None, package=None, yyyy_mm=None):
231     """
232     Process data either in auto-mode or manual-mode
233
234     If auto-mode, process newest data for all platforms, all
235     sensors. Otherwise in manual-mode, process data for specified
236     platform, sensor package, and month.
237
238     :Parameters:
239        proctype : string
240            'auto' or 'manual'
241
242        platform : string
243            Platfrom id to process (e.g. 'bogue')
244        package : string
245            Sensor package id to process (e.g. 'adcp')
246        yyyy_mm : string
247            Year and month of data to process (e.g. '2007_07')
248
249     Examples
250     --------
251     >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06')
252     >>> raw2proc('manual', 'bogue', 'adcp', '2007_06')
253          
254     """
255     print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC")
256
257     if proctype == 'auto':
258         print 'Processing in auto-mode, all platforms, all packages, latest data'
259         auto()
260     elif proctype == 'manual':
261         if platform and package and yyyy_mm:
262             print 'Processing in manually ...'
263             print ' ...  platform id : %s' % platform
264             print ' ... package name : %s' % package
265             print ' ...        month : %s' % yyyy_mm
266             print ' ...  starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
267             manual(platform, package, yyyy_mm)
268         else:
269             print 'raw2proc: Manual operation requires platform, package, and month'
270             print "   >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')"
271     else:
272         print 'raw2proc: requires either auto or manual operation'
273
274
275 def auto():
276     """Process all platforms, all packages, latest data
277
278     Notes
279     -----
280    
281     1. determine which platforms (all platforms with currently active
282        config files i.e. config_end_date is None
283     2. for each platform
284          get latest config
285          for each package
286            (determine process for 'latest' data) copy to new area when grabbed
287            parse recent data
288            yyyy_mm is the current month
289            load this months netcdf, if new month, create this months netcdf
290            update modified date and append new data in netcdf
291            
292     """
293     now_dt = datetime.utcnow()
294     now_dt.replace(microsecond=0)
295
296     yyyy_mm = this_month()
297     months = find_months(yyyy_mm)
298     month_start_dt = months[1]
299     month_end_dt = months[2] - timedelta(seconds=1)
300
301     configs = find_active_configs(config_dir=defconfigs)
302     if configs:
303         # for each configuration
304         for cn in configs:
305             print ' ... config file : %s' % cn
306             pi = get_config(cn+'.platform_info')
307             asi = get_config(cn+'.sensor_info')
308             platform = pi['id']
309             if pi['config_start_date']:
310                 pi['config_start_dt'] = filt_datetime(pi['config_start_date'])
311             elif pi['config_start_date'] == None:
312                 pi['config_start_dt'] = now_dt
313             if pi['config_end_date']:
314                 pi['config_end_dt'] = filt_datetime(pi['config_end_date'])
315             elif pi['config_end_date'] == None:
316                 pi['config_end_dt'] = now_dt
317             # for each sensor package
318             for package in asi.keys():
319                 try: # if package files, try next package
320                     print ' ... package name : %s' % package
321                     si = asi[package]
322                     si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
323                     ofn = os.path.join(si['proc_dir'], si['proc_filename'])
324                     si['proc_start_dt'] = month_start_dt
325                     si['proc_end_dt'] = month_end_dt
326                     if os.path.exists(ofn):
327                         # get last dt from current month file
328                         (es, units) = nc_get_time(ofn)
329                         last_dt = es2dt(es[-1])
330                         # if older than month_start_dt use it instead to only process newest data
331                         if last_dt>=month_start_dt:
332                             si['proc_start_dt'] = last_dt
333
334                     (raw_files, raw_dts) = find_raw(si, yyyy_mm)
335                     raw_files = which_raw(pi, raw_files, raw_dts)
336                     if raw_files:
337                         process(pi, si, raw_files, yyyy_mm)
338                     else:
339                         print ' ... ... NOTE: no new raw files found'
340
341                     # update latest data for SECOORA commons
342                     if 'latest_dir' in si.keys():
343                         # print ' ... ... latest : %s ' % si['latest_dir']
344                         proc2latest(pi, si, yyyy_mm)
345
346                     if 'csv_dir' in si.keys():
347                         proc2csv(pi, si, yyyy_mm)
348                 except:
349                     traceback.print_exc()
350     #
351     else:
352         print ' ... ... NOTE: No active platforms'
353
354 def manual(platform, package, yyyy_mm):
355     """Process data for specified platform, sensor package, and month
356
357     Notes
358     -----
359    
360     1. determine which configs
361     2. for each config for specific platform
362            if have package in config
363                which raw files
364     """
365     now_dt = datetime.utcnow()
366     now_dt.replace(microsecond=0)
367
368     months = find_months(yyyy_mm)
369     month_start_dt = months[1]
370     month_end_dt = months[2] - timedelta(seconds=1)
371    
372     configs = find_configs(platform, yyyy_mm, config_dir=defconfigs)
373
374     if configs:
375         # for each configuration
376         for index in range(len(configs)):
377             cn = configs[index]
378             print ' ... config file : %s' % cn
379             pi = get_config(cn+'.platform_info')
380             if pi['config_start_date']:
381                 pi['config_start_dt'] = filt_datetime(pi['config_start_date'])
382             elif pi['config_start_date'] == None:
383                 pi['config_start_dt'] = now_dt
384             if pi['config_end_date']:
385                 pi['config_end_dt'] = filt_datetime(pi['config_end_date'])
386             elif pi['config_end_date'] == None:
387                 pi['config_end_dt'] = now_dt
388             # month start and end dt to pi info
389             asi = get_config(cn+'.sensor_info')
390             if package in pi['packages']:
391                 si = asi[package]
392                 if si['utc_offset']:
393                     print ' ... ... utc_offset : %g (hours)' % si['utc_offset']
394                 si['proc_start_dt'] = month_start_dt
395                 si['proc_end_dt'] = month_end_dt
396                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
397                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
398                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
399                 # print raw_files
400                 # print raw_dts
401                 raw_files = which_raw(pi, raw_files, raw_dts)
402                 # print raw_files
403                 # print raw_dts
404                 # remove any previous netcdf file (platform_package_yyyy_mm.nc)
405                 if index==0  and os.path.exists(ofn):
406                     os.remove(ofn)
407                 # this added just in case data repeated in data files
408                 if os.path.exists(ofn):
409                     # get last dt from current month file
410                     (es, units) = nc_get_time(ofn)
411                     last_dt = es2dt(es[-1])
412                     # if older than month_start_dt use it instead to only process newest data
413                     if last_dt>=month_start_dt:
414                         si['proc_start_dt'] = last_dt
415
416                 if raw_files:
417                     process(pi, si, raw_files, yyyy_mm)
418                 else:
419                     print ' ... ... NOTE: no raw files found for %s %s for %s' % (package, platform, yyyy_mm)
420                
421             else:
422                 print ' ... ... NOTE: %s not operational on %s for %s' % (package, platform, yyyy_mm)               
423     else:
424         print ' ... ... ... NOTE: %s not operational for %s' % (platform, yyyy_mm)
425    
426 def process(pi, si, raw_files, yyyy_mm):
427     # tailored data processing for different input file formats and control over output
428     (parse, create, update) = import_processors(si['process_module'])
429     for fn in raw_files:
430         # sys.stdout.write('... %s ... ' % fn)
431         # attach file name to sensor info so parser can use it, if needed
432         si['fn'] = fn
433         lines = load_data(fn)
434         if lines:
435             data = parse(pi, si, lines)
436             # determine which index of data is within the specified timeframe (usually the month)
437             n = len(data['dt'])
438             data['in'] = numpy.array([False for i in range(n)])
439
440             for index, val in enumerate(data['dt']):
441                 if val>=pi['config_start_dt'] and val>=si['proc_start_dt'] and val<=si['proc_end_dt'] and val<=pi['config_end_dt']:
442                     data['in'][index] = True
443                    
444             # if any records are in the month then write to netcdf
445             if data['in'].any():
446                 sys.stdout.write(' ... %s ... ' % fn)
447                 sys.stdout.write('%d\n' % len(data['in'].nonzero()[0]))
448                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
449                 # update or create netcdf
450                 if os.path.exists(ofn):
451                     ut = update(pi,si,data)
452                     nc_update(ofn, ut)
453                 else:
454                     ct = create(pi,si,data)
455                     nc_create(ofn, ct)
456         else:
457             # if no lines, file was empty
458             print " ... skipping file %s" % (fn,)
459
460    
461 # globals
462 start_dt = datetime.utcnow()
463 start_dt.replace(microsecond=0)
464
465 if __name__ == "__main__":
466     import optparse
467     raw2proc('auto')
468
469     # for testing
470     # proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07'
471     # raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')
Note: See TracBrowser for help on using the browser.