kafka AclAuthorizerBenchmark 源码

  • 2022-10-20
  • 浏览 (217)

kafka AclAuthorizerBenchmark 代码

文件路径:/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.jmh.acl;

import kafka.security.authorizer.AclAuthorizer;
import kafka.security.authorizer.AclAuthorizer.VersionedAcls;
import kafka.security.authorizer.AclEntry;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.Action;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import scala.collection.JavaConverters;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class AclAuthorizerBenchmark {
    @Param({"10000", "50000", "200000"})
    private int resourceCount;
    //no. of. rules per resource
    @Param({"10", "50"})
    private int aclCount;

    @Param({"0", "20", "50", "90", "99", "99.9", "99.99", "100"})
    private double denyPercentage;

    private final int hostPreCount = 1000;
    private final String resourceNamePrefix = "foo-bar35_resource-";
    private final AclAuthorizer aclAuthorizer = new AclAuthorizer();
    private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
    private List<Action> actions = new ArrayList<>();
    private RequestContext authorizeContext;
    private RequestContext authorizeByResourceTypeContext;
    private String authorizeByResourceTypeHostName = "127.0.0.2";

    private HashMap<ResourcePattern, AclAuthorizer.VersionedAcls> aclToUpdate = new HashMap<>();

    Random rand = new Random(System.currentTimeMillis());
    double eps = 1e-9;

    @Setup(Level.Trial)
    public void setup() throws Exception {
        prepareAclCache();
        prepareAclToUpdate();
        // By adding `-95` to the resource name prefix, we cause the `TreeMap.from/to` call to return
        // most map entries. In such cases, we rely on the filtering based on `String.startsWith`
        // to return the matching ACLs. Using a more efficient data structure (e.g. a prefix
        // tree) should improve performance significantly).
        actions = Collections.singletonList(new Action(AclOperation.WRITE,
            new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix + 95, PatternType.LITERAL),
            1, true, true));
        authorizeContext = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
            "someclient", 1), "1", InetAddress.getByName("127.0.0.1"), principal,
            ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
        authorizeByResourceTypeContext = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
            "someclient", 1), "1", InetAddress.getByName(authorizeByResourceTypeHostName), principal,
            ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
    }

    private void prepareAclCache() {
        Map<ResourcePattern, Set<AclEntry>> aclEntries = new HashMap<>();
        for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
            ResourcePattern resource = new ResourcePattern(
                (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC,
                resourceNamePrefix + resourceId,
                (resourceId % 5 == 0) ? PatternType.PREFIXED : PatternType.LITERAL);

            Set<AclEntry> entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>());

            for (int aclId = 0; aclId < aclCount; aclId++) {
                // The principal in the request context we are using
                // is principal.toString without any suffix
                String principalName = principal.toString() + (aclId == 0 ? "" : aclId);
                AccessControlEntry allowAce = new AccessControlEntry(
                    principalName, "*", AclOperation.READ, AclPermissionType.ALLOW);

                entries.add(new AclEntry(allowAce));

                if (shouldDeny()) {
                    // dominantly deny the resource
                    AccessControlEntry denyAce = new AccessControlEntry(
                        principalName, "*", AclOperation.READ, AclPermissionType.DENY);
                    entries.add(new AclEntry(denyAce));
                }
            }
        }

        ResourcePattern resourcePrefix = new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix,
            PatternType.PREFIXED);
        Set<AclEntry> entriesPrefix = aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>());
        for (int hostId = 0; hostId < hostPreCount; hostId++) {
            AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), "127.0.0." + hostId,
                AclOperation.READ, AclPermissionType.ALLOW);
            entriesPrefix.add(new AclEntry(allowAce));

            if (shouldDeny()) {
                // dominantly deny the resource
                AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), "127.0.0." + hostId,
                    AclOperation.READ, AclPermissionType.DENY);
                entriesPrefix.add(new AclEntry(denyAce));
            }
        }

        ResourcePattern resourceWildcard = new ResourcePattern(ResourceType.TOPIC, ResourcePattern.WILDCARD_RESOURCE,
            PatternType.LITERAL);
        Set<AclEntry> entriesWildcard = aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>());
        // get dynamic entries number for wildcard acl
        for (int hostId = 0; hostId < resourceCount / 10; hostId++) {
            String hostName = "127.0.0" + hostId;
            // AuthorizeByResourceType is optimizing the wildcard deny case.
            // If we didn't skip the host, we would end up having a biased short runtime.
            if (hostName.equals(authorizeByResourceTypeHostName)) {
                continue;
            }

            AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), hostName,
                AclOperation.READ, AclPermissionType.ALLOW);
            entriesWildcard.add(new AclEntry(allowAce));
            if (shouldDeny()) {
                AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), hostName,
                    AclOperation.READ, AclPermissionType.DENY);
                entriesWildcard.add(new AclEntry(denyAce));
            }
        }

        for (Map.Entry<ResourcePattern, Set<AclEntry>> entryMap : aclEntries.entrySet()) {
            aclAuthorizer.updateCache(entryMap.getKey(),
                new VersionedAcls(JavaConverters.asScalaSetConverter(entryMap.getValue()).asScala().toSet(), 1));
        }
    }

    private void prepareAclToUpdate() {
        scala.collection.mutable.Set<AclEntry> entries = new scala.collection.mutable.HashSet<>();
        for (int i = 0; i < resourceCount; i++) {
            scala.collection.immutable.Set<AclEntry> immutable = new scala.collection.immutable.HashSet<>();
            for (int j = 0; j < aclCount; j++) {
                entries.add(new AclEntry(new AccessControlEntry(
                    principal.toString(), "127.0.0" + j, AclOperation.WRITE, AclPermissionType.ALLOW)));
                immutable = entries.toSet();
            }
            aclToUpdate.put(
                new ResourcePattern(ResourceType.TOPIC, randomResourceName(resourceNamePrefix), PatternType.LITERAL),
                new AclAuthorizer.VersionedAcls(immutable, i));
        }
    }

    private String randomResourceName(String prefix) {
        return prefix + UUID.randomUUID().toString().substring(0, 5);
    }

    private Boolean shouldDeny() {
        return rand.nextDouble() * 100.0 - eps < denyPercentage;
    }

    @TearDown(Level.Trial)
    public void tearDown() {
        aclAuthorizer.close();
    }

    @Benchmark
    public void testAclsIterator() {
        aclAuthorizer.acls(AclBindingFilter.ANY);
    }

    @Benchmark
    public void testAuthorizer() {
        aclAuthorizer.authorize(authorizeContext, actions);
    }

    @Benchmark
    public void testAuthorizeByResourceType() {
        aclAuthorizer.authorizeByResourceType(authorizeByResourceTypeContext, AclOperation.READ, ResourceType.TOPIC);
    }

    @Benchmark
    public void testUpdateCache() {
        AclAuthorizer aclAuthorizer = new AclAuthorizer();
        for (Map.Entry<ResourcePattern, VersionedAcls> e : aclToUpdate.entrySet()) {
            aclAuthorizer.updateCache(e.getKey(), e.getValue());
        }
    }
}

相关信息

kafka 源码目录

相关文章

kafka ApiVersions 源码

kafka ClientDnsLookup 源码

kafka ClientRequest 源码

kafka ClientResponse 源码

kafka ClientUtils 源码

kafka ClusterConnectionStates 源码

kafka CommonClientConfigs 源码

kafka ConnectionState 源码

kafka DefaultHostResolver 源码

kafka FetchSessionHandler 源码

0  赞