#!/usr/bin/env python """ Parse combined sponge data XML files. Usage: > python parse.py path/to/xml/file > python parse.py -t > python parse.py --test Test silent import. >>> import parse """ __author__ = "Chris Calloway" __email__ = "cbc@chriscalloway.org" __copyright__ = "Copyright 2010 UNC-CH Department of Marine Science" __license__ = "GPL2" import sys import os import re import glob import hashlib import doctest import unittest from StringIO import StringIO import xml.etree.cElementTree as ET USAGE = "\n".join(__doc__.splitlines()[3:8]) TEST_PATH = os.path.join("tests", "parse") XMLNS_PATTERN = re.compile(r"(\{.*\})(.*)") def _test(): """ Run doctests as unittest suite. Test silent import >>> from parse import _test """ suite = [] suite.append(doctest.DocTestSuite()) suite = unittest.TestSuite(suite) unittest.TextTestRunner().run(suite) return def xmldoc_path(): """ Return the XML document file path from the command line. Supply too few arguments on command line. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> sys.argv = [] >>> _xmldoc_path = xmldoc_path() >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True Supply too many arguments on the command line. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> sys.argv = ["", "", "",] >>> _xmldoc_path = xmldoc_path() >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True Supply non-file argument. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _xmldoc_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH) >>> sys.argv = ["", _xmldoc_path] >>> _xmldoc_path = xmldoc_path() >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True Supply nonexistent file argument. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _xmldoc_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "xxxxx") >>> sys.argv = ["", _xmldoc_path] >>> _xmldoc_path = xmldoc_path() >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True Supply valid XML document path argument. >>> _xmldoc_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "xml","*","*.xml") >>> _xmldoc_path = glob.glob(_xmldoc_path)[0] >>> sys.argv = ["", _xmldoc_path] >>> _xmldoc_path == xmldoc_path() True """ path = None try: if len(sys.argv) == 2: if sys.argv[1] == "-t" or sys.argv[1] == "--test": _test() else: path = sys.argv[1] if not os.path.exists(path): raise IOError(path + \ " does not exist.") elif not os.path.isfile(path): raise IOError(path + \ " is not a file.") else: raise IOError("Incorrect number of arguments supplied.") except IOError: print USAGE return path def xmldoc(path): """ Return the XML document as a string from a file at path. Get the test reference data. >>> xmlref = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "xmlref.py") >>> namespace = {} >>> execfile(xmlref, globals(), namespace) >>> xml_doc_lens = namespace["XML_DOC_LENS"] >>> xml_doc_md5s = namespace["XML_DOC_MD5S"] Pick a test document. >>> xmldoc_glob = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "xml","*","*.xml") >>> _xmldoc_path = glob.glob(xmldoc_glob)[0] >>> _xmldoc = xmldoc(_xmldoc_path) Verify the test document matches the reference data. >>> xmldoc_lines = _xmldoc.splitlines() >>> xmldoc_lines[0] == '' True >>> xmldoc_lines[-1] == '' True >>> len(xmldoc_lines) == xml_doc_lens[os.path.basename(_xmldoc_path)] True >>> doc_hash = hashlib.md5() >>> doc_hash.update(_xmldoc) >>> doc_hash.hexdigest() == xml_doc_md5s[os.path.basename(_xmldoc_path)] True """ _xmldoc = None with open(path) as handle: _xmldoc = handle.readlines() return "".join(_xmldoc) class Point(object): """ A data point for a sponge sensor sample." """ def __init__(self, point): for key, value in point.attrib.items(): key = key.lower() super(Point, self).__setattr__(key, value) for elem in point.getchildren(): tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower() super(Point, self).__setattr__(tag, elem.text) class Sensor(object): """ A collection of data points for a sponge sensor sample. """ def __init__(self, sensor, xmlns): for key, value in sensor.attrib.items(): key = key.lower() super(Sensor, self).__setattr__(key, value) for elem in sensor.getchildren(): tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower() if tag == "parameters": self.points = [Point(point) for point in elem.findall(xmlns + "Point")] else: super(Sensor, self).__setattr__(tag, elem.text) class Device(object): """ Data from a collection of sponge sensors for a single time sample. """ def __init__(self, device, xmlns): for key, value in device.attrib.items(): key = key.lower() super(Device, self).__setattr__(key, value) for elem in device.getchildren(): tag = XMLNS_PATTERN.search(elem.tag).groups()[1].lower() if tag == "siteinfo": for subelem in elem.getchildren(): tag = XMLNS_PATTERN.search(subelem.tag).groups()[1].lower() super(Device, self).__setattr__(tag, subelem.text) elif tag == "data": for key, value in elem.attrib.items(): key = key.lower() if key == "time": key = "data_time" elif key == "sessionid": key = "data_sessionid" super(Device, self).__setattr__(key, value) self.sensors = [Sensor(sensor, xmlns) for sensor in elem.findall(xmlns + "SensorData")] else: super(Device, self).__setattr__(tag, elem.text) class Data(object): """ A collection of sponge data samples from a collection of sensors. """ def __init__(self, _xmldoc): """ Initialize a new sponge data tree. """ tree = ET.XML(_xmldoc) self.xmlns = XMLNS_PATTERN.search( tree.getchildren()[0].tag).groups()[0] self.devices = [Device(device, self.xmlns) for device in tree.findall(self.xmlns + "Device")] def _main(): """ Run module as script. Test silent import. >>> from parse import _main """ data = None _xmldoc_path = xmldoc_path() if _xmldoc_path: _xmldoc = xmldoc(_xmldoc_path) data = Data(_xmldoc) return data if __name__ == "__main__": DATA = _main() print "DATA =", DATA