Skip to content

CGATcore IOTools Module - Tools for I/O operations

This module contains utility functions for reading/writing from files. These include methods for

  • inspecting files, such as :func:get_first_line, :func:get_last_line and :func:is_empty,

  • working with filenames, such as :func:which and :func:snip, :func:check_presence_of_files

  • manipulating file, such as :func:open_file, :func:zap_file, :func:clone_file, :func:touch_file.

  • converting values for input/output, such as :func:val2str, :func:str2val, :func:pretty_percent, :func:human2bytes, :func:convert_dictionary_values.

  • iterating over file contents, such as :func:iterate, :func:iterator_split,

  • creating lists/dictionaries from files, such as :func:readMap and :func:read_list, and

  • working with file collections (see :class:FilePool).



manage a pool of output files.

This class will keep a large number of files open. To see if you can handle this, check the limit within the shell::

ulimit -n

The number of currently open and maximum open files in the system:

cat /proc/sys/fs/file-nr

Changing these limits might not be easy without root privileges.

The maximum number of files opened is given by :attr:maxopen. This class is inefficient if the number of files is larger than :attr:maxopen and calls to write do not group keys together.

To use this class, create a FilePool and write to it as if it was a single file, specifying a section for each write::

pool = FilePool("%s.tsv")
for value in range(100):
    for section in ("file1", "file2", "file3"):
         pool.write(section, str(value) + ",")

This will create three files called file1.tsv, file2.tsv, file3.tsv, each containing the numbers from 0 to 99.

The FilePool acts otherwise as a dictionary providing access to the number of times an item has been written to each file::

print pool["file1]
print pool.items()



output pattern to use. Should contain a "%s". If set to None, the pattern "%s" will be used.

header : string optional header to write when writing to a file the first time. force : bool overwrite existing files. All files matching the pattern will be deleted.

Source code in cgatcore/
class FilePool:
    """manage a pool of output files.

    This class will keep a large number of files open. To
    see if you can handle this, check the limit within the shell::

       ulimit -n

    The number of currently open and maximum open files in the system:

      cat /proc/sys/fs/file-nr

    Changing these limits might not be easy without root privileges.

    The maximum number of files opened is given by :attr:`maxopen`.
    This class is inefficient if the number of files is larger than
    :attr:`maxopen` and calls to `write` do not group keys together.

    To use this class, create a FilePool and write to it as if it was
    a single file, specifying a section for each write::

        pool = FilePool("%s.tsv")
        for value in range(100):
            for section in ("file1", "file2", "file3"):
                 pool.write(section, str(value) + ",")

    This will create three files called ``file1.tsv``, ``file2.tsv``,
    ``file3.tsv``, each containing the numbers from 0 to 99.

    The FilePool acts otherwise as a dictionary providing access to
    the number of times an item has been written to each file::

        print pool["file1]
        print pool.items()


    output_pattern : string
       output pattern to use. Should contain a "%s". If set to None, the
       pattern "%s" will be used.
    header : string
       optional header to write when writing to a file the first time.
    force : bool
       overwrite existing files. All files matching the pattern will be


    maxopen = 5000

    def __init__(self,

        self.mFiles = {}
        self.mOutputPattern = output_pattern = open_file

        self.mCounts = collections.defaultdict(int)
        self.mHeader = header
        if force and output_pattern:
            for f in glob.glob(re.sub("%s", "*", output_pattern)):

    def __del__(self):
        """close all open files."""
        for file in list(self.mFiles.values()):

    def __len__(self):
        return len(self.mCounts)

    def close(self):
        """close all open files."""
        for file in list(self.mFiles.values()):

    def values(self):
        return list(self.mCounts.values())

    def keys(self):
        return list(self.mCounts.keys())

    def iteritems(self):
        return iter(self.mCounts.items())

    def items(self):
        return list(self.mCounts.items())

    def __iter__(self):
        return self.mCounts.__iter__()

    def getFile(self, identifier):
        return identifier

    def getFilename(self, identifier):
        """get filename for an identifier."""

        if self.mOutputPattern:
            return re.sub("%s", str(identifier), self.mOutputPattern)
            return identifier

    def setHeader(self, header):
        """set the header to be written to each file when opening
        for the first time."""

        self.mHeader = header

    def open_file(self, filename, mode="w"):
        """open file.

        If file is in a new directory, create directories.
        if mode in ("w", "a"):
            dirname = os.path.dirname(filename)
            if dirname and not os.path.exists(dirname):

        return, mode)

    def write(self, identifier, line):
        """write `line` to file specified by `identifier`"""
        filename = self.getFilename(identifier)

        if filename not in self.mFiles:

            if self.maxopen and len(self.mFiles) > self.maxopen:
                for f in list(self.mFiles.values()):
                self.mFiles = {}

            self.mFiles[filename] = self.open_file(filename, "a")
            if self.mHeader:

        except ValueError as msg:
            raise ValueError(
                "error while writing to %s: msg=%s" % (filename, msg))
        self.mCounts[filename] += 1

    def deleteFiles(self, min_size=0):
        """delete all files below a minimum size `min_size` bytes."""

        ndeleted = 0
        for filename, counts in list(self.mCounts.items()):
            if counts < min_size:
                ndeleted += 1

        return ndeleted


