Created
November 14, 2023 20:25
-
-
Save juztas/6a5aa0c632d68a4786f6c96ce234dbd3 to your computer and use it in GitHub Desktop.
condor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import os | |
| import re | |
| import time | |
| import atexit | |
| import pathlib | |
| import copy | |
| import glob | |
| import subprocess | |
| import datetime | |
| import classad | |
| import logging | |
| from logging.handlers import TimedRotatingFileHandler | |
| import yaml | |
| PID = os.getpid() | |
| KEYS = ["EnteredCurrentStatus", "accounting_group", "CMS_Type", "CMS_JobType", "CpusProvisioned", "DESIRED_CMSDataLocations", | |
| "DESIRED_Sites", "DESIRED_CMSDataset", "JobCpus", "JobCurrentStartDate", "JOB_GLIDEIN_ToDie", "JOB_GLIDEIN_ToRetire", | |
| "RequestCpus", "RequestMemory", "MaxWallTimeMins", "WMAgent_RequestName", "CRAB_ReqName", "CRAB_AsyncDest", "CRAB_UserHN", | |
| "RemoteHost", "Owner", "RequestWalltime"] | |
| OTHER_KEYS = ['DataLocCount', 'DataLocal', 'SitesCount', 'CaltechWhitelisted', 'MINIAODJob', 'JobOverRunTime', 'JobOverMemUse', 'MemoryUsage'] | |
| def getLogger(logFile='/var/log/condor/', logLevel='DEBUG', logOutName='cron.log', rotateTime='midnight', backupCount=10): | |
| """ Get new Logger for logging """ | |
| levels = {'FATAL': logging.FATAL, | |
| 'ERROR': logging.ERROR, | |
| 'WARNING': logging.WARNING, | |
| 'INFO': logging.INFO, | |
| 'DEBUG': logging.DEBUG} | |
| logger = logging.getLogger() | |
| logFile += logOutName | |
| handler = TimedRotatingFileHandler(logFile, when=rotateTime, backupCount=backupCount) | |
| formatter = logging.Formatter("%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - %(message)s", | |
| datefmt="%a, %d %b %Y %H:%M:%S") | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(levels[logLevel]) | |
| return logger | |
| def bashCall(command): | |
| """ Call bash command and return out """ | |
| process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) | |
| output, error = process.communicate() | |
| return output, error | |
| def getSystemInfo(): | |
| """ | |
| Return all system info we want explicitly add to classads | |
| For now only cpu model. | |
| """ | |
| retOut = {} | |
| out, _err = bashCall('lscpu | grep "Model name:"') | |
| out = out[11:].strip().split(b'\n')[0] | |
| retOut['CPU_MODEL'] = out | |
| return retOut | |
| def getBenchmarks(cpu): | |
| out = {} | |
| htFlag = '' | |
| containerFlag = '' | |
| if os.path.exists('/opt/maitenance/benchmarks.yaml'): | |
| with open('/opt/maitenance/benchmarks.yaml', 'r') as fd: | |
| benchmarks = yaml.load(fd, Loader=yaml.FullLoader) | |
| benchmarks_cpu = "_".join(cpu.replace("(","").replace(")","").split()) | |
| if benchmarks_cpu in benchmarks: | |
| for i in benchmarks[benchmarks_cpu]: | |
| htFlag = 'Y' if i['Hyperthreading'] == True else 'N' | |
| containerFlag = 'Y' if i['Container'] == True else 'N' | |
| for benType in i['Values']: | |
| out["_".join(["Benchmark", htFlag, containerFlag, benType])] = i['Values'][benType] | |
| return out | |
| def getLocation(): | |
| """ Return Location details from inventory file """ | |
| inventory = {} | |
| location = {} | |
| if os.path.exists('/etc/facter/facts.d/inventory.yaml'): | |
| with open('/etc/facter/facts.d/inventory.yaml', 'r') as fd: | |
| inventory = yaml.load(fd, Loader=yaml.FullLoader) | |
| for key, val in inventory.items(): | |
| if key == 'locationrack': | |
| location['Rack'] = val | |
| if key == 'locationbuilding': | |
| location['Building'] = val | |
| if key == 'serverunit': | |
| location['Unit'] = val | |
| return location | |
| def findMemoryUsage(remoteHostId): | |
| """ Find memory usage of a condor job """ | |
| slotId = remoteHostId.split('@')[0] | |
| out, _err = bashCall('ps auxf | grep "%s " | grep "condor_starter" | grep -v grep | grep -v "cit-gatekeeper" | grep -v "tier2"' % slotId) | |
| out = out.split(b'\n') | |
| firstline = list(filter(None, out[0].split(b' '))) | |
| newProcID = firstline[1] | |
| cmd = """ps --forest -o pid=,ppid=,rss=,pcpu=,pmem=,cmd= $(ps -e --no-header -o pid,ppid|awk -vp=%s 'function r(s){print s;s=a[s];while(s){sub(",","",s);t=s;sub(",.*","",t);sub("[0-9]+","",s);r(t)}}{a[$2]=a[$2]","$1}END{r(p)}')""" % int(newProcID) | |
| out, _err = bashCall(cmd) | |
| memUsage = 0 | |
| for line in out.split(b'\n'): | |
| if not line: | |
| continue | |
| lineArgs = list(filter(None, line.split(b' '))) | |
| memUsage += int(lineArgs[2]) | |
| return int(memUsage / 1024) | |
| def finalizeJobInfo(jobIn): | |
| """ Preset final job adds. Site in whitelist, data is at Site. Miniaod job, etc.. """ | |
| out = copy.deepcopy(jobIn) | |
| # Few things to check. | |
| # 1. Is T2_US_Caltech in DESIRED_CMSDataLocations | |
| if 'DESIRED_CMSDataLocations' in out.keys(): | |
| del out['DESIRED_CMSDataLocations'] | |
| out['DataLocCount'] = len(jobIn['DESIRED_CMSDataLocations'].split(',')) | |
| out['DataLocal'] = True if 'T2_US_Caltech' in jobIn['DESIRED_CMSDataLocations'].split(',') else False | |
| else: | |
| out['DataLocCount'] = 0 | |
| out['DataLocal'] = False | |
| # 2. Is T2_US_Caltech in DESIRED_Sites | |
| if 'DESIRED_Sites' in out.keys(): | |
| del out['DESIRED_Sites'] | |
| out['SitesCount'] = len(jobIn['DESIRED_Sites'].split(',')) | |
| out['CaltechWhitelisted'] = True if 'T2_US_Caltech' in jobIn['DESIRED_Sites'].split(',') else False | |
| # 3. Is Desireddataset contains *MINIAOD* regex | |
| if 'DESIRED_CMSDataset' in out.keys(): | |
| m = re.match(r".*(MINIAOD).*", out['DESIRED_CMSDataset']) | |
| if m and m.groups(): | |
| out['MINIAODJob'] = True | |
| else: | |
| out['MINIAODJob'] = False | |
| # 4. Check if job is running longer than supposed to | |
| if 'JobCurrentStartDate' in out.keys() and 'MaxWallTimeMins' in out.keys(): | |
| maxruntime = out['JobCurrentStartDate'] + (out['MaxWallTimeMins'] * 60) | |
| timenow = int(time.time()) | |
| if maxruntime <= timenow: | |
| out['JobOverRunTime'] = True | |
| else: | |
| out['JobOverRunTime'] = False | |
| # 5. Check if Job is using more memory than requested. | |
| if 'RequestMemory' in out.keys() and 'MemoryUsage' in out.keys(): | |
| if int(out['RequestMemory']) < int(out['MemoryUsage']): | |
| out['JobOverMemUse'] = True | |
| else: | |
| out['JobOverMemUse'] = False | |
| return out | |
| def validateInt(numb): | |
| """ Validate integer which is used for ExprTree | |
| """ | |
| try: | |
| dummyOut = int(numb) | |
| return dummyOut | |
| except TypeError: | |
| return int(numb.eval()) | |
| def readAdFile(adFile, scanIndex): | |
| """Read CMS Job Ad and find out params from it """ | |
| jOut = [] | |
| with open(adFile, encoding='utf-8') as fd: | |
| jobAds = classad.parseAds(fd) | |
| for jobAd in jobAds: | |
| oneJob = {} | |
| for key in KEYS: | |
| if key == 'RemoteHost': | |
| memUse = 0 | |
| try: | |
| memUse = findMemoryUsage(jobAd[key]) | |
| except IndexError as ex: | |
| print('Got IndexError %s. Continue' % ex) | |
| oneJob['MemoryUsage'] = memUse | |
| continue | |
| outval = "" | |
| try: | |
| # Production likes dynamically modify the job, so this needs to be validated to int based on job classads | |
| if key in ['RequestMemory', 'MaxWallTimeMins', 'RequestCpus', | |
| 'JobCpus', 'JOB_GLIDEIN_ToDie', 'JOB_GLIDEIN_ToRetire']: | |
| outval = 0 if key not in jobAd.keys() else validateInt(jobAd[key]) | |
| elif key in ['Owner', 'RequestWalltime']: | |
| if scanIndex != 0 and key == 'Owner': | |
| oneJob['accounting_group'] = jobAd[key] | |
| elif scanIndex != 0 and key == 'RequestWalltime': | |
| oneJob['MaxWallTimeMins'] = 0 if key not in jobAd.keys() else int(validateInt(jobAd[key]) / 60) | |
| continue | |
| elif key in 'DESIRED_CMSDataset' and jobAd[key] == classad.Value.Undefined: | |
| continue | |
| else: | |
| outval = jobAd[key] | |
| except KeyError: | |
| continue | |
| oneJob[key] = outval | |
| jOut.append(finalizeJobInfo(oneJob)) | |
| return jOut | |
| def lockRun(): | |
| """ Create file in tmp with pid """ | |
| if os.path.isfile('/tmp/condor-scan-lock.pid'): | |
| raise Exception('Lock file is already in place') | |
| with open('/tmp/condor-scan-lock.pid', 'w', encoding='utf-8') as fd: | |
| fd.write(str(PID)) | |
| def unlockRun(): | |
| """ Unlock run if pid == what is in lock file""" | |
| runPid = 0 | |
| if os.path.isfile('/tmp/condor-scan-lock.pid'): | |
| with open('/tmp/condor-scan-lock.pid', 'r', encoding='utf-8') as fd: | |
| try: | |
| runPid = int(fd.read()) | |
| except ValueError: | |
| runPid = PID | |
| if PID == runPid: | |
| os.remove('/tmp/condor-scan-lock.pid') | |
| # It might happen that we still have file at this point. | |
| # So we check if lock is older than 1hr - remove it. | |
| if os.path.isfile('/tmp/condor-scan-lock.pid'): | |
| fname = pathlib.Path('/tmp/condor-scan-lock.pid') | |
| timenow = int(time.time()) | |
| if (timenow-int(fname.stat().st_mtime)) > 3600: | |
| print('Stale lock file. Removing.') | |
| os.remove('/tmp/condor-scan-lock.pid') | |
| def writer(fd, key, lineout): | |
| """ Split into max 4k Lines if needed and prepare classad vals """ | |
| if isinstance(lineout, int): | |
| fd.write('%s = %s\n' % (key, lineout)) | |
| return | |
| lineIn = str(lineout) | |
| if len(lineIn) > 4000: | |
| multOut = [lineIn[i:i+4000] for i in range(0, len(lineIn), 4000)] | |
| counter = 1 | |
| tmpKeys = [] | |
| for line in multOut: | |
| fd.write('%s%s = "%s"\n' % (key, counter, line)) | |
| tmpKeys.append('%s%s' % (key, counter)) | |
| counter += 1 | |
| fd.write('%s = strcat(%s)\n' % (key, ','.join(tmpKeys))) | |
| else: | |
| fd.write('%s = "%s"\n' % (key, lineout)) | |
| def printDate(appMsg=''): | |
| now = datetime.datetime.now() | |
| dtString = now.strftime("%d/%m/%Y %H:%M:%S") | |
| print("%s Time: %s" % (appMsg, dtString)) | |
| def calculateTotal(lineout): | |
| totalCount = 0 | |
| for item in lineout: | |
| try: | |
| totalCount += int(item) | |
| except ValueError: | |
| continue | |
| return totalCount | |
| def execute(logger): | |
| """ Main loop via processes and prepare condor info """ | |
| printDate('START') | |
| logger.info('Start processes and prepare condor info') | |
| atexit.register(unlockRun) | |
| lockRun() | |
| with open('/tmp/condor-cron-localcheck-report.tmp.%s' % PID, 'w', encoding='utf-8') as fd: | |
| fd.write('CMS_CONDOR_SCAN = %s\n' % int(time.time())) | |
| parseDirs = ['/wntmp/condor/execute/*/*/*/*/.job.ad', '/wntmp/condor/execute/*/.job.ad'] | |
| nodeOut = [] | |
| jobCount = [0, 0] | |
| for fdir in parseDirs: | |
| allDirs = glob.glob(fdir) | |
| scanIndex = parseDirs.index(fdir) | |
| jobCount[scanIndex] = len(allDirs) | |
| if scanIndex == 1 and jobCount[0] > 0: | |
| # This means that it is not local job and we dont need pilot information | |
| continue | |
| for adFile in glob.glob(fdir): | |
| if not os.path.isfile(adFile): | |
| continue | |
| jobOut = [] | |
| try: | |
| jobOut = readAdFile(adFile, scanIndex) | |
| except IOError as ex: | |
| print('Got Exception %s' % ex) | |
| continue | |
| for item in jobOut: | |
| nodeOut.append(item) | |
| for key in list(KEYS + OTHER_KEYS): | |
| if key in ['DESIRED_CMSDataLocations', 'DESIRED_Sites', 'RemoteHost']: | |
| # We are not putting this info for each job. This is too much to report and no use | |
| continue | |
| lineout = [] | |
| for inD in nodeOut: | |
| if key in inD.keys(): | |
| lineout.append(str(inD[key])) | |
| else: | |
| lineout.append('NOT_SET') | |
| writer(fd, key, lineout) | |
| if key in ['JobCpus', 'CpusProvisioned', 'MemoryUsage', 'RequestCpus', 'RequestMemory']: | |
| totalSum = calculateTotal(lineout) | |
| writer(fd, 'Total%s' % key, totalSum) | |
| if logger: | |
| logger.info('Written keys into file /tmp/condor-cron-localcheck-report') | |
| logger.info('CMS_CONDOR_FAILURE = FALSE') | |
| fd.write('CMS_CONDOR_FAILURE = FALSE\n') | |
| tmpOut = getSystemInfo() | |
| for key, val in tmpOut.items(): | |
| fd.write('%s = "%s"\n' % (key, val.decode("utf-8"))) | |
| location = getLocation() | |
| for key, val in location.items(): | |
| fd.write('%s = "%s"\n' % (key, val)) | |
| benchmarks = getBenchmarks(tmpOut['CPU_MODEL'].decode("utf-8")) | |
| for key,val in benchmarks.items(): | |
| fd.write('%s = "%s"\n' % (key, val)) | |
| printDate('START-WRAPPER') | |
| logger.info('Start Wrapper to check cvmfs, ceph, node in drain, apptainer hello world') | |
| # It will check cvmfs, ceph, node in drain, apptainer hello world | |
| _out, _err = bashCall('/opt/dist/compute/condor_cron_localcheck_wrapper.sh %s' % PID) | |
| output = _out.decode() | |
| error = _err.decode() | |
| logger.info(output) | |
| logger.info(error) | |
| printDate('END') | |
| logger.info('End') | |
| if __name__ == "__main__": | |
| LOGGER = getLogger() | |
| execute(LOGGER) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment