""" Expand zipped sponge data files. Usage: > python expand.py path/to/config/file >>> import expand """ __author__ = "Chris Calloway" __email__ = "cbc@chriscalloway.org" __copyright__ = "Copyright 2010 UNC-CH Department of Marine Science" __license__ = "GPL2" import sys import os import glob import zipfile import shutil import fileinput from StringIO import StringIO usage = "\n".join(__doc__.splitlines()[3:6]) test_path = "tests/expand" def config_path(): """ Return the configuration file path from the command line. Supply too few arguments on command line. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> sys.argv = [] >>> _config_path = config_path() >>> sys.stdout = save_stdout >>> usage == temp_stdout.getvalue()[:-1] True Supply too many arguments on the command line. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> sys.argv = ["", "", "",] >>> _config_path = config_path() >>> sys.stdout = save_stdout >>> usage == temp_stdout.getvalue()[:-1] True Supply non-file argument. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path) >>> sys.argv = ["", _config_path] >>> _config_path = config_path() >>> sys.stdout = save_stdout >>> usage == temp_stdout.getvalue()[:-1] True Supply nonexistent file argument. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path, "xxxxx") >>> sys.argv = ["", _config_path] >>> _config_path = config_path() >>> sys.stdout = save_stdout >>> usage == temp_stdout.getvalue()[:-1] True Supply valid config path argument. >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path, "config.py") >>> sys.argv = ["", _config_path] >>> _config_path == config_path() True """ path = None try: if len(sys.argv) == 2: path = sys.argv[1] if not os.path.exists(path): raise IOError(path + \ " does not exist.") elif not os.path.isfile(path): raise IOError(path + \ " is not a file.") else: raise IOError("Incorrect number of arguments supplied.") except IOError: print usage return path def config(path): """ Return the configuration from a file. Execute empty configuration. >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path, "empty_config.py") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> zipdir >>> xmldir >>> zipdir_pattern >>> zipfile_pattern >>> xmlfile_pattern Execute nonexistent configuration. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path, "xxxxx") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> sys.stdout = save_stdout >>> usage == temp_stdout.getvalue()[:-1] True >>> zipdir >>> xmldir >>> zipdir_pattern >>> zipfile_pattern >>> xmlfile_pattern Execute bad configuration. >>> save_stdout = sys.stdout >>> temp_stdout = StringIO() >>> sys.stdout = temp_stdout >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path, "bad_config.py") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> sys.stdout = save_stdout >>> usage == temp_stdout.getvalue()[:-1] True >>> >>> zipdir >>> xmldir >>> zipdir_pattern >>> zipfile_pattern >>> xmlfile_pattern Execute valid configuration. >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path, "config.py") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> zipdir == os.path.join(os.path.dirname(os.path.abspath(__file__)), ... test_path, "zip") True >>> xmldir == os.path.join(os.path.dirname(os.path.abspath(__file__)), ... test_path, "xml") True >>> zipdir_pattern == "[0-9][0-9][0-9][0-9]_[0-9][0-9]" True >>> zipfile_pattern == "[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]T" \ "[0-9][0-9][0-9][0-9][0-9][0-9]-" \ "[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]T" \ "[0-9][0-9][0-9][0-9][0-9][0-9].zip" True >>> xmlfile_pattern == "[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]T" \ "[0-9][0-9][0-9][0-9][0-9][0-9].xml" True """ namespace = {} namespace["zipdir"] = None namespace["xmldir"] = None namespace["zipdir_pattern"] = None namespace["zipfile_pattern"] = None namespace["xmlfile_pattern"] = None try: execfile(path, globals(), namespace) except IOError: print usage except SyntaxError: print usage return (namespace["zipdir"], namespace["xmldir"], namespace["zipdir_pattern"], namespace["zipfile_pattern"], namespace["xmlfile_pattern"],) def combine(xml_subdir, xmlfile_pattern): """ Combine the xml sponge data files from a subdirectory. Combine test subdirectory. >>> xmlref_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path, "xmlref") >>> xmlref = glob.glob(os.path.join(xmlref_path, "*")) >>> xmlref_path = [path for path in xmlref if os.path.isdir(path)][0] >>> xmltest_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path, "xmltest") >>> if os.path.exists(xmltest_path): ... shutil.rmtree(xmltest_path) >>> shutil.copytree(xmlref_path, xmltest_path) >>> xmltest = glob.glob(os.path.join(xmltest_path, "*")) >>> xml_subdir = [path for path in xmltest if os.path.isdir(path)][0] >>> xmlfile_pattern = "[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]T" \ "[0-9][0-9][0-9][0-9][0-9][0-9].xml" >>> combine(xml_subdir, xmlfile_pattern) >>> ref_path = xml_subdir + os.extsep + "ref" >>> xml_path = xml_subdir + os.extsep + "xml" >>> ref_handle = open(ref_path) >>> xml_handle = open(xml_path) >>> reffile = ref_handle.read() >>> xmlfile = xml_handle.read() >>> reffile == xmlfile True >>> ref_handle.close() >>> xml_handle.close() >>> os.path.exists(xml_subdir) False """ # Find all the xml sponge data files. xmlfiles = glob.glob(os.path.join(xml_subdir, xmlfile_pattern)) xmlfiles = [xmlfile for xmlfile in xmlfiles if os.path.isfile(xmlfile)] # Read all the xml sponge data files. files = fileinput.FileInput(xmlfiles) header = files.readline() lines = [line for line in files if not files.isfirstline()] # Write a combined xml sponge data file. path = xml_subdir + os.extsep + "xml" handle = open(path, "w") handle.write(header) handle.write("\r\n") handle.writelines(lines) handle.write("\r\n") handle.close() # Remove the xml sponge data file subdirectory. shutil.rmtree(xml_subdir) return def expand(zipdir, xmldir, zipdir_pattern, zipfile_pattern, xmlfile_pattern): """ Expand zipped sponge data files. Expand valid tree. >>> _config_path = os.path.join( ... os.path.dirname( ... os.path.abspath(__file__)), ... test_path, "config.py") >>> zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern = \ config(_config_path) >>> if os.path.exists(xmldir): ... shutil.rmtree(xmldir) >>> expand(zipdir,xmldir,zipdir_pattern,zipfile_pattern,xmlfile_pattern) """ # Find all the monthly directories of zipped files. zip_months = glob.glob(os.path.join(zipdir, zipdir_pattern)) zip_months = [zip_month for zip_month in zip_months if os.path.isdir(zip_month)] # Create a directory for monthly XML subdirectories. if not os.path.exists(xmldir): os.mkdir(xmldir, 0755) elif not os.path.isdir(xmldir): raise IOError("XML directory name " + \ xmldir + \ " exists and is not a directory.") for zip_month in zip_months: # Create each monthly XML subdirectory. xml_month = os.path.join(xmldir, os.path.split(zip_month)[1]) if not os.path.exists(xml_month): os.mkdir(xml_month, 0755) elif not os.path.isdir(xml_month): raise IOError("XML month subdirectory name " + \ xml_month + \ " exists and is not a directory.") # Find all the zip files for each month. zipfiles = glob.glob(os.path.join(zip_month, zipfile_pattern)) zipfiles = [zip_file for zip_file in zipfiles if os.path.isfile(zip_file)] for zip_file in zipfiles: # Create an XML subdirectory for each zip file. xml_subdir = os.path.splitext(os.path.split(zip_file)[1])[0] xml_subdir = os.path.join(xml_month, xml_subdir) if not os.path.exists(xml_subdir): os.mkdir(xml_subdir, 0755) # Extract all the xml files in the zip file. archive = zipfile.ZipFile(zip_file, "r") archive.extractall(xml_subdir) combine(xml_subdir, xmlfile_pattern) elif not os.path.isdir(xml_subdir): raise IOError("XML file subdirectory name " + \ xml_subdir + \ " exists and is not a directory.") return if __name__ == "__main__": _config_path = config_path() if _config_path: _config = config(_config_path) if all(_config): expand(*_config)