1 |
|
---|
2 |
|
---|
3 |
""" |
---|
4 |
Parse combined sponge data XML files. |
---|
5 |
|
---|
6 |
Usage: |
---|
7 |
|
---|
8 |
> python parse.py path/to/xml/file |
---|
9 |
> python parse.py -t |
---|
10 |
> python parse.py --test |
---|
11 |
|
---|
12 |
Test silent import. |
---|
13 |
|
---|
14 |
>>> import parse |
---|
15 |
""" |
---|
16 |
|
---|
17 |
__author__ = "Chris Calloway" |
---|
18 |
__email__ = "cbc@chriscalloway.org" |
---|
19 |
__copyright__ = "Copyright 2010 UNC-CH Department of Marine Science" |
---|
20 |
__license__ = "GPL2" |
---|
21 |
|
---|
22 |
import sys |
---|
23 |
import os |
---|
24 |
import re |
---|
25 |
import glob |
---|
26 |
import hashlib |
---|
27 |
import doctest |
---|
28 |
import unittest |
---|
29 |
from StringIO import StringIO |
---|
30 |
import xml.etree.cElementTree as ET |
---|
31 |
|
---|
32 |
USAGE = "\n".join(__doc__.splitlines()[3:8]) |
---|
33 |
TEST_PATH = os.path.join("tests", "parse") |
---|
34 |
XMLNS_PATTERN = re.compile(r"(\{.*\})(.*)") |
---|
35 |
|
---|
36 |
|
---|
37 |
def _test(): |
---|
38 |
""" |
---|
39 |
Run doctests as unittest suite. |
---|
40 |
|
---|
41 |
Test silent import |
---|
42 |
|
---|
43 |
>>> from parse import _test |
---|
44 |
""" |
---|
45 |
|
---|
46 |
suite = [] |
---|
47 |
suite.append(doctest.DocTestSuite()) |
---|
48 |
suite = unittest.TestSuite(suite) |
---|
49 |
unittest.TextTestRunner().run(suite) |
---|
50 |
|
---|
51 |
return |
---|
52 |
|
---|
53 |
|
---|
54 |
def xmldoc_path(): |
---|
55 |
""" |
---|
56 |
Return the XML document file path from the command line. |
---|
57 |
|
---|
58 |
Supply too few arguments on command line. |
---|
59 |
|
---|
60 |
>>> save_stdout = sys.stdout |
---|
61 |
>>> temp_stdout = StringIO() |
---|
62 |
>>> sys.stdout = temp_stdout |
---|
63 |
>>> sys.argv = [] |
---|
64 |
>>> _xmldoc_path = xmldoc_path() |
---|
65 |
>>> sys.stdout = save_stdout |
---|
66 |
>>> USAGE == temp_stdout.getvalue()[:-1] |
---|
67 |
True |
---|
68 |
|
---|
69 |
Supply too many arguments on the command line. |
---|
70 |
|
---|
71 |
>>> save_stdout = sys.stdout |
---|
72 |
>>> temp_stdout = StringIO() |
---|
73 |
>>> sys.stdout = temp_stdout |
---|
74 |
>>> sys.argv = ["", "", "",] |
---|
75 |
>>> _xmldoc_path = xmldoc_path() |
---|
76 |
>>> sys.stdout = save_stdout |
---|
77 |
>>> USAGE == temp_stdout.getvalue()[:-1] |
---|
78 |
True |
---|
79 |
|
---|
80 |
Supply non-file argument. |
---|
81 |
|
---|
82 |
>>> save_stdout = sys.stdout |
---|
83 |
>>> temp_stdout = StringIO() |
---|
84 |
>>> sys.stdout = temp_stdout |
---|
85 |
>>> _xmldoc_path = os.path.join( |
---|
86 |
... os.path.dirname( |
---|
87 |
... os.path.abspath(__file__)), |
---|
88 |
... TEST_PATH) |
---|
89 |
>>> sys.argv = ["", _xmldoc_path] |
---|
90 |
>>> _xmldoc_path = xmldoc_path() |
---|
91 |
>>> sys.stdout = save_stdout |
---|
92 |
>>> USAGE == temp_stdout.getvalue()[:-1] |
---|
93 |
True |
---|
94 |
|
---|
95 |
Supply nonexistent file argument. |
---|
96 |
|
---|
97 |
>>> save_stdout = sys.stdout |
---|
98 |
>>> temp_stdout = StringIO() |
---|
99 |
>>> sys.stdout = temp_stdout |
---|
100 |
>>> _xmldoc_path = os.path.join( |
---|
101 |
... os.path.dirname( |
---|
102 |
... os.path.abspath(__file__)), |
---|
103 |
... TEST_PATH, "xxxxx") |
---|
104 |
>>> sys.argv = ["", _xmldoc_path] |
---|
105 |
>>> _xmldoc_path = xmldoc_path() |
---|
106 |
>>> sys.stdout = save_stdout |
---|
107 |
>>> USAGE == temp_stdout.getvalue()[:-1] |
---|
108 |
True |
---|
109 |
|
---|
110 |
Supply valid XML document path argument. |
---|
111 |
|
---|
112 |
>>> _xmldoc_path = os.path.join( |
---|
113 |
... os.path.dirname( |
---|
114 |
... os.path.abspath(__file__)), |
---|
115 |
... TEST_PATH, "xml","*","*.xml") |
---|
116 |
>>> _xmldoc_path = glob.glob(_xmldoc_path)[0] |
---|
117 |
>>> sys.argv = ["", _xmldoc_path] |
---|
118 |
>>> _xmldoc_path == xmldoc_path() |
---|
119 |
True |
---|
120 |
""" |
---|
121 |
|
---|
122 |
path = None |
---|
123 |
try: |
---|
124 |
if len(sys.argv) == 2: |
---|
125 |
if sys.argv[1] == "-t" or sys.argv[1] == "--test": |
---|
126 |
_test() |
---|
127 |
else: |
---|
128 |
path = sys.argv[1] |
---|
129 |
if not os.path.exists(path): |
---|
130 |
raise IOError(path + \ |
---|
131 |
" does not exist.") |
---|
132 |
elif not os.path.isfile(path): |
---|
133 |
raise IOError(path + \ |
---|
134 |
" is not a file.") |
---|
135 |
else: |
---|
136 |
raise IOError("Incorrect number of arguments supplied.") |
---|
137 |
except IOError: |
---|
138 |
print USAGE |
---|
139 |
return path |
---|
140 |
|
---|
141 |
|
---|
142 |
def xmldoc(path): |
---|
143 |
""" |
---|
144 |
Return the XML document as a string from a file at path. |
---|
145 |
|
---|
146 |
Get the test reference data. |
---|
147 |
|
---|
148 |
>>> xmlref = os.path.join( |
---|
149 |
... os.path.dirname( |
---|
150 |
... os.path.abspath(__file__)), |
---|
151 |
... TEST_PATH, "xmlref.py") |
---|
152 |
>>> namespace = {} |
---|
153 |
>>> execfile(xmlref, globals(), namespace) |
---|
154 |
>>> xml_doc_lens = namespace["XML_DOC_LENS"] |
---|
155 |
>>> xml_doc_md5s = namespace["XML_DOC_MD5S"] |
---|
156 |
|
---|
157 |
Pick a test document. |
---|
158 |
|
---|
159 |
>>> xmldoc_glob = os.path.join( |
---|
160 |
... os.path.dirname( |
---|
161 |
... os.path.abspath(__file__)), |
---|
162 |
... TEST_PATH, "xml","*","*.xml") |
---|
163 |
>>> _xmldoc_path = glob.glob(xmldoc_glob)[0] |
---|
164 |
>>> _xmldoc = xmldoc(_xmldoc_path) |
---|
165 |
|
---|
166 |
Verify the test document matches the reference data. |
---|
167 |
|
---|
168 |
>>> xmldoc_lines = _xmldoc.splitlines() |
---|
169 |
>>> xmldoc_lines[0] == '<?xml version="1.0" encoding="utf-8"?>' |
---|
170 |
True |
---|
171 |
>>> xmldoc_lines[-1] == '</root>' |
---|
172 |
True |
---|
173 |
>>> len(xmldoc_lines) == xml_doc_lens[os.path.basename(_xmldoc_path)] |
---|
174 |
True |
---|
175 |
>>> doc_hash = hashlib.md5() |
---|
176 |
>>> doc_hash.update(_xmldoc) |
---|
177 |
>>> doc_hash.hexdigest() == xml_doc_md5s[os.path.basename(_xmldoc_path)] |
---|
178 |
True |
---|
179 |
""" |
---|
180 |
|
---|
181 |
_xmldoc = None |
---|
182 |
with open(path) as handle: |
---|
183 |
_xmldoc = handle.readlines() |
---|
184 |
|
---|
185 |
return "".join(_xmldoc) |
---|
186 |
|
---|
187 |
|
---|
188 |
class Point(object): |
---|
189 |
""" |
---|
190 |
A data point for a sponge sensor sample." |
---|
191 |
""" |
---|
192 |
|
---|
193 |
def __init__(self, point): |
---|
194 |
for key, value in point.attrib.items(): |
---|
195 |
key = key.lower() |
---|
196 |
super(Point, self).__setattr__(key, value) |
---|
197 |
for elem in point.getchildren(): |
---|
198 |
tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower() |
---|
199 |
super(Point, self).__setattr__(tag, elem.text) |
---|
200 |
|
---|
201 |
|
---|
202 |
class Sensor(object): |
---|
203 |
""" |
---|
204 |
A collection of data points for a sponge sensor sample. |
---|
205 |
""" |
---|
206 |
|
---|
207 |
def __init__(self, sensor, xmlns): |
---|
208 |
for key, value in sensor.attrib.items(): |
---|
209 |
key = key.lower() |
---|
210 |
super(Sensor, self).__setattr__(key, value) |
---|
211 |
for elem in sensor.getchildren(): |
---|
212 |
tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower() |
---|
213 |
if tag == "parameters": |
---|
214 |
self.points = [Point(point) for point |
---|
215 |
in elem.findall(xmlns + "Point")] |
---|
216 |
else: |
---|
217 |
super(Sensor, self).__setattr__(tag, elem.text) |
---|
218 |
|
---|
219 |
|
---|
220 |
class Device(object): |
---|
221 |
""" |
---|
222 |
Data from a collection of sponge sensors for a single time sample. |
---|
223 |
""" |
---|
224 |
|
---|
225 |
def __init__(self, device, xmlns): |
---|
226 |
for key, value in device.attrib.items(): |
---|
227 |
key = key.lower() |
---|
228 |
super(Device, self).__setattr__(key, value) |
---|
229 |
for elem in device.getchildren(): |
---|
230 |
tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower() |
---|
231 |
if tag == "siteinfo": |
---|
232 |
for subelem in elem.getchildren(): |
---|
233 |
tag = XMLNS_PATTERN.search(subelem.tag).groups()[1].lower() |
---|
234 |
super(Device, self).__setattr__(tag, subelem.text) |
---|
235 |
elif tag == "data": |
---|
236 |
for key, value in elem.attrib.items(): |
---|
237 |
key = key.lower() |
---|
238 |
if key == "time": |
---|
239 |
key = "data_time" |
---|
240 |
elif key == "sessionid": |
---|
241 |
key = "data_sessionid" |
---|
242 |
super(Device, self).__setattr__(key, value) |
---|
243 |
self.sensors = [Sensor(sensor, xmlns) for sensor |
---|
244 |
in elem.findall(xmlns + "SensorData")] |
---|
245 |
else: |
---|
246 |
super(Device, self).__setattr__(tag, elem.text) |
---|
247 |
|
---|
248 |
|
---|
249 |
class Data(object): |
---|
250 |
""" |
---|
251 |
A collection of sponge data samples from a collection of sensors. |
---|
252 |
""" |
---|
253 |
|
---|
254 |
def __init__(self, _xmldoc): |
---|
255 |
""" |
---|
256 |
Initialize a new sponge data tree. |
---|
257 |
""" |
---|
258 |
|
---|
259 |
tree = ET.XML(_xmldoc) |
---|
260 |
self.xmlns = XMLNS_PATTERN.search( |
---|
261 |
tree.getchildren()[0].tag).groups()[0] |
---|
262 |
self.devices = [Device(device, self.xmlns) for device |
---|
263 |
in tree.findall(self.xmlns + "Device")] |
---|
264 |
|
---|
265 |
|
---|
266 |
def _main(): |
---|
267 |
""" |
---|
268 |
Run module as script. |
---|
269 |
|
---|
270 |
Test silent import. |
---|
271 |
|
---|
272 |
>>> from parse import _main |
---|
273 |
""" |
---|
274 |
|
---|
275 |
data = None |
---|
276 |
|
---|
277 |
_xmldoc_path = xmldoc_path() |
---|
278 |
if _xmldoc_path: |
---|
279 |
_xmldoc = xmldoc(_xmldoc_path) |
---|
280 |
data = Data(_xmldoc) |
---|
281 |
|
---|
282 |
return data |
---|
283 |
|
---|
284 |
if __name__ == "__main__": |
---|
285 |
DATA = _main() |
---|
286 |
print "DATA =", DATA |
---|