NCCOOS Trac Projects: Top | Web | Platforms | Processing | Viz | Sprints | Sandbox | (Wind)

root/spongenet/trunk/spongenet/parse.py

Revision 368 (checked in by cbc, 14 years ago)

cElementTree might be faster for the parse module.

Line 
1 #!/usr/bin/env python
2
3 """
4 Parse combined sponge data XML files.
5
6 Usage:
7
8    > python parse.py path/to/xml/file
9    > python parse.py -t
10    > python parse.py --test
11
12 Test silent import.
13
14 >>> import parse
15 """
16
17 __author__ = "Chris Calloway"
18 __email__ = "cbc@chriscalloway.org"
19 __copyright__ = "Copyright 2010 UNC-CH Department of Marine Science"
20 __license__ = "GPL2"
21
22 import sys
23 import os
24 import re
25 import glob
26 import hashlib
27 import doctest
28 import unittest
29 from StringIO import StringIO
30 import xml.etree.cElementTree as ET
31
32 USAGE = "\n".join(__doc__.splitlines()[3:8])
33 TEST_PATH = os.path.join("tests", "parse")
34 XMLNS_PATTERN = re.compile(r"(\{.*\})(.*)")
35
36
37 def _test():
38     """
39     Run doctests as unittest suite.
40
41     Test silent import
42
43     >>> from parse import _test
44     """
45
46     suite = []
47     suite.append(doctest.DocTestSuite())
48     suite = unittest.TestSuite(suite)
49     unittest.TextTestRunner().run(suite)
50
51     return
52
53
54 def xmldoc_path():
55     """
56     Return the XML document file path from the command line.
57
58     Supply too few arguments on command line.
59
60     >>> save_stdout = sys.stdout
61     >>> temp_stdout = StringIO()
62     >>> sys.stdout = temp_stdout
63     >>> sys.argv = []
64     >>> _xmldoc_path = xmldoc_path()
65     >>> sys.stdout = save_stdout
66     >>> USAGE == temp_stdout.getvalue()[:-1]
67     True
68
69     Supply too many arguments on the command line.
70
71     >>> save_stdout = sys.stdout
72     >>> temp_stdout = StringIO()
73     >>> sys.stdout = temp_stdout
74     >>> sys.argv = ["", "", "",]
75     >>> _xmldoc_path = xmldoc_path()
76     >>> sys.stdout = save_stdout
77     >>> USAGE == temp_stdout.getvalue()[:-1]
78     True
79
80     Supply non-file argument.
81
82     >>> save_stdout = sys.stdout
83     >>> temp_stdout = StringIO()
84     >>> sys.stdout = temp_stdout
85     >>> _xmldoc_path = os.path.join(
86     ...                    os.path.dirname(
87     ...                        os.path.abspath(__file__)),
88     ...                    TEST_PATH)
89     >>> sys.argv = ["", _xmldoc_path]
90     >>> _xmldoc_path = xmldoc_path()
91     >>> sys.stdout = save_stdout
92     >>> USAGE == temp_stdout.getvalue()[:-1]
93     True
94
95     Supply nonexistent file argument.
96
97     >>> save_stdout = sys.stdout
98     >>> temp_stdout = StringIO()
99     >>> sys.stdout = temp_stdout
100     >>> _xmldoc_path = os.path.join(
101     ...                    os.path.dirname(
102     ...                        os.path.abspath(__file__)),
103     ...                    TEST_PATH, "xxxxx")
104     >>> sys.argv = ["", _xmldoc_path]
105     >>> _xmldoc_path = xmldoc_path()
106     >>> sys.stdout = save_stdout
107     >>> USAGE == temp_stdout.getvalue()[:-1]
108     True
109
110     Supply valid XML document path argument.
111
112     >>> _xmldoc_path = os.path.join(
113     ...                    os.path.dirname(
114     ...                        os.path.abspath(__file__)),
115     ...                    TEST_PATH, "xml","*","*.xml")
116     >>> _xmldoc_path = glob.glob(_xmldoc_path)[0]
117     >>> sys.argv = ["", _xmldoc_path]
118     >>> _xmldoc_path == xmldoc_path()
119     True
120     """
121
122     path = None
123     try:
124         if len(sys.argv) == 2:
125             if sys.argv[1] == "-t" or sys.argv[1] == "--test":
126                 _test()
127             else:
128                 path = sys.argv[1]
129                 if not os.path.exists(path):
130                     raise IOError(path + \
131                                   " does not exist.")
132                 elif not os.path.isfile(path):
133                     raise IOError(path + \
134                                   " is not a file.")
135         else:
136             raise IOError("Incorrect number of arguments supplied.")
137     except IOError:
138         print USAGE
139     return path
140
141
142 def xmldoc(path):
143     """
144     Return the XML document as a string from a file at path.
145
146     Get the test reference data.
147
148     >>> xmlref = os.path.join(
149     ...              os.path.dirname(
150     ...                  os.path.abspath(__file__)),
151     ...               TEST_PATH, "xmlref.py")
152     >>> namespace = {}
153     >>> execfile(xmlref, globals(), namespace)
154     >>> xml_doc_lens = namespace["XML_DOC_LENS"]
155     >>> xml_doc_md5s = namespace["XML_DOC_MD5S"]
156
157     Pick a test document.
158
159     >>> xmldoc_glob = os.path.join(
160     ...                   os.path.dirname(
161     ...                       os.path.abspath(__file__)),
162     ...                   TEST_PATH, "xml","*","*.xml")
163     >>> _xmldoc_path = glob.glob(xmldoc_glob)[0]
164     >>> _xmldoc = xmldoc(_xmldoc_path)
165
166     Verify the test document matches the reference data.
167
168     >>> xmldoc_lines = _xmldoc.splitlines()
169     >>> xmldoc_lines[0] == '<?xml version="1.0" encoding="utf-8"?>'
170     True
171     >>> xmldoc_lines[-1] == '</root>'
172     True
173     >>> len(xmldoc_lines) == xml_doc_lens[os.path.basename(_xmldoc_path)]
174     True
175     >>> doc_hash = hashlib.md5()
176     >>> doc_hash.update(_xmldoc)
177     >>> doc_hash.hexdigest() == xml_doc_md5s[os.path.basename(_xmldoc_path)]
178     True
179     """
180
181     _xmldoc = None
182     with open(path) as handle:
183         _xmldoc = handle.readlines()
184
185     return "".join(_xmldoc)
186
187
188 class Point(object):
189     """
190     A data point for a sponge sensor sample."
191     """
192
193     def __init__(self, point):
194         for key, value in point.attrib.items():
195             key = key.lower()
196             super(Point, self).__setattr__(key, value)
197         for elem in point.getchildren():
198             tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower()
199             super(Point, self).__setattr__(tag, elem.text)
200
201
202 class Sensor(object):
203     """
204     A collection of data points for a sponge sensor sample.
205     """
206
207     def __init__(self, sensor, xmlns):
208         for key, value in sensor.attrib.items():
209             key = key.lower()
210             super(Sensor, self).__setattr__(key, value)
211         for elem in sensor.getchildren():
212             tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower()
213             if tag == "parameters":
214                 self.points = [Point(point) for point
215                                 in elem.findall(xmlns + "Point")]
216             else:
217                 super(Sensor, self).__setattr__(tag, elem.text)
218
219
220 class Device(object):
221     """
222     Data from a collection of sponge sensors for a single time sample.
223     """
224
225     def __init__(self, device, xmlns):
226         for key, value in device.attrib.items():
227             key = key.lower()
228             super(Device, self).__setattr__(key, value)
229         for elem in device.getchildren():
230             tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower()
231             if tag == "siteinfo":
232                 for subelem in elem.getchildren():
233                     tag = XMLNS_PATTERN.search(subelem.tag).groups()[1].lower()
234                     super(Device, self).__setattr__(tag, subelem.text)
235             elif tag == "data":
236                 for key, value in elem.attrib.items():
237                     key = key.lower()
238                     if key == "time":
239                         key = "data_time"
240                     elif key == "sessionid":
241                         key = "data_sessionid"
242                     super(Device, self).__setattr__(key, value)
243                 self.sensors = [Sensor(sensor, xmlns) for sensor
244                                 in elem.findall(xmlns + "SensorData")]
245             else:
246                 super(Device, self).__setattr__(tag, elem.text)
247
248
249 class Data(object):
250     """
251     A collection of sponge data samples from a collection of sensors.
252     """
253
254     def __init__(self, _xmldoc):
255         """
256         Initialize a new sponge data tree.
257         """
258
259         tree = ET.XML(_xmldoc)
260         self.xmlns = XMLNS_PATTERN.search(
261                          tree.getchildren()[0].tag).groups()[0]
262         self.devices = [Device(device, self.xmlns) for device
263                         in tree.findall(self.xmlns + "Device")]
264
265
266 def _main():
267     """
268     Run module as script.
269
270     Test silent import.
271
272     >>> from parse import _main
273     """
274
275     data = None
276
277     _xmldoc_path = xmldoc_path()
278     if _xmldoc_path:
279         _xmldoc = xmldoc(_xmldoc_path)
280         data = Data(_xmldoc)
281
282     return data
283
284 if __name__ == "__main__":
285     DATA = _main()
286     print "DATA =", DATA
Note: See TracBrowser for help on using the browser.