#!/usr/bin/env python # Last modified: Time-stamp: <2008-05-07 13:03:19 haines> """Process raw data to monthly netCDF data files This module processes raw ascii- or binary-data from different NCCOOS sensors (ctd, adcp, waves-adcp, met) based on manual or automated operation. If automated processing, add raw data (level0) from all active sensors to current month's netcdf data files (level1) with the current configuration setting. If manual processing, determine which configurations to use for requested platform, sensor, and month. :Processing steps: 0. raw2proc auto or manual for platform, sensor, month 1. list of files to process 2. parse data 3. create, update netcdf to-do 3. qc (measured) data 4. process derived data (and regrid?) 5. qc (measured and derived) data flags """ __version__ = "v0.1" __author__ = "Sara Haines " import sys import os import re # define config file location to run under cron defconfigs='/home/haines/nccoos/test/r2p' import numpy from procutil import * from ncutil import * REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*' def load_data(inFile): lines=None if os.path.exists(inFile): f = open(inFile, 'r') lines = f.readlines() f.close() if len(lines)<=0: print 'Empty file: '+ inFile else: print 'File does not exist: '+ inFile return lines def import_parser(name): mod = __import__('parsers') parser = getattr(mod, name) return parser def import_processors(mod_name): mod = __import__(mod_name) parser = getattr(mod, 'parser') creator = getattr(mod, 'creator') updater = getattr(mod, 'updater') return (parser, creator, updater) def get_config(name): """Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')""" components = name.split('.') mod = __import__(components[0]) for comp in components[1:]: attr = getattr(mod, comp) return attr def find_configs(platform, yyyy_mm, config_dir=''): """Find which configuration files for specified platform and month :Parameters: platform : string Platfrom id to process (e.g. 'bogue') yyyy_mm : string Year and month of data to process (e.g. '2007_07') :Returns: cns : list of str List of configurations that overlap with desired month If empty [], no configs were found """ import glob # list of config files based on platform configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py')) now_dt = datetime.utcnow() now_dt.replace(microsecond=0) # determine when month starts and ends (prev_month, this_month, next_month) = find_months(yyyy_mm) month_start_dt = this_month month_end_dt = next_month - timedelta(seconds=1) # print month_start_dt; print month_end_dt # cns = [] for config in configs: # datetime from filename cn = os.path.splitext(os.path.basename(config))[0] cndt = filt_datetime(os.path.basename(config))[0] pi = get_config(cn+'.platform_info') if pi['config_start_date']: config_start_dt = filt_datetime(pi['config_start_date'])[0] elif pi['config_start_date'] == None: config_start_dt = now_dt if pi['config_end_date']: config_end_dt = filt_datetime(pi['config_end_date'])[0] elif pi['config_end_date'] == None: config_end_dt = now_dt # if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \ (config_end_dt >= month_start_dt or config_end_dt >= month_end_dt): cns.append(cn) return cns def find_active_configs(config_dir=''): """Find which configuration files are active :Returns: cns : list of str List of configurations that overlap with desired month If empty [], no configs were found """ import glob # list of all config files configs = glob.glob(os.path.join(config_dir, '*_config_*.py')) now_dt = datetime.utcnow() now_dt.replace(microsecond=0) # cns = [] for config in configs: # datetime from filename cn = os.path.splitext(os.path.basename(config))[0] cndt = filt_datetime(os.path.basename(config))[0] pi = get_config(cn+'.platform_info') if pi['config_end_date'] == None: cns.append(cn) return cns def find_raw(si, yyyy_mm): """Determine which list of raw files to process for month """ import glob # determine when month starts and ends # months = find_months(yyyy_mm) # list all the raw files in prev-month, this-month, and next-month all_raw_files = [] for mon in months: mstr = mon.strftime('%Y_%m') gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob']) all_raw_files.extend(glob.glob(gs)) all_raw_files.sort() # ****** ((SMH) NOTE: Will need to override looking in specific # subdirs of months if all data is contained in one file for long # deployment, such as with adcp binary data. # dt_start = si['proc_start_dt']-timedelta(days=1) dt_end = si['proc_end_dt']+timedelta(days=1) raw_files = []; raw_dts = [] # compute datetime for each file for fn in all_raw_files: # JC changes fndt_tuple = filt_datetime(os.path.basename(fn)) fndt = fndt_tuple[0] # "ind" var from filt_datetime() - what level of granularity was used granularity = fndt_tuple[1] if granularity == 4: # change dt_start to before monthly filename filt_datetime() date dt_start = si['proc_start_dt']-timedelta(days=31) print dt_start # end JC changes if fndt: if dt_start <= fndt <= dt_end: raw_files.append(fn) raw_dts.append(fndt) return (raw_files, raw_dts) def which_raw(pi, raw_files, dts): """Further limit file names based on configuration file timeframe """ now_dt = datetime.utcnow() now_dt.replace(microsecond=0) if pi['config_start_date']: config_start_dt = filt_datetime(pi['config_start_date'])[0] elif pi['config_start_date'] == None: config_start_dt = now_dt if pi['config_end_date']: config_end_dt = filt_datetime(pi['config_end_date'])[0] elif pi['config_end_date'] == None: config_end_dt = now_dt new_list = [raw_files[i] for i in range(len(raw_files)) \ if config_start_dt <= dts[i] <= config_end_dt] return new_list def raw2proc(proctype, platform=None, package=None, yyyy_mm=None): """ Process data either in auto-mode or manual-mode If auto-mode, process newest data for all platforms, all sensors. Otherwise in manual-mode, process data for specified platform, sensor package, and month. :Parameters: proctype : string 'auto' or 'manual' platform : string Platfrom id to process (e.g. 'bogue') package : string Sensor package id to process (e.g. 'adcp') yyyy_mm : string Year and month of data to process (e.g. '2007_07') Examples -------- >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06') >>> raw2proc('manual', 'bogue', 'adcp', '2007_06') """ print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC") if proctype == 'auto': print 'Processing in auto-mode, all platforms, all packages, latest data' auto() elif proctype == 'manual': if platform and package and yyyy_mm: print 'Processing in manually ...' print ' ... platform id : %s' % platform print ' ... package name : %s' % package print ' ... month : %s' % yyyy_mm print ' ... starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC") manual(platform, package, yyyy_mm) else: print 'raw2proc: Manual operation requires platform, package, and month' print " >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')" else: print 'raw2proc: requires either auto or manual operation' def auto(): """Process all platforms, all packages, latest data Notes ----- 1. determine which platforms (all platforms with currently active config files i.e. config_end_date is None 2. for each platform get latest config for each package (determine process for 'latest' data) copy to new area when grabbed parse recent data yyyy_mm is the current month load this months netcdf, if new month, create this months netcdf update modified date and append new data in netcdf """ yyyy_mm = this_month() months = find_months(yyyy_mm) month_start_dt = months[1] month_end_dt = months[2] - timedelta(seconds=1) configs = find_active_configs(config_dir=defconfigs) if configs: # for each configuration for cn in configs: print ' ... config file : %s' % cn pi = get_config(cn+'.platform_info') asi = get_config(cn+'.sensor_info') platform = pi['id'] # for each sensor package for package in asi.keys(): print ' ... package name : %s' % package si = asi[package] si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm) ofn = os.path.join(si['proc_dir'], si['proc_filename']) si['proc_start_dt'] = month_start_dt si['proc_end_dt'] = month_end_dt if os.path.exists(ofn): # get last dt from current month file (es, units) = nc_get_time(ofn) last_dt = es2dt(es[-1]) # if older than month_start_dt use it instead to only process newest data if last_dt>=month_start_dt: si['proc_start_dt'] = last_dt (raw_files, raw_dts) = find_raw(si, yyyy_mm) raw_files = which_raw(pi, raw_files, raw_dts) process(pi, si, raw_files, yyyy_mm) # else: print ' ... ... ... \nNOTE: No active platforms\n' def manual(platform, package, yyyy_mm): """Process data for specified platform, sensor package, and month Notes ----- 1. determine which configs 2. for each config for specific platform if have package in config which raw files """ # determine when month starts and ends months = find_months(yyyy_mm) month_start_dt = months[1] month_end_dt = months[2] - timedelta(seconds=1) configs = find_configs(platform, yyyy_mm, config_dir=defconfigs) if configs: # for each configuration for index in range(len(configs)): cn = configs[index] print ' ... config file : %s' % cn pi = get_config(cn+'.platform_info') # month start and end dt to pi info asi = get_config(cn+'.sensor_info') if package in pi['packages']: si = asi[package] if si['utc_offset']: print ' ... ... utc_offset : %g (hours)' % si['utc_offset'] si['proc_start_dt'] = month_start_dt si['proc_end_dt'] = month_end_dt si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm) ofn = os.path.join(si['proc_dir'], si['proc_filename']) (raw_files, raw_dts) = find_raw(si, yyyy_mm) raw_files = which_raw(pi, raw_files, raw_dts) # remove any previous netcdf file (platform_package_yyyy_mm.nc) if index==0 and os.path.exists(ofn): os.remove(ofn) # process(pi, si, raw_files, yyyy_mm) else: print ' ... ... \nNOTE: %s not operational on %s for %s\n' % (package, platform, yyyy_mm) else: print ' ... ... ... \nNOTE: %s not operational for %s\n' % (platform, yyyy_mm) def process(pi, si, raw_files, yyyy_mm): # tailored data processing for different input file formats and control over output (parse, create, update) = import_processors(si['process_module']) for fn in raw_files: # sys.stdout.write('... %s ... ' % fn) # attach file name to sensor info so parser can use it, if needed si['fn'] = fn lines = load_data(fn) if lines: data = parse(pi, si, lines) # determine which index of data is within the specified timeframe (usually the month) n = len(data['dt']) data['in'] = numpy.array([False for i in range(n)]) for index, val in enumerate(data['dt']): if val>si['proc_start_dt'] and val<=si['proc_end_dt']: data['in'][index] = True # if any records are in the month then write to netcdf if data['in'].any(): sys.stdout.write('... %s ... ' % fn) sys.stdout.write('%d\n' % len(data['in'])) ofn = os.path.join(si['proc_dir'], si['proc_filename']) # update or create netcdf if os.path.exists(ofn): ut = update(pi,si,data) nc_update(ofn, ut) else: ct = create(pi,si,data) nc_create(ofn, ct) else: # if no lines, file was empty print " ... skipping file %s" % (fn,) # globals start_dt = datetime.utcnow() start_dt.replace(microsecond=0) if __name__ == "__main__": import optparse raw2proc('auto') # for testing # proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07' # raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')