NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/spongenet/trunk/spongenet/parse.py

Revision 370 (checked in by cbc, 14 years ago)

Subclass Point, Sensor, and Device from dict.

Line 
1 #!/usr/bin/env python
2
3 """
4 Parse combined sponge data XML files.
5
6 Usage:
7
8    > python parse.py path/to/xml/file
9    > python parse.py -t
10    > python parse.py --test
11
12 Test silent import.
13
14 >>> import parse
15 """
16
17 __author__ = "Chris Calloway"
18 __email__ = "cbc@chriscalloway.org"
19 __copyright__ = "Copyright 2010 UNC-CH Department of Marine Science"
20 __license__ = "GPL2"
21
22 import sys
23 import os
24 import re
25 import glob
26 import hashlib
27 import doctest
28 import unittest
29 from StringIO import StringIO
30 import xml.etree.cElementTree as ET
31
32 USAGE = "\n".join(__doc__.splitlines()[3:8])
33 TEST_PATH = os.path.join("tests", "parse")
34 XMLNS_PATTERN = re.compile(r"(\{.*\})(.*)")
35
36
37 def _test():
38     """
39     Run doctests as unittest suite.
40
41     Test silent import
42
43     >>> from parse import _test
44     """
45
46     suite = []
47     suite.append(doctest.DocTestSuite())
48     suite = unittest.TestSuite(suite)
49     unittest.TextTestRunner().run(suite)
50
51     return
52
53
54 def xmldoc_path():
55     """
56     Return the XML document file path from the command line.
57
58     Supply too few arguments on command line.
59
60     >>> save_stdout = sys.stdout
61     >>> temp_stdout = StringIO()
62     >>> sys.stdout = temp_stdout
63     >>> sys.argv = []
64     >>> _xmldoc_path = xmldoc_path()
65     >>> sys.stdout = save_stdout
66     >>> USAGE == temp_stdout.getvalue()[:-1]
67     True
68
69     Supply too many arguments on the command line.
70
71     >>> save_stdout = sys.stdout
72     >>> temp_stdout = StringIO()
73     >>> sys.stdout = temp_stdout
74     >>> sys.argv = ["", "", "",]
75     >>> _xmldoc_path = xmldoc_path()
76     >>> sys.stdout = save_stdout
77     >>> USAGE == temp_stdout.getvalue()[:-1]
78     True
79
80     Supply non-file argument.
81
82     >>> save_stdout = sys.stdout
83     >>> temp_stdout = StringIO()
84     >>> sys.stdout = temp_stdout
85     >>> _xmldoc_path = os.path.join(
86     ...                    os.path.dirname(
87     ...                        os.path.abspath(__file__)),
88     ...                    TEST_PATH)
89     >>> sys.argv = ["", _xmldoc_path]
90     >>> _xmldoc_path = xmldoc_path()
91     >>> sys.stdout = save_stdout
92     >>> USAGE == temp_stdout.getvalue()[:-1]
93     True
94
95     Supply nonexistent file argument.
96
97     >>> save_stdout = sys.stdout
98     >>> temp_stdout = StringIO()
99     >>> sys.stdout = temp_stdout
100     >>> _xmldoc_path = os.path.join(
101     ...                    os.path.dirname(
102     ...                        os.path.abspath(__file__)),
103     ...                    TEST_PATH, "xxxxx")
104     >>> sys.argv = ["", _xmldoc_path]
105     >>> _xmldoc_path = xmldoc_path()
106     >>> sys.stdout = save_stdout
107     >>> USAGE == temp_stdout.getvalue()[:-1]
108     True
109
110     Supply valid XML document path argument.
111
112     >>> _xmldoc_path = os.path.join(
113     ...                    os.path.dirname(
114     ...                        os.path.abspath(__file__)),
115     ...                    TEST_PATH, "xml","*","*.xml")
116     >>> _xmldoc_path = glob.glob(_xmldoc_path)[0]
117     >>> sys.argv = ["", _xmldoc_path]
118     >>> _xmldoc_path == xmldoc_path()
119     True
120     """
121
122     path = None
123     try:
124         if len(sys.argv) == 2:
125             if sys.argv[1] == "-t" or sys.argv[1] == "--test":
126                 _test()
127             else:
128                 path = sys.argv[1]
129                 if not os.path.exists(path):
130                     raise IOError(path + \
131                                   " does not exist.")
132                 elif not os.path.isfile(path):
133                     raise IOError(path + \
134                                   " is not a file.")
135         else:
136             raise IOError("Incorrect number of arguments supplied.")
137     except IOError:
138         print USAGE
139     return path
140
141
142 def xmldoc(path):
143     """
144     Return the XML document as a string from a file at path.
145
146     Get the test reference data.
147
148     >>> xmlref = os.path.join(
149     ...              os.path.dirname(
150     ...                  os.path.abspath(__file__)),
151     ...               TEST_PATH, "xmlref.py")
152     >>> namespace = {}
153     >>> execfile(xmlref, globals(), namespace)
154     >>> xml_doc_lens = namespace["XML_DOC_LENS"]
155     >>> xml_doc_md5s = namespace["XML_DOC_MD5S"]
156
157     Pick a test document.
158
159     >>> xmldoc_glob = os.path.join(
160     ...                   os.path.dirname(
161     ...                       os.path.abspath(__file__)),
162     ...                   TEST_PATH, "xml","*","*.xml")
163     >>> _xmldoc_path = glob.glob(xmldoc_glob)[0]
164     >>> _xmldoc = xmldoc(_xmldoc_path)
165
166     Verify the test document matches the reference data.
167
168     >>> xmldoc_lines = _xmldoc.splitlines()
169     >>> xmldoc_lines[0] == '<?xml version="1.0" encoding="utf-8"?>'
170     True
171     >>> xmldoc_lines[-1] == '</root>'
172     True
173     >>> len(xmldoc_lines) == xml_doc_lens[os.path.basename(_xmldoc_path)]
174     True
175     >>> doc_hash = hashlib.md5()
176     >>> doc_hash.update(_xmldoc)
177     >>> doc_hash.hexdigest() == xml_doc_md5s[os.path.basename(_xmldoc_path)]
178     True
179     """
180
181     _xmldoc = None
182     with open(path) as handle:
183         _xmldoc = handle.readlines()
184
185     return "".join(_xmldoc)
186
187
188 class Point(dict):
189     """
190     A data point for a sponge sensor sample."
191
192     Instantiate a valid Point object.
193
194     >>> xmldoc_glob = os.path.join(
195     ...                   os.path.dirname(
196     ...                       os.path.abspath(__file__)),
197     ...                   TEST_PATH, "xml","*","*.xml")
198     >>> _xmldoc_path = glob.glob(xmldoc_glob)[0]
199     >>> _xmldoc = xmldoc(_xmldoc_path)
200     >>> _data = Data(_xmldoc)
201     >>> point = _data.devices[0].sensors[0].points[0]
202     >>> len(point.keys()) == 8
203     True
204     >>> point["id"] != None
205     True
206     >>> point["descr"] != None
207     True
208     >>> point["type"] != None
209     True
210     >>> point["format"] != None
211     True
212     >>> point["unit"] != None
213     True
214     >>> point["rangemin"] != None
215     True
216     >>> point["rangemax"] != None
217     True
218     >>> point["value"] != None
219     True
220     """
221
222     def __init__(self, point):
223         super(Point, self).__init__({})
224         for key, value in point.attrib.items():
225             key = key.lower()
226             self[key] = value
227         for elem in point.getchildren():
228             tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower()
229             self[tag] = elem.text
230
231
232 class Sensor(dict):
233     """
234     A collection of data points for a sponge sensor sample.
235
236     Instantiate a valid Sensor object.
237
238     >>> xmldoc_glob = os.path.join(
239     ...                   os.path.dirname(
240     ...                       os.path.abspath(__file__)),
241     ...                   TEST_PATH, "xml","*","*.xml")
242     >>> _xmldoc_path = glob.glob(xmldoc_glob)[0]
243     >>> _xmldoc = xmldoc(_xmldoc_path)
244     >>> _data = Data(_xmldoc)
245     >>> sensor = _data.devices[0].sensors[0]
246     >>> len(sensor.points)
247     3
248     >>> len(sensor.keys()) == 9
249     True
250     >>> sensor["id"] != None
251     True
252     >>> sensor["serialno"] != None
253     True
254     >>> sensor["prodno"] != None
255     True
256     >>> sensor["prodname"] != None
257     True
258     >>> sensor["descr"] != None
259     True
260     >>> sensor["adr"] != None
261     True
262     >>> sensor["protocolver"] != None
263     True
264     >>> sensor["verticalposition"] != None
265     True
266     >>> sensor["status"] != None
267     True
268     """
269
270     def __init__(self, sensor, xmlns):
271         super(Sensor, self).__init__({})
272         for key, value in sensor.attrib.items():
273             key = key.lower()
274             self[key] = value
275         for elem in sensor.getchildren():
276             tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower()
277             if tag == "parameters":
278                 self.points = [Point(point) for point
279                                 in elem.findall(xmlns + "Point")]
280             else:
281                 self[tag] = elem.text
282
283
284 class Device(dict):
285     """
286     Data from a collection of sponge sensors for a single time sample.
287
288     Instantiate a valid Device object.
289
290     >>> xmldoc_glob = os.path.join(
291     ...                   os.path.dirname(
292     ...                       os.path.abspath(__file__)),
293     ...                   TEST_PATH, "xml","*","*.xml")
294     >>> _xmldoc_path = glob.glob(xmldoc_glob)[0]
295     >>> _xmldoc = xmldoc(_xmldoc_path)
296     >>> _data = Data(_xmldoc)
297     >>> device = _data.devices[0]
298     >>> len(device.sensors)
299     15
300     >>> len(device.keys())
301     16
302     >>> device["id"] != None
303     True
304     >>> device["sessionid"] != None
305     True
306     >>> device["descr"] != None
307     True
308     >>> device["serialno"] != None
309     True
310     >>> device["prodno"] != None
311     True
312     >>> device["prodname"] != None
313     True
314     >>> device["devicetype"] != None
315     True
316     >>> device["protocolver"] != None
317     True
318     >>> device["time"] != None
319     True
320     >>> device["status"] != None
321     True
322     >>> device["location"] != None
323     True
324     >>> device["verticalposition"] != None
325     True
326     >>> device["owner"] != None
327     True
328     >>> device["recordnumber"] != None
329     True
330     >>> device["data_time"] != None
331     True
332     >>> device["data_sessionid"] != None
333     True
334     """
335
336     def __init__(self, device, xmlns):
337         super(Device, self).__init__({})
338         for key, value in device.attrib.items():
339             key = key.lower()
340             self[key] = value
341         for elem in device.getchildren():
342             tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower()
343             if tag == "siteinfo":
344                 for subelem in elem.getchildren():
345                     tag = XMLNS_PATTERN.search(subelem.tag).groups()[1].lower()
346                     self[tag] = subelem.text
347             elif tag == "data":
348                 for key, value in elem.attrib.items():
349                     key = key.lower()
350                     if key == "time":
351                         key = "data_time"
352                     elif key == "sessionid":
353                         key = "data_sessionid"
354                     self[key] = value
355                 self.sensors = [Sensor(sensor, xmlns) for sensor
356                                 in elem.findall(xmlns + "SensorData")]
357             else:
358                 self[tag] = elem.text
359
360
361 class Data(object):
362     """
363     A collection of sponge data samples from a collection of sensors.
364
365     Instantiate a valid Data object.
366
367     >>> xmldoc_glob = os.path.join(
368     ...                   os.path.dirname(
369     ...                       os.path.abspath(__file__)),
370     ...                   TEST_PATH, "xml","*","*.xml")
371     >>> _xmldoc_path = glob.glob(xmldoc_glob)[0]
372     >>> _xmldoc = xmldoc(_xmldoc_path)
373     >>> _data = Data(_xmldoc)
374     >>> _data.xmlns
375     '{http://www.aadi.no/RTOutSchema}'
376     >>> len(_data.devices)
377     50
378     """
379
380     def __init__(self, _xmldoc):
381         """
382         Initialize a new sponge data tree.
383         """
384
385         tree = ET.XML(_xmldoc)
386         self.xmlns = XMLNS_PATTERN.search(
387                          tree.getchildren()[0].tag).groups()[0]
388         self.devices = [Device(device, self.xmlns) for device
389                         in tree.findall(self.xmlns + "Device")]
390
391
392 def _main():
393     """
394     Run module as script.
395
396     Test silent import.
397
398     >>> from parse import _main
399     """
400
401     data = None
402
403     _xmldoc_path = xmldoc_path()
404     if _xmldoc_path:
405         _xmldoc = xmldoc(_xmldoc_path)
406         data = Data(_xmldoc)
407
408     return data
409
410 if __name__ == "__main__":
411     DATA = _main()
412     print "DATA =", DATA
Note: See TracBrowser for help on using the browser.