close all open files.

Source code in cgatcore/
def __del__(self):
    """close all open files."""
    for file in list(self.mFiles.values()):


close all open files.

Source code in cgatcore/
def close(self):
    """close all open files."""
    for file in list(self.mFiles.values()):


delete all files below a minimum size min_size bytes.

Source code in cgatcore/
def deleteFiles(self, min_size=0):
    """delete all files below a minimum size `min_size` bytes."""

    ndeleted = 0
    for filename, counts in list(self.mCounts.items()):
        if counts < min_size:
            ndeleted += 1

    return ndeleted


get filename for an identifier.

Source code in cgatcore/
def getFilename(self, identifier):
    """get filename for an identifier."""

    if self.mOutputPattern:
        return re.sub("%s", str(identifier), self.mOutputPattern)
        return identifier

open_file(filename, mode='w')

open file.

If file is in a new directory, create directories.

Source code in cgatcore/
def open_file(self, filename, mode="w"):
    """open file.

    If file is in a new directory, create directories.
    if mode in ("w", "a"):
        dirname = os.path.dirname(filename)
        if dirname and not os.path.exists(dirname):

    return, mode)


set the header to be written to each file when opening for the first time.

Source code in cgatcore/
def setHeader(self, header):
    """set the header to be written to each file when opening
    for the first time."""

    self.mHeader = header

write(identifier, line)

write line to file specified by identifier

Source code in cgatcore/
def write(self, identifier, line):
    """write `line` to file specified by `identifier`"""
    filename = self.getFilename(identifier)

    if filename not in self.mFiles:

        if self.maxopen and len(self.mFiles) > self.maxopen:
            for f in list(self.mFiles.values()):
            self.mFiles = {}

        self.mFiles[filename] = self.open_file(filename, "a")
        if self.mHeader:

    except ValueError as msg:
        raise ValueError(
            "error while writing to %s: msg=%s" % (filename, msg))
    self.mCounts[filename] += 1


Bases: FilePool

manage a pool of output files in memory.

The usage is the same as :class:FilePool but the data is cached in memory before writing to disk.

Source code in cgatcore/
class FilePoolMemory(FilePool):
    """manage a pool of output files in memory.

    The usage is the same as :class:`FilePool` but the data is cached
    in memory before writing to disk.


    maxopen = 5000

    def __init__(self, *args, **kwargs):
        FilePool.__init__(self, *args, **kwargs) = collections.defaultdict(list)
        self.isClosed = False

    def __del__(self):
        """close all open files.
        if not self.isClosed:

    def close(self):
        """close all open files.
        writes the data to disk.
        if self.isClosed:
            raise IOError("write on closed FilePool in close()")

        for filename, data in
            f = self.open_file(filename, "a")
            if self.mHeader:

        self.isClosed = True

    def write(self, identifier, line):

        filename = self.getFilename(identifier)[filename].append(line)
        self.mCounts[filename] += 1


close all open files.

Source code in cgatcore/
def __del__(self):
    """close all open files.
    if not self.isClosed:


close all open files. writes the data to disk.

Source code in cgatcore/
def close(self):
    """close all open files.
    writes the data to disk.
    if self.isClosed:
        raise IOError("write on closed FilePool in close()")

    for filename, data in
        f = self.open_file(filename, "a")
        if self.mHeader:

    self.isClosed = True


Bases: defaultdict

Auto-vivifying nested dictionaries.

For example::

nd= nested_dict() nd["mouse"]["chr1"]["+"] = 311

