여러개의 serial_sh 파일을 처리해주는 프로그램
여러개의 job을 qsub를 이용해서 처리할 때, 각각의 job마다 serial_sh에 해당하는 파일을 만들어주는 작업이 번거로워서 하나의 job_description_file로 모든 job을 관리할 수 있도록 해주는 스크립트 프로그램을 만들어 보았다.
subJobs.py의 소스코드는 다음과 같다.
테스트에 사용된 testJobs는 다음과 같다.
subJobs.py의 소스코드는 다음과 같다.
#!/usr/bin/python
#====================================================================#
# This script submits jobs described in argument to PBS queue. #
# #
# The format of argument file : #
# >[job_name]:[err_file]:[log_file] #
# [programs_to_execute] #
# #
# Example : #
# $ cat jobs #
# >serial1:serial1.err:serial2.log #
# $PBS_O_WORKDIR/myjob1 #
# >serial2:: #
# $PBS_O_WORKDIR/another_job #
# #
# If [err_file] or [log_file] is empty, [job_name].pbs.err or #
# [job_name].pbs.log is used. #
#====================================================================#
import unittest, sys, re, os
def readJobs(infile):
ret = []
(name,err,log,cmd) = ("","","","")
fp = open(infile,'r')
while True:
line = fp.readline()
if line == "":
ret.append((name,err,log,cmd))
break
if line[0] == ">":
if name != "": ret.append((name,err,log,cmd))
(name,err,log) = parseOption(line)
cmd = ""
else:
cmd += line.rstrip("\n")+"\n"
fp.close()
return ret
def parseOption(option):
(name,err,log) = re.split(":",(option.rstrip("\n"))[1:])
if err == "": err = name+".pbs.err"
if log == "": log = name+".pbs.log"
return name,err,log
def makeScript((name,err,log,cmd)):
filename = "subJob_sh.tmp"
fp = open(filename,'w')
fp.write("#!/bin/sh\n")
fp.write("#This pbs script is created by subJobs.py\n")
fp.write("#PBS -N %s\n" %(name))
fp.write("#PBS -e %s\n" %(err))
fp.write("#PBS -o %s\n" %(log))
fp.write("#PBS -q long\n")
fp.write(cmd)
fp.close()
return filename
def submit(filename):
job_id = os.system("qsub "+filename)
os.remove(filename)
return job_id
class Test(unittest.TestCase):
content = [
("job1","job1.err","job1.log"
,"echo Running on host `hostname`\ncd $PBS_O_WORKDIR\npwd\n"),
("job2","job2.pbs.err","job2.pbs.log","date\n")]
def test_readJobs(self):
self.assertEquals(self.content,readJobs("testJobs"))
def test_makeScript(self):
self.assertEquals("subJob_sh.tmp",makeScript(self.content[0]))
def test_submit(self):
self.assertNotEquals(None,submit(makeScript(self.content[0])))
self.assertNotEquals(None,submit(makeScript(self.content[1])))
if __name__=='__main__':
if len(sys.argv) < 2:
print "Usage :"
print " subJobs.py [job_description_file]"
print ""
print "The format of argument file :"
print " >[job_name]:[err_file]:[log_file]"
print " [programs_to_execute]"
print ""
print "Example :"
print " $ cat jobs"
print " >serial1:serial1.err:serial2.log"
print " $PBS_O_WORKDIR/myjob1"
print " >serial2::"
print " $PBS_O_WORKDIR/another_job"
print ""
print "If [err_file] or [log_file] is empty, [job_name].pbs.err or"
print "[job_name].pbs.log is used."
for i in range(1,len(sys.argv),1):
job_desc = readJobs(sys.argv[i])
for j in range(len(job_desc)):
job_id = submit(makeScript(job_desc[j]))
# unittest.main()
#====================================================================#
# This script submits jobs described in argument to PBS queue. #
# #
# The format of argument file : #
# >[job_name]:[err_file]:[log_file] #
# [programs_to_execute] #
# #
# Example : #
# $ cat jobs #
# >serial1:serial1.err:serial2.log #
# $PBS_O_WORKDIR/myjob1 #
# >serial2:: #
# $PBS_O_WORKDIR/another_job #
# #
# If [err_file] or [log_file] is empty, [job_name].pbs.err or #
# [job_name].pbs.log is used. #
#====================================================================#
import unittest, sys, re, os
def readJobs(infile):
ret = []
(name,err,log,cmd) = ("","","","")
fp = open(infile,'r')
while True:
line = fp.readline()
if line == "":
ret.append((name,err,log,cmd))
break
if line[0] == ">":
if name != "": ret.append((name,err,log,cmd))
(name,err,log) = parseOption(line)
cmd = ""
else:
cmd += line.rstrip("\n")+"\n"
fp.close()
return ret
def parseOption(option):
(name,err,log) = re.split(":",(option.rstrip("\n"))[1:])
if err == "": err = name+".pbs.err"
if log == "": log = name+".pbs.log"
return name,err,log
def makeScript((name,err,log,cmd)):
filename = "subJob_sh.tmp"
fp = open(filename,'w')
fp.write("#!/bin/sh\n")
fp.write("#This pbs script is created by subJobs.py\n")
fp.write("#PBS -N %s\n" %(name))
fp.write("#PBS -e %s\n" %(err))
fp.write("#PBS -o %s\n" %(log))
fp.write("#PBS -q long\n")
fp.write(cmd)
fp.close()
return filename
def submit(filename):
job_id = os.system("qsub "+filename)
os.remove(filename)
return job_id
class Test(unittest.TestCase):
content = [
("job1","job1.err","job1.log"
,"echo Running on host `hostname`\ncd $PBS_O_WORKDIR\npwd\n"),
("job2","job2.pbs.err","job2.pbs.log","date\n")]
def test_readJobs(self):
self.assertEquals(self.content,readJobs("testJobs"))
def test_makeScript(self):
self.assertEquals("subJob_sh.tmp",makeScript(self.content[0]))
def test_submit(self):
self.assertNotEquals(None,submit(makeScript(self.content[0])))
self.assertNotEquals(None,submit(makeScript(self.content[1])))
if __name__=='__main__':
if len(sys.argv) < 2:
print "Usage :"
print " subJobs.py [job_description_file]"
print ""
print "The format of argument file :"
print " >[job_name]:[err_file]:[log_file]"
print " [programs_to_execute]"
print ""
print "Example :"
print " $ cat jobs"
print " >serial1:serial1.err:serial2.log"
print " $PBS_O_WORKDIR/myjob1"
print " >serial2::"
print " $PBS_O_WORKDIR/another_job"
print ""
print "If [err_file] or [log_file] is empty, [job_name].pbs.err or"
print "[job_name].pbs.log is used."
for i in range(1,len(sys.argv),1):
job_desc = readJobs(sys.argv[i])
for j in range(len(job_desc)):
job_id = submit(makeScript(job_desc[j]))
# unittest.main()
테스트에 사용된 testJobs는 다음과 같다.
>job1:job1.err:job1.log
echo Running on host `hostname`
cd $PBS_O_WORKDIR
pwd
>job2::
date
echo Running on host `hostname`
cd $PBS_O_WORKDIR
pwd
>job2::
date
TRACKBACK ADDRESS :: http://bionote.net/tt/blna999/trackback/33
