NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/raw2proc/trunk/raw2proc/raw2proc.py

Revision 219 (checked in by haines, 15 years ago)

AVP fixed profiler CDL2; hampton processing added YSI 6600 V1

  • Property svn:executable set to
Line 
1 #!/usr/bin/env python
2 # Last modified:  Time-stamp: <2009-01-08 09:12:00 haines>
3 """Process raw data to monthly netCDF data files
4
5 This module processes raw ascii- or binary-data from different NCCOOS
6 sensors (ctd, adcp, waves-adcp, met) based on manual or automated
7 operation.  If automated processing, add raw data (level0) from all
8 active sensors to current month's netcdf data files (level1) with the
9 current configuration setting.  If manual processing, determine which
10 configurations to use for requested platform, sensor, and month.
11
12 :Processing steps:
13   0. raw2proc auto or manual for platform, sensor, month
14   1. list of files to process
15   2. parse data
16   3. create, update netcdf
17
18   to-do
19   3. qc (measured) data
20   4. process derived data (and regrid?)
21   5. qc (measured and derived) data flags
22
23 """
24
25 __version__ = "v0.1"
26 __author__ = "Sara Haines <sara_haines@unc.edu>"
27
28 import sys
29 import os
30 import re
31
32 # for production use:
33 # defconfigs='/home/haines/nccoos/raw2proc'
34 # for testing use:
35 # defconfigs='/home/haines/nccoos/test/r2p'
36
37 # define config file location to run under cron
38 defconfigs='/home/haines/nccoos/raw2proc'
39
40 import numpy
41
42 from procutil import *
43 from ncutil import *
44
45 REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*'
46
47 def load_data(inFile):
48     lines=None
49     if os.path.exists(inFile):
50         f = open(inFile, 'r')
51         lines = f.readlines()
52         f.close()
53         if len(lines)<=0:
54             print 'Empty file: '+ inFile           
55     else:
56         print 'File does not exist: '+ inFile
57     return lines
58
59 def import_parser(name):
60     mod = __import__('parsers')
61     parser = getattr(mod, name)
62     return parser
63
64 def import_processors(mod_name):
65     mod = __import__(mod_name)
66     parser = getattr(mod, 'parser')
67     creator = getattr(mod, 'creator')
68     updater = getattr(mod, 'updater')
69     return (parser, creator, updater)
70    
71
72 def get_config(name):
73     """Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')"""
74     components = name.split('.')
75     mod = __import__(components[0])
76     for comp in components[1:]:
77         attr = getattr(mod, comp)
78     return attr
79
80 def find_configs(platform, yyyy_mm, config_dir=''):
81     """Find which configuration files for specified platform and month
82
83     :Parameters:
84        platform : string
85            Platfrom id to process (e.g. 'bogue')
86        yyyy_mm : string
87            Year and month of data to process (e.g. '2007_07')
88
89     :Returns:
90        cns : list of str
91            List of configurations that overlap with desired month
92            If empty [], no configs were found
93     """
94     import glob
95     # list of config files based on platform
96     configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py'))
97     configs.sort()
98     now_dt = datetime.utcnow()
99     now_dt.replace(microsecond=0)
100     # determine when month starts and ends
101     (prev_month, this_month, next_month) = find_months(yyyy_mm)
102     month_start_dt = this_month
103     month_end_dt = next_month - timedelta(seconds=1)
104     # print month_start_dt; print month_end_dt
105     #
106     cns = []
107     for config in configs:
108         # datetime from filename
109         cn = os.path.splitext(os.path.basename(config))[0]
110         cndt = filt_datetime(os.path.basename(config))[0]
111         pi = get_config(cn+'.platform_info')
112         if pi['config_start_date']:
113             config_start_dt = filt_datetime(pi['config_start_date'])[0]
114         elif pi['config_start_date'] == None:
115             config_start_dt = now_dt
116         if pi['config_end_date']:
117             config_end_dt = filt_datetime(pi['config_end_date'])[0]
118         elif pi['config_end_date'] == None:
119             config_end_dt = now_dt
120         #
121         if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \
122                (config_end_dt >= month_start_dt or config_end_dt >= month_end_dt):
123             cns.append(cn)
124     return cns
125
126
127 def find_active_configs(config_dir=''):
128     """Find which configuration files are active
129
130     :Returns:
131        cns : list of str
132            List of configurations that overlap with desired month
133            If empty [], no configs were found
134     """
135     import glob
136     # list of all config files
137     configs = glob.glob(os.path.join(config_dir, '*_config_*.py'))
138     now_dt = datetime.utcnow()
139     now_dt.replace(microsecond=0)
140     #
141     cns = []
142     for config in configs:
143         # datetime from filename
144         cn = os.path.splitext(os.path.basename(config))[0]
145         cndt = filt_datetime(os.path.basename(config))[0]
146         pi = get_config(cn+'.platform_info')
147         if pi['config_end_date'] == None:
148             cns.append(cn)
149     return cns
150
151
152 def find_raw(si, yyyy_mm):
153     """Determine which list of raw files to process for month """
154     import glob
155
156     months = find_months(yyyy_mm)
157     # list all the raw files in prev-month, this-month, and next-month
158     all_raw_files = []
159     m = re.search('\d{4}_\d{2}$', si['raw_dir'])
160     if m:
161         # look for raw_file_glob in specific directory ending in YYYY_MM
162         # but look no further. 
163         gs = os.path.join(si['raw_dir'], si['raw_file_glob'])
164         all_raw_files.extend(glob.glob(gs))
165     else:
166         # no YYYY_MM at end of raw_dir then look for files
167         # in prev-month, this-month, and next-month
168         for mon in months:
169             mstr = mon.strftime('%Y_%m')
170             gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob'])
171             all_raw_files.extend(glob.glob(gs))
172            
173     all_raw_files.sort()
174        
175     #
176     dt_start = si['proc_start_dt']-timedelta(days=1)
177     dt_end = si['proc_end_dt']+timedelta(days=1)
178     raw_files = []; raw_dts = []
179     # compute datetime for each file
180     for fn in all_raw_files:
181         # JC changes
182         fndt_tuple = filt_datetime(os.path.basename(fn))
183         fndt = fndt_tuple[0]
184
185         # "ind" var from filt_datetime() - what level of granularity was used
186         granularity = fndt_tuple[1]
187         if granularity == 4:
188             # change dt_start to before monthly filename filt_datetime() date
189             dt_start = si['proc_start_dt']-timedelta(days=31)
190             # print dt_start
191         # end JC changes
192         if fndt:
193             if dt_start <= fndt <= dt_end or m:
194                 raw_files.append(fn)
195                 raw_dts.append(fndt)
196     return (raw_files, raw_dts)
197
198 def which_raw(pi, raw_files, dts):
199     """Further limit file names based on configuration file timeframe """
200
201     now_dt = datetime.utcnow()
202     now_dt.replace(microsecond=0)
203     if pi['config_start_date']:
204         config_start_dt = filt_datetime(pi['config_start_date'])[0]
205     elif pi['config_start_date'] == None:
206         config_start_dt = now_dt
207
208     if pi['config_end_date']:
209         config_end_dt = filt_datetime(pi['config_end_date'])[0]
210     elif pi['config_end_date'] == None:
211         config_end_dt = now_dt
212    
213     new_list = [raw_files[i] for i in range(len(raw_files)) \
214                      if config_start_dt <= dts[i] <= config_end_dt]
215
216     if not new_list:
217         new_list = [raw_files[i] for i in range(len(raw_files)) \
218                     if dts[i] <= config_end_dt]
219        
220     return new_list
221        
222
223 def raw2proc(proctype, platform=None, package=None, yyyy_mm=None):
224     """
225     Process data either in auto-mode or manual-mode
226
227     If auto-mode, process newest data for all platforms, all
228     sensors. Otherwise in manual-mode, process data for specified
229     platform, sensor package, and month.
230
231     :Parameters:
232        proctype : string
233            'auto' or 'manual'
234
235        platform : string
236            Platfrom id to process (e.g. 'bogue')
237        package : string
238            Sensor package id to process (e.g. 'adcp')
239        yyyy_mm : string
240            Year and month of data to process (e.g. '2007_07')
241
242     Examples
243     --------
244     >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06')
245     >>> raw2proc('manual', 'bogue', 'adcp', '2007_06')
246           
247     """
248     print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC")
249
250     if proctype == 'auto':
251         print 'Processing in auto-mode, all platforms, all packages, latest data'
252         auto()
253     elif proctype == 'manual':
254         if platform and package and yyyy_mm:
255             print 'Processing in manually ...'
256             print ' ...  platform id : %s' % platform
257             print ' ... package name : %s' % package
258             print ' ...        month : %s' % yyyy_mm
259             print ' ...  starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
260             manual(platform, package, yyyy_mm)
261         else:
262             print 'raw2proc: Manual operation requires platform, package, and month'
263             print "   >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')"
264     else:
265         print 'raw2proc: requires either auto or manual operation'
266
267
268 def auto():
269     """Process all platforms, all packages, latest data
270
271     Notes
272     -----
273     
274     1. determine which platforms (all platforms with currently active
275        config files i.e. config_end_date is None
276     2. for each platform
277          get latest config
278          for each package
279            (determine process for 'latest' data) copy to new area when grabbed
280            parse recent data
281            yyyy_mm is the current month
282            load this months netcdf, if new month, create this months netcdf
283            update modified date and append new data in netcdf
284           
285     """
286     yyyy_mm = this_month()
287     months = find_months(yyyy_mm)
288     month_start_dt = months[1]
289     month_end_dt = months[2] - timedelta(seconds=1)
290
291     configs = find_active_configs(config_dir=defconfigs)
292     if configs:
293         # for each configuration
294         for cn in configs:
295             print ' ... config file : %s' % cn
296             pi = get_config(cn+'.platform_info')
297             asi = get_config(cn+'.sensor_info')
298             platform = pi['id']
299             # for each sensor package
300             for package in asi.keys():
301                 print ' ... package name : %s' % package
302                 si = asi[package]
303                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
304                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
305                 si['proc_start_dt'] = month_start_dt
306                 si['proc_end_dt'] = month_end_dt
307                 if os.path.exists(ofn):
308                     # get last dt from current month file
309                     (es, units) = nc_get_time(ofn)
310                     last_dt = es2dt(es[-1])
311                     # if older than month_start_dt use it instead to only process newest data
312                     if last_dt>=month_start_dt:
313                         si['proc_start_dt'] = last_dt
314
315                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
316                 raw_files = which_raw(pi, raw_files, raw_dts)
317                 if raw_files:
318                     process(pi, si, raw_files, yyyy_mm)
319                 else:
320                     print ' ... ... NOTE: no raw files found for %s %s for %s' % (package, platform, yyyy_mm)
321
322                 # update latest data for SECOORA commons
323                 if 'latest_dir' in si.keys():
324                     # print ' ... ... latest : %s ' % si['latest_dir']
325                     proc2latest(pi, si, yyyy_mm)
326     #
327     else:
328         print ' ... ... NOTE: No active platforms'
329
330 def manual(platform, package, yyyy_mm):
331     """Process data for specified platform, sensor package, and month
332
333     Notes
334     -----
335     
336     1. determine which configs
337     2. for each config for specific platform
338            if have package in config
339                which raw files
340     """
341      # determine when month starts and ends
342     months = find_months(yyyy_mm)
343     month_start_dt = months[1]
344     month_end_dt = months[2] - timedelta(seconds=1)
345    
346     configs = find_configs(platform, yyyy_mm, config_dir=defconfigs)
347
348     if configs:
349         # for each configuration
350         for index in range(len(configs)):
351             cn = configs[index]
352             print ' ... config file : %s' % cn
353             pi = get_config(cn+'.platform_info')
354             # month start and end dt to pi info
355             asi = get_config(cn+'.sensor_info')
356             if package in pi['packages']:
357                 si = asi[package]
358                 if si['utc_offset']:
359                     print ' ... ... utc_offset : %g (hours)' % si['utc_offset']
360                 si['proc_start_dt'] = month_start_dt
361                 si['proc_end_dt'] = month_end_dt
362                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
363                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
364                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
365                 raw_files = which_raw(pi, raw_files, raw_dts)
366                 # print raw_files
367                 # remove any previous netcdf file (platform_package_yyyy_mm.nc)
368                 if index==0  and os.path.exists(ofn):
369                     os.remove(ofn)
370                 # this added just in case data repeated in data files
371                 if os.path.exists(ofn):
372                     # get last dt from current month file
373                     (es, units) = nc_get_time(ofn)
374                     last_dt = es2dt(es[-1])
375                     # if older than month_start_dt use it instead to only process newest data
376                     if last_dt>=month_start_dt:
377                         si['proc_start_dt'] = last_dt
378
379                 if raw_files:
380                     process(pi, si, raw_files, yyyy_mm)
381                 else:
382                     print ' ... ... \nNOTE: no raw files found for %s %s for %s\n' % (package, platform, yyyy_mm)
383                
384             else:
385                 print ' ... ... \nNOTE: %s not operational on %s for %s\n' % (package, platform, yyyy_mm)               
386     else:
387         print ' ... ... ... \nNOTE: %s not operational for %s\n' % (platform, yyyy_mm)
388    
389 def process(pi, si, raw_files, yyyy_mm):
390     # tailored data processing for different input file formats and control over output
391     (parse, create, update) = import_processors(si['process_module'])
392     for fn in raw_files:
393         # sys.stdout.write('... %s ... ' % fn)
394         # attach file name to sensor info so parser can use it, if needed
395         si['fn'] = fn
396         lines = load_data(fn)
397         if lines:
398             data = parse(pi, si, lines)
399             # determine which index of data is within the specified timeframe (usually the month)
400             n = len(data['dt'])
401             data['in'] = numpy.array([False for i in range(n)])
402
403             for index, val in enumerate(data['dt']):
404                 if val>=si['proc_start_dt'] and val<=si['proc_end_dt']:
405                     data['in'][index] = True
406                    
407             # if any records are in the month then write to netcdf
408             if data['in'].any():
409                 sys.stdout.write('... %s ... ' % fn)
410                 sys.stdout.write('%d\n' % len(data['in'].nonzero()[0]))
411                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
412                 # update or create netcdf
413                 if os.path.exists(ofn):
414                     ut = update(pi,si,data)
415                     nc_update(ofn, ut)
416                 else:
417                     ct = create(pi,si,data)
418                     nc_create(ofn, ct)
419         else:
420             # if no lines, file was empty
421             print " ... skipping file %s" % (fn,)
422
423    
424 # globals
425 start_dt = datetime.utcnow()
426 start_dt.replace(microsecond=0)
427
428 if __name__ == "__main__":
429     import optparse
430     raw2proc('auto')
431
432     # for testing
433     # proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07'
434     # raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')
435
Note: See TracBrowser for help on using the browser.