Source code in cgatcore/
class nested_dict(collections.defaultdict):
    """Auto-vivifying nested dictionaries.

    For example::

      nd= nested_dict()
      nd["mouse"]["chr1"]["+"] = 311


    def __init__(self):
        collections.defaultdict.__init__(self, nested_dict)

    def iterflattened(self):
        iterate through values with nested keys flattened into a tuple

        for key, value in self.items():
            if isinstance(value, nested_dict):
                for keykey, value in value.iterflattened():
                    yield (key,) + keykey, value
                yield (key,), value


iterate through values with nested keys flattened into a tuple

Source code in cgatcore/
def iterflattened(self):
    iterate through values with nested keys flattened into a tuple

    for key, value in self.items():
        if isinstance(value, nested_dict):
            for keykey, value in value.iterflattened():
                yield (key,) + keykey, value
            yield (key,), value

bytes2human(n, format='%(value).1f%(symbol)s', symbols='customary')

Convert n bytes into a human readable string based on format. symbols can be either "customary", "customary_ext", "iec" or "iec_ext", see:

bytes2human(0) '0.0B' bytes2human(0.9) '0.0B' bytes2human(1) '1.0B' bytes2human(1.9) '1.0B' bytes2human(1024) '1.0K' bytes2human(1048576) '1.0M' bytes2human(1099511627776127398123789121) '909.5Y'

bytes2human(9856, symbols="customary") '9.6K' bytes2human(9856, symbols="customary_ext") '9.6kilo' bytes2human(9856, symbols="iec") '9.6Ki' bytes2human(9856, symbols="iec_ext") '9.6kibi'

bytes2human(10000, "%(value).1f %(symbol)s/sec") '9.8 K/sec'

precision can be adjusted by playing with %f operator

bytes2human(10000, format="%(value).5f %(symbol)s") '9.76562 K'

Author: Giampaolo Rodola' License: MIT

Source code in cgatcore/
def bytes2human(n, format='%(value).1f%(symbol)s', symbols='customary'):
    Convert n bytes into a human readable string based on format.
    symbols can be either "customary", "customary_ext", "iec" or "iec_ext",

      >>> bytes2human(0)
      >>> bytes2human(0.9)
      >>> bytes2human(1)
      >>> bytes2human(1.9)
      >>> bytes2human(1024)
      >>> bytes2human(1048576)
      >>> bytes2human(1099511627776127398123789121)

      >>> bytes2human(9856, symbols="customary")
      >>> bytes2human(9856, symbols="customary_ext")
      >>> bytes2human(9856, symbols="iec")
      >>> bytes2human(9856, symbols="iec_ext")

      >>> bytes2human(10000, "%(value).1f %(symbol)s/sec")
      '9.8 K/sec'

      >>> # precision can be adjusted by playing with %f operator
      >>> bytes2human(10000, format="%(value).5f %(symbol)s")
      '9.76562 K'

    Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
    License: MIT
    n = int(n)
    if n < 0:
        raise ValueError("n < 0")
    symbols = SYMBOLS[symbols]
    prefix = {}
    for i, s in enumerate(symbols[1:]):
        prefix[s] = 1 << (i + 1) * 10
    for symbol in reversed(symbols[1:]):
        if n >= prefix[symbol]:
            value = float(n) / prefix[symbol]
            return format % locals()
    return format % dict(symbol=symbols[0], value=n)


check for the presence/absence of files


filenames : list Filenames to check for presence.


missing : list List of missing filenames

Source code in cgatcore/
def check_presence_of_files(filenames):
    """check for the presence/absence of files

    filenames : list
        Filenames to check for presence.

    missing : list
        List of missing filenames

    missing = []
    for filename in filenames:
        if not os.path.exists(filename):
    return missing

clone_file(infile, outfile)

create a clone of infile named outfile by creating a soft-link.

Source code in cgatcore/
def clone_file(infile, outfile):
    '''create a clone of ``infile`` named ``outfile``
    by creating a soft-link.
    # link via relative paths, otherwise it
    # fails if infile and outfile are in different
    # directories or in a subdirectory
    if os.path.dirname(infile) != os.path.dirname(outfile):
        relpath = os.path.relpath(
            os.path.dirname(infile), os.path.dirname(outfile))
        relpath = "."
    target = os.path.join(relpath, os.path.basename(infile))

        os.symlink(target, outfile)
    except OSError:

convert_dictionary_values(d, map={})

convert string values in a dictionary to numeric types.

Arguments d : dict The dictionary to convert map : dict If map contains 'default', a default conversion is enforced. For example, to force int for every column but column id, supply map = {'default' : "int", "id" : "str" }

Source code in cgatcore/
def convert_dictionary_values(d, map={}):
    """convert string values in a dictionary to numeric types.

    d : dict
       The dictionary to convert
    map : dict
       If map contains 'default', a default conversion is enforced.
       For example, to force int for every column but column ``id``,
       supply map = {'default' : "int", "id" : "str" }

    rx_int = re.compile(r"^\s*[+-]*[0-9]+\s*$")
    rx_float = re.compile(r"^[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?$")

    # pre-process with 'default'
    if "default" in map:
        k = "default"
        if map[k] == "int":
            default = int
        elif map[k] == "float":
            default = float
        elif map[k] == "string":
            default = str
        default = False

    for k, vv in list(d.items()):

        if vv is None:
        v = vv.strip()
            if k in map:
                if map[k] == "int":
                    d[k] = int(v)
                elif map[k] == "float":
                    d[k] = float(v)
                elif map[k] == "string":
            elif default:
                if v != "":
                    d[k] = default(v)
                    d[k] = v
        except TypeError as msg:
            raise TypeError("conversion in field: %s, %s" % (k, msg))

            if rx_int.match(v):
                d[k] = int(v)
            elif rx_float.match(v):
                d[k] = float(v)
        except TypeError as msg:
            raise TypeError(
                "expected string or buffer: offending value = '%s' " % str(v))
        except ValueError as msg:
            raise ValueError("conversion error: %s, %s" % (msg, str(d)))
    return d

flatten(nested_list, ltypes=(list, tuple))

flatten a nested list.

This method works with any list-like container such as tuples.


nested_list : list A nested list. ltypes : list A list of valid container types.


list : list A flattened list.

Source code in cgatcore/
def flatten(nested_list, ltypes=(list, tuple)):
    '''flatten a nested list.

    This method works with any list-like container
    such as tuples.

    nested_list : list
        A nested list.
    ltypes : list
        A list of valid container types.

    list : list
        A flattened list.
    ltype = type(nested_list)
    nested_list = list(nested_list)
    i = 0
    while i < len(nested_list):
        while isinstance(nested_list[i], ltypes):
            if not nested_list[i]:
                i -= 1
                nested_list[i:i + 1] = nested_list[i]
        i += 1
    return ltype(nested_list)

