# Copyright (C) 2014 Dignity Health
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# NO CLINICAL USE. THE SOFTWARE IS NOT INTENDED FOR COMMERCIAL PURPOSES
# AND SHOULD BE USED ONLY FOR NON-COMMERCIAL RESEARCH PURPOSES. THE
# SOFTWARE MAY NOT IN ANY EVENT BE USED FOR ANY CLINICAL OR DIAGNOSTIC
# PURPOSES. YOU ACKNOWLEDGE AND AGREE THAT THE SOFTWARE IS NOT INTENDED FOR
# USE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITY, INCLUDING BUT NOT
# LIMITED TO LIFE SUPPORT OR EMERGENCY MEDICAL OPERATIONS OR USES. LICENSOR
# MAKES NO WARRANTY AND HAS NO LIABILITY ARISING FROM ANY USE OF THE
# SOFTWARE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITIES.
# External Binary Encapsulation
# This is a set of convenience functions for wrapping external binaries in
# python.
import os
import hashlib
import subprocess
import numpy as np
# gpi
from .defines import GPI_SHDM_PATH
from .logger import manager
from .sysspecs import Specs
# start logger for this module
log = manager.getLogger(__name__)
[docs]class FilePath(object):
'''Generate a tempfile-name and path based on THIS object's id. If THIS
object looses its reference then make sure the associated file is also
deleted. The supplied read/writer functions can be used to write and
retrieve the file information. If a tempfile-path and name is all that is
needed, then this object can be instantiated without any arguments.
path: /tmp (default GPI tmp dir)
filename: additional to the nodeid
suffix: additional to the nodeid (i.e. '.jpg')
nodeid: node's location in memory (id())
rfunc: reader function with footprint:
data = rfunc('filename')
wfunc: writer function with footprint:
retcode = wfunc('filename', data)
retcode: None or 0 for success
If no names are specified then THIS object id is used.
'''
_Extern_File_Handle_Type = True
def __init__(self, wfunc=None, wdata=None, path=None, filename=None,
suffix=None, nodeid=None, rfunc=None, asuffix=[]):
self._reader = rfunc
self._writer = wfunc
self._output_data = wdata # data to be written
self._suffix = ''
if suffix is not None:
self._suffix = suffix
self._additional_suffix = set(asuffix + [self._suffix, '']) - set([None])
## build the filepath one step at a time
self._basename_path = ''
self._filename = ''
if nodeid:
self._filename += str(nodeid)
if filename:
if self._filename != '':
self._filename += '_'
self._filename += str(filename)
# just use THIS object id if nothing is specified
if self._filename == '':
self._filename = str(id(self))
if path:
self._basename_path = os.path.join(str(path), self._filename)
else:
self._basename_path = os.path.join(GPI_SHDM_PATH, self._filename)
if self._suffix:
self._filename += self._suffix
if self.fileExists():
log.warn('The path: \'' + self._basename_path + '\' already exists, continuing...')
def __str__(self):
return self._basename_path
def __del__(self):
# this may not delete in a timely fashion so direct use of clear() is
# encouraged.
if self.fileExists():
log.warn('The \'FilePath\' object for path: \''+self._basename_path+'\' was not closed before collection.')
self.clear()
def additionalSuffix(self, suf=[]):
# in case the filename is used as a basename, this will allow more
# files to be searched for removal. -helpful for formats that require
# multiple files.
self._additional_suffix = suf
def clear(self):
for s in self._additional_suffix:
if os.path.isfile(self._basename_path + s):
os.remove(self._basename_path + s)
def fileExists(self):
for s in self._additional_suffix:
if os.path.isfile(self._basename_path + s):
return True
return False
def close(self):
self.clear()
def setReader(self, func):
self._reader = func
def setWriter(self, func):
self._writer = func
def read(self, suffix=None):
if suffix is None:
suffix = self._suffix
return self._reader(self._basename_path + suffix)
def data(self, suffix=None):
if suffix is None:
suffix = self._suffix
return self.read(suffix)
def write(self, suffix=None):
if suffix is None:
suffix = self._suffix
return self._writer(self._basename_path + suffix, self._output_data)
def isOutput(self):
# this file is the result of running the command
if self._reader:
return True
return False
def isInput(self):
# this file is an input argument to the command
if self._writer:
return True
return False
class IFilePath(FilePath):
def __init__(self, wfunc, wdata, suffix=None, asuffix=[]):
super(IFilePath, self).__init__(wfunc=wfunc, wdata=wdata, suffix=suffix, asuffix=asuffix)
class OFilePath(FilePath):
def __init__(self, rfunc, suffix=None, asuffix=[]):
super(OFilePath, self).__init__(rfunc=rfunc, suffix=suffix, asuffix=asuffix)
[docs]class Command(object):
'''This object simplifies the situation where an external program generates
a file and potentially takes a file as input. These files need to be
communicated as commandline arguments, and also need to be read and written
from GPI.
in1 = FilePath('.cfl', writer, data)
out1 = FilePath('.cfl', reader)
# run command immediatly
Command('fft', in1, '-o', out1, '-d1')
# setup a command list
c = Command()
c.arg('fft')
c.arg(in1, '-o', out1)
c.arg('-d1')
c.run()
data = out1.read()
'''
def __init__(self, *args, **kwargs):
self._warn = True
if 'warn' in kwargs:
self._warn = kwargs['warn']
self._checkForInvalidArgs(args)
self._cmd = args
self._retcode = None
if len(self._cmd):
# run the command straight away if there is one
self._retcode = self.run()
def _checkForInvalidArgs(self, args):
for a in args:
if type(a) not in [OFilePath, IFilePath, FilePath, str]:
types = [type(a) for a in args]
raise ValueError('Command:Args must be of type str, OFilePath, IFilePath or FilePath. '+str(types))
def arg(self, *args):
self._checkForInvalidArgs(args)
self._cmd += args
def setWarning(self, val):
self._warn = val
def returnCode(self):
return self._retcode
def __str__(self):
# this is the actual command that is passed to subprocess
return ' '.join([str(x) for x in self._cmd])
def getArgList(self):
return self._cmd
def getArgString(self):
return str(self)
def run(self):
# write all data to input files
for x in self._cmd:
if hasattr(x, '_Extern_File_Handle_Type'):
if x.isInput():
x.write()
# run the command
self._retcode = subprocess.check_call(str(self), shell=True)
return self._retcode