Last active
October 22, 2024 21:41
-
-
Save nobody43/7a21f518dbebdd48a3c2e7af7934a63b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # SPDX-License-Identifier: GPL-2.0-only | |
| import sys | |
| import argparse | |
| import pathlib | |
| import shlex | |
| import re | |
| import json | |
| from copy import deepcopy | |
| def sanitizeProfileName(name): | |
| if name.startswith('/') or name.startswith('@{'): | |
| name = pathlib.Path(name).stem | |
| if ' ' in name: | |
| name = re.sub(r'\s+', '-', name) | |
| return name | |
| def makeLocalIdentity(nestingStacker_): | |
| newStacker = [] | |
| for i in nestingStacker_: | |
| i = sanitizeProfileName(i) | |
| newStacker.append(i) | |
| identity = '_'.join(newStacker) # separate each (sub)profile identity with underscores | |
| return identity | |
| def readApparmorFile(fullpath): | |
| '''AA file could contain multiple AA profiles''' | |
| file_data = {} | |
| fileVars = {} | |
| nestingStacker = [] | |
| duplicateProfilesCounter = [] | |
| localExists = {} | |
| localExists_eol = {} | |
| messages = [] | |
| isInsideProfile = False | |
| try: | |
| with open(fullpath, 'r') as f: | |
| for n,line in enumerate(f): | |
| nL = n + 1 # actual line number | |
| if RE_PROFILE_START.search(line) or RE_PROFILE_HAT_DEF.search(line): | |
| isInsideProfile = True | |
| m = parse_profile_start_line(line, fullpath) | |
| msg = {'filename': fullpath, | |
| 'profile': m.get('profile'), | |
| 'severity': 'WARNING', | |
| 'line': nL, | |
| 'reason': "A short named profile must be defined", | |
| 'suggestion': None} | |
| if m.get('plainprofile'): | |
| messages.append(msg) | |
| if m.get('namedprofile'): | |
| if '/' in m.get('namedprofile'): | |
| messages.append(msg) | |
| if m.get('flags'): | |
| m['flags'] = set(shlex.split(m.pop('flags').replace(',', ''))) | |
| else: | |
| m['flags'] = set() | |
| if m.get('profile'): | |
| duplicateProfilesCounter.append(m.get('profile')) | |
| nestingStacker.append(m.get('profile')) | |
| profileIdentity = '//'.join(nestingStacker) | |
| file_data[profileIdentity] = m | |
| elif RE_PROFILE_VARIABLE.search(line): | |
| m = RE_PROFILE_VARIABLE.search(line).groups() | |
| name = strip_quotes(m[0]) | |
| operation = m[1] | |
| val = separate_vars(m[2]) | |
| if fileVars.get(name): | |
| fileVars[name].update(set(val)) | |
| if operation == '=': | |
| messages.append({'filename': fullpath, | |
| 'profile': m.get('profile'), | |
| 'severity': 'DEGRADED', | |
| 'line': nL, | |
| 'reason': "Tunable must be appended with '+='", | |
| 'suggestion': None}) | |
| else: | |
| fileVars[name] = set(val) | |
| if operation == '+=': | |
| messages.append({'filename': fullpath, | |
| 'profile': m.get('profile'), | |
| 'severity': 'DEGRADED', | |
| 'line': nL, | |
| 'reason': "Tunable must be defined with '='", | |
| 'suggestion': None}) | |
| elif RE_INCLUDE.search(line): | |
| if nestingStacker: | |
| profileIdentity = '//'.join(nestingStacker) | |
| localIdentity = makeLocalIdentity(nestingStacker) | |
| localValue = f'include if exists <local/{localIdentity}>' # commented out will also match | |
| if localValue in line: | |
| localExists[profileIdentity] = localValue | |
| # Handle file entries | |
| # elif RE_PROFILE_FILE_ENTRY.search(line): | |
| # m = RE_PROFILE_FILE_ENTRY.search(line) | |
| # if nestingStacker: | |
| # currentProfile = nestingStacker[-1] | |
| # print(currentProfile, m.groupdict()) | |
| elif RE_PROFILE_END.search(line): | |
| if isInsideProfile: | |
| if not nestingStacker: | |
| messages.append({'filename': fullpath, | |
| 'profile': None, | |
| 'severity': 'DEGRADED', | |
| 'line': nL, | |
| 'reason': "Unbalanced parenthesis?", # not fully covered | |
| 'suggestion': None}) | |
| else: | |
| profileIdentity = '//'.join(nestingStacker) | |
| localExists_eol[profileIdentity] = nL | |
| del nestingStacker[-1] | |
| isInsideProfile = False | |
| except PermissionError: | |
| messages.append({'filename': fullpath, | |
| 'profile': None, | |
| 'severity': 'ERROR', | |
| 'line': None, | |
| 'reason': "Unable to read the file (PermissionError)", | |
| 'suggestion': None}) | |
| except UnicodeDecodeError: | |
| messages.append({'filename': fullpath, | |
| 'profile': None, | |
| 'severity': 'ERROR', | |
| 'line': None, | |
| 'reason': "Unable to read the file (UnicodeDecodeError)", | |
| 'suggestion': None}) | |
| except FileNotFoundError: | |
| messages.append({'filename': fullpath, | |
| 'profile': None, | |
| 'severity': 'ERROR', | |
| 'line': None, | |
| 'reason': "No such file or directory (FileNotFoundError)", | |
| 'suggestion': None}) | |
| # Assign variables to profile attachments as paths and assign filenames | |
| for p,d in deepcopy(file_data).items(): | |
| file_data[p]['filename'] = fullpath | |
| attachment = d.get('attachment') | |
| if attachment: | |
| if attachment.startswith('@{'): | |
| if fileVars.get(attachment): | |
| file_data[p]['attach_paths'] = fileVars[attachment] # incoming set | |
| else: | |
| messages.append({'filename': fullpath, | |
| 'profile': p, | |
| 'severity': 'ERROR', | |
| 'line': None, | |
| 'reason': f"Unknown global variable as profile attachment: {attachment}", | |
| 'suggestion': None}) | |
| else: | |
| if isinstance(file_data[p].get('attachment'), set): | |
| raise ValueError("Expecting 'str' or 'None', not 'set'") | |
| file_data[p]['attach_paths'] = {file_data[p]['attachment']} | |
| # Check if profile block does not have corresponding 'local' include | |
| for p,d in file_data.items(): | |
| if not localExists.get(p): # not found previously | |
| if '//' in p: | |
| identity = p.split('//') | |
| else: | |
| identity = [p] | |
| localIdentity = makeLocalIdentity(identity) | |
| filename = file_data[p]['filename'] | |
| messages.append({'filename': filename, | |
| 'profile': p, | |
| 'severity': 'WARNING', | |
| 'line': localExists_eol.get(p), # None? Unbalanced parenthesis? | |
| 'reason': "The (sub)profile block does not have expected 'local' include", | |
| 'suggestion': f'include if exists <local/{localIdentity}>'}) | |
| # Track multiple definitions inside single file | |
| for profile in duplicateProfilesCounter: | |
| counter = duplicateProfilesCounter.count(profile) | |
| if counter >= 2: | |
| messages.append({'filename': fullpath, | |
| 'profile': profile, | |
| 'severity': 'DEGRADED', | |
| 'line': None, | |
| 'reason': "Profile has been defined {counter} times in the same file", | |
| 'suggestion': None}) | |
| return (messages, file_data) | |
| def findAllProfileFilenames(profile_dir): | |
| profiles = set() | |
| for path in pathlib.Path(profile_dir).iterdir(): | |
| if path.is_file() and not is_skippable_file(path): | |
| profiles.add(path.resolve()) | |
| # Not default, dig deeper | |
| if not profiles: | |
| nestedDirs = ( | |
| 'groups', | |
| 'profiles-a-f', | |
| 'profiles-g-l', | |
| 'profiles-m-r', | |
| 'profiles-s-z', | |
| ) | |
| for d in nestedDirs: | |
| dirpath = pathlib.Path(pathlib.Path(profile_dir).resolve(), pathlib.Path(d)) | |
| for p in dirpath.rglob("*"): | |
| if p.is_file(): | |
| profiles.add(p) | |
| return profiles | |
| def handleArgs(): | |
| """DEGRADED are purposed for fatal errors - when the profile set will fail to load entirely""" | |
| allSeverities = ['DEBUG', 'NOTICE', 'WARNING', 'ERROR', 'CRITICAL', 'DEGRADED'] | |
| aaRoot = '/etc/apparmor.d' | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('-d', '--aa-root-dir', action='store', | |
| default=aaRoot, | |
| help='Target different AppArmor root directory rather than default') | |
| parser.add_argument('-p', '--profile', action='append', | |
| help='Handle only specified profile') | |
| # parser.add_argument('-s', '--severity', action='append', | |
| # choices=allSeverities, | |
| # help='Handle only specified severity event') | |
| args = parser.parse_args() | |
| # if not args.severity: | |
| # args.severity = allSeverities | |
| return args | |
| def main(argv): | |
| args = handleArgs() | |
| messages = [] | |
| profile_dir = args.aa_root_dir | |
| if not args.profile: | |
| profiles = findAllProfileFilenames(profile_dir) | |
| else: | |
| profiles = set() | |
| for p in args.profile: | |
| absolutePath = pathlib.Path(p).resolve() | |
| profiles.add(absolutePath) | |
| profile_data = {} | |
| for path in sorted(profiles): | |
| readApparmorFile_Out = readApparmorFile(path) | |
| profilesInFile = readApparmorFile_Out[1] | |
| messages.extend(readApparmorFile_Out[0]) | |
| profile_data.update(profilesInFile) | |
| for m in messages: | |
| m['filename'] = str(m.get('filename')) | |
| print(json.dumps(m, indent=2)) | |
| if messages: | |
| sys.exit(1) | |
| return None | |
| if __name__ == '__main__': | |
| try: | |
| from apparmor.regex import * | |
| from apparmor.aare import AARE | |
| from apparmor.aa import is_skippable_file | |
| from apparmor.rule.file import FileRule, FileRuleset | |
| from apparmor.common import convert_regexp | |
| try: | |
| from apparmor.rule.variable import separate_vars | |
| except ModuleNotFoundError: | |
| from apparmor.aa import separate_vars | |
| except ModuleNotFoundError: | |
| raise ModuleNotFoundError(f"""Can't find 'python3-apparmor' package! Install with: | |
| $ sudo apt install python3-apparmor""") | |
| main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment