diff --git a/Android.bp b/Android.bp new file mode 100644 index 0000000000000000000000000000000000000000..9952a8fd5d9c2abe14ee72bf51426a070cdcc826 --- /dev/null +++ b/Android.bp @@ -0,0 +1 @@ +subdirs = ["tests"] diff --git a/tests/Android.bp b/tests/Android.bp new file mode 100644 index 0000000000000000000000000000000000000000..87afab831a3a1a12966ca787048d003bc39d1e83 --- /dev/null +++ b/tests/Android.bp @@ -0,0 +1,7 @@ +cc_library_host_shared { + name: "libsepolwrap", + srcs: ["sepol_wrap.cpp"], + shared_libs: ["libbase", "libsepol"], + cflags: ["-Wall", "-Werror",], + export_include_dirs: ["include"], +} diff --git a/tests/include/sepol_wrap.h b/tests/include/sepol_wrap.h new file mode 100644 index 0000000000000000000000000000000000000000..0683a3bdf6a0f9f885db3d6aafcdffff1d38248e --- /dev/null +++ b/tests/include/sepol_wrap.h @@ -0,0 +1,20 @@ + +#ifdef __cplusplus +extern "C" { +#endif + +int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp); +bool init_libsepol(const char *policy_path); +void *load_policy(const char *policy_path); +void destroy_policy(void *policydbp); +void *init_avtab(void *policydbp); +void *init_cond_avtab(void *policydbp); +void destroy_avtab(void *avtab_iterp); +int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp); +void *init_type_iter(void *policydbp, const char *type, bool is_attr); +void destroy_type_iter(void *type_iterp); + +#ifdef __cplusplus +} +#endif + diff --git a/tests/policy.py b/tests/policy.py new file mode 100644 index 0000000000000000000000000000000000000000..480faa23777e132d69381d016fe5f09af0e7d558 --- /dev/null +++ b/tests/policy.py @@ -0,0 +1,142 @@ +from ctypes import * +import re +import os + +class TERule: + def __init__(self, rule): + data = rule.split(',') + self.flavor = data[0] + self.sctx = data[1] + self.tctx = data[2] + self.tclass = data[3] + self.perms = set((data[4].strip()).split(' ')) + self.rule = rule + +class Policy: + __Rules = None + __FcDict = None + __libsepolwrap = None + __policydbP = None + + # Return all file_contexts entries that map to the input Type. + def QueryFc(self, Type): + if Type in self.__FcDict: + return self.__FcDict[Type] + else: + return None + + # Return all attributes associated with a type if IsAttr=False or + # all types associated with an attribute if IsAttr=True + def QueryTypeAttribute(self, Type, IsAttr): + TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, + create_string_buffer(Type), c_bool(IsAttr)) + if (TypeIterP == None): + sys.exit("Failed to initialize type iterator") + buf = create_string_buffer(2048) + + while True: + ret = self.__libsepolwrap.get_type(buf, c_int(2048), + self.__policydbP, TypeIterP) + if ret == 0: + yield buf.value + continue + if ret == 1: + break; + # We should never get here. + sys.exit("Failed to import policy") + self.__libsepolwrap.destroy_type_iter(TypeIterP) + + # Return all TERules that match: + # (any scontext) or (any tcontext) or (any tclass) or (any perms), + # perms. + # Any unspecified paramenter will match all. + # + # Example: QueryTERule(tcontext=["foo", "bar"], perms=["entrypoint"]) + # Will return any rule with: + # (tcontext="foo" or tcontext="bar") and ("entrypoint" in perms) + def QueryTERule(self, **kwargs): + if self.__Rules is None: + self.__InitTERules() + for Rule in self.__Rules: + # Match source type + if "scontext" in kwargs and Rule.sctx not in kwargs['scontext']: + continue + # Match target type + if "tcontext" in kwargs and Rule.tctx not in kwargs['tcontext']: + continue + # Match target class + if "tclass" in kwargs and Rule.tclass not in kwargs['tclass']: + continue + # Match any perms + if "perms" in kwargs and not bool(Rule.perms & set(kwargs['perms'])): + continue + yield Rule + + + def __GetTERules(self, policydbP, avtabIterP): + if self.__Rules is None: + self.__Rules = set() + buf = create_string_buffer(2048) + ret = 0 + while True: + ret = self.__libsepolwrap.get_allow_rule(buf, c_int(2048), policydbP, avtabIterP) + if ret == 0: + Rule = TERule(buf.value) + self.__Rules.add(Rule) + continue + if ret == 1: + break; + # We should never get here. + sys.exit("Failed to import policy") + + def __InitTERules(self): + avtabIterP = self.__libsepolwrap.init_avtab(self.__policydbP) + if (avtabIterP == None): + sys.exit("Failed to initialize avtab") + self.__GetTERules(self.__policydbP, avtabIterP) + self.__libsepolwrap.destroy_avtab(avtabIterP) + avtabIterP = self.__libsepolwrap.init_cond_avtab(self.__policydbP) + if (avtabIterP == None): + sys.exit("Failed to initialize conditional avtab") + self.__GetTERules(self.__policydbP, avtabIterP) + self.__libsepolwrap.destroy_avtab(avtabIterP) + + # load ctypes-ified libsepol wrapper + def __InitLibsepolwrap(self): + self.__libsepolwrap = CDLL("libsepolwrap.so") + + # load file_contexts + def __InitFC(self, FcPaths): + fc = [] + for path in FcPaths: + if not os.path.exists(path): + sys.exit("file_contexts file " + path + " does not exist.") + fd = open(path, "r") + fc += fd.readlines() + fd.close() + self.__FcDict = {} + for i in fc: + rec = i.split() + try: + t = rec[-1].split(":")[2] + if t in self.__FcDict: + self.__FcDict[t].append(rec[0]) + else: + self.__FcDict[t] = [rec[0]] + except: + pass + + # load policy + def __InitPolicy(self, PolicyPath): + self.__policydbP = self.__libsepolwrap.load_policy(create_string_buffer(PolicyPath)) + if (self.__policydbP is None): + sys.exit("Failed to load policy") + + def __init__(self, PolicyPath, FcPaths): + self.__InitLibsepolwrap() + self.__InitFC(FcPaths) + self.__InitPolicy(PolicyPath) + + def __del__(self): + if self.__policydbP is not None: + self.__libsepolwrap.destroy_policy(self.__policydbP) diff --git a/tests/sepol_wrap.cpp b/tests/sepol_wrap.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a12d4383df986b13b56972ccea8d9dcd1ee660ee --- /dev/null +++ b/tests/sepol_wrap.cpp @@ -0,0 +1,266 @@ +#include <stdio.h> +#include <string> +#include <sstream> +#include <stdlib.h> +#include <unistd.h> +#include <iostream> +#include <sys/mman.h> +#include <sys/stat.h> +#include <sepol/policydb/avtab.h> +#include <sepol/policydb/policydb.h> +#include <sepol/policydb/services.h> +#include <sepol/policydb/util.h> +#include <sys/types.h> +#include <fstream> + +#include <android-base/file.h> +#include <android-base/strings.h> +#include <sepol_wrap.h> + + +struct type_iter { + type_datum *d; + ebitmap_node *n; + unsigned int length; + unsigned int bit; +}; + +void *init_type_iter(void *policydbp, const char *type, bool is_attr) +{ + policydb_t *db = static_cast<policydb_t *>(policydbp); + struct type_iter *out = (struct type_iter *) + calloc(1, sizeof(struct type_iter)); + + if (!out) { + std::cerr << "Failed to allocate type type iterator" << std::endl; + return NULL; + } + + out->d = static_cast<type_datum *>(hashtab_search(db->p_types.table, type)); + if (is_attr && out->d->flavor != TYPE_ATTRIB) { + std::cerr << "\"" << type << "\" MUST be an attribute in the policy" << std::endl; + free(out); + return NULL; + } else if (!is_attr && out->d->flavor !=TYPE_TYPE) { + std::cerr << "\"" << type << "\" MUST be a type in the policy" << std::endl; + free(out); + return NULL; + } + + if (is_attr) { + out->bit = ebitmap_start(&db->attr_type_map[out->d->s.value - 1], &out->n); + out->length = ebitmap_length(&db->attr_type_map[out->d->s.value - 1]); + } else { + out->bit = ebitmap_start(&db->type_attr_map[out->d->s.value - 1], &out->n); + out->length = ebitmap_length(&db->type_attr_map[out->d->s.value - 1]); + } + + return static_cast<void *>(out); +} + +void destroy_type_iter(void *type_iterp) +{ + struct type_iter *type_i = static_cast<struct type_iter *>(type_iterp); + free(type_i); +} + +/* + * print allow rule into *out buffer. + * + * Returns -1 on error. + * Returns 0 on successfully reading an avtab entry. + * Returns 1 on complete + */ +int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp) +{ + size_t len; + policydb_t *db = static_cast<policydb_t *>(policydbp); + struct type_iter *i = static_cast<struct type_iter *>(type_iterp); + + for (; i->bit < i->length; i->bit = ebitmap_next(&i->n, i->bit)) { + if (!ebitmap_node_get_bit(i->n, i->bit)) { + continue; + } + len = snprintf(out, max_size, "%s", db->p_type_val_to_name[i->bit]); + if (len >= max_size) { + std::cerr << "type name exceeds buffer size." << std::endl; + return -1; + } + i->bit = ebitmap_next(&i->n, i->bit); + return 0; + } + + return 1; +} + +void *load_policy(const char *policy_path) +{ + FILE *fp; + policydb_t *db; + + fp = fopen(policy_path, "re"); + if (!fp) { + std::cerr << "Invalid or non-existing policy file: " << policy_path << std::endl; + return NULL; + } + + db = (policydb_t *) calloc(1, sizeof(policydb_t)); + if (!db) { + std::cerr << "Failed to allocate memory for policy db." << std::endl; + fclose(fp); + return NULL; + } + + sidtab_t sidtab; + sepol_set_sidtab(&sidtab); + sepol_set_policydb(db); + + struct stat sb; + if (fstat(fileno(fp), &sb)) { + std::cerr << "Failed to stat the policy file" << std::endl; + free(db); + fclose(fp); + return NULL; + } + + auto unmap = [=](void *ptr) { munmap(ptr, sb.st_size); }; + std::unique_ptr<void, decltype(unmap)> map( + mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fileno(fp), 0), unmap); + if (!map) { + std::cerr << "Failed to map the policy file" << std::endl; + free(db); + fclose(fp); + return NULL; + } + + struct policy_file pf; + policy_file_init(&pf); + pf.type = PF_USE_MEMORY; + pf.data = static_cast<char *>(map.get()); + pf.len = sb.st_size; + if (policydb_init(db)) { + std::cerr << "Failed to initialize policydb" << std::endl; + free(db); + fclose(fp); + return NULL; + } + + if (policydb_read(db, &pf, 0)) { + std::cerr << "Failed to read binary policy" << std::endl; + policydb_destroy(db); + free(db); + fclose(fp); + return NULL; + } + + return static_cast<void *>(db); +} + +/* items needed to iterate over the avtab */ +struct avtab_iter { + avtab_t avtab; + uint32_t i; + avtab_ptr_t cur; +}; + +/* + * print allow rule into *out buffer. + * + * Returns -1 on error. + * Returns 0 on successfully reading an avtab entry. + * Returns 1 on complete + */ +static int get_avtab_allow_rule(char *out, size_t max_size, policydb_t *db, + struct avtab_iter *avtab_i) +{ + size_t len; + + for (; avtab_i->i < avtab_i->avtab.nslot; (avtab_i->i)++) { + if (avtab_i->cur == NULL) { + avtab_i->cur = avtab_i->avtab.htable[avtab_i->i]; + } + for (; avtab_i->cur; avtab_i->cur = (avtab_i->cur)->next) { + if (!((avtab_i->cur)->key.specified & AVTAB_ALLOWED)) continue; + + len = snprintf(out, max_size, "allow,%s,%s,%s,%s", + db->p_type_val_to_name[(avtab_i->cur)->key.source_type - 1], + db->p_type_val_to_name[(avtab_i->cur)->key.target_type - 1], + db->p_class_val_to_name[(avtab_i->cur)->key.target_class - 1], + sepol_av_to_string(db, (avtab_i->cur)->key.target_class, (avtab_i->cur)->datum.data)); + avtab_i->cur = (avtab_i->cur)->next; + if (!(avtab_i->cur)) + (avtab_i->i)++; + if (len >= max_size) { + std::cerr << "Allow rule exceeds buffer size." << std::endl; + return -1; + } + return 0; + } + avtab_i->cur = NULL; + } + + return 1; +} + +int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp) +{ + policydb_t *db = static_cast<policydb_t *>(policydbp); + struct avtab_iter *avtab_i = static_cast<struct avtab_iter *>(avtab_iterp); + + return get_avtab_allow_rule(out, len, db, avtab_i); +} + +/* + * <sepol/policydb/expand.h->conditional.h> uses 'bool' as a variable name + * inside extern "C" { .. } construct, which clang doesn't like. + * So, declare the function we need from expand.h ourselves. + */ +extern "C" int expand_avtab(policydb_t *p, avtab_t *a, avtab_t *expa); + +static avtab_iter *init_avtab_common(avtab_t *in, policydb_t *p) +{ + struct avtab_iter *out = (struct avtab_iter *) + calloc(1, sizeof(struct avtab_iter)); + if (!out) { + std::cerr << "Failed to allocate avtab" << std::endl; + return NULL; + } + + if (avtab_init(&out->avtab)) { + std::cerr << "Failed to initialize avtab" << std::endl; + free(out); + return NULL; + } + + if (expand_avtab(p, in, &out->avtab)) { + std::cerr << "Failed to expand avtab" << std::endl; + free(out); + return NULL; + } + return out; +} + +void *init_avtab(void *policydbp) +{ + policydb_t *p = static_cast<policydb_t *>(policydbp); + return static_cast<void *>(init_avtab_common(&p->te_avtab, p)); +} + +void *init_cond_avtab(void *policydbp) +{ + policydb_t *p = static_cast<policydb_t *>(policydbp); + return static_cast<void *>(init_avtab_common(&p->te_cond_avtab, p)); +} + +void destroy_avtab(void *avtab_iterp) +{ + struct avtab_iter *avtab_i = static_cast<struct avtab_iter *>(avtab_iterp); + avtab_destroy(&avtab_i->avtab); + free(avtab_i); +} + +void destroy_policy(void *policydbp) +{ + policydb_t *p = static_cast<policydb_t *>(policydbp); + policydb_destroy(p); +} diff --git a/tests/treble.py b/tests/treble.py new file mode 100644 index 0000000000000000000000000000000000000000..901f7020997d919f87bc3c98797974b0fc3656f9 --- /dev/null +++ b/tests/treble.py @@ -0,0 +1,257 @@ +from optparse import OptionParser +from optparse import Option, OptionValueError +import os +import policy +import re +import sys + +''' +Use file_contexts and policy to verify Treble requirements +are not violated. +''' +### +# Differentiate between domains that are part of the core Android platform and +# domains introduced by vendors +coreAppdomain = { + 'bluetooth', + 'ephemeral_app', + 'isolated_app', + 'nfc', + 'platform_app', + 'priv_app', + 'radio', + 'shared_relro', + 'shell', + 'system_app', + 'untrusted_app', + 'untrusted_app_25', + 'untrusted_v2_app', + } +coredomainWhitelist = { + 'adbd', + 'kernel', + 'postinstall', + 'postinstall_dexopt', + 'recovery', + 'system_server', + } +coredomainWhitelist |= coreAppdomain + +class scontext: + def __init__(self): + self.fromSystem = False + self.fromVendor = False + self.coredomain = False + self.appdomain = False + self.attributes = set() + self.entrypoints = [] + self.entrypointpaths = [] + +def PrintScontext(domain, sctx): + print domain + print "\tcoredomain="+str(sctx.coredomain) + print "\tappdomain="+str(sctx.appdomain) + print "\tfromSystem="+str(sctx.fromSystem) + print "\tfromVendor="+str(sctx.fromVendor) + print "\tattributes="+str(sctx.attributes) + print "\tentrypoints="+str(sctx.entrypoints) + print "\tentrypointpaths=" + if sctx.entrypointpaths is not None: + for path in sctx.entrypointpaths: + print "\t\t"+str(path) + +alldomains = {} +coredomains = set() +appdomains = set() +vendordomains = set() + +### +# Check whether the regex will match a file path starting with the provided +# prefix +# +# Compares regex entries in file_contexts with a path prefix. Regex entries +# are often more specific than this file prefix. For example, the regex could +# be /system/bin/foo\.sh and the prefix could be /system. This function +# loops over the regex removing characters from the end until +# 1) there is a match - return True or 2) run out of characters - return +# False. +# +def MatchPathPrefix(pathregex, prefix): + for i in range(len(pathregex), 0, -1): + try: + pattern = re.compile('^' + pathregex[0:i] + "$") + except: + continue + if pattern.match(prefix): + return True + return False + +def GetAllDomains(pol): + global alldomains + for result in pol.QueryTypeAttribute("domain", True): + alldomains[result] = scontext() + +def GetAppDomains(): + global appdomains + global alldomains + for d in alldomains: + # The application of the "appdomain" attribute is trusted because core + # selinux policy contains neverallow rules that enforce that only zygote + # and runas spawned processes may transition to processes that have + # the appdomain attribute. + if "appdomain" in alldomains[d].attributes: + alldomains[d].appdomain = True + appdomains.add(d) + + +def GetCoreDomains(): + global alldomains + global coredomains + for d in alldomains: + # TestCoredomainViolators will verify if coredomain was incorrectly + # applied. + if "coredomain" in alldomains[d].attributes: + alldomains[d].coredomain = True + coredomains.add(d) + # check whether domains are executed off of /system or /vendor + if d in coredomainWhitelist: + continue + # TODO, add checks to prevent app domains from being incorrectly + # labeled as coredomain. Apps don't have entrypoints as they're always + # dynamically transitioned to by zygote. + if d in appdomains: + continue + if not alldomains[d].entrypointpaths: + continue + for path in alldomains[d].entrypointpaths: + # Processes with entrypoint on /system + if ((MatchPathPrefix(path, "/system") and not + MatchPathPrefix(path, "/system/vendor")) or + MatchPathPrefix(path, "/init") or + MatchPathPrefix(path, "/charger")): + alldomains[d].fromSystem = True + # Processes with entrypoint on /vendor or /system/vendor + if (MatchPathPrefix(path, "/vendor") or + MatchPathPrefix(path, "/system/vendor")): + alldomains[d].fromVendor = True + +### +# Add the entrypoint type and path(s) to each domain. +# +def GetDomainEntrypoints(pol): + global alldomains + for x in pol.QueryTERule(tclass="file", perms=["entrypoint"]): + if not x.sctx in alldomains: + continue + alldomains[x.sctx].entrypoints.append(str(x.tctx)) + # postinstall_file represents a special case specific to A/B OTAs. + # Update_engine mounts a partition and relabels it postinstall_file. + # There is no file_contexts entry associated with postinstall_file + # so skip the lookup. + if x.tctx == "postinstall_file": + continue + alldomains[x.sctx].entrypointpaths = pol.QueryFc(x.tctx) +### +# Get attributes associated with each domain +# +def GetAttributes(pol): + global alldomains + for domain in alldomains: + for result in pol.QueryTypeAttribute(domain, False): + alldomains[domain].attributes.add(result) + +def setup(pol): + GetAllDomains(pol) + GetAttributes(pol) + GetDomainEntrypoints(pol) + GetAppDomains() + GetCoreDomains() + +############################################################# +# Tests +############################################################# +def TestCoredomainViolations(): + global alldomains + # verify that all domains launched from /system have the coredomain + # attribute + ret = "" + violators = [] + for d in alldomains: + domain = alldomains[d] + if domain.fromSystem and "coredomain" not in domain.attributes: + violators.append(d); + if len(violators) > 0: + ret += "The following domain(s) must be associated with the " + ret += "\"coredomain\" attribute because they are executed off of " + ret += "/system:\n" + ret += " ".join(str(x) for x in sorted(violators)) + "\n" + + # verify that all domains launched form /vendor do not have the coredomain + # attribute + violators = [] + for d in alldomains: + domain = alldomains[d] + if domain.fromVendor and "coredomain" in domain.attributes: + violators.append(d) + if len(violators) > 0: + ret += "The following domains must not be associated with the " + ret += "\"coredomain\" attribute because they are executed off of " + ret += "/vendor or /system/vendor:\n" + ret += " ".join(str(x) for x in sorted(violators)) + "\n" + + return ret + +### +# extend OptionParser to allow the same option flag to be used multiple times. +# This is used to allow multiple file_contexts files and tests to be +# specified. +# +class MultipleOption(Option): + ACTIONS = Option.ACTIONS + ("extend",) + STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",) + TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",) + ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + ("extend",) + + def take_action(self, action, dest, opt, value, values, parser): + if action == "extend": + values.ensure_value(dest, []).append(value) + else: + Option.take_action(self, action, dest, opt, value, values, parser) + +Tests = ["CoredomainViolators"] + +if __name__ == '__main__': + usage = "sepolicy-trebletests -f nonplat_file_contexts -f " + usage +="plat_file_contexts -p policy [--test test] [--help]" + parser = OptionParser(option_class=MultipleOption, usage=usage) + parser.add_option("-f", "--file_contexts", dest="file_contexts", + metavar="FILE", action="extend", type="string") + parser.add_option("-p", "--policy", dest="policy", metavar="FILE") + parser.add_option("-t", "--test", dest="test", action="extend", + help="Test options include "+str(Tests)) + + (options, args) = parser.parse_args() + + if not options.policy: + sys.exit("Must specify monolithic policy file\n" + parser.usage) + if not os.path.exists(options.policy): + sys.exit("Error: policy file " + options.policy + " does not exist\n" + + parser.usage) + + if not options.file_contexts: + sys.exit("Error: Must specify file_contexts file(s)\n" + parser.usage) + for f in options.file_contexts: + if not os.path.exists(f): + sys.exit("Error: File_contexts file " + f + " does not exist\n" + + parser.usage) + + pol = policy.Policy(options.policy, options.file_contexts) + setup(pol) + + results = "" + # If an individual test is not specified, run all tests. + if options.test is None or "CoredomainViolations" in options.tests: + results += TestCoredomainViolations() + + if len(results) > 0: + sys.exit(results)