NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/raw2proc/trunk/raw2proc/raw2proc.py

Revision 448 (checked in by cbc, 13 years ago)

Add new Billy Mitchell configs.

  • Property svn:executable set to
Line 
1 #!/usr/bin/env python
2 # Last modified:  Time-stamp: <2011-02-17 10:57:48 haines>
3 """Process raw data to monthly netCDF data files
4
5 This module processes raw ascii- or binary-data from different NCCOOS
6 sensors (ctd, adcp, waves-adcp, met) based on manual or automated
7 operation.  If automated processing, add raw data (level0) from all
8 active sensors to current month's netcdf data files (level1) with the
9 current configuration setting.  If manual processing, determine which
10 configurations to use for requested platform, sensor, and month.
11
12 :Processing steps:
13   0. raw2proc auto or manual for platform, sensor, month
14   1. list of files to process
15   2. parse data
16   3. create, update netcdf
17
18   to-do
19   3. qc (measured) data
20   4. process derived data (and regrid?)
21   5. qc (measured and derived) data flags
22
23 """
24
25 __version__ = "v0.1"
26 __author__ = "Sara Haines <sara_haines@unc.edu>"
27
28 import sys
29 import os
30 import re
31
32 # for production use:
33 # defconfigs='/home/haines/nccoos/raw2proc'
34 # for testing use:
35 # defconfigs='/home/haines/nccoos/test/r2p'
36
37 # define config file location to run under cron
38 defconfigs='/opt/env/haines/dataproc/raw2proc'
39
40 import numpy
41
42 from procutil import *
43 from ncutil import *
44
45 REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*'
46 NAN_RE_STR = '[Nn][Aa][Nn]'
47
48 def load_data(inFile):
49     lines=None
50     if os.path.exists(inFile):
51         f = open(inFile, 'r')
52         lines = f.readlines()
53         f.close()
54         if len(lines)<=0:
55             print 'Empty file: '+ inFile           
56     else:
57         print 'File does not exist: '+ inFile
58     return lines
59
60 def import_parser(name):
61     mod = __import__('parsers')
62     parser = getattr(mod, name)
63     return parser
64
65 def import_processors(mod_name):
66     mod = __import__(mod_name)
67     parser = getattr(mod, 'parser')
68     creator = getattr(mod, 'creator')
69     updater = getattr(mod, 'updater')
70     return (parser, creator, updater)
71    
72
73 def get_config(name):
74     """Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')"""
75     components = name.split('.')
76     mod = __import__(components[0])
77     for comp in components[1:]:
78         attr = getattr(mod, comp)
79     return attr
80
81 def find_configs(platform, yyyy_mm, config_dir=''):
82     """Find which configuration files for specified platform and month
83
84     :Parameters:
85        platform : string
86            Platfrom id to process (e.g. 'bogue')
87        yyyy_mm : string
88            Year and month of data to process (e.g. '2007_07')
89
90     :Returns:
91        cns : list of str
92            List of configurations that overlap with desired month
93            If empty [], no configs were found
94     """
95     import glob
96     # list of config files based on platform
97     configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py'))
98     configs.sort()
99     now_dt = datetime.utcnow()
100     now_dt.replace(microsecond=0)
101     # determine when month starts and ends
102     (prev_month, this_month, next_month) = find_months(yyyy_mm)
103     month_start_dt = this_month
104     month_end_dt = next_month - timedelta(seconds=1)
105     # print month_start_dt; print month_end_dt
106     #
107     cns = []
108     for config in configs:
109         # datetime from filename
110         cn = os.path.splitext(os.path.basename(config))[0]
111         cndt = filt_datetime(os.path.basename(config))
112         pi = get_config(cn+'.platform_info')
113         if pi['config_start_date']:
114             config_start_dt = filt_datetime(pi['config_start_date'])
115         elif pi['config_start_date'] == None:
116             config_start_dt = now_dt
117         if pi['config_end_date']:
118             config_end_dt = filt_datetime(pi['config_end_date'])
119         elif pi['config_end_date'] == None:
120             config_end_dt = now_dt
121         #
122         if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \
123                (config_end_dt >= month_start_dt or config_end_dt >= month_end_dt):
124             cns.append(cn)
125     return cns
126
127
128 def find_active_configs(config_dir=''):
129     """Find which configuration files are active
130
131     :Returns:
132        cns : list of str
133            List of configurations that overlap with desired month
134            If empty [], no configs were found
135     """
136     import glob
137     # list of all config files
138     configs = glob.glob(os.path.join(config_dir, '*_config_*.py'))
139     now_dt = datetime.utcnow()
140     now_dt.replace(microsecond=0)
141     #
142     cns = []
143     for config in configs:
144         # datetime from filename
145         cn = os.path.splitext(os.path.basename(config))[0]
146         cndt = filt_datetime(os.path.basename(config))
147         pi = get_config(cn+'.platform_info')
148         if pi['config_end_date'] == None:
149             cns.append(cn)
150     return cns
151
152
153 def find_raw(si, yyyy_mm):
154     """Determine which list of raw files to process for month """
155     import glob
156
157     months = find_months(yyyy_mm)
158     # list all the raw files in prev-month, this-month, and next-month
159     all_raw_files = []
160     m = re.search('\d{4}_\d{2}$', si['raw_dir'])
161     if m:
162         # look for raw_file_glob in specific directory ending in YYYY_MM
163         # but look no further. 
164         gs = os.path.join(si['raw_dir'], si['raw_file_glob'])
165         all_raw_files.extend(glob.glob(gs))
166     else:
167         # no YYYY_MM at end of raw_dir then look for files
168         # in prev-month, this-month, and next-month
169         for mon in months:
170             mstr = mon.strftime('%Y_%m')
171             gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob'])
172             all_raw_files.extend(glob.glob(gs))
173            
174     all_raw_files.sort()
175        
176     #
177     dt_start = si['proc_start_dt']-timedelta(days=1)
178     dt_end = si['proc_end_dt']+timedelta(days=1)
179     raw_files = []; raw_dts = []
180     # compute datetime for each file
181     for fn in all_raw_files:
182         (fndt, granularity) = filt_datetime(os.path.basename(fn), gran=True)
183
184         # "ind" var from filt_datetime() - what level of granularity was used
185         if granularity == 4:
186             # change dt_start to before monthly filename filt_datetime() date
187             dt_start = si['proc_start_dt']-timedelta(days=31)
188             # print dt_start
189
190         if fndt:
191             if dt_start <= fndt <= dt_end or m:
192                 raw_files.append(fn)
193                 raw_dts.append(fndt)
194     return (raw_files, raw_dts)
195
196 def which_raw(pi, raw_files, dts):
197     """Further limit file names based on configuration file timeframe """
198
199     now_dt = datetime.utcnow()
200     now_dt.replace(microsecond=0)
201     if pi['config_start_date']:
202         config_start_dt = filt_datetime(pi['config_start_date'])
203     elif pi['config_start_date'] == None:
204         config_start_dt = now_dt
205
206     if pi['config_end_date']:
207         config_end_dt = filt_datetime(pi['config_end_date'])
208     elif pi['config_end_date'] == None:
209         config_end_dt = now_dt
210    
211     new_list = [raw_files[i] for i in range(len(raw_files)) \
212                      if config_start_dt <= dts[i] <= config_end_dt]
213
214     if not new_list:
215         new_list = [raw_files[i] for i in range(len(raw_files)) \
216                     if dts[i] <= config_end_dt]
217        
218     return new_list
219        
220
221 def raw2proc(proctype, platform=None, package=None, yyyy_mm=None):
222     """
223     Process data either in auto-mode or manual-mode
224
225     If auto-mode, process newest data for all platforms, all
226     sensors. Otherwise in manual-mode, process data for specified
227     platform, sensor package, and month.
228
229     :Parameters:
230        proctype : string
231            'auto' or 'manual'
232
233        platform : string
234            Platfrom id to process (e.g. 'bogue')
235        package : string
236            Sensor package id to process (e.g. 'adcp')
237        yyyy_mm : string
238            Year and month of data to process (e.g. '2007_07')
239
240     Examples
241     --------
242     >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06')
243     >>> raw2proc('manual', 'bogue', 'adcp', '2007_06')
244          
245     """
246     print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC")
247
248     if proctype == 'auto':
249         print 'Processing in auto-mode, all platforms, all packages, latest data'
250         auto()
251     elif proctype == 'manual':
252         if platform and package and yyyy_mm:
253             print 'Processing in manually ...'
254             print ' ...  platform id : %s' % platform
255             print ' ... package name : %s' % package
256             print ' ...        month : %s' % yyyy_mm
257             print ' ...  starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
258             manual(platform, package, yyyy_mm)
259         else:
260             print 'raw2proc: Manual operation requires platform, package, and month'
261             print "   >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')"
262     else:
263         print 'raw2proc: requires either auto or manual operation'
264
265
266 def auto():
267     """Process all platforms, all packages, latest data
268
269     Notes
270     -----
271    
272     1. determine which platforms (all platforms with currently active
273        config files i.e. config_end_date is None
274     2. for each platform
275          get latest config
276          for each package
277            (determine process for 'latest' data) copy to new area when grabbed
278            parse recent data
279            yyyy_mm is the current month
280            load this months netcdf, if new month, create this months netcdf
281            update modified date and append new data in netcdf
282            
283     """
284     yyyy_mm = this_month()
285     months = find_months(yyyy_mm)
286     month_start_dt = months[1]
287     month_end_dt = months[2] - timedelta(seconds=1)
288
289     configs = find_active_configs(config_dir=defconfigs)
290     if configs:
291         # for each configuration
292         for cn in configs:
293             print ' ... config file : %s' % cn
294             pi = get_config(cn+'.platform_info')
295             asi = get_config(cn+'.sensor_info')
296             platform = pi['id']
297             # for each sensor package
298             for package in asi.keys():
299                 print ' ... package name : %s' % package
300                 si = asi[package]
301                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
302                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
303                 si['proc_start_dt'] = month_start_dt
304                 si['proc_end_dt'] = month_end_dt
305                 if os.path.exists(ofn):
306                     # get last dt from current month file
307                     (es, units) = nc_get_time(ofn)
308                     last_dt = es2dt(es[-1])
309                     # if older than month_start_dt use it instead to only process newest data
310                     if last_dt>=month_start_dt:
311                         si['proc_start_dt'] = last_dt
312
313                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
314                 raw_files = which_raw(pi, raw_files, raw_dts)
315                 if raw_files:
316                     process(pi, si, raw_files, yyyy_mm)
317                 else:
318                     print ' ... ... NOTE: no new raw files found'
319
320                 # update latest data for SECOORA commons
321                 if 'latest_dir' in si.keys():
322                     # print ' ... ... latest : %s ' % si['latest_dir']
323                     proc2latest(pi, si, yyyy_mm)
324
325                 if 'csv_dir' in si.keys():
326                     proc2csv(pi, si, yyyy_mm)
327     #
328     else:
329         print ' ... ... NOTE: No active platforms'
330
331 def manual(platform, package, yyyy_mm):
332     """Process data for specified platform, sensor package, and month
333
334     Notes
335     -----
336    
337     1. determine which configs
338     2. for each config for specific platform
339            if have package in config
340                which raw files
341     """
342      # determine when month starts and ends
343     months = find_months(yyyy_mm)
344     month_start_dt = months[1]
345     month_end_dt = months[2] - timedelta(seconds=1)
346    
347     configs = find_configs(platform, yyyy_mm, config_dir=defconfigs)
348
349     if configs:
350         # for each configuration
351         for index in range(len(configs)):
352             cn = configs[index]
353             print ' ... config file : %s' % cn
354             pi = get_config(cn+'.platform_info')
355             # month start and end dt to pi info
356             asi = get_config(cn+'.sensor_info')
357             if package in pi['packages']:
358                 si = asi[package]
359                 if si['utc_offset']:
360                     print ' ... ... utc_offset : %g (hours)' % si['utc_offset']
361                 si['proc_start_dt'] = month_start_dt
362                 si['proc_end_dt'] = month_end_dt
363                 si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm)
364                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
365                 (raw_files, raw_dts) = find_raw(si, yyyy_mm)
366                 raw_files = which_raw(pi, raw_files, raw_dts)
367                 # print raw_files
368                 # remove any previous netcdf file (platform_package_yyyy_mm.nc)
369                 if index==0  and os.path.exists(ofn):
370                     os.remove(ofn)
371                 # this added just in case data repeated in data files
372                 if os.path.exists(ofn):
373                     # get last dt from current month file
374                     (es, units) = nc_get_time(ofn)
375                     last_dt = es2dt(es[-1])
376                     # if older than month_start_dt use it instead to only process newest data
377                     if last_dt>=month_start_dt:
378                         si['proc_start_dt'] = last_dt
379
380                 if raw_files:
381                     process(pi, si, raw_files, yyyy_mm)
382                 else:
383                     print ' ... ... NOTE: no raw files found for %s %s for %s' % (package, platform, yyyy_mm)
384                
385             else:
386                 print ' ... ... NOTE: %s not operational on %s for %s' % (package, platform, yyyy_mm)               
387     else:
388         print ' ... ... ... NOTE: %s not operational for %s' % (platform, yyyy_mm)
389    
390 def process(pi, si, raw_files, yyyy_mm):
391     # tailored data processing for different input file formats and control over output
392     (parse, create, update) = import_processors(si['process_module'])
393     for fn in raw_files:
394         # sys.stdout.write('... %s ... ' % fn)
395         # attach file name to sensor info so parser can use it, if needed
396         si['fn'] = fn
397         lines = load_data(fn)
398         if lines:
399             data = parse(pi, si, lines)
400             # determine which index of data is within the specified timeframe (usually the month)
401             n = len(data['dt'])
402             data['in'] = numpy.array([False for i in range(n)])
403
404             for index, val in enumerate(data['dt']):
405                 if val>=si['proc_start_dt'] and val<=si['proc_end_dt']:
406                     data['in'][index] = True
407                    
408             # if any records are in the month then write to netcdf
409             if data['in'].any():
410                 sys.stdout.write(' ... %s ... ' % fn)
411                 sys.stdout.write('%d\n' % len(data['in'].nonzero()[0]))
412                 ofn = os.path.join(si['proc_dir'], si['proc_filename'])
413                 # update or create netcdf
414                 if os.path.exists(ofn):
415                     ut = update(pi,si,data)
416                     nc_update(ofn, ut)
417                 else:
418                     ct = create(pi,si,data)
419                     nc_create(ofn, ct)
420         else:
421             # if no lines, file was empty
422             print " ... skipping file %s" % (fn,)
423
424    
425 # globals
426 start_dt = datetime.utcnow()
427 start_dt.replace(microsecond=0)
428
429 if __name__ == "__main__":
430     import optparse
431     raw2proc('auto')
432
433     # for testing
434     # proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07'
435     # raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')
Note: See TracBrowser for help on using the browser.