force_str(iterator, encoding='ascii')

iterate over lines in iterator and force to string

Source code in cgatcore/
def force_str(iterator, encoding="ascii"):
    """iterate over lines in iterator and force to string"""
    for line in iterator:
        yield line.decode(encoding)

get_first_line(filename, nlines=1)

return the first line of a file.


filename : string The name of the file to be opened. nlines : int Number of lines to return.


string The first line(s) of the file.

Source code in cgatcore/
def get_first_line(filename, nlines=1):
    """return the first line of a file.

    filename : string
       The name of the file to be opened.
    nlines : int
       Number of lines to return.

       The first line(s) of the file.

    # U is to open it with Universal newline support
    with open(filename, 'rU') as f:
        line = "".join([f.readline() for x in range(nlines)])
    return line

get_last_line(filename, nlines=1, read_size=1024, encoding='utf-8')

return the last line of a file.

This method works by working back in blocks of read_size until the beginning of the last line is reached.


filename : string Name of the file to be opened. nlines : int Number of lines to return. read_size : int Number of bytes to read.


string The last line(s) of the file.

Source code in cgatcore/
def get_last_line(filename, nlines=1, read_size=1024, encoding="utf-8"):
    """return the last line of a file.

    This method works by working back in blocks of `read_size` until
    the beginning of the last line is reached.

    filename : string
       Name of the file to be opened.
    nlines : int
       Number of lines to return.
    read_size : int
       Number of bytes to read.

       The last line(s) of the file.


    # py3 requires binary mode for negative seeks
    f = open(filename, 'rb')
    offset = read_size, 2)
    file_size = f.tell()
    if file_size == 0:
        return ""
    while 1:
        if file_size < offset:
            offset = file_size * offset, 2)
        read_str =
        read_str = read_str.decode(encoding)
        lines = read_str.strip().splitlines()
        if len(lines) >= nlines + 1:
            return "\n".join(lines[-nlines:])
        if offset == file_size:   # reached the beginning
            return read_str
        offset += read_size

get_num_lines(filename, ignore_comments=True)

count number of lines in filename.


filename : string Name of the file to be opened. ignore_comments : bool If true, ignore lines starting with #.


int The number of line(s) in the file.

Source code in cgatcore/
def get_num_lines(filename, ignore_comments=True):
    """count number of lines in filename.

    filename : string
       Name of the file to be opened.
    ignore_comments : bool
       If true, ignore lines starting with ``#``.

       The number of line(s) in the file.


    if ignore_comments:
        filter_cmd = '| grep -v "#" '
        filter_cmd = ""

    # the implementation below seems to fastest
    # see
    # and
    if filename.endswith(".gz"):
        cmd = "zcat %(filename)s %(filter_cmd)s | wc -l" % locals()
        cmd = "cat %(filename)s %(filter_cmd)s | wc -l" % locals()

    out = subprocess.Popen(cmd,
    return int(out.partition(b' ')[0])


Attempts to guess the string format based on default symbols set and return the corresponding bytes as an integer. When unable to recognize the format ValueError is raised.

human2bytes('0 B') 0 human2bytes('1 K') 1024 human2bytes('1 M') 1048576 human2bytes('1 Gi') 1073741824 human2bytes('1 tera') 1099511627776

human2bytes('0.5kilo') 512 human2bytes('0.1 byte') 0 human2bytes('1 k') # k is an alias for K 1024 human2bytes('12 foo') Traceback (most recent call last): ... ValueError: can't interpret '12 foo'

Author: Giampaolo Rodola' License: MIT

Source code in cgatcore/
def human2bytes(s):
    Attempts to guess the string format based on default symbols
    set and return the corresponding bytes as an integer.
    When unable to recognize the format ValueError is raised.

      >>> human2bytes('0 B')
      >>> human2bytes('1 K')
      >>> human2bytes('1 M')
      >>> human2bytes('1 Gi')
      >>> human2bytes('1 tera')

      >>> human2bytes('0.5kilo')
      >>> human2bytes('0.1  byte')
      >>> human2bytes('1 k')  # k is an alias for K
      >>> human2bytes('12 foo')
      Traceback (most recent call last):
      ValueError: can't interpret '12 foo'

    Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
    License: MIT
    init = s
    num = ""
    while s and s[0:1].isdigit() or s[0:1] == '.':
        num += s[0]
        s = s[1:]
    num = float(num)
    letter = s.strip()
    for name, sset in list(SYMBOLS.items()):
        if letter in sset:
        if letter == 'k':
            # treat 'k' as an alias for 'K' as per:
            sset = SYMBOLS['customary']
            letter = letter.upper()
            raise ValueError("can't interpret %r" % init)
    prefix = {sset[0]: 1}
    for i, s in enumerate(sset[1:]):
        prefix[s] = 1 << (i + 1) * 10

    return int(num * prefix[letter])

invert_dictionary(dict, make_unique=False)

returns an inverted dictionary with keys and values swapped.

Source code in cgatcore/
def invert_dictionary(dict, make_unique=False):
    """returns an inverted dictionary with keys and values swapped.
    inv = {}
    if make_unique:
        for k, v in dict.items():
            inv[v] = k
        for k, v in dict.items():
            inv.setdefault(v, []).append(k)
    return inv


