Skip to content

Instantly share code, notes, and snippets.

@juztas
Created November 14, 2023 20:25
Show Gist options
  • Select an option

  • Save juztas/6a5aa0c632d68a4786f6c96ce234dbd3 to your computer and use it in GitHub Desktop.

Select an option

Save juztas/6a5aa0c632d68a4786f6c96ce234dbd3 to your computer and use it in GitHub Desktop.
condor.py
#!/usr/bin/env python3
import os
import re
import time
import atexit
import pathlib
import copy
import glob
import subprocess
import datetime
import classad
import logging
from logging.handlers import TimedRotatingFileHandler
import yaml
PID = os.getpid()
KEYS = ["EnteredCurrentStatus", "accounting_group", "CMS_Type", "CMS_JobType", "CpusProvisioned", "DESIRED_CMSDataLocations",
"DESIRED_Sites", "DESIRED_CMSDataset", "JobCpus", "JobCurrentStartDate", "JOB_GLIDEIN_ToDie", "JOB_GLIDEIN_ToRetire",
"RequestCpus", "RequestMemory", "MaxWallTimeMins", "WMAgent_RequestName", "CRAB_ReqName", "CRAB_AsyncDest", "CRAB_UserHN",
"RemoteHost", "Owner", "RequestWalltime"]
OTHER_KEYS = ['DataLocCount', 'DataLocal', 'SitesCount', 'CaltechWhitelisted', 'MINIAODJob', 'JobOverRunTime', 'JobOverMemUse', 'MemoryUsage']
def getLogger(logFile='/var/log/condor/', logLevel='DEBUG', logOutName='cron.log', rotateTime='midnight', backupCount=10):
""" Get new Logger for logging """
levels = {'FATAL': logging.FATAL,
'ERROR': logging.ERROR,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG}
logger = logging.getLogger()
logFile += logOutName
handler = TimedRotatingFileHandler(logFile, when=rotateTime, backupCount=backupCount)
formatter = logging.Formatter("%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - %(message)s",
datefmt="%a, %d %b %Y %H:%M:%S")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(levels[logLevel])
return logger
def bashCall(command):
""" Call bash command and return out """
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
output, error = process.communicate()
return output, error
def getSystemInfo():
"""
Return all system info we want explicitly add to classads
For now only cpu model.
"""
retOut = {}
out, _err = bashCall('lscpu | grep "Model name:"')
out = out[11:].strip().split(b'\n')[0]
retOut['CPU_MODEL'] = out
return retOut
def getBenchmarks(cpu):
out = {}
htFlag = ''
containerFlag = ''
if os.path.exists('/opt/maitenance/benchmarks.yaml'):
with open('/opt/maitenance/benchmarks.yaml', 'r') as fd:
benchmarks = yaml.load(fd, Loader=yaml.FullLoader)
benchmarks_cpu = "_".join(cpu.replace("(","").replace(")","").split())
if benchmarks_cpu in benchmarks:
for i in benchmarks[benchmarks_cpu]:
htFlag = 'Y' if i['Hyperthreading'] == True else 'N'
containerFlag = 'Y' if i['Container'] == True else 'N'
for benType in i['Values']:
out["_".join(["Benchmark", htFlag, containerFlag, benType])] = i['Values'][benType]
return out
def getLocation():
""" Return Location details from inventory file """
inventory = {}
location = {}
if os.path.exists('/etc/facter/facts.d/inventory.yaml'):
with open('/etc/facter/facts.d/inventory.yaml', 'r') as fd:
inventory = yaml.load(fd, Loader=yaml.FullLoader)
for key, val in inventory.items():
if key == 'locationrack':
location['Rack'] = val
if key == 'locationbuilding':
location['Building'] = val
if key == 'serverunit':
location['Unit'] = val
return location
def findMemoryUsage(remoteHostId):
""" Find memory usage of a condor job """
slotId = remoteHostId.split('@')[0]
out, _err = bashCall('ps auxf | grep "%s " | grep "condor_starter" | grep -v grep | grep -v "cit-gatekeeper" | grep -v "tier2"' % slotId)
out = out.split(b'\n')
firstline = list(filter(None, out[0].split(b' ')))
newProcID = firstline[1]
cmd = """ps --forest -o pid=,ppid=,rss=,pcpu=,pmem=,cmd= $(ps -e --no-header -o pid,ppid|awk -vp=%s 'function r(s){print s;s=a[s];while(s){sub(",","",s);t=s;sub(",.*","",t);sub("[0-9]+","",s);r(t)}}{a[$2]=a[$2]","$1}END{r(p)}')""" % int(newProcID)
out, _err = bashCall(cmd)
memUsage = 0
for line in out.split(b'\n'):
if not line:
continue
lineArgs = list(filter(None, line.split(b' ')))
memUsage += int(lineArgs[2])
return int(memUsage / 1024)
def finalizeJobInfo(jobIn):
""" Preset final job adds. Site in whitelist, data is at Site. Miniaod job, etc.. """
out = copy.deepcopy(jobIn)
# Few things to check.
# 1. Is T2_US_Caltech in DESIRED_CMSDataLocations
if 'DESIRED_CMSDataLocations' in out.keys():
del out['DESIRED_CMSDataLocations']
out['DataLocCount'] = len(jobIn['DESIRED_CMSDataLocations'].split(','))
out['DataLocal'] = True if 'T2_US_Caltech' in jobIn['DESIRED_CMSDataLocations'].split(',') else False
else:
out['DataLocCount'] = 0
out['DataLocal'] = False
# 2. Is T2_US_Caltech in DESIRED_Sites
if 'DESIRED_Sites' in out.keys():
del out['DESIRED_Sites']
out['SitesCount'] = len(jobIn['DESIRED_Sites'].split(','))
out['CaltechWhitelisted'] = True if 'T2_US_Caltech' in jobIn['DESIRED_Sites'].split(',') else False
# 3. Is Desireddataset contains *MINIAOD* regex
if 'DESIRED_CMSDataset' in out.keys():
m = re.match(r".*(MINIAOD).*", out['DESIRED_CMSDataset'])
if m and m.groups():
out['MINIAODJob'] = True
else:
out['MINIAODJob'] = False
# 4. Check if job is running longer than supposed to
if 'JobCurrentStartDate' in out.keys() and 'MaxWallTimeMins' in out.keys():
maxruntime = out['JobCurrentStartDate'] + (out['MaxWallTimeMins'] * 60)
timenow = int(time.time())
if maxruntime <= timenow:
out['JobOverRunTime'] = True
else:
out['JobOverRunTime'] = False
# 5. Check if Job is using more memory than requested.
if 'RequestMemory' in out.keys() and 'MemoryUsage' in out.keys():
if int(out['RequestMemory']) < int(out['MemoryUsage']):
out['JobOverMemUse'] = True
else:
out['JobOverMemUse'] = False
return out
def validateInt(numb):
""" Validate integer which is used for ExprTree
"""
try:
dummyOut = int(numb)
return dummyOut
except TypeError:
return int(numb.eval())
def readAdFile(adFile, scanIndex):
"""Read CMS Job Ad and find out params from it """
jOut = []
with open(adFile, encoding='utf-8') as fd:
jobAds = classad.parseAds(fd)
for jobAd in jobAds:
oneJob = {}
for key in KEYS:
if key == 'RemoteHost':
memUse = 0
try:
memUse = findMemoryUsage(jobAd[key])
except IndexError as ex:
print('Got IndexError %s. Continue' % ex)
oneJob['MemoryUsage'] = memUse
continue
outval = ""
try:
# Production likes dynamically modify the job, so this needs to be validated to int based on job classads
if key in ['RequestMemory', 'MaxWallTimeMins', 'RequestCpus',
'JobCpus', 'JOB_GLIDEIN_ToDie', 'JOB_GLIDEIN_ToRetire']:
outval = 0 if key not in jobAd.keys() else validateInt(jobAd[key])
elif key in ['Owner', 'RequestWalltime']:
if scanIndex != 0 and key == 'Owner':
oneJob['accounting_group'] = jobAd[key]
elif scanIndex != 0 and key == 'RequestWalltime':
oneJob['MaxWallTimeMins'] = 0 if key not in jobAd.keys() else int(validateInt(jobAd[key]) / 60)
continue
elif key in 'DESIRED_CMSDataset' and jobAd[key] == classad.Value.Undefined:
continue
else:
outval = jobAd[key]
except KeyError:
continue
oneJob[key] = outval
jOut.append(finalizeJobInfo(oneJob))
return jOut
def lockRun():
""" Create file in tmp with pid """
if os.path.isfile('/tmp/condor-scan-lock.pid'):
raise Exception('Lock file is already in place')
with open('/tmp/condor-scan-lock.pid', 'w', encoding='utf-8') as fd:
fd.write(str(PID))
def unlockRun():
""" Unlock run if pid == what is in lock file"""
runPid = 0
if os.path.isfile('/tmp/condor-scan-lock.pid'):
with open('/tmp/condor-scan-lock.pid', 'r', encoding='utf-8') as fd:
try:
runPid = int(fd.read())
except ValueError:
runPid = PID
if PID == runPid:
os.remove('/tmp/condor-scan-lock.pid')
# It might happen that we still have file at this point.
# So we check if lock is older than 1hr - remove it.
if os.path.isfile('/tmp/condor-scan-lock.pid'):
fname = pathlib.Path('/tmp/condor-scan-lock.pid')
timenow = int(time.time())
if (timenow-int(fname.stat().st_mtime)) > 3600:
print('Stale lock file. Removing.')
os.remove('/tmp/condor-scan-lock.pid')
def writer(fd, key, lineout):
""" Split into max 4k Lines if needed and prepare classad vals """
if isinstance(lineout, int):
fd.write('%s = %s\n' % (key, lineout))
return
lineIn = str(lineout)
if len(lineIn) > 4000:
multOut = [lineIn[i:i+4000] for i in range(0, len(lineIn), 4000)]
counter = 1
tmpKeys = []
for line in multOut:
fd.write('%s%s = "%s"\n' % (key, counter, line))
tmpKeys.append('%s%s' % (key, counter))
counter += 1
fd.write('%s = strcat(%s)\n' % (key, ','.join(tmpKeys)))
else:
fd.write('%s = "%s"\n' % (key, lineout))
def printDate(appMsg=''):
now = datetime.datetime.now()
dtString = now.strftime("%d/%m/%Y %H:%M:%S")
print("%s Time: %s" % (appMsg, dtString))
def calculateTotal(lineout):
totalCount = 0
for item in lineout:
try:
totalCount += int(item)
except ValueError:
continue
return totalCount
def execute(logger):
""" Main loop via processes and prepare condor info """
printDate('START')
logger.info('Start processes and prepare condor info')
atexit.register(unlockRun)
lockRun()
with open('/tmp/condor-cron-localcheck-report.tmp.%s' % PID, 'w', encoding='utf-8') as fd:
fd.write('CMS_CONDOR_SCAN = %s\n' % int(time.time()))
parseDirs = ['/wntmp/condor/execute/*/*/*/*/.job.ad', '/wntmp/condor/execute/*/.job.ad']
nodeOut = []
jobCount = [0, 0]
for fdir in parseDirs:
allDirs = glob.glob(fdir)
scanIndex = parseDirs.index(fdir)
jobCount[scanIndex] = len(allDirs)
if scanIndex == 1 and jobCount[0] > 0:
# This means that it is not local job and we dont need pilot information
continue
for adFile in glob.glob(fdir):
if not os.path.isfile(adFile):
continue
jobOut = []
try:
jobOut = readAdFile(adFile, scanIndex)
except IOError as ex:
print('Got Exception %s' % ex)
continue
for item in jobOut:
nodeOut.append(item)
for key in list(KEYS + OTHER_KEYS):
if key in ['DESIRED_CMSDataLocations', 'DESIRED_Sites', 'RemoteHost']:
# We are not putting this info for each job. This is too much to report and no use
continue
lineout = []
for inD in nodeOut:
if key in inD.keys():
lineout.append(str(inD[key]))
else:
lineout.append('NOT_SET')
writer(fd, key, lineout)
if key in ['JobCpus', 'CpusProvisioned', 'MemoryUsage', 'RequestCpus', 'RequestMemory']:
totalSum = calculateTotal(lineout)
writer(fd, 'Total%s' % key, totalSum)
if logger:
logger.info('Written keys into file /tmp/condor-cron-localcheck-report')
logger.info('CMS_CONDOR_FAILURE = FALSE')
fd.write('CMS_CONDOR_FAILURE = FALSE\n')
tmpOut = getSystemInfo()
for key, val in tmpOut.items():
fd.write('%s = "%s"\n' % (key, val.decode("utf-8")))
location = getLocation()
for key, val in location.items():
fd.write('%s = "%s"\n' % (key, val))
benchmarks = getBenchmarks(tmpOut['CPU_MODEL'].decode("utf-8"))
for key,val in benchmarks.items():
fd.write('%s = "%s"\n' % (key, val))
printDate('START-WRAPPER')
logger.info('Start Wrapper to check cvmfs, ceph, node in drain, apptainer hello world')
# It will check cvmfs, ceph, node in drain, apptainer hello world
_out, _err = bashCall('/opt/dist/compute/condor_cron_localcheck_wrapper.sh %s' % PID)
output = _out.decode()
error = _err.decode()
logger.info(output)
logger.info(error)
printDate('END')
logger.info('End')
if __name__ == "__main__":
LOGGER = getLogger()
execute(LOGGER)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment