1 |
|
---|
2 |
|
---|
3 |
"""Process raw data to monthly netCDF data files |
---|
4 |
|
---|
5 |
This module processes raw ascii- or binary-data from different NCCOOS |
---|
6 |
sensors (ctd, adcp, waves-adcp, met) based on manual or automated |
---|
7 |
operation. If automated processing, add raw data (level0) from all |
---|
8 |
active sensors to current month's netcdf data files (level1) with the |
---|
9 |
current configuration setting. If manual processing, determine which |
---|
10 |
configurations to use for requested platform, sensor, and month. |
---|
11 |
|
---|
12 |
:Processing steps: |
---|
13 |
0. raw2proc auto or manual for platform, sensor, month |
---|
14 |
1. list of files to process |
---|
15 |
2. parse data |
---|
16 |
3. create, update netcdf |
---|
17 |
|
---|
18 |
to-do |
---|
19 |
3. qc (measured) data |
---|
20 |
4. process derived data (and regrid?) |
---|
21 |
5. qc (measured and derived) data flags |
---|
22 |
|
---|
23 |
""" |
---|
24 |
|
---|
25 |
__version__ = "v0.1" |
---|
26 |
__author__ = "Sara Haines <sara_haines@unc.edu>" |
---|
27 |
|
---|
28 |
import sys |
---|
29 |
import os |
---|
30 |
import re |
---|
31 |
|
---|
32 |
|
---|
33 |
|
---|
34 |
|
---|
35 |
|
---|
36 |
|
---|
37 |
|
---|
38 |
defconfigs='/home/haines/nccoos/raw2proc' |
---|
39 |
|
---|
40 |
import numpy |
---|
41 |
|
---|
42 |
from procutil import * |
---|
43 |
from ncutil import * |
---|
44 |
|
---|
45 |
REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*' |
---|
46 |
|
---|
47 |
def load_data(inFile): |
---|
48 |
lines=None |
---|
49 |
if os.path.exists(inFile): |
---|
50 |
f = open(inFile, 'r') |
---|
51 |
lines = f.readlines() |
---|
52 |
f.close() |
---|
53 |
if len(lines)<=0: |
---|
54 |
print 'Empty file: '+ inFile |
---|
55 |
else: |
---|
56 |
print 'File does not exist: '+ inFile |
---|
57 |
return lines |
---|
58 |
|
---|
59 |
def import_parser(name): |
---|
60 |
mod = __import__('parsers') |
---|
61 |
parser = getattr(mod, name) |
---|
62 |
return parser |
---|
63 |
|
---|
64 |
def import_processors(mod_name): |
---|
65 |
mod = __import__(mod_name) |
---|
66 |
parser = getattr(mod, 'parser') |
---|
67 |
creator = getattr(mod, 'creator') |
---|
68 |
updater = getattr(mod, 'updater') |
---|
69 |
return (parser, creator, updater) |
---|
70 |
|
---|
71 |
|
---|
72 |
def get_config(name): |
---|
73 |
"""Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')""" |
---|
74 |
components = name.split('.') |
---|
75 |
mod = __import__(components[0]) |
---|
76 |
for comp in components[1:]: |
---|
77 |
attr = getattr(mod, comp) |
---|
78 |
return attr |
---|
79 |
|
---|
80 |
def find_configs(platform, yyyy_mm, config_dir=''): |
---|
81 |
"""Find which configuration files for specified platform and month |
---|
82 |
|
---|
83 |
:Parameters: |
---|
84 |
platform : string |
---|
85 |
Platfrom id to process (e.g. 'bogue') |
---|
86 |
yyyy_mm : string |
---|
87 |
Year and month of data to process (e.g. '2007_07') |
---|
88 |
|
---|
89 |
:Returns: |
---|
90 |
cns : list of str |
---|
91 |
List of configurations that overlap with desired month |
---|
92 |
If empty [], no configs were found |
---|
93 |
""" |
---|
94 |
import glob |
---|
95 |
|
---|
96 |
configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py')) |
---|
97 |
configs.sort() |
---|
98 |
now_dt = datetime.utcnow() |
---|
99 |
now_dt.replace(microsecond=0) |
---|
100 |
|
---|
101 |
(prev_month, this_month, next_month) = find_months(yyyy_mm) |
---|
102 |
month_start_dt = this_month |
---|
103 |
month_end_dt = next_month - timedelta(seconds=1) |
---|
104 |
|
---|
105 |
|
---|
106 |
cns = [] |
---|
107 |
for config in configs: |
---|
108 |
|
---|
109 |
cn = os.path.splitext(os.path.basename(config))[0] |
---|
110 |
cndt = filt_datetime(os.path.basename(config))[0] |
---|
111 |
pi = get_config(cn+'.platform_info') |
---|
112 |
if pi['config_start_date']: |
---|
113 |
config_start_dt = filt_datetime(pi['config_start_date'])[0] |
---|
114 |
elif pi['config_start_date'] == None: |
---|
115 |
config_start_dt = now_dt |
---|
116 |
if pi['config_end_date']: |
---|
117 |
config_end_dt = filt_datetime(pi['config_end_date'])[0] |
---|
118 |
elif pi['config_end_date'] == None: |
---|
119 |
config_end_dt = now_dt |
---|
120 |
|
---|
121 |
if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \ |
---|
122 |
(config_end_dt >= month_start_dt or config_end_dt >= month_end_dt): |
---|
123 |
cns.append(cn) |
---|
124 |
return cns |
---|
125 |
|
---|
126 |
|
---|
127 |
def find_active_configs(config_dir=''): |
---|
128 |
"""Find which configuration files are active |
---|
129 |
|
---|
130 |
:Returns: |
---|
131 |
cns : list of str |
---|
132 |
List of configurations that overlap with desired month |
---|
133 |
If empty [], no configs were found |
---|
134 |
""" |
---|
135 |
import glob |
---|
136 |
|
---|
137 |
configs = glob.glob(os.path.join(config_dir, '*_config_*.py')) |
---|
138 |
now_dt = datetime.utcnow() |
---|
139 |
now_dt.replace(microsecond=0) |
---|
140 |
|
---|
141 |
cns = [] |
---|
142 |
for config in configs: |
---|
143 |
|
---|
144 |
cn = os.path.splitext(os.path.basename(config))[0] |
---|
145 |
cndt = filt_datetime(os.path.basename(config))[0] |
---|
146 |
pi = get_config(cn+'.platform_info') |
---|
147 |
if pi['config_end_date'] == None: |
---|
148 |
cns.append(cn) |
---|
149 |
return cns |
---|
150 |
|
---|
151 |
|
---|
152 |
def find_raw(si, yyyy_mm): |
---|
153 |
"""Determine which list of raw files to process for month """ |
---|
154 |
import glob |
---|
155 |
|
---|
156 |
months = find_months(yyyy_mm) |
---|
157 |
|
---|
158 |
all_raw_files = [] |
---|
159 |
m = re.search('\d{4}_\d{2}$', si['raw_dir']) |
---|
160 |
if m: |
---|
161 |
|
---|
162 |
|
---|
163 |
gs = os.path.join(si['raw_dir'], si['raw_file_glob']) |
---|
164 |
all_raw_files.extend(glob.glob(gs)) |
---|
165 |
else: |
---|
166 |
|
---|
167 |
|
---|
168 |
for mon in months: |
---|
169 |
mstr = mon.strftime('%Y_%m') |
---|
170 |
gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob']) |
---|
171 |
all_raw_files.extend(glob.glob(gs)) |
---|
172 |
|
---|
173 |
all_raw_files.sort() |
---|
174 |
|
---|
175 |
|
---|
176 |
dt_start = si['proc_start_dt']-timedelta(days=1) |
---|
177 |
dt_end = si['proc_end_dt']+timedelta(days=1) |
---|
178 |
raw_files = []; raw_dts = [] |
---|
179 |
|
---|
180 |
for fn in all_raw_files: |
---|
181 |
|
---|
182 |
fndt_tuple = filt_datetime(os.path.basename(fn)) |
---|
183 |
fndt = fndt_tuple[0] |
---|
184 |
|
---|
185 |
|
---|
186 |
granularity = fndt_tuple[1] |
---|
187 |
if granularity == 4: |
---|
188 |
|
---|
189 |
dt_start = si['proc_start_dt']-timedelta(days=31) |
---|
190 |
|
---|
191 |
|
---|
192 |
if fndt: |
---|
193 |
if dt_start <= fndt <= dt_end or m: |
---|
194 |
raw_files.append(fn) |
---|
195 |
raw_dts.append(fndt) |
---|
196 |
return (raw_files, raw_dts) |
---|
197 |
|
---|
198 |
def which_raw(pi, raw_files, dts): |
---|
199 |
"""Further limit file names based on configuration file timeframe """ |
---|
200 |
|
---|
201 |
now_dt = datetime.utcnow() |
---|
202 |
now_dt.replace(microsecond=0) |
---|
203 |
if pi['config_start_date']: |
---|
204 |
config_start_dt = filt_datetime(pi['config_start_date'])[0] |
---|
205 |
elif pi['config_start_date'] == None: |
---|
206 |
config_start_dt = now_dt |
---|
207 |
|
---|
208 |
if pi['config_end_date']: |
---|
209 |
config_end_dt = filt_datetime(pi['config_end_date'])[0] |
---|
210 |
elif pi['config_end_date'] == None: |
---|
211 |
config_end_dt = now_dt |
---|
212 |
|
---|
213 |
new_list = [raw_files[i] for i in range(len(raw_files)) \ |
---|
214 |
if config_start_dt <= dts[i] <= config_end_dt] |
---|
215 |
|
---|
216 |
if not new_list: |
---|
217 |
new_list = [raw_files[i] for i in range(len(raw_files)) \ |
---|
218 |
if dts[i] <= config_end_dt] |
---|
219 |
|
---|
220 |
return new_list |
---|
221 |
|
---|
222 |
|
---|
223 |
def raw2proc(proctype, platform=None, package=None, yyyy_mm=None): |
---|
224 |
""" |
---|
225 |
Process data either in auto-mode or manual-mode |
---|
226 |
|
---|
227 |
If auto-mode, process newest data for all platforms, all |
---|
228 |
sensors. Otherwise in manual-mode, process data for specified |
---|
229 |
platform, sensor package, and month. |
---|
230 |
|
---|
231 |
:Parameters: |
---|
232 |
proctype : string |
---|
233 |
'auto' or 'manual' |
---|
234 |
|
---|
235 |
platform : string |
---|
236 |
Platfrom id to process (e.g. 'bogue') |
---|
237 |
package : string |
---|
238 |
Sensor package id to process (e.g. 'adcp') |
---|
239 |
yyyy_mm : string |
---|
240 |
Year and month of data to process (e.g. '2007_07') |
---|
241 |
|
---|
242 |
Examples |
---|
243 |
-------- |
---|
244 |
>>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06') |
---|
245 |
>>> raw2proc('manual', 'bogue', 'adcp', '2007_06') |
---|
246 |
|
---|
247 |
""" |
---|
248 |
print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC") |
---|
249 |
|
---|
250 |
if proctype == 'auto': |
---|
251 |
print 'Processing in auto-mode, all platforms, all packages, latest data' |
---|
252 |
auto() |
---|
253 |
elif proctype == 'manual': |
---|
254 |
if platform and package and yyyy_mm: |
---|
255 |
print 'Processing in manually ...' |
---|
256 |
print ' ... platform id : %s' % platform |
---|
257 |
print ' ... package name : %s' % package |
---|
258 |
print ' ... month : %s' % yyyy_mm |
---|
259 |
print ' ... starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC") |
---|
260 |
manual(platform, package, yyyy_mm) |
---|
261 |
else: |
---|
262 |
print 'raw2proc: Manual operation requires platform, package, and month' |
---|
263 |
print " >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')" |
---|
264 |
else: |
---|
265 |
print 'raw2proc: requires either auto or manual operation' |
---|
266 |
|
---|
267 |
|
---|
268 |
def auto(): |
---|
269 |
"""Process all platforms, all packages, latest data |
---|
270 |
|
---|
271 |
Notes |
---|
272 |
----- |
---|
273 |
|
---|
274 |
1. determine which platforms (all platforms with currently active |
---|
275 |
config files i.e. config_end_date is None |
---|
276 |
2. for each platform |
---|
277 |
get latest config |
---|
278 |
for each package |
---|
279 |
(determine process for 'latest' data) copy to new area when grabbed |
---|
280 |
parse recent data |
---|
281 |
yyyy_mm is the current month |
---|
282 |
load this months netcdf, if new month, create this months netcdf |
---|
283 |
update modified date and append new data in netcdf |
---|
284 |
|
---|
285 |
""" |
---|
286 |
yyyy_mm = this_month() |
---|
287 |
months = find_months(yyyy_mm) |
---|
288 |
month_start_dt = months[1] |
---|
289 |
month_end_dt = months[2] - timedelta(seconds=1) |
---|
290 |
|
---|
291 |
configs = find_active_configs(config_dir=defconfigs) |
---|
292 |
if configs: |
---|
293 |
|
---|
294 |
for cn in configs: |
---|
295 |
print ' ... config file : %s' % cn |
---|
296 |
pi = get_config(cn+'.platform_info') |
---|
297 |
asi = get_config(cn+'.sensor_info') |
---|
298 |
platform = pi['id'] |
---|
299 |
|
---|
300 |
for package in asi.keys(): |
---|
301 |
print ' ... package name : %s' % package |
---|
302 |
si = asi[package] |
---|
303 |
si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm) |
---|
304 |
ofn = os.path.join(si['proc_dir'], si['proc_filename']) |
---|
305 |
si['proc_start_dt'] = month_start_dt |
---|
306 |
si['proc_end_dt'] = month_end_dt |
---|
307 |
if os.path.exists(ofn): |
---|
308 |
|
---|
309 |
(es, units) = nc_get_time(ofn) |
---|
310 |
last_dt = es2dt(es[-1]) |
---|
311 |
|
---|
312 |
if last_dt>=month_start_dt: |
---|
313 |
si['proc_start_dt'] = last_dt |
---|
314 |
|
---|
315 |
(raw_files, raw_dts) = find_raw(si, yyyy_mm) |
---|
316 |
raw_files = which_raw(pi, raw_files, raw_dts) |
---|
317 |
if raw_files: |
---|
318 |
process(pi, si, raw_files, yyyy_mm) |
---|
319 |
else: |
---|
320 |
print ' ... ... \nNOTE: no raw files found for %s %s for %s\n' % (package, platform, yyyy_mm) |
---|
321 |
|
---|
322 |
|
---|
323 |
if 'latest_dir' in si.keys(): |
---|
324 |
print ' ... ... latest : %s ' % si['latest_dir'] |
---|
325 |
proc2latest(pi, si, yyyy_mm) |
---|
326 |
|
---|
327 |
else: |
---|
328 |
print ' ... ... ... \nNOTE: No active platforms\n' |
---|
329 |
|
---|
330 |
def manual(platform, package, yyyy_mm): |
---|
331 |
"""Process data for specified platform, sensor package, and month |
---|
332 |
|
---|
333 |
Notes |
---|
334 |
----- |
---|
335 |
|
---|
336 |
1. determine which configs |
---|
337 |
2. for each config for specific platform |
---|
338 |
if have package in config |
---|
339 |
which raw files |
---|
340 |
""" |
---|
341 |
|
---|
342 |
months = find_months(yyyy_mm) |
---|
343 |
month_start_dt = months[1] |
---|
344 |
month_end_dt = months[2] - timedelta(seconds=1) |
---|
345 |
|
---|
346 |
configs = find_configs(platform, yyyy_mm, config_dir=defconfigs) |
---|
347 |
|
---|
348 |
if configs: |
---|
349 |
|
---|
350 |
for index in range(len(configs)): |
---|
351 |
cn = configs[index] |
---|
352 |
print ' ... config file : %s' % cn |
---|
353 |
pi = get_config(cn+'.platform_info') |
---|
354 |
|
---|
355 |
asi = get_config(cn+'.sensor_info') |
---|
356 |
if package in pi['packages']: |
---|
357 |
si = asi[package] |
---|
358 |
if si['utc_offset']: |
---|
359 |
print ' ... ... utc_offset : %g (hours)' % si['utc_offset'] |
---|
360 |
si['proc_start_dt'] = month_start_dt |
---|
361 |
si['proc_end_dt'] = month_end_dt |
---|
362 |
si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm) |
---|
363 |
ofn = os.path.join(si['proc_dir'], si['proc_filename']) |
---|
364 |
(raw_files, raw_dts) = find_raw(si, yyyy_mm) |
---|
365 |
raw_files = which_raw(pi, raw_files, raw_dts) |
---|
366 |
|
---|
367 |
|
---|
368 |
if index==0 and os.path.exists(ofn): |
---|
369 |
os.remove(ofn) |
---|
370 |
|
---|
371 |
if os.path.exists(ofn): |
---|
372 |
|
---|
373 |
(es, units) = nc_get_time(ofn) |
---|
374 |
last_dt = es2dt(es[-1]) |
---|
375 |
|
---|
376 |
if last_dt>=month_start_dt: |
---|
377 |
si['proc_start_dt'] = last_dt |
---|
378 |
|
---|
379 |
if raw_files: |
---|
380 |
process(pi, si, raw_files, yyyy_mm) |
---|
381 |
else: |
---|
382 |
print ' ... ... \nNOTE: no raw files found for %s %s for %s\n' % (package, platform, yyyy_mm) |
---|
383 |
|
---|
384 |
else: |
---|
385 |
print ' ... ... \nNOTE: %s not operational on %s for %s\n' % (package, platform, yyyy_mm) |
---|
386 |
else: |
---|
387 |
print ' ... ... ... \nNOTE: %s not operational for %s\n' % (platform, yyyy_mm) |
---|
388 |
|
---|
389 |
def process(pi, si, raw_files, yyyy_mm): |
---|
390 |
|
---|
391 |
(parse, create, update) = import_processors(si['process_module']) |
---|
392 |
for fn in raw_files: |
---|
393 |
|
---|
394 |
|
---|
395 |
si['fn'] = fn |
---|
396 |
lines = load_data(fn) |
---|
397 |
if lines: |
---|
398 |
data = parse(pi, si, lines) |
---|
399 |
|
---|
400 |
n = len(data['dt']) |
---|
401 |
data['in'] = numpy.array([False for i in range(n)]) |
---|
402 |
|
---|
403 |
for index, val in enumerate(data['dt']): |
---|
404 |
if val>=si['proc_start_dt'] and val<=si['proc_end_dt']: |
---|
405 |
data['in'][index] = True |
---|
406 |
|
---|
407 |
|
---|
408 |
if data['in'].any(): |
---|
409 |
sys.stdout.write('... %s ... ' % fn) |
---|
410 |
sys.stdout.write('%d\n' % len(data['in'].nonzero()[0])) |
---|
411 |
ofn = os.path.join(si['proc_dir'], si['proc_filename']) |
---|
412 |
|
---|
413 |
if os.path.exists(ofn): |
---|
414 |
ut = update(pi,si,data) |
---|
415 |
nc_update(ofn, ut) |
---|
416 |
else: |
---|
417 |
ct = create(pi,si,data) |
---|
418 |
nc_create(ofn, ct) |
---|
419 |
else: |
---|
420 |
|
---|
421 |
print " ... skipping file %s" % (fn,) |
---|
422 |
|
---|
423 |
|
---|
424 |
|
---|
425 |
start_dt = datetime.utcnow() |
---|
426 |
start_dt.replace(microsecond=0) |
---|
427 |
|
---|
428 |
if __name__ == "__main__": |
---|
429 |
import optparse |
---|
430 |
raw2proc('auto') |
---|
431 |
|
---|
432 |
|
---|
433 |
|
---|
434 |
|
---|
435 |
|
---|