return True if file exists and is complete.

A file is complete if its last line contains job finished.

Source code in cgatcore/
def is_complete(filename):
    '''return True if file exists and is complete.

    A file is complete if its last line contains
    ``job finished``.
    if filename.endswith(".gz"):
        raise NotImplementedError(
            'is_complete not implemented for compressed files')
    if is_empty(filename):
        return False
    lastline = get_last_line(filename)
    return "job finished" in lastline


return True if file exists and is empty.


OSError If file does not exist

Source code in cgatcore/
def is_empty(filename):
    """return True if file exists and is empty.

       If file does not exist
    # don't now about stdin
    if filename == "-":
        return False
    return os.stat(filename)[stat.ST_SIZE] == 0


return true if container is a nested data structure.

A nested data structure is a dict of dicts or a list of list, but not a dict of list or a list of dicts.

Source code in cgatcore/
def is_nested(container):
    """return true if container is a nested data structure.

    A nested data structure is a dict of dicts or a list of list,
    but not a dict of list or a list of dicts.
    for t in [, list, tuple]:
        if isinstance(container, t):
            return any([isinstance(v, t) for v in container.values()])
    return False


iterate over infile and return a :py:class:collections.namedtuple according to a header in the first row.

Lines starting with # are skipped.

Source code in cgatcore/
def iterate(infile):
    '''iterate over infile and return a :py:class:`collections.namedtuple`
    according to a header in the first row.

    Lines starting with ``#`` are skipped.


    n = 0
    for line in infile:
        if line.startswith("#"):
        n += 1
        if n == 1:
            # replace non-alphanumeric characters with _
            header = re.sub(r"[^a-zA-Z0-9_\s]", "_", line[:-1]).split()
            DATA = collections.namedtuple("DATA", header)

        result = DATA(*line[:-1].split())

        yield result

iterate_tabular(infile, sep='\t')

iterate over file infile skipping lines starting with #.

Within a line, records are separated by sep.


tuple Records within a line

Source code in cgatcore/
def iterate_tabular(infile, sep="\t"):
    '''iterate over file `infile` skipping lines starting with

    Within a line, records are separated by `sep`.

        Records within a line

    for line in infile:
        if line.startswith("#"):
        yield line[:-1].split(sep)

iterator_split(infile, regex)

Return an iterator of file chunks based on a known logical start point regex that splits the file into intuitive chunks. This assumes the file is structured in some fashion. For arbitrary number of bytes use If a header is present it is returned as the first file chunk.

infile must be either an open file handle or an iterable.

Source code in cgatcore/
def iterator_split(infile, regex):
    '''Return an iterator of file chunks based on a known logical start
    point `regex` that splits the file into intuitive chunks.  This
    assumes the file is structured in some fashion.  For arbitrary
    number of bytes use`bytes`).  If a header is present it
    is returned as the first file chunk.

    infile must be either an open file handle or an iterable.

    chunk_list = []

    regex = re.compile(regex)

    for x in infile:
            if len(chunk_list):
                # return the current chunk and start a new one from this point
                yield chunk_list
            chunk_list = []
    yield chunk_list


iterate over the contents of a nested data structure.

The nesting can be done both as lists or as dictionaries.


nested : dict A nested dictionary


pair: tuple A container/key/value triple

