#!/usr/bin/env python """ Expand zipped sponge data files. Usage: > python expand.py path/to/config/file > python expand.py -t > python expand.py --test Test silent import. >>> import expand """ __author__ = "Chris Calloway" __email__ = "cbc@chriscalloway.org" __copyright__ = "Copyright 2010 UNC-CH Department of Marine Science" __license__ = "GPL2" import sys import os import glob import zipfile import shutil import fileinput import doctest import unittest from StringIO import StringIO USAGE = "\n".join(__doc__.splitlines()[3:8]) TEST_PATH = "tests/expand" def _test(): """ Run doctests as unittest suite. Test silent import >>> from expand import _test """ suite = [] suite.append(doctest.DocTestSuite()) suite = unittest.TestSuite(suite) unittest.TextTestRunner().run(suite) return def config_path(): """ Return the configuration file path from the command line. Supply too few arguments on command line. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> sys.argv = [] >>> _config_path = config_path() >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True Supply too many arguments on the command line. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> sys.argv = ["", "", "",] >>> _config_path = config_path() >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True Supply non-file argument. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH) >>> sys.argv = ["", _config_path] >>> _config_path = config_path() >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True Supply nonexistent file argument. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "xxxxx") >>> sys.argv = ["", _config_path] >>> _config_path = config_path() >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True Supply valid config path argument. >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "config.py") >>> sys.argv = ["", _config_path] >>> _config_path == config_path() True """ path = None try: if len(sys.argv) == 2: if sys.argv[1] == "-t" or sys.argv[1] == "--test": _test() else: path = sys.argv[1] if not os.path.exists(path): raise IOError(path + \ " does not exist.") elif not os.path.isfile(path): raise IOError(path + \ " is not a file.") else: raise IOError("Incorrect number of arguments supplied.") except IOError: print USAGE return path def config(path): """ Return the configuration from a file. Execute empty configuration. >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "empty_config.py") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> zipdir >>> xmldir >>> zipdir_pattern >>> zipfile_pattern >>> xmlfile_pattern Execute nonexistent configuration. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "xxxxx") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True >>> zipdir >>> xmldir >>> zipdir_pattern >>> zipfile_pattern >>> xmlfile_pattern Execute bad configuration. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "bad_config.py") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> sys.stdout = save_stdout >>> USAGE == temp_stdout.getvalue()[:-1] True >>> >>> zipdir >>> xmldir >>> zipdir_pattern >>> zipfile_pattern >>> xmlfile_pattern Execute valid configuration. >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "config.py") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> zipdir == os.path.join(os.path.dirname(os.path.abspath(__file__)), ... TEST_PATH, "zip") True >>> xmldir == os.path.join(os.path.dirname(os.path.abspath(__file__)), ... TEST_PATH, "xml") True >>> zipdir_pattern == "[0-9][0-9][0-9][0-9]_[0-9][0-9]" True >>> zipfile_pattern == "[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]T" \ "[0-9][0-9][0-9][0-9][0-9][0-9]-" \ "[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]T" \ "[0-9][0-9][0-9][0-9][0-9][0-9].zip" True >>> xmlfile_pattern == "[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]T" \ "[0-9][0-9][0-9][0-9][0-9][0-9].xml" True """ namespace = {} namespace["zipdir"] = None namespace["xmldir"] = None namespace["zipdir_pattern"] = None namespace["zipfile_pattern"] = None namespace["xmlfile_pattern"] = None try: execfile(path, globals(), namespace) except IOError: print USAGE except SyntaxError: print USAGE return (namespace["zipdir"], namespace["xmldir"], namespace["zipdir_pattern"], namespace["zipfile_pattern"], namespace["xmlfile_pattern"],) def combine(xml_subdir, xmlfile_pattern): """ Combine the XML sponge data files from a subdirectory. Combine test subdirectory. >>> xmlref_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "xmlref") >>> xmlref = glob.glob(os.path.join(xmlref_path, "*")) >>> xmlref_path = [path for path in xmlref if os.path.isdir(path)][0] >>> xmltest_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "xmltest") >>> if os.path.exists(xmltest_path): ... shutil.rmtree(xmltest_path) >>> shutil.copytree(xmlref_path, xmltest_path) >>> xmltest = glob.glob(os.path.join(xmltest_path, "*")) >>> xml_subdir = [path for path in xmltest if os.path.isdir(path)][0] >>> xmlfile_pattern = "[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]T" \ "[0-9][0-9][0-9][0-9][0-9][0-9].xml" >>> combine(xml_subdir, xmlfile_pattern) >>> ref_path = xml_subdir + os.extsep + "ref" >>> xml_path = xml_subdir + os.extsep + "xml" >>> with open(ref_path) as ref_handle: ... reffile = ref_handle.readlines() >>> with open(xml_path) as xml_handle: ... xmlfile = xml_handle.readlines() >>> reffile == xmlfile True >>> os.path.exists(xml_subdir) False """ # Find all the XML sponge data files. xmlfiles = glob.glob(os.path.join(xml_subdir, xmlfile_pattern)) xmlfiles = [xmlfile for xmlfile in xmlfiles if os.path.isfile(xmlfile)] # Read all the XML sponge data files. files = fileinput.FileInput(xmlfiles) header = files.readline().rstrip() + "\n" lines = [line.rstrip() + "\n" for line in files if not files.isfirstline()] # Write a combined XML sponge data file. path = xml_subdir + os.extsep + "xml" with open(path, "w") as handle: handle.write(header) handle.write("\n") handle.writelines(lines) handle.write("\n") # Remove the XML sponge data file subdirectory. shutil.rmtree(xml_subdir) return def expand(zipdir, xmldir, zipdir_pattern, zipfile_pattern, xmlfile_pattern): """ Expand zipped sponge data files. Expand valid tree. >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "config.py") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> if os.path.exists(xmldir): ... shutil.rmtree(xmldir) >>> expand(zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern) >>> xmlref_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... TEST_PATH, "xmlref") >>> ref_paths = sorted(glob.glob(os.path.join(TEST_PATH, "xmlref", ... "*", "*.ref"))) >>> xml_paths = sorted(glob.glob(os.path.join(xmldir, "*", "*.xml"))) >>> paths = zip(ref_paths, xml_paths) >>> truths = [] >>> for ref_path,xml_path in paths: ... with open(ref_path) as ref_handle: ... reffile = ref_handle.readlines() ... with open(xml_path) as xml_handle: ... xmlfile = xml_handle.readlines() ... truths.extend((reffile == xmlfile, ... os.path.exists(os.path.splitext(xml_path)[0]),)) >>> truths == [True, False,] * len(paths) True """ # Find all the monthly directories of zipped files. zip_months = glob.glob(os.path.join(zipdir, zipdir_pattern)) zip_months = [zip_month for zip_month in zip_months if os.path.isdir(zip_month)] # Create a directory for monthly XML subdirectories. if not os.path.exists(xmldir): os.mkdir(xmldir, 0755) elif not os.path.isdir(xmldir): raise IOError("XML directory name " + \ xmldir + \ " exists and is not a directory.") for zip_month in zip_months: # Create each monthly XML subdirectory. xml_month = os.path.join(xmldir, os.path.split(zip_month)[1]) if not os.path.exists(xml_month): os.mkdir(xml_month, 0755) elif not os.path.isdir(xml_month): raise IOError("XML month subdirectory name " + \ xml_month + \ " exists and is not a directory.") # Find all the zip files for each month. zipfiles = glob.glob(os.path.join(zip_month, zipfile_pattern)) zipfiles = [zip_file for zip_file in zipfiles if os.path.isfile(zip_file)] for zip_file in zipfiles: # Create an XML subdirectory for each zip file. xml_subdir = os.path.splitext(os.path.split(zip_file)[1])[0] xml_subdir = os.path.join(xml_month, xml_subdir) if not os.path.exists(xml_subdir): os.mkdir(xml_subdir, 0755) # Extract all the XML files in the zip file # and combine into a single XML document. archive = zipfile.ZipFile(zip_file, "r") archive.extractall(xml_subdir) combine(xml_subdir, xmlfile_pattern) elif not os.path.isdir(xml_subdir): raise IOError("XML file subdirectory name " + \ xml_subdir + \ " exists and is not a directory.") return def _main(): """ Run module as script. Test silent import. >>> from expand import _main """ _config_path = config_path() if _config_path: _config = config(_config_path) if all(_config): expand(*_config) return if __name__ == "__main__": _main()