1
2
3
4
5 """
6 Class wrappers for grid client libraries and utilities.
7 """
8
9 import arc, sys, os, getpass
10
11 import ConfigParser
12
14
15 temp = "finding offset"
16 header = ctypes.string_at(id(temp), sys.getsizeof(temp)).find(temp)
17
18 location = id(string) + header
19 size = sys.getsizeof(string) - header
20
21
22
23 memset = ctypes.CDLL("libc.so.6").memset
24
25 self.logInfoMsg("Clearing 0x%08x size %i bytes" % (location, size))
26
27 memset(location, 0, size)
28
30 """
31 Class implementing the configuration options for ARC Job Tool.
32
33 The class combines the configuration for ARC and ARC Job Tool
34 into a single configuration file arcjobtool.conf.
35 """
36 - def __init__(self, userConfig, arcClient):
44
46 """
47 Adds a default service to the list of services.
48 """
49 self.defaultServices.append(serviceName)
50
52 """
53 Clears all services in the default services lists.
54 """
55 self.defaultServices = []
56
58 """
59 Adds a service to the rejected list.
60 """
61 self.rejectedServices.append(serviceName)
62
64 """
65 Clears all services in the rejected list.
66 """
67 self.rejectedServices = []
68
70 """
71 Write the configuration to file.
72 """
73 parser = ConfigParser.RawConfigParser()
74 if not parser.has_section("common"):
75 parser.add_section("common")
76
77 allServices = list(self.defaultServices)
78
79 if len(self.defaultServices)>0:
80 parser.set("common", "defaultservices", " ".join(self.defaultServices))
81
82 if len(self.rejectedServices)>0:
83 parser.set("common", "rejectservices", " ".join(self.rejectedServices))
84
85 if self.brokerName!="":
86 parser.set("common", "brokername", self.brokerName)
87
88 if self.brokerArguments!="":
89 parser.set("common", "brokerarguments", self.brokerArguments)
90
91 parser.set("common", "timeout", str(self.timeout))
92 parser.set("common", "proxypath", self.proxyPath)
93 parser.set("common", "keypath", self.keyPath)
94 parser.set("common", "certificatepath", self.certificatePath)
95 parser.set("common", "cacertificatesdirectory", self.cacertificatesDirectory)
96
97 parser.add_section("arcgui")
98 parser.set("arcgui", "automaticdownload", self.automaticDownload)
99 parser.set("arcgui", "automaticdownloadinterval", self.automaticDownloadInterval)
100 parser.set("arcgui", "automaticupdate", self.automaticUpdate)
101 parser.set("arcgui", "automaticupdateinterval", self.automaticUpdateInterval)
102
103 configFile = open(self.__filename, "w")
104 parser.write(configFile)
105 configFile.close()
106
108 """
109 Initialises default properties for the configuration.
110 """
111
112
113
114 self.defaultServices = []
115 self.rejectedServices = []
116 self.__brokerName = "Random"
117 self.brokerArguments = ""
118 self.timeout = "50"
119 self.proxyPath = ""
120 self.keyPath = ""
121 self.certificatePath = ""
122 self.cacertificatesDirectory = ""
123
124 uid = os.getuid()
125
126 if self.certificatePath == "":
127 self.certificatePath = os.path.expanduser("~/.globus/usercert.pem")
128 self.userConfig.CertificatePath(self.certificatePath)
129
130 if self.keyPath == "":
131 self.keyPath = os.path.expanduser("~/.globus/userkey.pem")
132 self.userConfig.KeyPath(self.keyPath)
133
134 if self.cacertificatesDirectory == "":
135 self.cacertificatesDirectory = "/etc/grid-security/certificates"
136 self.userConfig.CACertificatesDirectory(self.cacertificatesDirectory)
137
138 if self.proxyPath == "":
139 self.proxyPath = "/tmp/x509up_u%d" % uid
140 self.userConfig.ProxyPath(self.proxyPath)
141
142
143
144 if self.arcClient!=None:
145 self.arcClient.brokerName = self.brokerName
146
147
148
149 self.automaticDownload = False
150 self.automaticDownloadInterval = 120000
151
152 self.automaticUpdate = True
153 self.automaticUpdateInterval = 60000
154
155 self.showSplash = True
156
158 """
159 Disables the rejected list
160 """
161 self.userConfig.ClearRejectedServices(arc.INDEX)
162 self.userConfig.ClearRejectedServices(arc.COMPUTING)
163
165 """
166 Enables the rejected list.
167 """
168 self.updateServices()
169
177
184
186 """
187 Update the ARC configuration with the maintained lists in
188 this class.
189 """
190 self.userConfig.ClearSelectedServices(arc.INDEX)
191 self.userConfig.ClearRejectedServices(arc.COMPUTING)
192
193 for service in self.defaultServices:
194 flavour = service.split(":")[0]
195 if flavour == "index":
196 serviceURL = service.split("index:")[1]
197 self.userConfig.AddServices([serviceURL], arc.INDEX)
198 elif flavour == "computing":
199 serviceURL = service.split("computing:")[1]
200 self.userConfig.AddServices([serviceURL], arc.COMPUTING)
201 else:
202 self.userConfig.AddServices([service], arc.INDEX)
203
204 for service in self.rejectedServices:
205 flavour = service.split(":")[0]
206 if flavour == "index":
207 serviceURL = service.split("index:")[1]
208 self.userConfig.AddServices(["-"+serviceURL], arc.INDEX)
209 elif flavour == "computing":
210 serviceURL = service.split("computing:")[1]
211 self.userConfig.AddServices(["-"+serviceURL], arc.COMPUTING)
212 else:
213 self.userConfig.AddServices(["-"+service], arc.INDEX)
214
216 """
217 Read configuration from file.
218 """
219 self.userConfig.LoadConfigurationFile(self.__filename, True)
220
221 self.__initDefaultProperties()
222
223
224
225 self.timeout = self.userConfig.Timeout()
226 self.proxyPath = str(self.userConfig.ProxyPath())
227
228
229 self.cacertificatesDirectory = str(self.userConfig.CACertificatesDirectory())
230
231
232
233
234 newAPI = True
235
236 if newAPI:
237 self.defaultServices = []
238 self.rejectedServices = []
239
240 rejectedDups = {}
241
242 selectedServices = self.userConfig.GetSelectedServices(arc.INDEX)
243
244 for flavour in selectedServices.keys():
245 for serviceName in selectedServices[flavour]:
246 self.defaultServices.append("index:"+flavour+":"+serviceName.fullstr())
247
248 selectedServices = self.userConfig.GetSelectedServices(arc.COMPUTING)
249 for flavour in selectedServices.keys():
250 for serviceName in selectedServices[flavour]:
251 self.defaultServices.append("computing:"+flavour+":"+serviceName.fullstr())
252
253
254
255 rejectedServices = self.userConfig.GetRejectedServices(arc.INDEX)
256 for flavour in rejectedServices.keys():
257 for serviceName in rejectedServices[flavour]:
258 if not rejectedDups.has_key(serviceName):
259 self.rejectedServices.append("index:"+flavour+":"+serviceName.fullstr())
260 rejectedDups[serviceName] = serviceName
261
262 rejectedServices = self.userConfig.GetRejectedServices(arc.COMPUTING)
263 for flavour in rejectedServices.keys():
264 for serviceName in rejectedServices[flavour]:
265 if not rejectedDups.has_key(serviceName):
266 self.rejectedServices.append("computing:"+flavour+":"+serviceName.fullstr())
267 rejectedDups[serviceName] = serviceName
268
269 parser = ConfigParser.RawConfigParser()
270 parser.read(self.__filename)
271
272 if parser.has_section("common"):
273
274 if newAPI:
275 pass
276 else:
277 if parser.has_option("common", "defaultservices"):
278 defaultServicesText = parser.get("common", "defaultservices")
279 self.defaultServices = defaultServicesText.strip().split()
280 if parser.has_option("common", "rejectedservices"):
281 rejectedServicesText = parser.get("common", "rejectedservices")
282 self.rejectedServices = rejectedServicesText.strip().split()
283
284 if parser.has_option("common", "brokername"):
285 self.brokerName = parser.get("common", "brokername")
286
287 if parser.has_option("common", "certificatepath"):
288 self.certificatePath = parser.get("common", "certificatepath")
289 if parser.has_option("common", "keypath"):
290 self.keyPath = parser.get("common", "keypath")
291
292 if parser.has_section("arcgui"):
293 if parser.has_option("arcgui", "automaticdownload"):
294 self.automaticDownload = parser.getboolean("arcgui", "automaticdownload")
295 if parser.has_option("arcgui", "automaticdownloadinterval"):
296 self.automaticDownloadInterval = int(parser.get("arcgui", "automaticdownloadinterval"))
297 if parser.has_option("arcgui", "automaticupdate"):
298 self.automaticUpdate = parser.getboolean("arcgui", "automaticupdate")
299 if parser.has_option("arcgui", "automaticupdateinterval"):
300 self.automaticUpdateInterval = int(parser.get("arcgui", "automaticupdateinterval"))
301
302 uid = os.getuid()
303
304 if self.certificatePath == "":
305 self.certificatePath = os.path.expanduser("~/.globus/usercert.pem")
306
307 if self.keyPath == "":
308 self.keyPath = os.path.expanduser("~/.globus/userkey.pem")
309
310 if self.cacertificatesDirectory == "":
311 self.cacertificatesDirectory = "/etc/grid-security/certificates"
312
313 if self.proxyPath == "":
314 self.proxyPath = "/tmp/x509up_u%d" % uid
315
316 self.userConfig.CertificatePath(self.certificatePath)
317 self.userConfig.KeyPath(self.keyPath)
318 self.userConfig.CACertificatesDirectory(self.cacertificatesDirectory)
319 self.userConfig.ProxyPath(self.proxyPath)
320
322 """
323 Sets the filename to use for the configuration file (property set)
324 """
325 self.__filename = filename
326
328 """
329 Returns the configuration filename (property get)
330 """
331 return self.__filename
332
334 """
335 Returns the current brokername (property get)
336 """
337 return self.__brokerName
338
347
348 brokerName = property(getBrokerName, setBrokerName)
349
350 filename = property(getFilename, setFilename)
351
352
354 """
355 Class implementing different user authentication mechanism.
356 """
358 """
359 Class constructor
360 """
361
362 self.userConfig = userConfig
363
364 self.certificatePath = self.userConfig.CertificatePath()
365 self.keyPath = self.userConfig.KeyPath()
366 self.caDir = self.userConfig.CACertificatePath()
367 self.proxyPath = self.userConfig.ProxyPath()
368
369 self.period = "12"
370 self.keybits = 1024
371 self.prompt = ""
372 self.proxyType = "gsi2"
373 self.errorMessage = ""
374
375 self.__logger = arc.Logger(arc.Logger_getRootLogger(), "UserAuth")
376
377 - def logMsg(self, level, msg):
378 """
379 Logs a message to the ARC logger.
380 """
381 self.__logger.msg(level, msg)
382
384 """
385 Logs an info message to the ARC logger."
386 """
387 self.logMsg(arc.INFO, msg)
388
390 """
391 Logs a debug message to the ARC logger.
392 """
393 self.logMsg(arc.DEBUG, msg)
394
396 """
397 Logs an error message to the ARC logger.
398 """
399 self.logMsg(arc.ERROR, msg)
400
402 """
403 Virtual method for querying the user for a password. Should be overidden
404 by derived classes.
405 """
406 if prompt == "":
407 return getpass.getpass("Passphrase:")
408 else:
409 return getpass.getpass(prompt)
410
412 """
413 Create a local proxy certificate
414 """
415
416 if self.proxyPath == "":
417 self.proxyPath = "/tmp/x509up_u%d" % os.getuid()
418
419
420
421 self.logInfoMsg("Load user certificate and key.")
422
423 passphrase = self.onPassphrasePrompt(self.prompt)
424 if passphrase == "":
425 self.errorMessage = ""
426 return False
427
428 signer = arc.Credential(self.certificatePath, self.keyPath, self.caDir, "", str(passphrase))
429
430
431 del passphrase
432
433
434
435 start = arc.Time()
436 arcPeriod = arc.Period(self.period, arc.PeriodHours)
437 policy = ""
438
439
440
441 self.logInfoMsg("Create proxy certificate request.")
442
443 credRequest = arc.Credential(start, arcPeriod, self.keybits, "rfc", "inheritAll", policy, -1)
444 generateRequestOk, requestString = credRequest.GenerateRequest()
445
446 if not generateRequestOk:
447 self.errorMessage = "Could not generate proxy certificate request."
448 return False
449
450 outputPrivateKeyOk, privateKey = credRequest.OutputPrivatekey()
451
452 if not outputPrivateKeyOk:
453 self.errorMessage = "Could not generate private key."
454 return False
455
456 outputCertificateOk, signingCert = signer.OutputCertificate()
457
458 if not outputCertificateOk:
459 self.errorMessage = "Could not get user certificate."
460 return False
461
462 outputCertificateChainOk, signingCertChain = signer.OutputCertificateChain()
463
464 if not outputCertificateOk:
465 self.errorMessage = "Could not get certificate chain."
466 return False
467
468
469
470 if self.proxyType == "rfc":
471 credRequest.SetProxyPolicy("rfc", "inheritAll", "", -1)
472 else:
473 credRequest.SetProxyPolicy("gsi2", "", "", -1)
474
475
476
477 self.logInfoMsg("Sign request with user certificate.")
478
479 signRequestOk, proxyCert = signer.SignRequest(credRequest)
480
481 if not signRequestOk:
482 self.errorMessage = "Failed to sign proxy request."
483 return False
484
485
486
487
488 fullProxyCert = proxyCert + privateKey + signingCert + signingCertChain
489
490 proxyCertFile = open(self.proxyPath, "w")
491 proxyCertFile.write(fullProxyCert)
492 proxyCertFile.close()
493
494
495
496 os.chmod(self.proxyPath, 0600)
497
498 return True
499
501 """
502 Class for making job description handling easier.
503 """
505 """
506 Class constructor
507 """
508 arc.JobDescription.__init__(self)
509
511 """
512 Add argument description.
513 """
514 self.Application.Executable.Argument.append(argument)
515
534
540
541 - def addOutputFile(self, name, url="", keepData = True, isExecutable = False, downloadToCache = False, threads = -1):
542 """
543 Add outputfile to job description.
544 """
545 outputFile = arc.FileType()
546 outputFile.Name = name
547 outputFile.KeepData = False
548 outputFile.IsExecutable = False
549 outputFile.DownloadToCache = False
550 outputFileTarget = arc.DataTargetType()
551 outputFileTarget.URI = arc.URL(url)
552 outputFileTarget.Threads = threads
553 outputFileTarget.Mandatory = True
554 outputFile.Target.append(outputFileTarget)
555 self.DataStaging.File.append(outputFile)
556
558 """
559 Clear output files.
560 """
561 pass
562
564 """
565 Add a runtime environment and version.
566 """
567 softReq = arc.SoftwareRequirement()
568 software = arc.Software(name+"-"+version)
569
570 arcRelOp = arc.Software.GREATERTHANOREQUAL
571
572 if relOp == "<":
573 arcRelOp = arc.Software.LESSTHAN
574 elif relOp == "<=":
575 arcRelOp = arc.Software.LESSTHANOREQUAL
576 elif relOp == "==":
577 arcRelOp = arc.Software.EQUAL
578 elif relOp == ">=":
579 arcRelOp = arc.Software.GREATERTHANOREQUAL
580 elif relOp == ">":
581 arcRelOp = arc.Software.GREATERTHAN
582
583 self.Resources.RunTimeEnvironment.add(software, arcRelOp)
584
586 - def __init__(self, userConfig=None, logStream=None):
587 """
588 ArcClient class constructor
589
590 Initialises properties and variables, sets up logging and
591 loads job list.
592 """
593
594
595
596 self.__logger = arc.Logger(arc.Logger_getRootLogger(), "ArcClient")
597
598
599
600 self.__proxyFilename = ""
601 self.jobListFilename = os.path.expanduser("~/.arc/jobs.xml")
602 self.userConfigFilename = os.path.expanduser("~/.arc/arcjobtool.conf")
603 self.downloadDir = "."
604
605
606
607 self.logInfoMsg("Creating UserConfig.")
608 if userConfig == None:
609 self.userConfig = arc.UserConfig(self.userConfigFilename, self.jobListFilename, True)
610 else:
611 self.userConfig = userConfig
612
613 self.targets = []
614 self.filteredTargets = []
615 self.jobList = None
616 self.jobDict = {}
617 self.clusters = []
618 self.__debugLevel = arc.VERBOSE
619 self.__doFiltering = True
620 self.updateProgress = None
621 self.currentTarget = None
622 self.__brokerName = "Random"
623
624
625
626
627
628
629
630
631
632
633
634
635
636 self.logInfoMsg("Create BrokerLoader.")
637
638 self.__brokerLoader = arc.BrokerLoader()
639 self.__broker = self.__brokerLoader.load(self.__brokerName, self.userConfig)
640
641 self.logInfoMsg("Create TargetGenerator.")
642
643 self.__targetGenerator = arc.TargetGenerator(self.userConfig);
644
645
646
647 self.logInfoMsg("Initialise job list.")
648
649 self.__loadJobs()
650
652 """
653 Enable rejected services list.
654 """
655 self.userConfig.LoadConfigurationFile(self.userConfigFilename)
656
658 """
659 Disable rejected services list.
660 """
661 self.userConfig.ClearSelectedServices(arc.INDEX)
662 self.userConfig.ClearSelectedServices(arc.COMPUTING)
663 self.userConfig.ClearRejectedServices(arc.INDEX)
664 self.userConfig.ClearRejectedServices(arc.COMPUTING)
665
666 - def logMsg(self, level, msg):
667 """
668 Logs a message to the ARC logger.
669 """
670 self.__logger.msg(level, msg)
671
673 """
674 Logs an info message to the ARC logger.
675 """
676 self.logMsg(arc.INFO, msg)
677
679 """
680 Logs a debug message to the ARC logger.
681 """
682 self.logMsg(arc.DEBUG, msg)
683
685 """
686 Logs an error message to the ARC logger.
687 """
688 self.logMsg(arc.ERROR, msg)
689
691 """
692 Assigns the logstream to instance.
693 """
694 self.__logStream = stream
695 logcout = arc.LogStream(stream);
696 arc.Logger_getRootLogger().removeDestinations()
697 arc.Logger_getRootLogger().addDestination(logcout)
698
700 """
701 Return current log stream.
702 """
703 return self.__logStream
704
706 """
707 Set debug level of the arc library.
708 """
709 self.__debugLevel = level
710 arc.Logger_getRootLogger().setThreshold(self.__debugLevel);
711
713 """
714 Return the debug level of the arc library.
715 """
716 return self.__debugLevel
717
719 """
720 Set proxy filename.
721 """
722 self.__proxyFilename = filename
723
725 """
726 Return proxy filename
727 """
728 return self.__proxyFilename
729
731 """
732 Save current configuration.
733 """
734 self.userConfig.ConfTree().SaveToFile(self.userConfigFilename)
735
737 """
738 Load job list from XML file.
739 """
740 self.jobList = arc.Config(arc.NS())
741 if os.path.exists(self.jobListFilename):
742 self.jobList.ReadFromFile(self.jobListFilename)
743 self.jobDict.clear()
744 for j in self.jobList.Path("Job"):
745 self.jobDict[str(j.Get("JobID"))]= {}
746
748 """
749 Load job list from XML file.
750 """
751 self.__loadJobs()
752
754 """
755 Return status of proxy.
756 """
757 return self.userConfig.CheckProxy()
758
760 """
761 Find possible targets by querying information system.
762 """
763
764 self.doProgress("Finding suitable targets.")
765
766 self.targets = None
767 self.__targetGenerator = arc.TargetGenerator(self.userConfig);
768 self.__targetGenerator.GetTargets(0, 1);
769 self.targets = self.__targetGenerator.ModifyFoundTargets()
770
772 """
773 Wrapper function for encapsulating ARC1 loading of
774 of a broker instance.
775 """
776
777 return self.__loader.load(brokerName, self.userConfig)
778
781
783 """
784 Return a filtered list of suitable targets based on the
785 RandomBroker component.
786 """
787 self.doProgress("Prefiltering targets.")
788
789 self.__broker = self.__brokerLoader.load(self.__brokerName, self.userConfig)
790 self.__broker.PreFilterTargets(self.targets, job)
791
792 self.currentTarget = self.__broker.GetBestTarget()
793
794 return self.currentTarget
795
797 """
798 Submit job to grid. Requires that a list of filtered targets
799 exists.
800 """
801 self.doProgress("Submitting job.")
802
803 if self.currentTarget==None:
804 return None
805
806 submitter = self.currentTarget.GetSubmitter(self.userConfig)
807 jobURL = submitter.Submit(job, self.currentTarget)
808 self.logInfoMsg("Submitted: " + jobURL.fullstr())
809
810 if jobURL:
811 self.doProgress("Updating job file.")
812 self.__loadJobs()
813 return jobURL
814 else:
815 return jobURL
816
818 """
819 Submit job to grid. Requires that a list of filtered targets
820 exists.
821 """
822 self.doProgress("Submitting job.")
823
824 if self.currentTarget==None:
825 return None
826
827 submitter = self.currentTarget.GetSubmitter(self.userConfig)
828 jobURL = submitter.Submit(job, self.currentTarget)
829 self.logInfoMsg("Submitted: " + jobURL.fullstr())
830
831 if jobURL:
832 self.doProgress("Updating job file.")
833 self.__loadJobs()
834 return jobURL
835 else:
836 return jobURL
837
839 """
840 Submit job list to grid. Requires a
841 """
842 self.doProgress("Submitting job.")
843
844 self.enableServiceFiltering()
845
846 resultList = [None]*len(jobList)
847
848 if self.currentTarget=="":
849 return resultList
850
851 i = 0
852
853 self.__broker = self.__brokerLoader.load(self.__brokerName, self.userConfig)
854
855 for job in jobList:
856
857 self.__broker.PreFilterTargets(self.targets, job)
858
859 jobURL = None
860
861 while True:
862 bestTarget = self.__broker.GetBestTarget()
863 if bestTarget!=None:
864 submitter = bestTarget.GetSubmitter(self.userConfig)
865 jobURL = submitter.Submit(job, bestTarget)
866 if jobURL:
867 break
868
869 resultList[i] = jobURL
870 else:
871 break
872
873 if jobURL!=None:
874 self.doProgress("Updating job file.")
875 self.__loadJobs()
876
877 resultList[i] = jobURL
878 i += 1
879
880 self.disableServiceFiltering()
881
882 return resultList
883
884 - def get(self, jobIds = [], status = ["FINISHED", "FAILED"], keep=False):
885 """
886 Download results from jobs.
887 """
888 self.doProgress("Retrieving job controllers.")
889 jobSupervisor = arc.JobSupervisor(self.userConfig, jobIds);
890 jobControllers = jobSupervisor.GetJobControllers()
891
892 for controller in jobControllers:
893 self.doProgress("Retrieving job.")
894 controller.Get(status, str(self.downloadDir), keep)
895
896 self.doProgress("Done.")
897
898 self.__loadJobs()
899
900 - def kill(self, jobIds = [], status = [], force = False):
901 """
902 Kill running jobs.
903 """
904
905 jobSupervisor = arc.JobSupervisor(self.userConfig, jobIds);
906 jobControllers = jobSupervisor.GetJobControllers();
907
908 for controller in jobControllers:
909 controller.Kill(status, force);
910
911 self.__loadJobs()
912
913 - def clean(self, jobIds = [], status = [], force = False):
914 """
915 Clean running jobs.
916 """
917 jobSupervisor = arc.JobSupervisor(self.userConfig, jobIds);
918 jobControllers = jobSupervisor.GetJobControllers();
919
920 for controller in jobControllers:
921 controller.Clean(status, force);
922
923 self.__loadJobs()
924
926 """
927 Call progress update callback if assigned otherwise
928 ignore.
929
930 message - Message to pass to progress callback function.
931 """
932 if self.updateProgress!=None:
933 self.updateProgress(message)
934
936 """
937 Query information system and populate the jobs property with additional
938 job information.
939 """
940
941 """
942 std::string Flavour;
943 URL JobID;
944 URL Cluster;
945 // Optional information (ACCs fills if they need it)
946 URL SubmissionEndpoint;
947 URL InfoEndpoint;
948 URL ISB;
949 URL OSB;
950 // ACC implementation dependent information
951 URL AuxURL;
952 std::string AuxInfo;
953
954 // Information retrieved from the information system
955 std::string Name;
956 std::string Type;
957 URL IDFromEndpoint;
958 std::string LocalIDFromManager;
959 std::string JobDescription;
960 JobState State;
961 std::string RestartState;
962 std::map<std::string, std::string> AuxStates; //for all state models
963 std::map<std::string, std::string> RestartStates; //for all state models
964 int ExitCode;
965 std::string ComputingManagerExitCode;
966 std::list<std::string> Error;
967 int WaitingPosition;
968 std::string UserDomain;
969 std::string Owner;
970 std::string LocalOwner;
971 Period RequestedTotalWallTime;
972 Period RequestedTotalCPUTime;
973 int RequestedMainMemory; // Deprecated??
974 int RequestedSlots;
975 std::list<std::string> RequestedApplicationEnvironment;
976 std::string StdIn;
977 std::string StdOut;
978 std::string StdErr;
979 std::string LogDir;
980 std::list<std::string> ExecutionNode;
981 std::string ExecutionCE; // Deprecated??
982 std::string Queue;
983 Period UsedTotalWallTime;
984 Period UsedTotalCPUTime;
985 int UsedMainMemory;
986 std::list<std::string> UsedApplicationEnvironment;
987 int UsedSlots;
988 Time LocalSubmissionTime;
989 Time SubmissionTime;
990 Time ComputingManagerSubmissionTime;
991 Time StartTime;
992 Time ComputingManagerEndTime;
993 Time EndTime;
994 Time WorkingAreaEraseTime;
995 Time ProxyExpirationTime;
996 std::string SubmissionHost;
997 std::string SubmissionClientName;
998 Time CreationTime;
999 Period Validity;
1000 std::list<std::string> OtherMessages;
1001 //Associations
1002 URL JobManagementEndpoint;
1003 URL DataStagingEndpoint;
1004 std::list<std::string> ActivityOldId;
1005 //ExecutionEnvironment (condensed)
1006 bool VirtualMachine;
1007 std::string UsedCPUType;
1008 std::string UsedOSFamily;
1009 std::string UsedPlatform;
1010 """
1011
1012 self.disableServiceFiltering()
1013
1014 self.doProgress("Creating job supervisor.")
1015
1016 self.logInfoMsg("Creating JobSupervisor")
1017 jobSupervisor = arc.JobSupervisor(self.userConfig, [])
1018 self.logInfoMsg("Retrieving job controllers.")
1019 jobControllers = jobSupervisor.GetJobControllers();
1020
1021 self.doProgress("Querying job controllers")
1022
1023 for controller in jobControllers:
1024 self.doProgress("Getting job information.")
1025 controller.GetJobInformation()
1026 self.doProgress("Extracting job information.")
1027 jobStore = controller.GetJobs()
1028 for job in jobStore:
1029 jobId = job.JobID.str()
1030 jobState = job.State
1031 if self.jobDict.has_key(jobId):
1032 self.jobDict[jobId]["State"] = jobState()
1033 self.jobDict[jobId]["Name"] = job.Name
1034
1035 if len(job.Error)>0:
1036 self.jobDict[jobId]["Error"] = str(job.Error[0])
1037 else:
1038 self.jobDict[jobId]["Error"] = ""
1039
1040 self.jobDict[jobId]["Cluster"] = job.Cluster.fullstr()
1041 self.jobDict[jobId]["SubmissionEndpoint"] = job.SubmissionEndpoint.fullstr()
1042 self.jobDict[jobId]["InfoEndpoint"] = job.InfoEndpoint.fullstr()
1043 self.jobDict[jobId]["Type"] = job.Type
1044 self.jobDict[jobId]["JobDescription"] = job.JobDescription
1045 self.jobDict[jobId]["ExitCode"] = job.ExitCode
1046 self.jobDict[jobId]["UserDomain"] = job.UserDomain
1047 self.jobDict[jobId]["LocalIdFromManager"] = job.LocalIDFromManager
1048 self.jobDict[jobId]["Owner"] = job.Owner
1049 self.jobDict[jobId]["WaitingPosition"] = job.WaitingPosition
1050 self.jobDict[jobId]["LocalOwner"] = job.LocalOwner
1051 self.jobDict[jobId]["RequestedMainMemory"] = job.RequestedMainMemory
1052 self.jobDict[jobId]["RequestedSlots"] = job.RequestedSlots
1053 self.jobDict[jobId]["StdIn"] = job.StdIn
1054 self.jobDict[jobId]["StdOut"] = job.StdOut
1055 self.jobDict[jobId]["StdErr"] = job.StdErr
1056 self.jobDict[jobId]["LogDir"] = job.LogDir
1057 self.jobDict[jobId]["ExecutionCE"] = job.ExecutionCE
1058 self.jobDict[jobId]["Queue"] = job.Queue
1059 self.jobDict[jobId]["UsedMainMemory"] = job.UsedMainMemory
1060 self.jobDict[jobId]["UsedSlots"] = job.UsedSlots
1061 self.jobDict[jobId]["SubmissionHost"] = job.SubmissionHost
1062 self.jobDict[jobId]["SubmissionClientName"] = job.SubmissionClientName
1063 self.jobDict[jobId]["OtherMessages"] = job.OtherMessages
1064 self.jobDict[jobId]["VirtualMachine"] = job.VirtualMachine
1065 self.jobDict[jobId]["UsedCPUType"] = job.UsedCPUType
1066 self.jobDict[jobId]["UsedOSFamily"] = job.UsedOSFamily
1067 self.jobDict[jobId]["UsedPlatform"] = job.UsedPlatform
1068 self.jobDict[jobId]["RequestedTotalWallTime"] = job.RequestedTotalWallTime
1069 self.jobDict[jobId]["RequestedTotalCPUTime"] = job.RequestedTotalCPUTime
1070 self.jobDict[jobId]["ExecutionNode"] = job.ExecutionNode
1071 self.jobDict[jobId]["UsedTotalWallTime"] = job.UsedTotalWallTime
1072 self.jobDict[jobId]["UsedTotalCPUTime"] = job.UsedTotalCPUTime
1073 self.jobDict[jobId]["UsedApplicationEnvironment"] = job.UsedApplicationEnvironment
1074 self.jobDict[jobId]["LocalSubmissionTime"] = job.LocalSubmissionTime
1075 self.jobDict[jobId]["SubmissionTime"] = job.SubmissionTime
1076 self.jobDict[jobId]["ComputingManagerSubmissionTime"] = job.ComputingManagerSubmissionTime
1077 self.jobDict[jobId]["StartTime"] = job.StartTime
1078 self.jobDict[jobId]["ComputingManagerEndTime"] = job.ComputingManagerEndTime
1079 self.jobDict[jobId]["EndTime"] = job.EndTime
1080 self.jobDict[jobId]["WorkingAreaEraseTime"] = job.WorkingAreaEraseTime
1081 self.jobDict[jobId]["ProxyExpirationTime"] = job.ProxyExpirationTime
1082 self.jobDict[jobId]["CreationTime"] = job.CreationTime
1083 self.jobDict[jobId]["Validity"] = job.Validity
1084
1085 self.enableServiceFiltering()
1086
1088 """
1089 Sort job dictionary by field specified by the byField parameter.
1090 """
1091 keys = self.jobDict.keys()
1092 if byField == "":
1093 keys.sort(lambda x, y: cmp(x, y))
1094 else:
1095 try:
1096 keys.sort(lambda x, y: cmp(self.jobDict[x][byField], self.jobDict[y][byField]) )
1097 except:
1098 keys.sort(lambda x, y: cmp(x, y))
1099 return keys
1100
1102 """
1103 Print list of jobs managed by this class.
1104 """
1105 for jobId in self.jobDict.keys():
1106 print jobId
1107
1109 """
1110 Print status of jobs.
1111 """
1112 jobmaster = arc.JobSupervisor(self.userConfig, [], [], self.jobListFilename)
1113 jobcontrollers = jobmaster.GetJobControllers();
1114
1115 jobStatus = []
1116
1117 for job in jobcontrollers:
1118 job.Stat(jobStatus, True)
1119
1121 return self.__brokerName
1122
1126
1129
1132
1133 debugLevel = property(getDebugLevel, setDebugLevel)
1134 proxyFilename = property(getProxyFilename, setProxyFilename)
1135 logStream = property(getLogStream, setLogStream)
1136 brokerName = property(getBrokerName, setBrokerName)
1137 timeout = property(getTimeout, setTimeout)
1138