Source code in cgatcore/
def nested_iter(nested):
    """iterate over the contents of a nested data structure.

    The nesting can be done both as lists or as dictionaries.

    nested : dict
        A nested dictionary

    pair: tuple
        A container/key/value triple

    if isinstance(nested,
        for key, value in nested.items():
            if not isinstance(value, and \
               not isinstance(value, list):
                yield nested, key, value
                for x in nested_iter(value):
                    yield x
    elif isinstance(nested, list):
        for key, value in enumerate(nested):
            if not isinstance(value, and \
               not isinstance(value, list):
                yield nested, key, value
                for x in nested_iter(value):
                    yield x

open_file(filename, mode='r', create_dir=False, encoding='utf-8')

open file called filename with mode mode.

gzip - compressed files are recognized by the suffix .gz and opened transparently.

Note that there are differences in the file like objects returned, for example in the ability to seek.


filename : string mode : string File opening mode create_dir : bool If True, the directory containing filename will be created if it does not exist.


File or file-like object in case of gzip compressed files.

Source code in cgatcore/
def open_file(filename, mode="r", create_dir=False, encoding="utf-8"):
    '''open file called *filename* with mode *mode*.

    gzip - compressed files are recognized by the
    suffix ``.gz`` and opened transparently.

    Note that there are differences in the file
    like objects returned, for example in the
    ability to seek.

    filename : string
    mode : string
       File opening mode
    create_dir : bool
       If True, the directory containing filename
       will be created if it does not exist.

    File or file-like object in case of gzip compressed files.

    _, ext = os.path.splitext(filename)

    if create_dir:
        dirname = os.path.dirname(filename)
        if dirname and not os.path.exists(dirname):

    if ext.lower() in (".gz", ".z"):
        if mode == "r":
            return, 'rt', encoding=encoding)
        elif mode == "w":
            return, 'wt', encoding=encoding)
        elif mode == "a":
            return, 'wt', encoding=encoding)
        return open(filename, mode, encoding=encoding)

pickle(file_name, obj)

dump a python object to a file using pickle

Source code in cgatcore/
def pickle(file_name, obj):
    '''dump a python object to a file using pickle'''
    with open(file_name, "wb") as pkl_file:
        pickle.dump(obj, pkl_file)

pretty_percent(numerator, denominator, format='%5.2f', na='na')

output a percent value or "na" if not defined

Source code in cgatcore/
def pretty_percent(numerator, denominator, format="%5.2f", na="na"):
    """output a percent value or "na" if not defined"""
        x = format % (100.0 * numerator / denominator)
    except (ValueError, ZeroDivisionError, TypeError):
        x = na
    return x


output val or na if val is None

Source code in cgatcore/
def pretty_string(val):
    '''output val or na if val is None'''
    if val is not None:
        return val
        return "na"

readMultiMap(infile, columns=(0, 1), map_functions=(str, str), both_directions=False, has_header=False, dtype=dict)

read a map (pairs of values) from infile.

In contrast to :func:readMap, this method permits multiple entries for the same key.


infile : File File object to read from columns : tuple Columns (A, B) to take from the file to create the mapping from A to B. map_functions : tuple Functions to convert the values in the rows to the desired object types such as int or float. both_directions : bool If true, both mapping directions are returned in a tuple, i.e., A->B and B->A. has_header : bool If true, ignore first line with header. dtype : function datatype to use for the dictionaries.


map : dict A dictionary containing the mapping. If both_directions is true, two dictionaries will be returned.

Source code in cgatcore/
def readMultiMap(infile,
                 columns=(0, 1),
                 map_functions=(str, str),
    """read a map (pairs of values) from infile.

    In contrast to :func:`readMap`, this method permits multiple
    entries for the same key.

    infile : File
       File object to read from
    columns : tuple
       Columns (A, B) to take from the file to create the mapping from
       A to B.
    map_functions : tuple
       Functions to convert the values in the rows to the desired
       object types such as int or float.
    both_directions : bool
       If true, both mapping directions are returned in a tuple, i.e.,
       A->B and B->A.
    has_header : bool
       If true, ignore first line with header.
    dtype : function
       datatype to use for the dictionaries.

    map : dict
       A dictionary containing the mapping. If `both_directions` is true,
       two dictionaries will be returned.

    m = dtype()
    r = dtype()
    n = 0
    for line in infile:
        if line[0] == "#":
        n += 1

        if has_header and n == 1:

        d = line[:-1].split("\t")
            key = map_functions[0](d[columns[0]])
            val = map_functions[1](d[columns[1]])
        except (ValueError, IndexError) as msg:
            raise ValueError("parsing error in line %s: %s" % (line[:-1], msg))

        if key not in m:
            m[key] = []
        if val not in r:
            r[val] = []

    if both_directions:
        return m, r
        return m

read_list(infile, column=0, map_function=str, map_category={}, with_title=False)

read a list of values from infile.


infile : File File object to read from columns : int Column to take from the file. map_function : function Function to convert the values in the rows to the desired object types such as int or float. map_category : dict When given, automatically transform/map the values given this dictionary. with_title : bool If true, first line of file is title and will be ignored.


list : list A list with the values.

