1 |
#!/usr/bin/env python |
---|
2 |
# Last modified: Time-stamp: <2013-11-05 16:37:38 haines> |
---|
3 |
"""Process raw data to monthly netCDF data files |
---|
4 |
|
---|
5 |
This module processes raw ascii- or binary-data from different NCCOOS |
---|
6 |
sensors (ctd, adcp, waves-adcp, met) based on manual or automated |
---|
7 |
operation. If automated processing, add raw data (level0) from all |
---|
8 |
active sensors to current month's netcdf data files (level1) with the |
---|
9 |
current configuration setting. If manual processing, determine which |
---|
10 |
configurations to use for requested platform, sensor, and month. |
---|
11 |
|
---|
12 |
:Processing steps: |
---|
13 |
0. raw2proc auto or manual for platform, sensor, month |
---|
14 |
1. list of files to process |
---|
15 |
2. parse data |
---|
16 |
3. create, update netcdf |
---|
17 |
|
---|
18 |
to-do |
---|
19 |
3. qc (measured) data |
---|
20 |
4. process derived data (and regrid?) |
---|
21 |
5. qc (measured and derived) data flags |
---|
22 |
|
---|
23 |
""" |
---|
24 |
|
---|
25 |
__version__ = "v0.1" |
---|
26 |
__author__ = "Sara Haines <sara_haines@unc.edu>" |
---|
27 |
|
---|
28 |
import sys |
---|
29 |
import os |
---|
30 |
import re |
---|
31 |
import traceback |
---|
32 |
|
---|
33 |
# for production use: |
---|
34 |
# defconfigs='/home/haines/nccoos/raw2proc' |
---|
35 |
# for testing use: |
---|
36 |
# defconfigs='/home/haines/nccoos/test/r2p' |
---|
37 |
|
---|
38 |
# define config file location to run under cron |
---|
39 |
defconfigs='/opt/env/haines/dataproc/raw2proc' |
---|
40 |
|
---|
41 |
import numpy |
---|
42 |
|
---|
43 |
from procutil import * |
---|
44 |
from ncutil import * |
---|
45 |
|
---|
46 |
REAL_RE_STR = '\\s*(-?\\d(\\.\\d+|)[Ee][+\\-]\\d\\d?|-?(\\d+\\.\\d*|\\d*\\.\\d+)|-?\\d+)\\s*' |
---|
47 |
NAN_RE_STR = '[Nn][Aa][Nn]' |
---|
48 |
|
---|
49 |
def load_data(inFile): |
---|
50 |
lines=None |
---|
51 |
if os.path.exists(inFile): |
---|
52 |
f = open(inFile, 'r') |
---|
53 |
lines = f.readlines() |
---|
54 |
f.close() |
---|
55 |
if len(lines)<=0: |
---|
56 |
print 'Empty file: '+ inFile |
---|
57 |
else: |
---|
58 |
print 'File does not exist: '+ inFile |
---|
59 |
return lines |
---|
60 |
|
---|
61 |
def import_parser(name): |
---|
62 |
mod = __import__('parsers') |
---|
63 |
parser = getattr(mod, name) |
---|
64 |
return parser |
---|
65 |
|
---|
66 |
def import_processors(mod_name): |
---|
67 |
mod = __import__(mod_name) |
---|
68 |
parser = getattr(mod, 'parser') |
---|
69 |
creator = getattr(mod, 'creator') |
---|
70 |
updater = getattr(mod, 'updater') |
---|
71 |
return (parser, creator, updater) |
---|
72 |
|
---|
73 |
|
---|
74 |
def get_config(name): |
---|
75 |
"""Usage Example >>>sensor_info = get_config('bogue_config_20060918.sensor_info')""" |
---|
76 |
components = name.split('.') |
---|
77 |
mod = __import__(components[0]) |
---|
78 |
for comp in components[1:]: |
---|
79 |
attr = getattr(mod, comp) |
---|
80 |
return attr |
---|
81 |
|
---|
82 |
def get_config_dates(pi): |
---|
83 |
""" Get datetime of both start and end setting within config file |
---|
84 |
|
---|
85 |
Example |
---|
86 |
------- |
---|
87 |
>>> pi = get_config(cn+'.platform_info') |
---|
88 |
>>> (config_start_dt, config_end_dt) = get_config_dates(pi) |
---|
89 |
|
---|
90 |
""" |
---|
91 |
now_dt = datetime.utcnow() |
---|
92 |
now_dt.replace(microsecond=0) |
---|
93 |
if pi['config_start_date']: |
---|
94 |
config_start_dt = filt_datetime(pi['config_start_date']) |
---|
95 |
elif pi['config_start_date'] == None: |
---|
96 |
config_start_dt = now_dt |
---|
97 |
if pi['config_end_date']: |
---|
98 |
config_end_dt = filt_datetime(pi['config_end_date']) |
---|
99 |
elif pi['config_end_date'] == None: |
---|
100 |
config_end_dt = now_dt |
---|
101 |
return (config_start_dt, config_end_dt) |
---|
102 |
|
---|
103 |
def find_configs(platform, yyyy_mm, config_dir=''): |
---|
104 |
"""Find which configuration files for specified platform and month |
---|
105 |
|
---|
106 |
:Parameters: |
---|
107 |
platform : string |
---|
108 |
Platfrom id to process (e.g. 'bogue') |
---|
109 |
yyyy_mm : string |
---|
110 |
Year and month of data to process (e.g. '2007_07') |
---|
111 |
|
---|
112 |
:Returns: |
---|
113 |
cns : list of str |
---|
114 |
List of configurations that overlap with desired month |
---|
115 |
If empty [], no configs were found |
---|
116 |
""" |
---|
117 |
import glob |
---|
118 |
# list of config files based on platform |
---|
119 |
configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py')) |
---|
120 |
configs.sort() |
---|
121 |
# determine when month starts and ends |
---|
122 |
(prev_month, this_month, next_month) = find_months(yyyy_mm) |
---|
123 |
month_start_dt = this_month |
---|
124 |
month_end_dt = next_month - timedelta(seconds=1) |
---|
125 |
# print month_start_dt; print month_end_dt |
---|
126 |
# |
---|
127 |
cns = [] |
---|
128 |
for config in configs: |
---|
129 |
cn = os.path.splitext(os.path.basename(config))[0] |
---|
130 |
pi = get_config(cn+'.platform_info') |
---|
131 |
(config_start_dt, config_end_dt) = get_config_dates(pi) |
---|
132 |
if (config_start_dt <= month_start_dt or config_start_dt <= month_end_dt) and \ |
---|
133 |
(config_end_dt >= month_start_dt or config_end_dt >= month_end_dt): |
---|
134 |
cns.append(cn) |
---|
135 |
return cns |
---|
136 |
|
---|
137 |
|
---|
138 |
def find_active_configs(config_dir=defconfigs): |
---|
139 |
"""Find which configuration files are active |
---|
140 |
|
---|
141 |
:Returns: |
---|
142 |
cns : list of str |
---|
143 |
List of configurations that overlap with desired month |
---|
144 |
If empty [], no configs were found |
---|
145 |
""" |
---|
146 |
import glob |
---|
147 |
# list of all config files |
---|
148 |
configs = glob.glob(os.path.join(config_dir, '*_config_*.py')) |
---|
149 |
cns = [] |
---|
150 |
for config in configs: |
---|
151 |
# datetime from filename |
---|
152 |
cn = os.path.splitext(os.path.basename(config))[0] |
---|
153 |
pi = get_config(cn+'.platform_info') |
---|
154 |
if pi['config_end_date'] == None: |
---|
155 |
cns.append(cn) |
---|
156 |
return cns |
---|
157 |
|
---|
158 |
|
---|
159 |
def uniqify(seq): |
---|
160 |
seen = {} |
---|
161 |
result = [] |
---|
162 |
for item in seq: |
---|
163 |
# in old Python versions: |
---|
164 |
# if seen.has_key(item) |
---|
165 |
# but in new ones: |
---|
166 |
if item in seen: continue |
---|
167 |
seen[item] = 1 |
---|
168 |
result.append(item) |
---|
169 |
return result |
---|
170 |
|
---|
171 |
|
---|
172 |
def get_all_platforms(config_dir=defconfigs): |
---|
173 |
"""Get all platform ids |
---|
174 |
|
---|
175 |
:Returns: |
---|
176 |
pids : list of str |
---|
177 |
Sorted list of all the platforms |
---|
178 |
""" |
---|
179 |
import glob |
---|
180 |
# list of all config files |
---|
181 |
configs = glob.glob(os.path.join(config_dir, '*_config_*.py')) |
---|
182 |
configs.sort() |
---|
183 |
pids = [] |
---|
184 |
for config in configs: |
---|
185 |
# datetime from filename |
---|
186 |
cn = os.path.splitext(os.path.basename(config))[0] |
---|
187 |
pi = get_config(cn+'.platform_info') |
---|
188 |
if pi['id']: |
---|
189 |
pids.append(pi['id']) |
---|
190 |
pids = uniqify(pids) |
---|
191 |
pids.sort() |
---|
192 |
return pids |
---|
193 |
|
---|
194 |
def get_all_packages(platform, config_dir=defconfigs): |
---|
195 |
"""Get all package ids -- all defined packages in sensor_info{} from all configs for the platform |
---|
196 |
|
---|
197 |
:Returns: |
---|
198 |
sids : list of str |
---|
199 |
Sorted list of all the sensor ids for package |
---|
200 |
""" |
---|
201 |
import glob |
---|
202 |
# list of all config files |
---|
203 |
configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py')) |
---|
204 |
configs.sort() |
---|
205 |
# |
---|
206 |
sids = [] |
---|
207 |
for config in configs: |
---|
208 |
cn = os.path.splitext(os.path.basename(config))[0] |
---|
209 |
pi = get_config(cn+'.platform_info') |
---|
210 |
sids.extend(list(pi['packages'])) |
---|
211 |
sids = uniqify(sids) |
---|
212 |
sids.sort() |
---|
213 |
return sids |
---|
214 |
|
---|
215 |
def get_all_platform_configs(platform, config_dir=defconfigs): |
---|
216 |
"""Get all the config files for a platform |
---|
217 |
|
---|
218 |
:Returns: |
---|
219 |
cns : list of config names |
---|
220 |
Sorted list of all the sensor ids for package |
---|
221 |
""" |
---|
222 |
import glob |
---|
223 |
# list of all config files |
---|
224 |
configs = glob.glob(os.path.join(config_dir, platform + '_config_*.py')) |
---|
225 |
configs.sort() |
---|
226 |
# |
---|
227 |
cns = [] |
---|
228 |
for config in configs: |
---|
229 |
cn = os.path.splitext(os.path.basename(config))[0] |
---|
230 |
cns.append(cn) |
---|
231 |
return cns |
---|
232 |
|
---|
233 |
def get_config_packages(cn): |
---|
234 |
""" Get active packages set in platform_info{} from specific config file |
---|
235 |
|
---|
236 |
:Returns: |
---|
237 |
sids : list of str |
---|
238 |
Sorted (default) or unsorted list of all the sensor ids for package |
---|
239 |
If empty [], no platform ids were found |
---|
240 |
""" |
---|
241 |
pi = get_config(cn+'.platform_info') |
---|
242 |
sids = list(pi['packages']) |
---|
243 |
return sids |
---|
244 |
|
---|
245 |
def list_months(dts, dte): |
---|
246 |
""" list of datetimes for all months inclusively within given date range |
---|
247 |
|
---|
248 |
""" |
---|
249 |
lom = [] |
---|
250 |
if type(dts) == type(dte) == type(datetime.utcnow()) and dts <= dte: |
---|
251 |
years = range(dts.year,dte.year+1) |
---|
252 |
for yyyy in years: |
---|
253 |
if yyyy > dts.year: |
---|
254 |
a = 1 |
---|
255 |
else: |
---|
256 |
a = dts.month |
---|
257 |
if yyyy < dte.year: |
---|
258 |
b = 12 |
---|
259 |
else: |
---|
260 |
b = dte.month |
---|
261 |
months = range(a, b+1) |
---|
262 |
for mm in months: |
---|
263 |
lom.append(datetime(yyyy, mm, 1).strftime('%Y_%m')) |
---|
264 |
else: |
---|
265 |
print "list_months requires two inputs type datetime.datetime and dts<dte" |
---|
266 |
return lom |
---|
267 |
|
---|
268 |
|
---|
269 |
def create_spin_list(plats, packs, dates, config_dir=defconfigs): |
---|
270 |
""" create list of params needed to run manual() mutiple ways |
---|
271 |
|
---|
272 |
:Returns: |
---|
273 |
spin_list : list of three-tuple each tuple with form (platform, package, yyyy_mm) |
---|
274 |
|
---|
275 |
Notes |
---|
276 |
----- |
---|
277 |
|
---|
278 |
1. plats -- 'ALL' or ['b1', 'b2'] |
---|
279 |
2. packs -- 'ALL' or ['ctd1', 'ctd2'] |
---|
280 |
3. dates -- 'ALL' or ['2011_11', '2011_12'] or [dt.datetime(2006,1,1), dt.nowutc()] |
---|
281 |
|
---|
282 |
For each platform determin packages for given dates |
---|
283 |
also a good way to get listing platforms and packages for specified dates |
---|
284 |
|
---|
285 |
""" |
---|
286 |
result = [] |
---|
287 |
platforms = [] |
---|
288 |
if type(plats) == str: |
---|
289 |
if plats.upper() == 'ALL': |
---|
290 |
platforms = get_all_platforms() |
---|
291 |
else: |
---|
292 |
platforms = [plats] # make one platform iterable |
---|
293 |
else: platforms = plats |
---|
294 |
|
---|
295 |
print ' Expanded lists for creating spin_list:' |
---|
296 |
print ' ... platform ids : %s' % platforms |
---|
297 |
|
---|
298 |
for platform in platforms: |
---|
299 |
if len(platforms)>1: |
---|
300 |
print '------------------------------------' |
---|
301 |
print ' ... ... platform : %s ' % platform |
---|
302 |
packages = [] |
---|
303 |
if type(packs) == str: |
---|
304 |
if packs.upper() == 'ALL': |
---|
305 |
packages = get_all_packages(platform) |
---|
306 |
else: |
---|
307 |
packages = [packs] # make one package iterable |
---|
308 |
else: packages = packs |
---|
309 |
|
---|
310 |
print ' ... ... packages : %s' % packages |
---|
311 |
for package in packages: |
---|
312 |
# dates is a string 'ALL' or format 'YYYY_MM' |
---|
313 |
months = [] |
---|
314 |
if type(dates) == str: |
---|
315 |
if dates.upper() == 'ALL': |
---|
316 |
cns = get_all_platform_configs(platform) |
---|
317 |
months = [] |
---|
318 |
for cn in cns: |
---|
319 |
pi = get_config(cn+'.platform_info') |
---|
320 |
(dts, dte) = get_config_dates(pi) |
---|
321 |
if package in pi['packages']: |
---|
322 |
months.extend(list_months(dts, dte)) |
---|
323 |
else: |
---|
324 |
months = [dates] # make on date iterable |
---|
325 |
# dates is a list |
---|
326 |
if type(dates) == type([]): |
---|
327 |
# if dates has two datetime types |
---|
328 |
if type(dates[0]) == type(dates[1]) == type(datetime.utcnow()): |
---|
329 |
dt1, dt2 = dates |
---|
330 |
cns = get_all_platform_configs(platform) |
---|
331 |
months = [] |
---|
332 |
for cn in cns: |
---|
333 |
pi = get_config(cn+'.platform_info') |
---|
334 |
(dts, dte) = get_config_dates(pi) |
---|
335 |
|
---|
336 |
if dts<=dt1 and dt1<=dte: a = dt1 |
---|
337 |
elif dt1<=dts and dt1<=dte: a = dts |
---|
338 |
|
---|
339 |
if dts<dt2 and dt2<=dte: b = dt2 |
---|
340 |
elif dts<dt2 and dte<=dt2: b = dte |
---|
341 |
|
---|
342 |
if dte<dt1 or dt2<dts: |
---|
343 |
continue |
---|
344 |
# list only months that are in configs for wide date range |
---|
345 |
if package in pi['packages']: |
---|
346 |
months.extend(list_months(a,b)) |
---|
347 |
# else if string in list |
---|
348 |
elif type(dates[0]) == str: |
---|
349 |
months = dates |
---|
350 |
print ' ... ... months : %s' % months |
---|
351 |
for month in months: |
---|
352 |
# print '... ... %s %s %s' % (platform, package, month) |
---|
353 |
result.append((platform, package, month)) |
---|
354 |
|
---|
355 |
return result |
---|
356 |
|
---|
357 |
def find_raw(si, yyyy_mm): |
---|
358 |
"""Determine which list of raw files to process for month """ |
---|
359 |
import glob |
---|
360 |
|
---|
361 |
months = find_months(yyyy_mm) |
---|
362 |
# list all the raw files in prev-month, this-month, and next-month |
---|
363 |
all_raw_files = [] |
---|
364 |
m = re.search('\d{4}_\d{2}\/*$', si['raw_dir']) |
---|
365 |
if m: |
---|
366 |
# look for raw_file_glob in specific directory ending in YYYY_MM |
---|
367 |
# but look no further. |
---|
368 |
gs = os.path.join(si['raw_dir'], si['raw_file_glob']) |
---|
369 |
all_raw_files.extend(glob.glob(gs)) |
---|
370 |
else: |
---|
371 |
# no YYYY_MM at end of raw_dir then look for files |
---|
372 |
# in prev-month, this-month, and next-month |
---|
373 |
for mon in months: |
---|
374 |
mstr = mon.strftime('%Y_%m') |
---|
375 |
gs = os.path.join(si['raw_dir'], mstr, si['raw_file_glob']) |
---|
376 |
all_raw_files.extend(glob.glob(gs)) |
---|
377 |
|
---|
378 |
all_raw_files.sort() |
---|
379 |
|
---|
380 |
# |
---|
381 |
dt_start = si['proc_start_dt']-timedelta(days=1) |
---|
382 |
dt_end = si['proc_end_dt']+timedelta(days=1) |
---|
383 |
raw_files = []; raw_dts = [] |
---|
384 |
# compute datetime for each file |
---|
385 |
for fn in all_raw_files: |
---|
386 |
(fndt, granularity) = filt_datetime(os.path.basename(fn), gran=True) |
---|
387 |
# print (fndt, granularity) |
---|
388 |
if granularity == 4: |
---|
389 |
# change dt_start to before monthly filename filt_datetime() date |
---|
390 |
# for filenames with just YYYY_MM or YYYYMM add or substract 30 days to |
---|
391 |
# see if it falls within config range. It won't hurt to add names to files |
---|
392 |
# parsed. |
---|
393 |
dt_start = si['proc_start_dt']-timedelta(days=31) |
---|
394 |
# print dt_start |
---|
395 |
if fndt: |
---|
396 |
if dt_start <= fndt <= dt_end or m: |
---|
397 |
raw_files.append(fn) |
---|
398 |
raw_dts.append(fndt) |
---|
399 |
return (raw_files, raw_dts) |
---|
400 |
|
---|
401 |
def which_raw(pi, raw_files, dts): |
---|
402 |
"""Further limit file names based on configuration file timeframe """ |
---|
403 |
(config_start_dt, config_end_dt) = get_config_dates(pi) |
---|
404 |
|
---|
405 |
for idx, fn in enumerate(raw_files): |
---|
406 |
(fndt, granularity) = filt_datetime(os.path.basename(fn), gran=True) |
---|
407 |
if granularity == 4: |
---|
408 |
if fndt < config_start_dt: |
---|
409 |
dts[idx] = config_start_dt |
---|
410 |
if fndt > config_end_dt: |
---|
411 |
dts[idx] = config_end_dt |
---|
412 |
|
---|
413 |
new_list = [raw_files[i] for i in range(len(raw_files)) \ |
---|
414 |
if config_start_dt <= dts[i] <= config_end_dt] |
---|
415 |
|
---|
416 |
if not new_list: |
---|
417 |
new_list = [raw_files[i] for i in range(len(raw_files)) \ |
---|
418 |
if dts[i] <= config_end_dt] |
---|
419 |
|
---|
420 |
return new_list |
---|
421 |
|
---|
422 |
|
---|
423 |
def raw2proc(proctype, platform=None, package=None, yyyy_mm=None): |
---|
424 |
""" |
---|
425 |
Process data either in auto-mode or manual-mode |
---|
426 |
|
---|
427 |
If auto-mode, process newest data for all platforms, all |
---|
428 |
sensors. Otherwise in manual-mode, process data for specified |
---|
429 |
platform, sensor package, and month. |
---|
430 |
|
---|
431 |
:Parameters: |
---|
432 |
proctype : string |
---|
433 |
'auto' or 'manual' or 'spin' |
---|
434 |
|
---|
435 |
platform : string |
---|
436 |
Platfrom id to process (e.g. 'bogue') |
---|
437 |
package : string |
---|
438 |
Sensor package id to process (e.g. 'adcp') |
---|
439 |
yyyy_mm : string |
---|
440 |
Year and month of data to process (e.g. '2007_07') |
---|
441 |
|
---|
442 |
Examples |
---|
443 |
-------- |
---|
444 |
>>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_06') |
---|
445 |
>>> raw2proc('manual', 'bogue', 'adcp', '2007_06') |
---|
446 |
|
---|
447 |
Spin |
---|
448 |
---- |
---|
449 |
platform can be list of platforms or 'ALL' |
---|
450 |
package can be list packages or 'ALL' |
---|
451 |
yyyy_mm can be list of months, or datetime range |
---|
452 |
|
---|
453 |
>>> raw2proc('spin', ['b1','b2'], ['ctd1', 'ctd2'], ['2011_11']) |
---|
454 |
>>> raw2proc('spin', ['b1','b2'], ['ctd1', 'ctd2'], 'ALL') |
---|
455 |
>>> raw2proc('spin', ['b1','b2'], ['ctd1', 'ctd2'], [datetime(2011,11,1), datetime(2012,4,1)]) |
---|
456 |
>>> raw2proc('spin', ['b1','b2'], 'ALL', 'ALL') |
---|
457 |
|
---|
458 |
Not a good idea but this will reprocess all the data from level0 |
---|
459 |
>>> raw2proc('spin', 'ALL', 'ALL', 'ALL') |
---|
460 |
|
---|
461 |
""" |
---|
462 |
print '\nStart time for raw2proc: %s\n' % start_dt.strftime("%Y-%b-%d %H:%M:%S UTC") |
---|
463 |
|
---|
464 |
if proctype == 'auto': |
---|
465 |
print 'Processing in auto-mode, all platforms, all packages, latest data' |
---|
466 |
auto() |
---|
467 |
elif proctype == 'manual': |
---|
468 |
if platform and package and yyyy_mm: |
---|
469 |
print 'Processing in manual-mode ...' |
---|
470 |
print ' ... platform id : %s' % platform |
---|
471 |
print ' ... package name : %s' % package |
---|
472 |
print ' ... month : %s' % yyyy_mm |
---|
473 |
print ' ... starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC") |
---|
474 |
manual(platform, package, yyyy_mm) |
---|
475 |
else: |
---|
476 |
print 'raw2proc: Manual operation requires platform, package, and month' |
---|
477 |
print " >>> raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07')" |
---|
478 |
elif proctype == 'spin': |
---|
479 |
if platform and package and yyyy_mm: |
---|
480 |
print 'Processing in spin-mode ...' |
---|
481 |
print ' ... platform ids : %s' % platform |
---|
482 |
print ' ... package names : %s' % package |
---|
483 |
print ' ... months : %s' % yyyy_mm |
---|
484 |
print ' ... starting at : %s' % start_dt.strftime("%Y-%m-%d %H:%M:%S UTC") |
---|
485 |
spin_list = create_spin_list(platform, package, yyyy_mm) |
---|
486 |
spin(spin_list) |
---|
487 |
else: |
---|
488 |
print "raw2proc: Spin operation requires platform(s), package(s), and month(s)" |
---|
489 |
print " >>> raw2proc(proctype='spin', platform='b1', package='ALL', yyyy_mm='ALL')" |
---|
490 |
print " >>> raw2proc(proctype='spin', platform='ALL', package='met', yyyy_mm='2011_11')" |
---|
491 |
print " >>> raw2proc('spin', ['b1','b2'], ['ctd1', 'ctd2'], [datetime(2011,11,1), datetime(2012,4,1)])" |
---|
492 |
|
---|
493 |
else: |
---|
494 |
print 'raw2proc: requires either auto or manual operation' |
---|
495 |
|
---|
496 |
|
---|
497 |
def auto(): |
---|
498 |
"""Process all platforms, all packages, latest data |
---|
499 |
|
---|
500 |
Notes |
---|
501 |
----- |
---|
502 |
|
---|
503 |
1. determine which platforms (all platforms with currently active |
---|
504 |
config files i.e. config_end_date is None |
---|
505 |
2. for each platform |
---|
506 |
get latest config |
---|
507 |
for each package |
---|
508 |
(determine process for 'latest' data) copy to new area when grabbed |
---|
509 |
parse recent data |
---|
510 |
yyyy_mm is the current month |
---|
511 |
load this months netcdf, if new month, create this months netcdf |
---|
512 |
update modified date and append new data in netcdf |
---|
513 |
|
---|
514 |
""" |
---|
515 |
yyyy_mm = this_month() |
---|
516 |
months = find_months(yyyy_mm) |
---|
517 |
month_start_dt = months[1] |
---|
518 |
month_end_dt = months[2] - timedelta(seconds=1) |
---|
519 |
|
---|
520 |
configs = find_active_configs(config_dir=defconfigs) |
---|
521 |
if configs: |
---|
522 |
# for each configuration |
---|
523 |
for cn in configs: |
---|
524 |
print ' ... config file : %s' % cn |
---|
525 |
pi = get_config(cn+'.platform_info') |
---|
526 |
asi = get_config(cn+'.sensor_info') |
---|
527 |
platform = pi['id'] |
---|
528 |
(pi['config_start_dt'], pi['config_end_dt']) = get_config_dates(pi) |
---|
529 |
|
---|
530 |
# for each sensor package |
---|
531 |
for package in asi.keys(): |
---|
532 |
try: # if package files, try next package |
---|
533 |
print ' ... package name : %s' % package |
---|
534 |
si = asi[package] |
---|
535 |
si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm) |
---|
536 |
ofn = os.path.join(si['proc_dir'], si['proc_filename']) |
---|
537 |
si['proc_start_dt'] = month_start_dt |
---|
538 |
si['proc_end_dt'] = month_end_dt |
---|
539 |
if os.path.exists(ofn): |
---|
540 |
# get last dt from current month file |
---|
541 |
(es, units) = nc_get_time(ofn) |
---|
542 |
last_dt = es2dt(es[-1]) |
---|
543 |
# if older than month_start_dt use it instead to only process newest data |
---|
544 |
if last_dt>=month_start_dt: |
---|
545 |
si['proc_start_dt'] = last_dt |
---|
546 |
|
---|
547 |
(raw_files, raw_dts) = find_raw(si, yyyy_mm) |
---|
548 |
raw_files = which_raw(pi, raw_files, raw_dts) |
---|
549 |
if raw_files: |
---|
550 |
process(pi, si, raw_files, yyyy_mm) |
---|
551 |
else: |
---|
552 |
print ' ... ... NOTE: no new raw files found' |
---|
553 |
|
---|
554 |
# update latest data for SECOORA commons |
---|
555 |
if 'latest_dir' in si.keys(): |
---|
556 |
# print ' ... ... latest : %s ' % si['latest_dir'] |
---|
557 |
proc2latest(pi, si, yyyy_mm) |
---|
558 |
|
---|
559 |
if 'csv_dir' in si.keys(): |
---|
560 |
proc2csv(pi, si, yyyy_mm) |
---|
561 |
except: |
---|
562 |
traceback.print_exc() |
---|
563 |
# |
---|
564 |
else: |
---|
565 |
print ' ... ... NOTE: No active platforms' |
---|
566 |
|
---|
567 |
def spin(spin_list): |
---|
568 |
""" wrapper to run manual() for multiple months""" |
---|
569 |
for item in spin_list: |
---|
570 |
platform, package, yyyy_mm = item |
---|
571 |
raw2proc('manual',platform, package, yyyy_mm) |
---|
572 |
|
---|
573 |
def manual(platform, package, yyyy_mm): |
---|
574 |
"""Process data for specified platform, sensor package, and month |
---|
575 |
|
---|
576 |
Notes |
---|
577 |
----- |
---|
578 |
|
---|
579 |
1. determine which configs |
---|
580 |
2. for each config for specific platform |
---|
581 |
if have package in config |
---|
582 |
which raw files |
---|
583 |
""" |
---|
584 |
months = find_months(yyyy_mm) |
---|
585 |
month_start_dt = months[1] |
---|
586 |
month_end_dt = months[2] - timedelta(seconds=1) |
---|
587 |
|
---|
588 |
configs = find_configs(platform, yyyy_mm, config_dir=defconfigs) |
---|
589 |
|
---|
590 |
if configs: |
---|
591 |
# for each configuration |
---|
592 |
for index in range(len(configs)): |
---|
593 |
cn = configs[index] |
---|
594 |
print ' ... config file : %s' % cn |
---|
595 |
pi = get_config(cn+'.platform_info') |
---|
596 |
(pi['config_start_dt'], pi['config_end_dt']) = get_config_dates(pi) |
---|
597 |
# month start and end dt to pi info |
---|
598 |
asi = get_config(cn+'.sensor_info') |
---|
599 |
if package in pi['packages']: |
---|
600 |
si = asi[package] |
---|
601 |
if si['utc_offset']: |
---|
602 |
print ' ... ... utc_offset : %g (hours)' % si['utc_offset'] |
---|
603 |
si['proc_start_dt'] = month_start_dt |
---|
604 |
si['proc_end_dt'] = month_end_dt |
---|
605 |
si['proc_filename'] = '%s_%s_%s.nc' % (platform, package, yyyy_mm) |
---|
606 |
ofn = os.path.join(si['proc_dir'], si['proc_filename']) |
---|
607 |
(raw_files, raw_dts) = find_raw(si, yyyy_mm) |
---|
608 |
# print raw_files |
---|
609 |
# print raw_dts |
---|
610 |
raw_files = which_raw(pi, raw_files, raw_dts) |
---|
611 |
# print raw_files |
---|
612 |
# print raw_dts |
---|
613 |
# remove any previous netcdf file (platform_package_yyyy_mm.nc) |
---|
614 |
if index==0 and os.path.exists(ofn): |
---|
615 |
os.remove(ofn) |
---|
616 |
# this added just in case data repeated in data files |
---|
617 |
if os.path.exists(ofn): |
---|
618 |
# get last dt from current month file |
---|
619 |
(es, units) = nc_get_time(ofn) |
---|
620 |
last_dt = es2dt(es[-1]) |
---|
621 |
# if older than month_start_dt use it instead to only process newest data |
---|
622 |
if last_dt>=month_start_dt: |
---|
623 |
si['proc_start_dt'] = last_dt |
---|
624 |
|
---|
625 |
if raw_files: |
---|
626 |
process(pi, si, raw_files, yyyy_mm) |
---|
627 |
else: |
---|
628 |
print ' ... ... NOTE: no raw files found for %s %s for %s' % (package, platform, yyyy_mm) |
---|
629 |
|
---|
630 |
else: |
---|
631 |
print ' ... ... NOTE: %s not operational on %s for %s' % (package, platform, yyyy_mm) |
---|
632 |
else: |
---|
633 |
print ' ... ... ... NOTE: %s not operational for %s' % (platform, yyyy_mm) |
---|
634 |
|
---|
635 |
|
---|
636 |
def process(pi, si, raw_files, yyyy_mm): |
---|
637 |
# tailored data processing for different input file formats and control over output |
---|
638 |
(parse, create, update) = import_processors(si['process_module']) |
---|
639 |
for fn in raw_files: |
---|
640 |
# sys.stdout.write('... %s ... ' % fn) |
---|
641 |
# attach file name to sensor info so parser can use it, if needed |
---|
642 |
si['fn'] = fn |
---|
643 |
lines = load_data(fn) |
---|
644 |
if lines: |
---|
645 |
data = parse(pi, si, lines) |
---|
646 |
# determine which index of data is within the specified timeframe (usually the month) |
---|
647 |
n = len(data['dt']) |
---|
648 |
data['in'] = numpy.array([False for i in range(n)]) |
---|
649 |
|
---|
650 |
for index, val in enumerate(data['dt']): |
---|
651 |
if val>=pi['config_start_dt'] and \ |
---|
652 |
val>=si['proc_start_dt'] and \ |
---|
653 |
val<=si['proc_end_dt'] and \ |
---|
654 |
val<=pi['config_end_dt']: |
---|
655 |
data['in'][index] = True |
---|
656 |
|
---|
657 |
# if any records are in the month then write to netcdf |
---|
658 |
if data['in'].any(): |
---|
659 |
sys.stdout.write(' ... %s ... ' % fn) |
---|
660 |
sys.stdout.write('%d\n' % len(data['in'].nonzero()[0])) |
---|
661 |
ofn = os.path.join(si['proc_dir'], si['proc_filename']) |
---|
662 |
# update or create netcdf |
---|
663 |
if os.path.exists(ofn): |
---|
664 |
ut = update(pi,si,data) |
---|
665 |
nc_update(ofn, ut) |
---|
666 |
else: |
---|
667 |
ct = create(pi,si,data) |
---|
668 |
nc_create(ofn, ct) |
---|
669 |
else: |
---|
670 |
# if no lines, file was empty |
---|
671 |
print " ... skipping file %s" % (fn,) |
---|
672 |
|
---|
673 |
|
---|
674 |
# globals |
---|
675 |
start_dt = datetime.utcnow() |
---|
676 |
start_dt.replace(microsecond=0) |
---|
677 |
|
---|
678 |
if __name__ == "__main__": |
---|
679 |
import optparse |
---|
680 |
raw2proc('auto') |
---|
681 |
|
---|
682 |
# for testing |
---|
683 |
# proctype='manual'; platform='bogue'; package='adcp'; yyyy_mm='2007_07' |
---|
684 |
# raw2proc(proctype='manual', platform='bogue', package='adcp', yyyy_mm='2007_07') |
---|