1 |
#!/usr/bin/env python |
---|
2 |
# Last modified: Time-stamp: <2011-02-17 10:57:48 haines> |
---|
3 |
"""Process raw data to monthly netCDF data files |
---|
4 |
|
---|
5 |
This module processes raw ascii- or binary-data from different NCCOOS |
---|
6 |
sensors (ctd, adcp, waves-adcp, met) based on manual or automated |
---|
7 |
operation. If automated processing, add raw data (level0) from all |
---|
8 |
active sensors to current month's netcdf data files (level1) with the |
---|
9 |
current configuration setting. If manual processing, determine which |
---|
10 |
configurations to use for requested platform, sensor, and month. |
---|
11 |
|
---|
12 |
:Processing steps: |
---|
13 |
0. raw2proc auto or manual for platform, sensor, month |
---|
14 |
1. list of files to process |
---|
15 |
2. parse data |
---|
16 |
3. create, update netcdf |
---|
17 |
|
---|
18 |
to-do |
---|
19 |
3. qc (measured) data |
---|
20 |
4. process derived data (and regrid?) |
---|
21 |
5. qc (measured and derived) data flags |
---|
22 |
|
---|
23 |
""" |
---|
24 |
|
---|
25 |
__version__ = "v0.1" |
---|
26 |
__author__ = "Sara Haines <sara_haines@unc.edu>" |
---|
27 |
|
---|
28 |
import sys |
---|
29 |
import os |
---|
30 |
import re |
---|
31 |
|
---|
32 |
# for production use: |
---|
33 |
# defconfigs='/home/haines/nccoos/raw2proc' |
---|
34 |
# for testing use: |
---|
35 |
# defconfigs='/home/haines/nccoos/test/r2p' |
---|
36 |
|
---|
37 |
# define config file location to run under cron |
---|
38 |
defconfigs='/opt/env/haines/dataproc/raw2proc' |
---|
39 |
|
---|
40 |
import numpy |
---|
41 |
|
---|
42 |
from procutil import * |
---|
43 |
from ncutil import * |
---|
44 |
|
---|
45 |
REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*' |
---|
46 |
NAN_RE_STR = '[Nn][Aa][Nn]' |
---|
47 |
|
---|
48 |
def load_data(inFile): |
---|
49 |
lines=None |
---|
50 |
if os.path.exists(inFile): |
---|
51 |
f = open(inFile, 'r') |
---|
52 |
lines = f.readlines() |
---|
53 |
f.close() |
---|
54 |
if len(lines)<=0: |
---|
55 |
print 'Empty file: '+ inFile |
---|
56 |
else: |
---|
57 |
print 'File does not exist: '+ inFile |
---|
58 |
return lines |
---|
59 |
|
---|
60 |
def import_parser(name): |
---|
61 |
mod = __import__('parsers') |
---|
62 |
parser = getattr(mod, name) |
---|
63 |
return parser |
---|
64 |
|
---|
65 |
def import_processors(mod_name): |
---|
66 |
mod = __import__(mod_name) |
---|
67 |
parser = getattr(mod, 'parser') |
---|
68 |
creator = getattr(mod, 'creator') |
---|
69 |
updater = getattr(mod, 'updater') |
---|
70 |
return (parser, creator, updater) |
---|
71 |
|
---|
72 |
|
---|
73 |
def get_config(name): |
---|
74 |
"""Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')""" |
---|
75 |
components = name.split('.') |
---|
76 |
mod = __import__(components[0]) |
---|
77 |
for comp in components[1:]: |
---|
78 |
attr = getattr(mod, comp) |
---|
79 |
return attr |
---|
80 |
|
---|
81 |
def find_configs(platform, yyyy_mm, config_dir=''): |
---|
82 |
"""Find which configuration files for specified platform and month |
---|
83 |
|
---|
84 |
:Parameters: |
---|
85 |
platform : string |
---|
86 |
Platfrom id to process (e.g. 'bogue') |
---|
87 |
yyyy_mm : string |
---|
88 |
Year and month of data to process (e.g. '2007_07') |
---|
89 |
|
---|
90 |
:Returns: |
---|
91 |
cns : list of str |
---|
92 |
List of configurations that overlap with desired month |
---|
93 |
If empty [], no configs were found |
---|
94 |
""" |
---|
95 |
import glob |
---|
96 |
# list of config files based on platform |
---|
97 |
configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py')) |
---|
98 |
configs.sort() |
---|
99 |
now_dt = datetime.utcnow() |
---|
100 |
now_dt.replace(microsecond=0) |
---|
101 |
# determine when month starts and ends |
---|
102 |
(prev_month, this_month, next_month) = find_months(yyyy_mm) |
---|
103 |
month_start_dt = this_month |
---|
104 |
month_end_dt = next_month - timedelta(seconds=1) |
---|
105 |
# print month_start_dt; print month_end_dt |
---|
106 |
# |
---|
107 |
cns = [] |
---|
108 |
for config in configs: |
---|
109 |
# datetime from filename |
---|
110 |
cn = os.path.splitext(os.path.basename(config))[0] |
---|
111 |
cndt = filt_datetime(os.path.basename(config)) |
---|
112 |
pi = get_config(cn+'.platform_info') |
---|
113 |
if pi['config_start_date']: |
---|
114 |
config_start_dt = filt_datetime(pi['config_start_date']) |
---|
115 |
elif pi['config_start_date'] == None: |
---|
116 |
config_start_dt = now_dt |
---|
117 |
if pi['config_end_date']: |
---|
118 |
config_end_dt = filt_datetime(pi['config_end_date']) |
---|
119 |
elif pi['config_end_date'] == None: |
---|
120 |
config_end_dt = now_dt |
---|
121 |
# |
---|
122 |
if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \ |
---|
123 |
(config_end_dt >= month_start_dt or config_end_dt >= month_end_dt): |
---|
124 |
cns.append(cn) |
---|
125 |
return cns |
---|
126 |
|
---|
127 |
|
---|
128 |
def find_active_configs(config_dir=''): |
---|
129 |
"""Find which configuration files are active |
---|
130 |
|
---|
131 |
:Returns: |
---|
132 |
cns : list of str |
---|
133 |
List of configurations that overlap with desired month |
---|
134 |
If empty [], no configs were found |
---|
135 |
""" |
---|
136 |
import glob |
---|
137 |
# list of all config files |
---|
138 |
configs = glob.glob(os.path.join(config_dir, '*_config_*.py')) |
---|
139 |
now_dt = datetime.utcnow() |
---|
140 |
now_dt.replace(microsecond=0) |
---|
141 |
# |
---|
142 |
cns = [] |
---|
143 |
for config in configs: |
---|
144 |
# datetime from filename |
---|
145 |
cn = os.path.splitext(os.path.basename(config))[0] |
---|
146 |
cndt = filt_datetime(os.path.basename(config)) |
---|
147 |
pi = get_config(cn+'.platform_info') |
---|
148 |
if pi['config_end_date'] == None: |
---|
149 |
cns.append(cn) |
---|
150 |
return cns |
---|
151 |
|
---|
152 |
|
---|
153 |
def find_raw(si, yyyy_mm): |
---|
154 |
"""Determine which list of raw files to process for month """ |
---|
155 |
import glob |
---|
156 |
|
---|
157 |
months = find_months(yyyy_mm) |
---|
158 |
# list all the raw files in prev-month, this-month, and next-month |
---|
159 |
all_raw_files = [] |
---|
160 |
m = re.search('\d{4}_\d{2}$', si['raw_dir']) |
---|
161 |
if m: |
---|
162 |
# look for raw_file_glob in specific directory ending in YYYY_MM |
---|
163 |
# but look no further. |
---|
164 |
gs = os.path.join(si['raw_dir'], si['raw_file_glob']) |
---|
165 |
all_raw_files.extend(glob.glob(gs)) |
---|
166 |
else: |
---|
167 |
# no YYYY_MM at end of raw_dir then look for files |
---|
168 |
# in prev-month, this-month, and next-month |
---|
169 |
for mon in months: |
---|
170 |
mstr = mon.strftime('%Y_%m') |
---|
171 |
gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob']) |
---|
172 |
all_raw_files.extend(glob.glob(gs)) |
---|
173 |
|
---|
174 |
all_raw_files.sort() |
---|
175 |
|
---|
176 |
# |
---|
177 |
dt_start = si['proc_start_dt']-timedelta(days=1) |
---|
178 |
dt_end = si['proc_end_dt']+timedelta(days=1) |
---|
179 |
raw_files = []; raw_dts = [] |
---|
180 |
# compute datetime for each file |
---|
181 |
for fn in all_raw_files: |
---|
182 |
(fndt, granularity) = filt_datetime(os.path.basename(fn), gran=True) |
---|
183 |
|
---|
184 |
# "ind" var from filt_datetime() - what level of granularity was used |
---|
185 |
if granularity == 4: |
---|
186 |
# change dt_start to before monthly filename filt_datetime() date |
---|
187 |
dt_start = si['proc_start_dt']-timedelta(days=31) |
---|
188 |
# print dt_start |
---|
189 |
|
---|
190 |
if fndt: |
---|
191 |
if dt_start <= fndt <= dt_end or m: |
---|
192 |
raw_files.append(fn) |
---|
193 |
raw_dts.append(fndt) |
---|
194 |
return (raw_files, raw_dts) |
---|
195 |
|
---|
196 |
def which_raw(pi, raw_files, dts): |
---|
197 |
"""Further limit file names based on configuration file timeframe """ |
---|
198 |
|
---|
199 |
now_dt = datetime.utcnow() |
---|
200 |
now_dt.replace(microsecond=0) |
---|
201 |
if pi['config_start_date']: |
---|
202 |
config_start_dt = filt_datetime(pi['config_start_date']) |
---|
203 |
elif pi['config_start_date'] == None: |
---|
204 |
config_start_dt = now_dt |
---|
205 |
|
---|
206 |
if pi['config_end_date']: |
---|
207 |
config_end_dt = filt_datetime(pi['config_end_date']) |
---|
208 |
elif pi['config_end_date'] == None: |
---|
209 |
config_end_dt = now_dt |
---|
210 |
|
---|
211 |
new_list = [raw_files[i] for i in range(len(raw_files)) \ |
---|
212 |
if config_start_dt <= dts[i] <= config_end_dt] |
---|
213 |
|
---|
214 |
if not new_list: |
---|
215 |
new_list = [raw_files[i] for i in range(len(raw_files)) \ |
---|
216 |
if dts[i] <= config_end_dt] |
---|
217 |
|
---|
218 |
return new_list |
---|
219 |
|
---|
220 |
|
---|
221 |
def raw2proc(proctype, platform=None, package=None, yyyy_mm=None): |
---|
222 |
""" |
---|
223 |
Process data either in auto-mode or manual-mode |
---|
224 |
|
---|
225 |
If auto-mode, process newest data for all platforms, all |
---|
226 |
sensors. Otherwise in manual-mode, process data for specified |
---|
227 |
platform, sensor package, and month. |
---|
228 |
|
---|
229 |
:Parameters: |
---|
230 |
proctype : string |
---|
231 |
'auto' or 'manual' |
---|
232 |
|
---|
233 |
platform : string |
---|
234 |
Platfrom id to process (e.g. 'bogue') |
---|
235 |
package : string |
---|
236 |
Sensor package id to process (e.g. 'adcp') |
---|
237 |
yyyy_mm : string |
---|
238 |
Year and month of data to process (e.g. '2007_07') |
---|
239 |
|
---|
240 |
Examples |
---|
241 |
-------- |
---|
242 |
>>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06') |
---|
243 |
>>> raw2proc('manual', 'bogue', 'adcp', '2007_06') |
---|
244 |
|
---|
245 |
""" |
---|
246 |
print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC") |
---|
247 |
|
---|
248 |
if proctype == 'auto': |
---|
249 |
print 'Processing in auto-mode, all platforms, all packages, latest data' |
---|
250 |
auto() |
---|
251 |
elif proctype == 'manual': |
---|
252 |
if platform and package and yyyy_mm: |
---|
253 |
print 'Processing in manually ...' |
---|
254 |
print ' ... platform id : %s' % platform |
---|
255 |
print ' ... package name : %s' % package |
---|
256 |
print ' ... month : %s' % yyyy_mm |
---|
257 |
print ' ... starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC") |
---|
258 |
manual(platform, package, yyyy_mm) |
---|
259 |
else: |
---|
260 |
print 'raw2proc: Manual operation requires platform, package, and month' |
---|
261 |
print " >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')" |
---|
262 |
else: |
---|
263 |
print 'raw2proc: requires either auto or manual operation' |
---|
264 |
|
---|
265 |
|
---|
266 |
def auto(): |
---|
267 |
"""Process all platforms, all packages, latest data |
---|
268 |
|
---|
269 |
Notes |
---|
270 |
----- |
---|
271 |
|
---|
272 |
1. determine which platforms (all platforms with currently active |
---|
273 |
config files i.e. config_end_date is None |
---|
274 |
2. for each platform |
---|
275 |
get latest config |
---|
276 |
for each package |
---|
277 |
(determine process for 'latest' data) copy to new area when grabbed |
---|
278 |
parse recent data |
---|
279 |
yyyy_mm is the current month |
---|
280 |
load this months netcdf, if new month, create this months netcdf |
---|
281 |
update modified date and append new data in netcdf |
---|
282 |
|
---|
283 |
""" |
---|
284 |
yyyy_mm = this_month() |
---|
285 |
months = find_months(yyyy_mm) |
---|
286 |
month_start_dt = months[1] |
---|
287 |
month_end_dt = months[2] - timedelta(seconds=1) |
---|
288 |
|
---|
289 |
configs = find_active_configs(config_dir=defconfigs) |
---|
290 |
if configs: |
---|
291 |
# for each configuration |
---|
292 |
for cn in configs: |
---|
293 |
print ' ... config file : %s' % cn |
---|
294 |
pi = get_config(cn+'.platform_info') |
---|
295 |
asi = get_config(cn+'.sensor_info') |
---|
296 |
platform = pi['id'] |
---|
297 |
# for each sensor package |
---|
298 |
for package in asi.keys(): |
---|
299 |
print ' ... package name : %s' % package |
---|
300 |
si = asi[package] |
---|
301 |
si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm) |
---|
302 |
ofn = os.path.join(si['proc_dir'], si['proc_filename']) |
---|
303 |
si['proc_start_dt'] = month_start_dt |
---|
304 |
si['proc_end_dt'] = month_end_dt |
---|
305 |
if os.path.exists(ofn): |
---|
306 |
# get last dt from current month file |
---|
307 |
(es, units) = nc_get_time(ofn) |
---|
308 |
last_dt = es2dt(es[-1]) |
---|
309 |
# if older than month_start_dt use it instead to only process newest data |
---|
310 |
if last_dt>=month_start_dt: |
---|
311 |
si['proc_start_dt'] = last_dt |
---|
312 |
|
---|
313 |
(raw_files, raw_dts) = find_raw(si, yyyy_mm) |
---|
314 |
raw_files = which_raw(pi, raw_files, raw_dts) |
---|
315 |
if raw_files: |
---|
316 |
process(pi, si, raw_files, yyyy_mm) |
---|
317 |
else: |
---|
318 |
print ' ... ... NOTE: no new raw files found' |
---|
319 |
|
---|
320 |
# update latest data for SECOORA commons |
---|
321 |
if 'latest_dir' in si.keys(): |
---|
322 |
# print ' ... ... latest : %s ' % si['latest_dir'] |
---|
323 |
proc2latest(pi, si, yyyy_mm) |
---|
324 |
|
---|
325 |
if 'csv_dir' in si.keys(): |
---|
326 |
proc2csv(pi, si, yyyy_mm) |
---|
327 |
# |
---|
328 |
else: |
---|
329 |
print ' ... ... NOTE: No active platforms' |
---|
330 |
|
---|
331 |
def manual(platform, package, yyyy_mm): |
---|
332 |
"""Process data for specified platform, sensor package, and month |
---|
333 |
|
---|
334 |
Notes |
---|
335 |
----- |
---|
336 |
|
---|
337 |
1. determine which configs |
---|
338 |
2. for each config for specific platform |
---|
339 |
if have package in config |
---|
340 |
which raw files |
---|
341 |
""" |
---|
342 |
# determine when month starts and ends |
---|
343 |
months = find_months(yyyy_mm) |
---|
344 |
month_start_dt = months[1] |
---|
345 |
month_end_dt = months[2] - timedelta(seconds=1) |
---|
346 |
|
---|
347 |
configs = find_configs(platform, yyyy_mm, config_dir=defconfigs) |
---|
348 |
|
---|
349 |
if configs: |
---|
350 |
# for each configuration |
---|
351 |
for index in range(len(configs)): |
---|
352 |
cn = configs[index] |
---|
353 |
print ' ... config file : %s' % cn |
---|
354 |
pi = get_config(cn+'.platform_info') |
---|
355 |
# month start and end dt to pi info |
---|
356 |
asi = get_config(cn+'.sensor_info') |
---|
357 |
if package in pi['packages']: |
---|
358 |
si = asi[package] |
---|
359 |
if si['utc_offset']: |
---|
360 |
print ' ... ... utc_offset : %g (hours)' % si['utc_offset'] |
---|
361 |
si['proc_start_dt'] = month_start_dt |
---|
362 |
si['proc_end_dt'] = month_end_dt |
---|
363 |
si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm) |
---|
364 |
ofn = os.path.join(si['proc_dir'], si['proc_filename']) |
---|
365 |
(raw_files, raw_dts) = find_raw(si, yyyy_mm) |
---|
366 |
raw_files = which_raw(pi, raw_files, raw_dts) |
---|
367 |
# print raw_files |
---|
368 |
# remove any previous netcdf file (platform_package_yyyy_mm.nc) |
---|
369 |
if index==0 and os.path.exists(ofn): |
---|
370 |
os.remove(ofn) |
---|
371 |
# this added just in case data repeated in data files |
---|
372 |
if os.path.exists(ofn): |
---|
373 |
# get last dt from current month file |
---|
374 |
(es, units) = nc_get_time(ofn) |
---|
375 |
last_dt = es2dt(es[-1]) |
---|
376 |
# if older than month_start_dt use it instead to only process newest data |
---|
377 |
if last_dt>=month_start_dt: |
---|
378 |
si['proc_start_dt'] = last_dt |
---|
379 |
|
---|
380 |
if raw_files: |
---|
381 |
process(pi, si, raw_files, yyyy_mm) |
---|
382 |
else: |
---|
383 |
print ' ... ... NOTE: no raw files found for %s %s for %s' % (package, platform, yyyy_mm) |
---|
384 |
|
---|
385 |
else: |
---|
386 |
print ' ... ... NOTE: %s not operational on %s for %s' % (package, platform, yyyy_mm) |
---|
387 |
else: |
---|
388 |
print ' ... ... ... NOTE: %s not operational for %s' % (platform, yyyy_mm) |
---|
389 |
|
---|
390 |
def process(pi, si, raw_files, yyyy_mm): |
---|
391 |
# tailored data processing for different input file formats and control over output |
---|
392 |
(parse, create, update) = import_processors(si['process_module']) |
---|
393 |
for fn in raw_files: |
---|
394 |
# sys.stdout.write('... %s ... ' % fn) |
---|
395 |
# attach file name to sensor info so parser can use it, if needed |
---|
396 |
si['fn'] = fn |
---|
397 |
lines = load_data(fn) |
---|
398 |
if lines: |
---|
399 |
data = parse(pi, si, lines) |
---|
400 |
# determine which index of data is within the specified timeframe (usually the month) |
---|
401 |
n = len(data['dt']) |
---|
402 |
data['in'] = numpy.array([False for i in range(n)]) |
---|
403 |
|
---|
404 |
for index, val in enumerate(data['dt']): |
---|
405 |
if val>=si['proc_start_dt'] and val<=si['proc_end_dt']: |
---|
406 |
data['in'][index] = True |
---|
407 |
|
---|
408 |
# if any records are in the month then write to netcdf |
---|
409 |
if data['in'].any(): |
---|
410 |
sys.stdout.write(' ... %s ... ' % fn) |
---|
411 |
sys.stdout.write('%d\n' % len(data['in'].nonzero()[0])) |
---|
412 |
ofn = os.path.join(si['proc_dir'], si['proc_filename']) |
---|
413 |
# update or create netcdf |
---|
414 |
if os.path.exists(ofn): |
---|
415 |
ut = update(pi,si,data) |
---|
416 |
nc_update(ofn, ut) |
---|
417 |
else: |
---|
418 |
ct = create(pi,si,data) |
---|
419 |
nc_create(ofn, ct) |
---|
420 |
else: |
---|
421 |
# if no lines, file was empty |
---|
422 |
print " ... skipping file %s" % (fn,) |
---|
423 |
|
---|
424 |
|
---|
425 |
# globals |
---|
426 |
start_dt = datetime.utcnow() |
---|
427 |
start_dt.replace(microsecond=0) |
---|
428 |
|
---|
429 |
if __name__ == "__main__": |
---|
430 |
import optparse |
---|
431 |
raw2proc('auto') |
---|
432 |
|
---|
433 |
# for testing |
---|
434 |
# proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07' |
---|
435 |
# raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07') |
---|