Package arcjobtool :: Module Tasks
[hide private]
[frames] | no frames]

Source Code for Module arcjobtool.Tasks

  1   
  2  import os, sys, string, shutil 
  3   
  4  import ConfigParser 
  5   
6 -class BaseTask(object):
7 - def __init__(self, createRunScript = True):
8 self.name = "noname" 9 self.description = "" 10 self.executable = "/bin/sh" 11 self.arguments = "run.sh" 12 self.__inputFiles = {} 13 self.__outputFiles = {} 14 self.__runtimeEnvironments = [] 15 self.cpuTime = 60 16 self.notify = "" 17 self.memory = -1 18 self.disk = -1 19 self.stdin = "" 20 self.stdout = "stdout.txt" 21 self.stderr = "stderr.txt" 22 self.logging = True 23 self.__clusters = [] 24 self.architecture = "" 25 self.__executables = [] 26 self.count = -1 27 self.version = "1.0" 28 self.taskType = "BaseTask" 29 self.groupSerial = 0 30 31 self.dirty = False 32 33 self.workDir="." 34 35 self.sweepSize = 1 36 self.__sweepParams = {"taskid":-1} 37 self.__sweepFiles = {} 38 39 self.calculateSweepValueFloat = None 40 self.calculateSweepValueInt = None 41 42 self.createRunScript = createRunScript 43 44 if self.createRunScript: 45 self.addInputFile("run.sh")
46
47 - def addInputFile(self, inputFile, url=""):
48 self.__inputFiles[inputFile] = url
49
50 - def addAndCopyInputFile(self, inputFile, copyFile = True):
51 """ 52 Add and copy input files to task workdir. 53 """ 54 if os.path.exists(inputFile) and os.path.exists(self.workDir): 55 head, tail = os.path.split(inputFile) 56 if copyFile: 57 shutil.copy(inputFile, self.workDir) 58 self.addInputFile(tail) 59 return True 60 else: 61 return False
62
63 - def clearInputFiles(self):
64 self.__inputFiles.clear() 65 self.addInputFile("run.sh")
66
67 - def removeInputFile(self, inputFile):
68 try: 69 del(self.__inputFiles[inputFile]) 70 except: 71 pass
72
73 - def addSweepFile(self, sweepFile):
74 self.__sweepFiles[sweepFile] = ""
75
76 - def clearSweepFiles(self):
77 self.__sweepFiles.clear()
78
79 - def removeSweepFile(self, sweepFile):
80 if self.__sweepFiles.has_key(sweepFile): 81 del self.__sweepFiles[sweepFile]
82
83 - def addOutputFile(self, outputFile, url=""):
84 self.__outputFile[outputFile] = url
85
86 - def clearOutputFiles(self):
87 self.__outputFiles[:] = []
88
89 - def addCluster(self, cluster):
90 self.__clusters.append(cluster)
91
92 - def clearClusters(self):
93 self.__clusters[:] = []
94
95 - def tagAsExecutable(self, filename):
96 if self.__inputFiles.has_key(filename): 97 self.__executables.append(filename)
98
99 - def clearExecutableTags(self):
100 self.__executables[:] = []
101
102 - def addRuntimeEnvironment(self, runtimeEnvironment):
103 self.__runtimeEnvironments.append(runtimeEnvironment)
104
105 - def clearRuntimeEnvironments(self):
106 self.__runtimeEnvironments[:] = []
107
108 - def __setstate__(self, state):
109 # Add version attribute to task 110 if '__version' not in state: 111 self.__version = "1.0" 112 self.onUpdateState(state, self.__version) 113 self.__dict__.update(state)
114
115 - def onUpdateState(self, state, version):
116 """Add or remove task attributes that has changed from 117 the version loaded from disk.""" 118 pass
119
120 - def setup(self):
121 self.onSetupTaskDirs()
122
123 - def clean(self):
124 self.onCleanTaskDirs()
125
126 - def __saveDefaultConfig(self, config):
127 config.add_section("task") 128 config.set("task", "type", self.taskType) 129 config.set("task", "version", self.version) 130 131 config.add_section("general") 132 config.set("general", "name", self.name) 133 config.set("general", "description", self.description) 134 config.set("general", "executable", self.executable) 135 config.set("general", "arguments", self.arguments) 136 config.set("general", "cputime", str(self.cpuTime)) 137 config.set("general", "notify", self.notify) 138 config.set("general", "memory", str(self.memory)) 139 config.set("general", "disk", str(self.disk)) 140 config.set("general", "stdin", self.stdin) 141 config.set("general", "stdout", self.stdout) 142 config.set("general", "stderr", self.stderr) 143 config.set("general", "logging", str(self.logging)) 144 config.set("general", "count", str(self.count)) 145 146 config.add_section("params") 147 config.set("params", "sweepSize", str(self.sweepSize)) 148 149 count = 0 150 151 for sweepFile in self.sweepFiles.keys(): 152 config.set("params", "sweepfile_%d" % count, sweepFile) 153 count += 1 154 155 config.add_section("inputfiles") 156 157 count = 0 158 159 for inputFile in self.inputFiles.keys(): 160 config.set("inputfiles", "inputfile_%d" % count, inputFile) 161 count += 1
162
163 - def __loadDefaultConfig(self, config):
164 try: 165 self.taskType = config.get("task", "type") 166 self.version = config.get("task", "version") 167 self.name = config.get("general", "name") 168 self.description = config.get("general", "description") 169 self.executable = config.get("general", "executable") 170 self.arguments = config.get("general", "arguments") 171 self.cpuTime = int(config.get("general", "cputime",)) 172 self.notify = config.get("general", "notify") 173 self.memory = int(config.get("general", "memory")) 174 self.disk = int(config.get("general", "disk")) 175 self.stdin = config.get("general", "stdin") 176 self.stdout = config.get("general", "stdout") 177 self.stderr = config.get("general", "stderr") 178 self.logging = bool(config.get("general", "logging")) 179 self.count = int(config.get("general", "count")) 180 self.sweepSize = int(config.get("params", "sweepsize")) 181 182 count = 0 183 184 while config.has_option("inputfiles", "inputfile_%d" % count): 185 inputFile = config.get("inputfiles", "inputfile_%d" % count) 186 self.addInputFile(inputFile) 187 count += 1 188 189 count = 0 190 191 while config.has_option("params", "sweepfile_%d" % count): 192 sweepFile = config.get("params", "sweepfile_%d" % count) 193 self.addSweepFile(sweepFile) 194 count += 1 195 except: 196 pass
197
198 - def save(self):
199 configFilename = os.path.join(self.workDir, "%s.ini" % self.name) 200 configParser = ConfigParser.ConfigParser() 201 self.__saveDefaultConfig(configParser) 202 self.onSaveConfig(configParser) 203 204 configFile = open(configFilename, "w") 205 configParser.write(configFile) 206 configFile.close() 207 208 self.dirty = False
209
210 - def load(self):
211 configFilename = os.path.join(self.workDir, "%s.ini" % self.name) 212 configFile = open(configFilename, "r") 213 configParser = ConfigParser.ConfigParser() 214 configParser.readfp(configFile) 215 configFile.close() 216 217 self.__loadDefaultConfig(configParser) 218 self.onLoadConfig(configParser) 219 220 self.dirty = False
221
222 - def onSaveConfig(self, config):
223 pass
224
225 - def onLoadConfig(self, config):
226 pass
227
228 - def getJobList(self):
229 """ 230 Return a list of job descriptions and directories 231 """ 232 jobList = [] 233 for i in range(self.sweepSize): 234 taskName=self.name+"_%04d" % (i+1) 235 taskId = i+1 236 taskDir = os.path.join(self.workDir,taskName) 237 238 jobDescription = self.onCreateJobDescription(taskName, taskId, taskDir) 239 jobList.append(jobDescription) 240 241 return jobList
242
243 - def getJobInfo(self):
244 """ 245 Return a list of job directories 246 """ 247 jobInfo = [] 248 for i in range(self.sweepSize): 249 taskName=self.name+"_%04d" % (i+1) 250 taskId = i+1 251 taskDir = os.path.join(self.workDir,taskName) 252 253 jobInfo.append([taskName, taskId, taskDir]) 254 255 return jobInfo
256 257
258 - def _existInList(self, stringList, pattern):
259 for line in stringtList: 260 if line.find(pattern)>=0: 261 return True 262 return False
263
264 - def onSetupTaskDirs(self):
265 266 taskIds = range(self.sweepSize) 267 268 for i in range(self.sweepSize): 269 270 # Create task dir 271 272 taskName=self.name+"_%04d" % (i+1) 273 taskId = i+1 274 taskDir = os.path.join(self.workDir,taskName) 275 if not os.path.exists(taskDir): 276 os.mkdir(taskDir) 277 278 # Create run-script 279 280 if self.createRunScript: 281 runScriptTemplate = self.onCreateRunScript(taskName, taskId) 282 runScriptFile = open(os.path.join(self.workDir, "run.sh"), "w") 283 runScriptFile.write(runScriptTemplate) 284 runScriptFile.close() 285 286 # Copy files to task dir 287 288 for filename in self.__inputFiles.keys(): 289 url = self.__inputFiles[filename] 290 if url=="": 291 fullPath = os.path.join(self.workDir, filename) 292 destPath = os.path.join(taskDir, filename) 293 294 if self.__sweepFiles.has_key(filename): 295 296 # Do parameter replacement 297 298 templateFile = open(fullPath, "r") 299 templateString = templateFile.read() 300 if templateString.find(r'%(name)s')!=-1: 301 templateString = self.onAssignTemplateName(templateString, self.name) 302 if templateString.find(r'%(id)d')!=-1: 303 templateString = self.onAssignTemplateId(templateString, taskId) 304 if templateString.find(r'%(sweepSize)d')!=-1: 305 templateString = self.onAssignTemplateSweepSize(templateString, self.sweepSize) 306 if templateString.find(r'%(value)g')!=-1: 307 taskValue = self.doCalculateSweepValueFloat(taskId, self.sweepSize) 308 templateString = self.onAssignTemplateValue(templateString, taskValue) 309 if templateString.find(r'%(value)d')!=-1: 310 taskValue = self.doCalculateSweepValueInt(taskId, self.sweepSize) 311 templateString = self.onAssignTemplateValue(templateString, taskValue) 312 templateFile.close() 313 314 # Write template file 315 316 if os.path.isfile(fullPath): 317 templateFile = open(destPath, "w") 318 templateFile.write(templateString) 319 templateFile.close() 320 else: 321 322 # Just copy the file. 323 324 shutil.copy(fullPath, destPath)
325 326
327 - def onCleanTaskDirs(self):
328 329 dirList = os.listdir(os.path.join(self.workDir)) 330 331 for dirItem in dirList: 332 fullPath = os.path.join(self.workDir, dirItem) 333 if os.path.isdir(fullPath): 334 if fullPath.find(self.name+"_")!=-1: 335 shutil.rmtree(fullPath)
336
337 - def onAssignTemplateName(self, templateString, taskName):
338 return templateString.replace(r'%(name)s', taskName)
339
340 - def onAssignTemplateId(self, templateString, taskId):
341 return templateString.replace(r'%(id)d', str(taskId))
342
343 - def onAssignTemplateSweepSize(self, templateString, sweepSize):
344 return templateString.replace(r'%(sweepSize)d', str(sweepSize))
345
346 - def onAssignTemplateValue(self, templateString, taskValue):
347 return templateString.replace(r'%(value)d', str(taskValue))
348
349 - def onCalculateSweepValueFloat(self, taskId, sweepSize):
350 if self.calculateSweepValueFloat!=None: 351 return self.calculateSweepValueFloat(taskId, sweepSize) 352 else: 353 return float(taskId)
354
355 - def onCalculateSweepValueInt(self, taskId, sweepSize):
356 if self.calculateSweepValueInt!=None: 357 return self.calculateSweepValueInt(taskId, sweepSize) 358 else: 359 return float(taskId)
360
361 - def onCreateRunScript(self, taskName, taskId):
362 """ 363 Abstract routine responsible for returning a 364 run-script for the job. 365 """ 366 return ""
367
368 - def onSetupScripts(self):
369 """Abstract routine responsible for creating the 370 necessary files that make up the grid task, such 371 as scripts, XRSL and input files.""" 372 pass
373
374 - def onClean(self):
375 """Abstract routine responsible for cleaning any 376 temporay files created by the setup() routine.""" 377 pass
378
379 - def onRefresh(self):
380 """If any new attributes are added in new versions of 381 a task the refresh method is responsible for checking 382 for these and adding/removing them if they do not exist.""" 383 pass
384
385 - def onCreateJobDescription(self, taskName, taskId, taskDir):
386 """ 387 Abstract routines responsible for returning a jobdescription for 388 the job. 389 """ 390 return None
391
392 - def getInputFiles(self):
393 return self.__inputFiles
394
395 - def getOutputFiles(self):
396 return self.__outputFiles
397
398 - def getRuntimeEnvironments(self):
399 return self.__runtimeEnvironments
400
401 - def getClusters(self):
402 return self.__clusters
403
404 - def getExecutables(self):
405 return self.__executables
406
407 - def getSweepParams(self):
408 return self.__sweepParams
409
410 - def getSweepFiles(self):
411 return self.__sweepFiles
412 413 inputFiles = property(getInputFiles) 414 outputFiles = property(getOutputFiles) 415 runtimeEnvironments = property(getRuntimeEnvironments) 416 clusters = property(getClusters) 417 executables = property(getExecutables) 418 sweepParams = property(getSweepParams) 419 sweepFiles = property(getSweepFiles) 420 jobList = property(getJobList) 421 jobInfo = property(getJobInfo)
422