Package arcjobtool :: Module ArcUtils
[hide private]
[frames] | no frames]

Source Code for Module arcjobtool.ArcUtils

   1  # 
   2  # ArcUtils - Wrapper classes for stuff in ARC. 
   3  # 
   4   
   5  """ 
   6  Class wrappers for grid client libraries and utilities. 
   7  """ 
   8   
   9  import arc, sys, os, getpass 
  10   
  11  import ConfigParser     
  12   
13 -def zeroString(string):
14 # find the header size with a dummy string 15 temp = "finding offset" 16 header = ctypes.string_at(id(temp), sys.getsizeof(temp)).find(temp) 17 18 location = id(string) + header 19 size = sys.getsizeof(string) - header 20 21 #memset = ctypes.cdll.msvcrt.memset 22 # For Linux, use the following. Change the 6 to whatever it is on your computer. 23 memset = ctypes.CDLL("libc.so.6").memset 24 25 self.logInfoMsg("Clearing 0x%08x size %i bytes" % (location, size)) 26 27 memset(location, 0, size)
28
29 -class ArcGuiConfig(object):
30 """ 31 Class implementing the configuration options for ARC Job Tool. 32 33 The class combines the configuration for ARC and ARC Job Tool 34 into a single configuration file arcjobtool.conf. 35 """
36 - def __init__(self, userConfig, arcClient):
37 """ 38 Class constructor 39 """ 40 self.userConfig = userConfig 41 self.arcClient = arcClient 42 self.__initDefaultProperties() 43 self.__filename = "arcjobtool.conf"
44
45 - def addDefaultService(self, serviceName):
46 """ 47 Adds a default service to the list of services. 48 """ 49 self.defaultServices.append(serviceName)
50
51 - def clearDefaultServices(self):
52 """ 53 Clears all services in the default services lists. 54 """ 55 self.defaultServices = []
56
57 - def addRejectedService(self, serviceName):
58 """ 59 Adds a service to the rejected list. 60 """ 61 self.rejectedServices.append(serviceName)
62
63 - def clearRejectedServices(self):
64 """ 65 Clears all services in the rejected list. 66 """ 67 self.rejectedServices = []
68
69 - def write(self):
70 """ 71 Write the configuration to file. 72 """ 73 parser = ConfigParser.RawConfigParser() 74 if not parser.has_section("common"): 75 parser.add_section("common") 76 77 allServices = list(self.defaultServices) 78 79 if len(self.defaultServices)>0: 80 parser.set("common", "defaultservices", " ".join(self.defaultServices)) 81 82 if len(self.rejectedServices)>0: 83 parser.set("common", "rejectservices", " ".join(self.rejectedServices)) 84 85 if self.brokerName!="": 86 parser.set("common", "brokername", self.brokerName) 87 88 if self.brokerArguments!="": 89 parser.set("common", "brokerarguments", self.brokerArguments) 90 91 parser.set("common", "timeout", str(self.timeout)) 92 parser.set("common", "proxypath", self.proxyPath) 93 parser.set("common", "keypath", self.keyPath) 94 parser.set("common", "certificatepath", self.certificatePath) 95 parser.set("common", "cacertificatesdirectory", self.cacertificatesDirectory) 96 97 parser.add_section("arcgui") 98 parser.set("arcgui", "automaticdownload", self.automaticDownload) 99 parser.set("arcgui", "automaticdownloadinterval", self.automaticDownloadInterval) 100 parser.set("arcgui", "automaticupdate", self.automaticUpdate) 101 parser.set("arcgui", "automaticupdateinterval", self.automaticUpdateInterval) 102 103 configFile = open(self.__filename, "w") 104 parser.write(configFile) 105 configFile.close()
106
107 - def __initDefaultProperties(self):
108 """ 109 Initialises default properties for the configuration. 110 """ 111 112 # ARC properties 113 114 self.defaultServices = [] 115 self.rejectedServices = [] 116 self.__brokerName = "Random" 117 self.brokerArguments = "" 118 self.timeout = "50" 119 self.proxyPath = "" 120 self.keyPath = "" 121 self.certificatePath = "" 122 self.cacertificatesDirectory = "" 123 124 uid = os.getuid() 125 126 if self.certificatePath == "": 127 self.certificatePath = os.path.expanduser("~/.globus/usercert.pem") 128 self.userConfig.CertificatePath(self.certificatePath) 129 130 if self.keyPath == "": 131 self.keyPath = os.path.expanduser("~/.globus/userkey.pem") 132 self.userConfig.KeyPath(self.keyPath) 133 134 if self.cacertificatesDirectory == "": 135 self.cacertificatesDirectory = "/etc/grid-security/certificates" 136 self.userConfig.CACertificatesDirectory(self.cacertificatesDirectory) 137 138 if self.proxyPath == "": 139 self.proxyPath = "/tmp/x509up_u%d" % uid 140 self.userConfig.ProxyPath(self.proxyPath) 141 142 # ArcClient properties 143 144 if self.arcClient!=None: 145 self.arcClient.brokerName = self.brokerName 146 147 # ARCGUI properties 148 149 self.automaticDownload = False 150 self.automaticDownloadInterval = 120000 151 152 self.automaticUpdate = True 153 self.automaticUpdateInterval = 60000 154 155 self.showSplash = True
156
157 - def disableRejected(self):
158 """ 159 Disables the rejected list 160 """ 161 self.userConfig.ClearRejectedServices(arc.INDEX) 162 self.userConfig.ClearRejectedServices(arc.COMPUTING)
163
164 - def enableRejected(self):
165 """ 166 Enables the rejected list. 167 """ 168 self.updateServices()
169
170 - def update(self):
171 """ 172 Updates the settings. 173 """ 174 self.updateServices() 175 self.write() 176 self.read()
177
178 - def create(self):
179 """ 180 Create a default configuration file. 181 """ 182 self.__initDefaultProperties() 183 self.write()
184
185 - def updateServices(self):
186 """ 187 Update the ARC configuration with the maintained lists in 188 this class. 189 """ 190 self.userConfig.ClearSelectedServices(arc.INDEX) 191 self.userConfig.ClearRejectedServices(arc.COMPUTING) 192 193 for service in self.defaultServices: 194 flavour = service.split(":")[0] 195 if flavour == "index": 196 serviceURL = service.split("index:")[1] 197 self.userConfig.AddServices([serviceURL], arc.INDEX) 198 elif flavour == "computing": 199 serviceURL = service.split("computing:")[1] 200 self.userConfig.AddServices([serviceURL], arc.COMPUTING) 201 else: 202 self.userConfig.AddServices([service], arc.INDEX) 203 204 for service in self.rejectedServices: 205 flavour = service.split(":")[0] 206 if flavour == "index": 207 serviceURL = service.split("index:")[1] 208 self.userConfig.AddServices(["-"+serviceURL], arc.INDEX) 209 elif flavour == "computing": 210 serviceURL = service.split("computing:")[1] 211 self.userConfig.AddServices(["-"+serviceURL], arc.COMPUTING) 212 else: 213 self.userConfig.AddServices(["-"+service], arc.INDEX)
214
215 - def read(self):
216 """ 217 Read configuration from file. 218 """ 219 self.userConfig.LoadConfigurationFile(self.__filename, True) 220 221 self.__initDefaultProperties() 222 223 # Load what we can from UserConfig. 224 225 self.timeout = self.userConfig.Timeout() 226 self.proxyPath = str(self.userConfig.ProxyPath()) 227 #self.keyPath = str(self.userConfig.KeyPath()) 228 #self.certificatePath = str(self.userConfig.CertificatePath()) 229 self.cacertificatesDirectory = str(self.userConfig.CACertificatesDirectory()) 230 231 # Due to API problems we can't get everything from UserConfig. Here we 232 # parse the ini-file to get the missing pieces. 233 234 newAPI = True 235 236 if newAPI: 237 self.defaultServices = [] 238 self.rejectedServices = [] 239 240 rejectedDups = {} 241 242 selectedServices = self.userConfig.GetSelectedServices(arc.INDEX) 243 244 for flavour in selectedServices.keys(): 245 for serviceName in selectedServices[flavour]: 246 self.defaultServices.append("index:"+flavour+":"+serviceName.fullstr()) 247 248 selectedServices = self.userConfig.GetSelectedServices(arc.COMPUTING) 249 for flavour in selectedServices.keys(): 250 for serviceName in selectedServices[flavour]: 251 self.defaultServices.append("computing:"+flavour+":"+serviceName.fullstr()) 252 253 # Seems to be a bug when reading rejected services. All services are duplicated. 254 255 rejectedServices = self.userConfig.GetRejectedServices(arc.INDEX) 256 for flavour in rejectedServices.keys(): 257 for serviceName in rejectedServices[flavour]: 258 if not rejectedDups.has_key(serviceName): 259 self.rejectedServices.append("index:"+flavour+":"+serviceName.fullstr()) 260 rejectedDups[serviceName] = serviceName 261 262 rejectedServices = self.userConfig.GetRejectedServices(arc.COMPUTING) 263 for flavour in rejectedServices.keys(): 264 for serviceName in rejectedServices[flavour]: 265 if not rejectedDups.has_key(serviceName): 266 self.rejectedServices.append("computing:"+flavour+":"+serviceName.fullstr()) 267 rejectedDups[serviceName] = serviceName 268 269 parser = ConfigParser.RawConfigParser() 270 parser.read(self.__filename) 271 272 if parser.has_section("common"): 273 274 if newAPI: 275 pass 276 else: 277 if parser.has_option("common", "defaultservices"): 278 defaultServicesText = parser.get("common", "defaultservices") 279 self.defaultServices = defaultServicesText.strip().split() 280 if parser.has_option("common", "rejectedservices"): 281 rejectedServicesText = parser.get("common", "rejectedservices") 282 self.rejectedServices = rejectedServicesText.strip().split() 283 284 if parser.has_option("common", "brokername"): 285 self.brokerName = parser.get("common", "brokername") 286 287 if parser.has_option("common", "certificatepath"): 288 self.certificatePath = parser.get("common", "certificatepath") 289 if parser.has_option("common", "keypath"): 290 self.keyPath = parser.get("common", "keypath") 291 292 if parser.has_section("arcgui"): 293 if parser.has_option("arcgui", "automaticdownload"): 294 self.automaticDownload = parser.getboolean("arcgui", "automaticdownload") 295 if parser.has_option("arcgui", "automaticdownloadinterval"): 296 self.automaticDownloadInterval = int(parser.get("arcgui", "automaticdownloadinterval")) 297 if parser.has_option("arcgui", "automaticupdate"): 298 self.automaticUpdate = parser.getboolean("arcgui", "automaticupdate") 299 if parser.has_option("arcgui", "automaticupdateinterval"): 300 self.automaticUpdateInterval = int(parser.get("arcgui", "automaticupdateinterval")) 301 302 uid = os.getuid() 303 304 if self.certificatePath == "": 305 self.certificatePath = os.path.expanduser("~/.globus/usercert.pem") 306 307 if self.keyPath == "": 308 self.keyPath = os.path.expanduser("~/.globus/userkey.pem") 309 310 if self.cacertificatesDirectory == "": 311 self.cacertificatesDirectory = "/etc/grid-security/certificates" 312 313 if self.proxyPath == "": 314 self.proxyPath = "/tmp/x509up_u%d" % uid 315 316 self.userConfig.CertificatePath(self.certificatePath) 317 self.userConfig.KeyPath(self.keyPath) 318 self.userConfig.CACertificatesDirectory(self.cacertificatesDirectory) 319 self.userConfig.ProxyPath(self.proxyPath)
320
321 - def setFilename(self, filename):
322 """ 323 Sets the filename to use for the configuration file (property set) 324 """ 325 self.__filename = filename
326
327 - def getFilename(self):
328 """ 329 Returns the configuration filename (property get) 330 """ 331 return self.__filename
332
333 - def getBrokerName(self):
334 """ 335 Returns the current brokername (property get) 336 """ 337 return self.__brokerName
338
339 - def setBrokerName(self, brokerName):
340 """ 341 Sets the current broker to use. (property set) 342 """ 343 self.userConfig.Broker(brokerName) 344 if self.arcClient!=None: 345 self.arcClient.brokerName = brokerName 346 self.__brokerName = brokerName
347 348 brokerName = property(getBrokerName, setBrokerName) 349 350 filename = property(getFilename, setFilename)
351 352
353 -class UserAuthentication(object):
354 """ 355 Class implementing different user authentication mechanism. 356 """
357 - def __init__(self, userConfig):
358 """ 359 Class constructor 360 """ 361 362 self.userConfig = userConfig 363 364 self.certificatePath = self.userConfig.CertificatePath() 365 self.keyPath = self.userConfig.KeyPath() 366 self.caDir = self.userConfig.CACertificatePath() 367 self.proxyPath = self.userConfig.ProxyPath() 368 369 self.period = "12" 370 self.keybits = 1024 371 self.prompt = "" 372 self.proxyType = "gsi2" 373 self.errorMessage = "" 374 375 self.__logger = arc.Logger(arc.Logger_getRootLogger(), "UserAuth")
376
377 - def logMsg(self, level, msg):
378 """ 379 Logs a message to the ARC logger. 380 """ 381 self.__logger.msg(level, msg)
382
383 - def logInfoMsg(self, msg):
384 """ 385 Logs an info message to the ARC logger." 386 """ 387 self.logMsg(arc.INFO, msg)
388
389 - def logDebugMsg(self, msg):
390 """ 391 Logs a debug message to the ARC logger. 392 """ 393 self.logMsg(arc.DEBUG, msg)
394
395 - def logErrorMsg(self, msg):
396 """ 397 Logs an error message to the ARC logger. 398 """ 399 self.logMsg(arc.ERROR, msg)
400
401 - def onPassphrasePrompt(self, prompt=""):
402 """ 403 Virtual method for querying the user for a password. Should be overidden 404 by derived classes. 405 """ 406 if prompt == "": 407 return getpass.getpass("Passphrase:") 408 else: 409 return getpass.getpass(prompt)
410
411 - def createLocalProxy(self, proxyType = "gsi2"):
412 """ 413 Create a local proxy certificate 414 """ 415 416 if self.proxyPath == "": 417 self.proxyPath = "/tmp/x509up_u%d" % os.getuid() 418 419 # --- Load user certificate and key --- 420 421 self.logInfoMsg("Load user certificate and key.") 422 423 passphrase = self.onPassphrasePrompt(self.prompt) 424 if passphrase == "": 425 self.errorMessage = "" 426 return False 427 428 signer = arc.Credential(self.certificatePath, self.keyPath, self.caDir, "", str(passphrase)) 429 430 #zeroString(passphrase) 431 del passphrase 432 433 # --- Setup proxy parameters --- 434 435 start = arc.Time() 436 arcPeriod = arc.Period(self.period, arc.PeriodHours) 437 policy = "" 438 439 # --- Create proxy certificate request --- 440 441 self.logInfoMsg("Create proxy certificate request.") 442 443 credRequest = arc.Credential(start, arcPeriod, self.keybits, "rfc", "inheritAll", policy, -1) 444 generateRequestOk, requestString = credRequest.GenerateRequest() 445 446 if not generateRequestOk: 447 self.errorMessage = "Could not generate proxy certificate request." 448 return False 449 450 outputPrivateKeyOk, privateKey = credRequest.OutputPrivatekey() 451 452 if not outputPrivateKeyOk: 453 self.errorMessage = "Could not generate private key." 454 return False 455 456 outputCertificateOk, signingCert = signer.OutputCertificate() 457 458 if not outputCertificateOk: 459 self.errorMessage = "Could not get user certificate." 460 return False 461 462 outputCertificateChainOk, signingCertChain = signer.OutputCertificateChain() 463 464 if not outputCertificateOk: 465 self.errorMessage = "Could not get certificate chain." 466 return False 467 468 # --- Set proxy type --- 469 470 if self.proxyType == "rfc": 471 credRequest.SetProxyPolicy("rfc", "inheritAll", "", -1) 472 else: 473 credRequest.SetProxyPolicy("gsi2", "", "", -1) 474 475 # --- Sign request with user certificate --- 476 477 self.logInfoMsg("Sign request with user certificate.") 478 479 signRequestOk, proxyCert = signer.SignRequest(credRequest) 480 481 if not signRequestOk: 482 self.errorMessage = "Failed to sign proxy request." 483 return False 484 485 486 # --- Create proxy certificate file --- 487 488 fullProxyCert = proxyCert + privateKey + signingCert + signingCertChain 489 490 proxyCertFile = open(self.proxyPath, "w") 491 proxyCertFile.write(fullProxyCert) 492 proxyCertFile.close() 493 494 # --- Set the correct permissions on the file --- 495 496 os.chmod(self.proxyPath, 0600) 497 498 return True
499
500 -class ManagedJobDescription(arc.JobDescription):
501 """ 502 Class for making job description handling easier. 503 """
504 - def __init__(self):
505 """ 506 Class constructor 507 """ 508 arc.JobDescription.__init__(self)
509
510 - def addArgument(self, argument):
511 """ 512 Add argument description. 513 """ 514 self.Application.Executable.Argument.append(argument)
515
516 - def addInputFile(self, name, url="", keepData = True, isExecutable = False, downloadToCache = False, threads = -1):
517 """ 518 Add an input file, name, to the job description. 519 """ 520 inputFile = arc.FileType() 521 inputFile.Name = str(os.path.basename(name)) 522 inputFile.KeepData = False 523 inputFile.IsExecutable = False 524 inputFile.DownloadToCache = False 525 inputFileSource = arc.DataSourceType() 526 if url=="": 527 fullPath = os.path.abspath(name) 528 urlRepr = "file://"+fullPath 529 inputFileSource.URI = arc.URL(str(urlRepr)) 530 else: 531 inputFileSource.URI = arc.URL(url) 532 inputFile.Source.append(inputFileSource) 533 self.DataStaging.File.append(inputFile)
534
535 - def clearInputFiles(self):
536 """ 537 Clear input files. 538 """ 539 pass
540
541 - def addOutputFile(self, name, url="", keepData = True, isExecutable = False, downloadToCache = False, threads = -1):
542 """ 543 Add outputfile to job description. 544 """ 545 outputFile = arc.FileType() 546 outputFile.Name = name 547 outputFile.KeepData = False 548 outputFile.IsExecutable = False 549 outputFile.DownloadToCache = False 550 outputFileTarget = arc.DataTargetType() 551 outputFileTarget.URI = arc.URL(url) 552 outputFileTarget.Threads = threads 553 outputFileTarget.Mandatory = True 554 outputFile.Target.append(outputFileTarget) 555 self.DataStaging.File.append(outputFile)
556
557 - def clearOutputFiles(self):
558 """ 559 Clear output files. 560 """ 561 pass
562
563 - def addRuntimeEnvironment(self, name, relOp, version):
564 """ 565 Add a runtime environment and version. 566 """ 567 softReq = arc.SoftwareRequirement() 568 software = arc.Software(name+"-"+version) 569 570 arcRelOp = arc.Software.GREATERTHANOREQUAL 571 572 if relOp == "<": 573 arcRelOp = arc.Software.LESSTHAN 574 elif relOp == "<=": 575 arcRelOp = arc.Software.LESSTHANOREQUAL 576 elif relOp == "==": 577 arcRelOp = arc.Software.EQUAL 578 elif relOp == ">=": 579 arcRelOp = arc.Software.GREATERTHANOREQUAL 580 elif relOp == ">": 581 arcRelOp = arc.Software.GREATERTHAN 582 583 self.Resources.RunTimeEnvironment.add(software, arcRelOp)
584
585 -class ArcClient(object):
586 - def __init__(self, userConfig=None, logStream=None):
587 """ 588 ArcClient class constructor 589 590 Initialises properties and variables, sets up logging and 591 loads job list. 592 """ 593 594 # Setup ARC logging 595 596 self.__logger = arc.Logger(arc.Logger_getRootLogger(), "ArcClient") 597 598 # Setup class properties 599 600 self.__proxyFilename = "" 601 self.jobListFilename = os.path.expanduser("~/.arc/jobs.xml") 602 self.userConfigFilename = os.path.expanduser("~/.arc/arcjobtool.conf") 603 self.downloadDir = "." 604 605 # Setup user configuration 606 607 self.logInfoMsg("Creating UserConfig.") 608 if userConfig == None: 609 self.userConfig = arc.UserConfig(self.userConfigFilename, self.jobListFilename, True) 610 else: 611 self.userConfig = userConfig 612 613 self.targets = [] 614 self.filteredTargets = [] 615 self.jobList = None 616 self.jobDict = {} 617 self.clusters = [] 618 self.__debugLevel = arc.VERBOSE 619 self.__doFiltering = True 620 self.updateProgress = None 621 self.currentTarget = None 622 self.__brokerName = "Random" 623 624 #self.__logStream = logStream 625 626 #if self.__logStream==None: 627 # self.__logStream = sys.stdout 628 629 #logcout = arc.LogStream(self.__logStream) 630 #arc.Logger_getRootLogger().removeDestinations() 631 #arc.Logger_getRootLogger().addDestination(logcout) 632 #arc.Logger_getRootLogger().setThreshold(self.__debugLevel) 633 634 # Setup ACC loader 635 636 self.logInfoMsg("Create BrokerLoader.") 637 638 self.__brokerLoader = arc.BrokerLoader() 639 self.__broker = self.__brokerLoader.load(self.__brokerName, self.userConfig) 640 641 self.logInfoMsg("Create TargetGenerator.") 642 643 self.__targetGenerator = arc.TargetGenerator(self.userConfig); 644 645 # Initialise joblist 646 647 self.logInfoMsg("Initialise job list.") 648 649 self.__loadJobs()
650
651 - def enableServiceFiltering(self):
652 """ 653 Enable rejected services list. 654 """ 655 self.userConfig.LoadConfigurationFile(self.userConfigFilename)
656
657 - def disableServiceFiltering(self):
658 """ 659 Disable rejected services list. 660 """ 661 self.userConfig.ClearSelectedServices(arc.INDEX) 662 self.userConfig.ClearSelectedServices(arc.COMPUTING) 663 self.userConfig.ClearRejectedServices(arc.INDEX) 664 self.userConfig.ClearRejectedServices(arc.COMPUTING)
665
666 - def logMsg(self, level, msg):
667 """ 668 Logs a message to the ARC logger. 669 """ 670 self.__logger.msg(level, msg)
671
672 - def logInfoMsg(self, msg):
673 """ 674 Logs an info message to the ARC logger. 675 """ 676 self.logMsg(arc.INFO, msg)
677
678 - def logDebugMsg(self, msg):
679 """ 680 Logs a debug message to the ARC logger. 681 """ 682 self.logMsg(arc.DEBUG, msg)
683
684 - def logErrorMsg(self, msg):
685 """ 686 Logs an error message to the ARC logger. 687 """ 688 self.logMsg(arc.ERROR, msg)
689
690 - def setLogStream(self, stream):
691 """ 692 Assigns the logstream to instance. 693 """ 694 self.__logStream = stream 695 logcout = arc.LogStream(stream); 696 arc.Logger_getRootLogger().removeDestinations() 697 arc.Logger_getRootLogger().addDestination(logcout)
698
699 - def getLogStream(self,):
700 """ 701 Return current log stream. 702 """ 703 return self.__logStream
704
705 - def setDebugLevel(self, level):
706 """ 707 Set debug level of the arc library. 708 """ 709 self.__debugLevel = level 710 arc.Logger_getRootLogger().setThreshold(self.__debugLevel);
711
712 - def getDebugLevel(self):
713 """ 714 Return the debug level of the arc library. 715 """ 716 return self.__debugLevel
717
718 - def setProxyFilename(self, filename):
719 """ 720 Set proxy filename. 721 """ 722 self.__proxyFilename = filename
723
724 - def getProxyFilename(self):
725 """ 726 Return proxy filename 727 """ 728 return self.__proxyFilename
729
730 - def saveConfiguration(self):
731 """ 732 Save current configuration. 733 """ 734 self.userConfig.ConfTree().SaveToFile(self.userConfigFilename)
735
736 - def __loadJobs(self):
737 """ 738 Load job list from XML file. 739 """ 740 self.jobList = arc.Config(arc.NS()) 741 if os.path.exists(self.jobListFilename): 742 self.jobList.ReadFromFile(self.jobListFilename) 743 self.jobDict.clear() 744 for j in self.jobList.Path("Job"): 745 self.jobDict[str(j.Get("JobID"))]= {}
746
747 - def loadJobList(self):
748 """ 749 Load job list from XML file. 750 """ 751 self.__loadJobs()
752
753 - def hasValidProxy(self):
754 """ 755 Return status of proxy. 756 """ 757 return self.userConfig.CheckProxy()
758
759 - def findTargets(self):
760 """ 761 Find possible targets by querying information system. 762 """ 763 764 self.doProgress("Finding suitable targets.") 765 766 self.targets = None 767 self.__targetGenerator = arc.TargetGenerator(self.userConfig); 768 self.__targetGenerator.GetTargets(0, 1); 769 self.targets = self.__targetGenerator.ModifyFoundTargets()
770
771 - def loadBroker(self, brokerName="Random"):
772 """ 773 Wrapper function for encapsulating ARC1 loading of 774 of a broker instance. 775 """ 776 777 return self.__loader.load(brokerName, self.userConfig)
778
779 - def filterTargets(self, job):
780 self.findBestTarget(job)
781
782 - def findBestTarget(self, job):
783 """ 784 Return a filtered list of suitable targets based on the 785 RandomBroker component. 786 """ 787 self.doProgress("Prefiltering targets.") 788 789 self.__broker = self.__brokerLoader.load(self.__brokerName, self.userConfig) 790 self.__broker.PreFilterTargets(self.targets, job) 791 792 self.currentTarget = self.__broker.GetBestTarget() 793 794 return self.currentTarget
795
796 - def submit_old(self, job):
797 """ 798 Submit job to grid. Requires that a list of filtered targets 799 exists. 800 """ 801 self.doProgress("Submitting job.") 802 803 if self.currentTarget==None: 804 return None 805 806 submitter = self.currentTarget.GetSubmitter(self.userConfig) 807 jobURL = submitter.Submit(job, self.currentTarget) 808 self.logInfoMsg("Submitted: " + jobURL.fullstr()) 809 810 if jobURL: 811 self.doProgress("Updating job file.") 812 self.__loadJobs() 813 return jobURL 814 else: 815 return jobURL
816
817 - def submit(self, job):
818 """ 819 Submit job to grid. Requires that a list of filtered targets 820 exists. 821 """ 822 self.doProgress("Submitting job.") 823 824 if self.currentTarget==None: 825 return None 826 827 submitter = self.currentTarget.GetSubmitter(self.userConfig) 828 jobURL = submitter.Submit(job, self.currentTarget) 829 self.logInfoMsg("Submitted: " + jobURL.fullstr()) 830 831 if jobURL: 832 self.doProgress("Updating job file.") 833 self.__loadJobs() 834 return jobURL 835 else: 836 return jobURL
837
838 - def submitJobList(self, jobList):
839 """ 840 Submit job list to grid. Requires a 841 """ 842 self.doProgress("Submitting job.") 843 844 self.enableServiceFiltering() 845 846 resultList = [None]*len(jobList) 847 848 if self.currentTarget=="": 849 return resultList 850 851 i = 0 852 853 self.__broker = self.__brokerLoader.load(self.__brokerName, self.userConfig) 854 855 for job in jobList: 856 857 self.__broker.PreFilterTargets(self.targets, job) 858 859 jobURL = None 860 861 while True: 862 bestTarget = self.__broker.GetBestTarget() 863 if bestTarget!=None: 864 submitter = bestTarget.GetSubmitter(self.userConfig) 865 jobURL = submitter.Submit(job, bestTarget) 866 if jobURL: 867 break 868 869 resultList[i] = jobURL 870 else: 871 break 872 873 if jobURL!=None: 874 self.doProgress("Updating job file.") 875 self.__loadJobs() 876 877 resultList[i] = jobURL 878 i += 1 879 880 self.disableServiceFiltering() 881 882 return resultList
883
884 - def get(self, jobIds = [], status = ["FINISHED", "FAILED"], keep=False):
885 """ 886 Download results from jobs. 887 """ 888 self.doProgress("Retrieving job controllers.") 889 jobSupervisor = arc.JobSupervisor(self.userConfig, jobIds); 890 jobControllers = jobSupervisor.GetJobControllers() 891 892 for controller in jobControllers: 893 self.doProgress("Retrieving job.") 894 controller.Get(status, str(self.downloadDir), keep) 895 896 self.doProgress("Done.") 897 898 self.__loadJobs()
899
900 - def kill(self, jobIds = [], status = [], force = False):
901 """ 902 Kill running jobs. 903 """ 904 905 jobSupervisor = arc.JobSupervisor(self.userConfig, jobIds); 906 jobControllers = jobSupervisor.GetJobControllers(); 907 908 for controller in jobControllers: 909 controller.Kill(status, force); 910 911 self.__loadJobs()
912
913 - def clean(self, jobIds = [], status = [], force = False):
914 """ 915 Clean running jobs. 916 """ 917 jobSupervisor = arc.JobSupervisor(self.userConfig, jobIds); 918 jobControllers = jobSupervisor.GetJobControllers(); 919 920 for controller in jobControllers: 921 controller.Clean(status, force); 922 923 self.__loadJobs()
924
925 - def doProgress(self, message):
926 """ 927 Call progress update callback if assigned otherwise 928 ignore. 929 930 message - Message to pass to progress callback function. 931 """ 932 if self.updateProgress!=None: 933 self.updateProgress(message)
934
935 - def updateStatus(self):
936 """ 937 Query information system and populate the jobs property with additional 938 job information. 939 """ 940 941 """ 942 std::string Flavour; 943 URL JobID; 944 URL Cluster; 945 // Optional information (ACCs fills if they need it) 946 URL SubmissionEndpoint; 947 URL InfoEndpoint; 948 URL ISB; 949 URL OSB; 950 // ACC implementation dependent information 951 URL AuxURL; 952 std::string AuxInfo; 953 954 // Information retrieved from the information system 955 std::string Name; 956 std::string Type; 957 URL IDFromEndpoint; 958 std::string LocalIDFromManager; 959 std::string JobDescription; 960 JobState State; 961 std::string RestartState; 962 std::map<std::string, std::string> AuxStates; //for all state models 963 std::map<std::string, std::string> RestartStates; //for all state models 964 int ExitCode; 965 std::string ComputingManagerExitCode; 966 std::list<std::string> Error; 967 int WaitingPosition; 968 std::string UserDomain; 969 std::string Owner; 970 std::string LocalOwner; 971 Period RequestedTotalWallTime; 972 Period RequestedTotalCPUTime; 973 int RequestedMainMemory; // Deprecated?? 974 int RequestedSlots; 975 std::list<std::string> RequestedApplicationEnvironment; 976 std::string StdIn; 977 std::string StdOut; 978 std::string StdErr; 979 std::string LogDir; 980 std::list<std::string> ExecutionNode; 981 std::string ExecutionCE; // Deprecated?? 982 std::string Queue; 983 Period UsedTotalWallTime; 984 Period UsedTotalCPUTime; 985 int UsedMainMemory; 986 std::list<std::string> UsedApplicationEnvironment; 987 int UsedSlots; 988 Time LocalSubmissionTime; 989 Time SubmissionTime; 990 Time ComputingManagerSubmissionTime; 991 Time StartTime; 992 Time ComputingManagerEndTime; 993 Time EndTime; 994 Time WorkingAreaEraseTime; 995 Time ProxyExpirationTime; 996 std::string SubmissionHost; 997 std::string SubmissionClientName; 998 Time CreationTime; 999 Period Validity; 1000 std::list<std::string> OtherMessages; 1001 //Associations 1002 URL JobManagementEndpoint; 1003 URL DataStagingEndpoint; 1004 std::list<std::string> ActivityOldId; 1005 //ExecutionEnvironment (condensed) 1006 bool VirtualMachine; 1007 std::string UsedCPUType; 1008 std::string UsedOSFamily; 1009 std::string UsedPlatform; 1010 """ 1011 1012 self.disableServiceFiltering() 1013 1014 self.doProgress("Creating job supervisor.") 1015 1016 self.logInfoMsg("Creating JobSupervisor") 1017 jobSupervisor = arc.JobSupervisor(self.userConfig, []) 1018 self.logInfoMsg("Retrieving job controllers.") 1019 jobControllers = jobSupervisor.GetJobControllers(); 1020 1021 self.doProgress("Querying job controllers") 1022 1023 for controller in jobControllers: 1024 self.doProgress("Getting job information.") 1025 controller.GetJobInformation() 1026 self.doProgress("Extracting job information.") 1027 jobStore = controller.GetJobs() 1028 for job in jobStore: 1029 jobId = job.JobID.str() 1030 jobState = job.State 1031 if self.jobDict.has_key(jobId): 1032 self.jobDict[jobId]["State"] = jobState() 1033 self.jobDict[jobId]["Name"] = job.Name 1034 1035 if len(job.Error)>0: 1036 self.jobDict[jobId]["Error"] = str(job.Error[0]) 1037 else: 1038 self.jobDict[jobId]["Error"] = "" 1039 1040 self.jobDict[jobId]["Cluster"] = job.Cluster.fullstr() 1041 self.jobDict[jobId]["SubmissionEndpoint"] = job.SubmissionEndpoint.fullstr() 1042 self.jobDict[jobId]["InfoEndpoint"] = job.InfoEndpoint.fullstr() 1043 self.jobDict[jobId]["Type"] = job.Type 1044 self.jobDict[jobId]["JobDescription"] = job.JobDescription 1045 self.jobDict[jobId]["ExitCode"] = job.ExitCode 1046 self.jobDict[jobId]["UserDomain"] = job.UserDomain 1047 self.jobDict[jobId]["LocalIdFromManager"] = job.LocalIDFromManager 1048 self.jobDict[jobId]["Owner"] = job.Owner 1049 self.jobDict[jobId]["WaitingPosition"] = job.WaitingPosition 1050 self.jobDict[jobId]["LocalOwner"] = job.LocalOwner 1051 self.jobDict[jobId]["RequestedMainMemory"] = job.RequestedMainMemory 1052 self.jobDict[jobId]["RequestedSlots"] = job.RequestedSlots 1053 self.jobDict[jobId]["StdIn"] = job.StdIn 1054 self.jobDict[jobId]["StdOut"] = job.StdOut 1055 self.jobDict[jobId]["StdErr"] = job.StdErr 1056 self.jobDict[jobId]["LogDir"] = job.LogDir 1057 self.jobDict[jobId]["ExecutionCE"] = job.ExecutionCE 1058 self.jobDict[jobId]["Queue"] = job.Queue 1059 self.jobDict[jobId]["UsedMainMemory"] = job.UsedMainMemory 1060 self.jobDict[jobId]["UsedSlots"] = job.UsedSlots 1061 self.jobDict[jobId]["SubmissionHost"] = job.SubmissionHost 1062 self.jobDict[jobId]["SubmissionClientName"] = job.SubmissionClientName 1063 self.jobDict[jobId]["OtherMessages"] = job.OtherMessages 1064 self.jobDict[jobId]["VirtualMachine"] = job.VirtualMachine 1065 self.jobDict[jobId]["UsedCPUType"] = job.UsedCPUType 1066 self.jobDict[jobId]["UsedOSFamily"] = job.UsedOSFamily 1067 self.jobDict[jobId]["UsedPlatform"] = job.UsedPlatform 1068 self.jobDict[jobId]["RequestedTotalWallTime"] = job.RequestedTotalWallTime 1069 self.jobDict[jobId]["RequestedTotalCPUTime"] = job.RequestedTotalCPUTime 1070 self.jobDict[jobId]["ExecutionNode"] = job.ExecutionNode 1071 self.jobDict[jobId]["UsedTotalWallTime"] = job.UsedTotalWallTime 1072 self.jobDict[jobId]["UsedTotalCPUTime"] = job.UsedTotalCPUTime 1073 self.jobDict[jobId]["UsedApplicationEnvironment"] = job.UsedApplicationEnvironment 1074 self.jobDict[jobId]["LocalSubmissionTime"] = job.LocalSubmissionTime 1075 self.jobDict[jobId]["SubmissionTime"] = job.SubmissionTime 1076 self.jobDict[jobId]["ComputingManagerSubmissionTime"] = job.ComputingManagerSubmissionTime 1077 self.jobDict[jobId]["StartTime"] = job.StartTime 1078 self.jobDict[jobId]["ComputingManagerEndTime"] = job.ComputingManagerEndTime 1079 self.jobDict[jobId]["EndTime"] = job.EndTime 1080 self.jobDict[jobId]["WorkingAreaEraseTime"] = job.WorkingAreaEraseTime 1081 self.jobDict[jobId]["ProxyExpirationTime"] = job.ProxyExpirationTime 1082 self.jobDict[jobId]["CreationTime"] = job.CreationTime 1083 self.jobDict[jobId]["Validity"] = job.Validity 1084 1085 self.enableServiceFiltering()
1086
1087 - def sortKeysBy(self, byField=""):
1088 """ 1089 Sort job dictionary by field specified by the byField parameter. 1090 """ 1091 keys = self.jobDict.keys() 1092 if byField == "": 1093 keys.sort(lambda x, y: cmp(x, y)) 1094 else: 1095 try: 1096 keys.sort(lambda x, y: cmp(self.jobDict[x][byField], self.jobDict[y][byField]) ) 1097 except: 1098 keys.sort(lambda x, y: cmp(x, y)) 1099 return keys
1100
1101 - def printJobs(self):
1102 """ 1103 Print list of jobs managed by this class. 1104 """ 1105 for jobId in self.jobDict.keys(): 1106 print jobId
1107
1108 - def printStatus(self):
1109 """ 1110 Print status of jobs. 1111 """ 1112 jobmaster = arc.JobSupervisor(self.userConfig, [], [], self.jobListFilename) 1113 jobcontrollers = jobmaster.GetJobControllers(); 1114 1115 jobStatus = [] 1116 1117 for job in jobcontrollers: 1118 job.Stat(jobStatus, True)
1119
1120 - def getBrokerName(self):
1121 return self.__brokerName
1122
1123 - def setBrokerName(self, brokerName):
1124 self.userConfig.Broker(brokerName) 1125 self.__brokerName = brokerName
1126
1127 - def setTimeout(self, timeout):
1128 self.userConfig.Timeout(timeout)
1129
1130 - def getTimeout(self):
1131 return self.userConfig.Timeout()
1132 1133 debugLevel = property(getDebugLevel, setDebugLevel) 1134 proxyFilename = property(getProxyFilename, setProxyFilename) 1135 logStream = property(getLogStream, setLogStream) 1136 brokerName = property(getBrokerName, setBrokerName) 1137 timeout = property(getTimeout, setTimeout)
1138