NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/raw2proc/trunk/raw2proc/raw2proc.py

Revision 320 (checked in by haines, 14 years ago)

catch-up trunk to production code running on cromwell

  • Property svn:executable set to
Line 
1 #!/usr/bin/env python
2 # Last modified:  Time-stamp: <2009-12-05 21:32:18 haines>
3 """Process raw data to monthly netCDF data files
4
5 This module processes raw ascii- or binary-data from different NCCOOS
6 sensors (ctd, adcp, waves-adcp, met) based on manual or automated
7 operation.  If automated processing, add raw data (level0) from all
8 active sensors to current month's netcdf data files (level1) with the
9 current configuration setting.  If manual processing, determine which
10 configurations to use for requested platform, sensor, and month.
11
12 :Processing steps:
13   0. raw2proc auto or manual for platform, sensor, month
14   1. list of files to process
15   2. parse data
16   3. create, update netcdf
17
18   to-do
19   3. qc (measured) data
20   4. process derived data (and regrid?)
21   5. qc (measured and derived) data flags
22
23 """
24
25 __version__ = "v0.1"
26 __author__ = "Sara Haines <sara_haines@unc.edu>"
27
28 import sys
29 import os
30 import re
31
32 # for production use:
33 # defconfigs='/home/haines/nccoos/raw2proc'
34 # for testing use:
35 # defconfigs='/home/haines/nccoos/test/r2p'
36
37 # define config file location to run under cron
38 defconfigs='/opt/env/haines/dataproc/raw2proc'
39
40 import numpy
41
42 from procutil import *
43 from ncutil import *
44
45 REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*'
46 NAN_RE_STR = '[Nn][Aa][Nn]'
47
48 def load_data(inFile):
49     lines=None
50     if os.path.exists(inFile):
51         f = open(inFile, 'r')
52         lines = f.readlines()
53         f.close()
54         if len(lines)<=0:
55             print 'Empty file: '+ inFile           
56     else:
57         print 'File does not exist: '+ inFile
58     return lines
59
60 def import_parser(name):
61     mod = __import__('parsers')
62     parser = getattr(mod, name)
63     return parser
64
65 def import_processors(mod_name):
66     mod = __import__(mod_name)
67     parser = getattr(mod, 'parser')
68     creator = getattr(mod, 'creator')
69     updater = getattr(mod, 'updater')
70     return (parser, creator, updater)
71    
72
73 def get_config(name):
74     """Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')"""
75     components = name.split('.')
76     mod = __import__(components[0])
77     for comp in components[1:]:
78         attr = getattr(mod, comp)
79     return attr
80
81 def find_configs(platform, yyyy_mm, config_dir=''):
82     """Find which configuration files for specified platform and month
83
84     :Parameters:
85        platform : string
86            Platfrom id to process (e.g. 'bogue')
87        yyyy_mm : string
88            Year and month of data to process (e.g. '2007_07')
89
90     :Returns:
91        cns : list of str
92            List of configurations that overlap with desired month
93            If empty [], no configs were found
94     """
95     import glob
96     # list of config files based on platform
97     configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py'))
98     configs.sort()
99     now_dt = datetime.utcnow()
100     now_dt.replace(microsecond=0)
101     # determine when month starts and ends
102     (prev_month, this_month, next_month) = find_months(yyyy_mm)
103     month_start_dt = this_month
104     month_end_dt = next_month - timedelta(seconds=1)
105     # print month_start_dt; print month_end_dt
106     #
107     cns = []
108     for config in configs:
109         # datetime from filename
110         cn = os.path.splitext(os.path.basename(config))[0]
111         cndt = filt_datetime(os.path.basename(config))[0]
112         pi = get_config(cn+'.platform_info')
113         if pi['config_start_date']:
114             config_start_dt = filt_datetime(pi['config_start_date'])[0]
115         elif pi['config_start_date'] == None:
116             config_start_dt = now_dt
117         if pi['config_end_date']:
118             config_end_dt = filt_datetime(pi['config_end_date'])[0]
119         elif pi['config_end_date'] == None:
120             config_end_dt = now_dt
121         #
122         if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \
123                (config_end_dt >= month_start_dt or config_end_dt >= month_end_dt):
124             cns.append(cn)
125     return cns
126
127
128 def find_active_configs(config_dir=''):
129     """Find which configuration files are active
130
131     :Returns:
132        cns : list of str
133            List of configurations that overlap with desired month
134            If empty [], no configs were found
135     """
136     import glob
137     # list of all config files
138     configs = glob.glob(os.path.join(config_dir, '*_config_*.py'))
139     now_dt = datetime.utcnow()
140     now_dt.replace(microsecond=0)
141     #
142     cns = []
143     for config in configs:
144         # datetime from filename
145         cn = os.path.splitext(os.path.basename(config))[0]
146         cndt = filt_datetime(os.path.basename(config))[0]
147         pi = get_config(cn+'.platform_info')
148         if pi['config_end_date'] == None:
149             cns.append(cn)
150     return cns
151
152
153 def find_raw(si, yyyy_mm):
154     """Determine which list of raw files to process for month """
155     import glob
156
157     months = find_months(yyyy_mm)
158     # list all the raw files in prev-month, this-month, and next-month
159     all_raw_files = []
160     m = re.search('\d{4}_\d{2}$', si['raw_dir'])
161     if m:
162         # look for raw_file_glob in specific directory ending in YYYY_MM
163         # but look no further. 
164         gs = os.path.join(si['raw_dir'], si['raw_file_glob'])
165         all_raw_files.extend(glob.glob(gs))
166     else:
167         # no YYYY_MM at end of raw_dir then look for files
168         # in prev-month, this-month, and next-month
169         for mon in months:
170             mstr = mon.strftime('%Y_%m')
171             gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob'])
172             all_raw_files.extend(glob.glob(gs))
173            
174     all_raw_files.sort()
175        
176     #
177     dt_start = si['proc_start_dt']-timedelta(days=1)
178     dt_end = si['proc_end_dt']+timedelta(days=1)
179     raw_files = []; raw_dts = []
180     # compute datetime for each file
181     for fn in all_raw_files:
182         # JC changes
183         fndt_tuple = filt_datetime(os.path.basename(fn))
184         fndt = fndt_tuple[0]
185
186         # "ind" var from filt_datetime() - what level of granularity was used
187         granularity = fndt_tuple[1]
188         if granularity == 4:
189             # change dt_start to before monthly filename filt_datetime() date
190             dt_start = si['proc_start_dt']-timedelta(days=31)
191             # print dt_start
192         # end JC changes
193         if fndt:
194             if dt_start <= fndt <= dt_end or m:
195                 raw_files.append(fn)
196                 raw_dts.append(fndt)
197     return (raw_files, raw_dts)
198
199 def which_raw(pi, raw_files, dts):
200     """Further limit file names based on configuration file timeframe """
201
202     now_dt = datetime.utcnow()
203     now_dt.replace(microsecond=0)
204     if pi['config_start_date']:
205         config_start_dt = filt_datetime(pi['config_start_date'])[0]
206     elif pi['config_start_date'] == None:
207         config_start_dt = now_dt
208
209     if pi['config_end_date']:
210         config_end_dt = filt_datetime(pi['config_end_date'])[0]
211     elif pi['config_end_date'] == None:
212         config_end_dt = now_dt
213    
214     new_list = [raw_files[i] for i in range(len(raw_files)) \
215                      if config_start_dt <= dts[i] <= config_end_dt]
216
217     if not new_list:
218         new_list = [raw_files[i] for i in range(len(raw_files)) \
219                     if dts[i] <= config_end_dt]
220        
221     return new_list
222        
223
224 def raw2proc(proctype, platform=None, package=None, yyyy_mm=None):
225     """
226     Process data either in auto-mode or manual-mode
227
228     If auto-mode, process newest data for all platforms, all
229     sensors. Otherwise in manual-mode, process data for specified
230     platform, sensor package, and month.
231
232     :Parameters:
233        proctype : string
234            'auto' or 'manual'
235
236        platform : string
237            Platfrom id to process (e.g. 'bogue')
238        package : string
239            Sensor package id to process (e.g. 'adcp')
240        yyyy_mm : string
241            Year and month of data to process (e.g. '2007_07')
242
243     Examples
244     --------
245     >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06')
246     >>> raw2proc('manual', 'bogue', 'adcp', '2007_06')
247          
248     """
249     print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC")
250
251     if proctype == 'auto':
252         print 'Processing in auto-mode, all platforms, all packages, latest data'
253         auto()
254     elif proctype == 'manual':
255         if platform and package and yyyy_mm:
256             print 'Processing in manually ...'
257             print ' ...  platform id : %s' % platform
258             print ' ... package name : %s' % package
259             print ' ...        month : %s' % yyyy_mm
260             print ' ...  starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
261             manual(platform, package, yyyy_mm)
262         else:
263             print 'raw2proc: Manual operation requires platform, package, and month'
264             print "   >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')"
265     else:
266         print 'raw2proc: requires either auto or manual operation'
267
268
269 def auto():
270     """Process all platforms, all packages, latest data
271
272     Notes
273     -----
274    
275     1. determine which platforms (all platforms with currently active
276        config files i.e. config_end_date is None
277     2. for each platform
278          get latest config
279          for each package
280            (determine process for 'latest' data) copy to new area when grabbed
281            parse recent data
282            yyyy_mm is the current month
283            load this months netcdf, if new month, create this months netcdf
284            update modified date and append new data in netcdf
285            
286     """
287     yyyy_mm = this_month()
288     months = find_months(yyyy_mm)
289     month_start_dt = months[1]
290     month_end_dt = months[2] - timedelta(seconds=1)
291
292     configs = find_active_configs(config_dir=defconfigs)
293     if configs:
294         # for each configuration
295         for cn in configs:
296             print ' ... config file : %s' % cn
297             pi = get_config(cn+'.platform_info')
298             asi = get_config(cn+'.sensor_info')
299             platform = pi['id']
300             # for each sensor package
301             for package in asi.keys():
302                 print ' ... package name : %s' % package
303                 si = asi[package]
304                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
305                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
306                 si['proc_start_dt'] = month_start_dt
307                 si['proc_end_dt'] = month_end_dt
308                 if os.path.exists(ofn):
309                     # get last dt from current month file
310                     (es, units) = nc_get_time(ofn)
311                     last_dt = es2dt(es[-1])
312                     # if older than month_start_dt use it instead to only process newest data
313                     if last_dt>=month_start_dt:
314                         si['proc_start_dt'] = last_dt
315
316                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
317                 raw_files = which_raw(pi, raw_files, raw_dts)
318                 if raw_files:
319                     process(pi, si, raw_files, yyyy_mm)
320                 else:
321                     print ' ... ... NOTE: no new raw files found'
322
323                 # update latest data for SECOORA commons
324                 if 'latest_dir' in si.keys():
325                     # print ' ... ... latest : %s ' % si['latest_dir']
326                     proc2latest(pi, si, yyyy_mm)
327
328                 if 'csv_dir' in si.keys():
329                     proc2csv(pi, si, yyyy_mm)
330     #
331     else:
332         print ' ... ... NOTE: No active platforms'
333
334 def manual(platform, package, yyyy_mm):
335     """Process data for specified platform, sensor package, and month
336
337     Notes
338     -----
339    
340     1. determine which configs
341     2. for each config for specific platform
342            if have package in config
343                which raw files
344     """
345      # determine when month starts and ends
346     months = find_months(yyyy_mm)
347     month_start_dt = months[1]
348     month_end_dt = months[2] - timedelta(seconds=1)
349    
350     configs = find_configs(platform, yyyy_mm, config_dir=defconfigs)
351
352     if configs:
353         # for each configuration
354         for index in range(len(configs)):
355             cn = configs[index]
356             print ' ... config file : %s' % cn
357             pi = get_config(cn+'.platform_info')
358             # month start and end dt to pi info
359             asi = get_config(cn+'.sensor_info')
360             if package in pi['packages']:
361                 si = asi[package]
362                 if si['utc_offset']:
363                     print ' ... ... utc_offset : %g (hours)' % si['utc_offset']
364                 si['proc_start_dt'] = month_start_dt
365                 si['proc_end_dt'] = month_end_dt
366                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
367                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
368                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
369                 raw_files = which_raw(pi, raw_files, raw_dts)
370                 # print raw_files
371                 # remove any previous netcdf file (platform_package_yyyy_mm.nc)
372                 if index==0  and os.path.exists(ofn):
373                     os.remove(ofn)
374                 # this added just in case data repeated in data files
375                 if os.path.exists(ofn):
376                     # get last dt from current month file
377                     (es, units) = nc_get_time(ofn)
378                     last_dt = es2dt(es[-1])
379                     # if older than month_start_dt use it instead to only process newest data
380                     if last_dt>=month_start_dt:
381                         si['proc_start_dt'] = last_dt
382
383                 if raw_files:
384                     process(pi, si, raw_files, yyyy_mm)
385                 else:
386                     print ' ... ... NOTE: no raw files found for %s %s for %s' % (package, platform, yyyy_mm)
387                
388             else:
389                 print ' ... ... NOTE: %s not operational on %s for %s' % (package, platform, yyyy_mm)               
390     else:
391         print ' ... ... ... NOTE: %s not operational for %s' % (platform, yyyy_mm)
392    
393 def process(pi, si, raw_files, yyyy_mm):
394     # tailored data processing for different input file formats and control over output
395     (parse, create, update) = import_processors(si['process_module'])
396     for fn in raw_files:
397         # sys.stdout.write('... %s ... ' % fn)
398         # attach file name to sensor info so parser can use it, if needed
399         si['fn'] = fn
400         lines = load_data(fn)
401         if lines:
402             data = parse(pi, si, lines)
403             # determine which index of data is within the specified timeframe (usually the month)
404             n = len(data['dt'])
405             data['in'] = numpy.array([False for i in range(n)])
406
407             for index, val in enumerate(data['dt']):
408                 if val>=si['proc_start_dt'] and val<=si['proc_end_dt']:
409                     data['in'][index] = True
410                    
411             # if any records are in the month then write to netcdf
412             if data['in'].any():
413                 sys.stdout.write(' ... %s ... ' % fn)
414                 sys.stdout.write('%d\n' % len(data['in'].nonzero()[0]))
415                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
416                 # update or create netcdf
417                 if os.path.exists(ofn):
418                     ut = update(pi,si,data)
419                     nc_update(ofn, ut)
420                 else:
421                     ct = create(pi,si,data)
422                     nc_create(ofn, ct)
423         else:
424             # if no lines, file was empty
425             print " ... skipping file %s" % (fn,)
426
427    
428 # globals
429 start_dt = datetime.utcnow()
430 start_dt.replace(microsecond=0)
431
432 if __name__ == "__main__":
433     import optparse
434     raw2proc('auto')
435
436     # for testing
437     # proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07'
438     # raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')
Note: See TracBrowser for help on using the browser.