Source code in cgatcore/
def read_list(infile,
    """read a list of values from infile.

    infile : File
       File object to read from
    columns : int
       Column to take from the file.
    map_function : function
       Function to convert the values in the rows to the desired
       object types such as int or float.
    map_category : dict
       When given, automatically transform/map the values given
       this dictionary.
    with_title : bool
       If true, first line of file is title and will be ignored.

    list : list
       A list with the values.

    m = []
    title = None
    for line in infile:
        if line[0] == "#":
        if with_title and not title:
            title = line[:-1].split("\t")[column]

            d = map_function(line[:-1].split("\t")[column])
        except ValueError:

        if map_category:
            d = map_category[d]

    return m

read_map(infile, columns=(0, 1), map_functions=(str, str), both_directions=False, has_header=True, dtype=dict)

read a map (key, value pairs) from infile.

If there are multiple entries for the same key, only the last entry will be recorded.


infile : File File object to read from columns : tuple Columns (A, B) to take from the file to create the mapping from A to B. map_functions : tuple Functions to convert the values in the rows to the desired object types such as int or float. both_directions : bool If true, both mapping directions are returned. has_header : bool If true, ignore first line with header. dtype : function datatype to use for the dictionaries.


map : dict A dictionary containing the mapping. If both_directions is true, two dictionaries will be returned.

Source code in cgatcore/
def read_map(infile,
             columns=(0, 1),
             map_functions=(str, str),
    """read a map (key, value pairs) from infile.

    If there are multiple entries for the same key, only the
    last entry will be recorded.

    infile : File
       File object to read from
    columns : tuple
       Columns (A, B) to take from the file to create the mapping from
       A to B.
    map_functions : tuple
       Functions to convert the values in the rows to the desired
       object types such as int or float.
    both_directions : bool
       If true, both mapping directions are returned.
    has_header : bool
       If true, ignore first line with header.
    dtype : function
       datatype to use for the dictionaries.

    map : dict
       A dictionary containing the mapping. If `both_directions` is true,
       two dictionaries will be returned.

    m = dtype()
    r = dtype()
    n = 0

    if columns == "all":
        key_column = 0
        value_column = None
        key_column, value_column = columns

    key_function, value_function = map_functions
    # default is to return a tuple for multiple values
    datatype = None

    for line in infile:
        if line[0] == "#":
        n += 1

        if has_header and n == 1:
            if columns == "all":
                header = line[:-1].split("\t")
                # remove the first column
                datatype = collections.namedtuple("DATA", header[1:])

        d = line[:-1].split("\t")
        if len(d) < 2:
        key = key_function(d[key_column])
        if value_column:
            val = value_function(d[value_column])
        elif datatype:
            val = datatype._make([d[x] for x in range(1, len(d))])
            val = tuple(map(value_function, [d[x] for x in range(1, len(d))]))

        m[key] = val
        if val not in r:
            r[val] = []

    if both_directions:
        return m, r
        return m

snip(filename, extension=None, alt_extension=None, strip_path=False)

return prefix of filename, that is the part without the extension.

If extension is given, make sure that filename has the extension (or alt_extension). Both extension or alt_extension can be list of extensions.

If strip_path is set to true, the path is stripped from the file name.

Source code in cgatcore/
def snip(filename, extension=None, alt_extension=None,
    '''return prefix of `filename`, that is the part without the

    If `extension` is given, make sure that filename has the
    extension (or `alt_extension`). Both extension or alt_extension
    can be list of extensions.

    If `strip_path` is set to true, the path is stripped from the file

    if extension is None:
        extension = []
    elif isinstance(extension, str):
        extension = [extension]

    if alt_extension is None:
        alt_extension = []
    elif isinstance(alt_extension, str):
        alt_extension = [alt_extension]

    if extension:
        for ext in extension + alt_extension:
            if filename.endswith(ext):
                root = filename[:-len(ext)]
            raise ValueError("'%s' expected to end in '%s'" %
                             (filename, ",".join(
                                 extension + alt_extension)))
        root, ext = os.path.splitext(filename)

    if strip_path:
        snipped = os.path.basename(root)
        snipped = root

    return snipped

str2val(val, na='na', list_detection=False)

guess type (int, float) of value.

If val is neither int nor float, the value itself is returned.

Source code in cgatcore/
def str2val(val, na="na", list_detection=False):
    """guess type (int, float) of value.

    If `val` is neither int nor float, the value
    itself is returned.

    if val is None:
        return val

    def _convert(v):
            x = int(v)
        except ValueError:
                x = float(v)
            except ValueError:
                if v.lower() == "true":
                    return True
                elif v.lower() == "false":
                    return False
                    return v
        return x

    if list_detection and "," in val:
        return [_convert(v) for v in val.split(",")]
        return _convert(val)

text_to_dict(filename, key=None, sep='\t')

make a dictionary from a text file keyed on the specified column.

Source code in cgatcore/
def text_to_dict(filename, key=None, sep="\t"):
    '''make a dictionary from a text file keyed
    on the specified column.'''

    # Please see function in readDict()
    count = 0
    result = {}
    valueidx, keyidx = False, False
    field_names = []

    with open(filename, "r") as fh:
        for line in fh:
            if line.startswith("#"):
            if count == 0:
                fieldn = 0
                for rawfield in line.split(sep):
                    field = rawfield.strip()
                    if field == key:
                        keyidx = fieldn
                    fieldn += 1

                if not keyidx:
                    raise ValueError("key name not found in header")
                # if not valueidx:
                #   raise ValueError(
                #     "value name not found in header")
                fields = [x.strip() for x in line.split(sep)]
                fieldn = 0
                thiskey = fields[keyidx]
                result[thiskey] = {}
                for field in fields:
                    if fieldn == keyidx:
                        colkey = field_names[fieldn]
                        result[thiskey][colkey] = field
                    fieldn += 1
            count += 1

    return result

touch_file(filename, mode=438, times=None, dir_fd=None, ref=None, **kwargs)

update/create a sentinel file.

modified from:

Compressed files (ending in .gz) are created as empty 'gzip' files, i.e., with a header.

Source code in cgatcore/
def touch_file(filename, mode=0o666, times=None, dir_fd=None, ref=None, **kwargs):
    '''update/create a sentinel file.

    modified from:

    Compressed files (ending in .gz) are created as empty 'gzip'
    files, i.e., with a header.

    flags = os.O_CREAT | os.O_APPEND
    existed = os.path.exists(filename)

    if filename.endswith(".gz") and not existed:
        # this will automatically add a gzip header
        with gzip.GzipFile(filename, "w") as fhandle:

    if ref:
        stattime = os.stat(ref)
        times = (stattime.st_atime, stattime.st_mtime)

    with os.fdopen(
            filename, flags=flags, mode=mode, dir_fd=dir_fd)) as fhandle:
            fhandle.fileno() if os.utime in os.supports_fd else filename,
            dir_fd=None if os.supports_fd else dir_fd,


retrieve a pickled python object from a file

Source code in cgatcore/
def unpickle(file_name):
    '''retrieve a pickled python object from a file'''
    with open(file_name, "r") as pkl_file:
        data = pickle.load(pkl_file)
    return data


ensure that val is a list.

Source code in cgatcore/
def val2list(val):
    '''ensure that val is a list.'''

    if not isinstance(val, list):
        return [val]
        return val

val2str(val, format='%5.2f', na='na')

return a formatted value.

If value does not fit format string, return "na"

Source code in cgatcore/
def val2str(val, format="%5.2f", na="na"):
    '''return a formatted value.

    If value does not fit format string, return "na"
    if isinstance(val, int):
        return format % val
    elif isinstance(val, float):
        return format % val

        x = format % val
    except (ValueError, TypeError):
        x = na
    return x


check if program is in PATH and is executable.


string The full path to the program. Returns None if not found.

Source code in cgatcore/
def which(program):
    """check if `program` is in PATH and is executable.

       The full path to the program. Returns None if not found.

    # see
    #            test-if-executable-exists-in-python

    def is_exe(fpath):
        return os.path.exists(fpath) and os.access(fpath, os.X_OK)

    fpath, fname = os.path.split(program)
    if fpath:
        if is_exe(program):
            return program
        for path in os.environ["PATH"].split(os.pathsep):
            exe_file = os.path.join(path, program)
            if is_exe(exe_file):
                return exe_file

    return None

writeMatrix(outfile, matrix, row_headers, col_headers, row_header='')

write a numpy matrix to outfile.

row_header gives the title of the rows

Source code in cgatcore/
def writeMatrix(outfile, matrix, row_headers, col_headers,
    '''write a numpy matrix to outfile.

    *row_header* gives the title of the rows

    outfile.write("%s\t%s\n" % (row_header,
    for x, row in enumerate(matrix):
        assert len(row) == len(col_headers)
        outfile.write("%s\t%s\n" %
                      (row_headers[x], "\t".join(map(str, row))))

write_lines(outfile, lines, header=False)

expects [[[line1-field1],[line1-field2 ] ],... ]

Source code in cgatcore/
def write_lines(outfile, lines, header=False):
    ''' expects [[[line1-field1],[line1-field2 ] ],... ]'''
    handle = open_file(outfile, "w")

    if header:
        handle.write("\t".join([str(title) for title in header]) + "\n")

    for line in lines:
        handle.write("\t".join([str(field) for field in line]) + "\n")


write_matrix(outfile, matrix, row_headers, col_headers, row_header='')

write a numpy matrix to outfile. row_header gives the title of the rows

Source code in cgatcore/
def write_matrix(outfile, matrix, row_headers, col_headers,
    '''write a numpy matrix to outfile.
    *row_header* gives the title of the rows

    outfile.write("%s\t%s\n" % (row_header, "\t".join(col_headers)))
    for x, row in enumerate(matrix):
        assert len(row) == len(col_headers)
        outfile.write("%s\t%s\n" % (row_headers[x], "\t".join(map(str, row))))

write_table(outfile, table, columns=None, fillvalue='')

write a table to outfile.

If table is a dictionary, output columnwise. If columns is a list, only output columns in columns in the specified order.

.. note:: Deprecated use pandas dataframes instead

Source code in cgatcore/
def write_table(outfile, table, columns=None, fillvalue=""):
    '''write a table to outfile.

    If table is a dictionary, output columnwise. If *columns* is a list,
    only output columns in columns in the specified order.

    .. note:: Deprecated
       use pandas dataframes instead


    if isinstance(table, dict):
        if columns is None:
            columns = list(table.keys())
        outfile.write("\t".join(columns) + "\n")
        # get data
        data = [table[x] for x in columns]
        # transpose
        data = list(itertools.zip_longest(*data, fillvalue=fillvalue))

        for d in data:
            outfile.write("\t".join(map(str, d)) + "\n")

        raise NotImplementedError


replace filename with empty file.

File attributes such as accession times are preserved.

If the file is a link, the link will be broken and replaced with an empty file having the same attributes as the file linked to.


stat_object A stat object of the file cleaned. link_destination : string If the file was a link, the file being linked to.

Source code in cgatcore/
def zap_file(filename):
    '''replace *filename* with empty file.

    File attributes such as accession times are preserved.

    If the file is a link, the link will be broken and replaced with
    an empty file having the same attributes as the file linked to.

       A stat object of the file cleaned.
    link_destination : string
       If the file was a link, the file being linked to.

    # stat follows times to links
    original = os.stat(filename)

    # return if file already has size 0
    if original.st_size == 0:
        return None, None

    if os.path.islink(filename):
        linkdest = os.readlink(filename)
        f = open(filename, "w")
        linkdest = None
        f = open(filename, "w")

    # Set original times
    os.utime(filename, (original.st_atime, original.st_mtime))
    os.chmod(filename, original.st_mode)

    return original, linkdest