package com.github.grantneale.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/grantneale/kafka/LagBasedPartitionAssignor.class */
public class LagBasedPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
    private static final Logger LOGGER = LoggerFactory.getLogger(LagBasedPartitionAssignor.class);
    private Properties consumerGroupProps;
    private Properties metadataConsumerProps;
    private KafkaConsumer<byte[], byte[]> metadataConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/grantneale/kafka/LagBasedPartitionAssignor$TopicPartitionLag.class */
    public static class TopicPartitionLag {
        private final String topic;
        private final int partition;
        private final long lag;

        TopicPartitionLag(String str, int i, long j) {
            this.topic = str;
            this.partition = i;
            this.lag = j;
        }

        String getTopic() {
            return this.topic;
        }

        int getPartition() {
            return this.partition;
        }

        long getLag() {
            return this.lag;
        }
    }

    public void configure(Map<String, ?> map) {
        this.consumerGroupProps = new Properties();
        for (Map.Entry<String, ?> entry : map.entrySet()) {
            this.consumerGroupProps.put(entry.getKey(), entry.getValue());
        }
        String property = this.consumerGroupProps.getProperty("group.id");
        if (property == null) {
            throw new IllegalArgumentException("group.id cannot be null when using partition.assignment.strategy=" + getClass().getName());
        }
        this.metadataConsumerProps = new Properties();
        this.metadataConsumerProps.putAll(this.consumerGroupProps);
        this.metadataConsumerProps.put("enable.auto.commit", "false");
        String str = property + ".assignor";
        this.metadataConsumerProps.put("client.id", str);
        LOGGER.debug("Configured LagBasedPartitionAssignor with values:\n\tgroup.id = {}\n\tclient.id = {}\n", property, str);
    }

    public String name() {
        return "lag";
    }

    public ConsumerPartitionAssignor.GroupAssignment assign(Cluster cluster, ConsumerPartitionAssignor.GroupSubscription groupSubscription) {
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : groupSubscription.groupSubscription().entrySet()) {
            List list = ((ConsumerPartitionAssignor.Subscription) entry.getValue()).topics();
            hashSet.addAll(list);
            hashMap.put(entry.getKey(), list);
        }
        Map<String, List<TopicPartition>> assign = assign(readTopicPartitionLags(cluster, hashSet), hashMap);
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<String, List<TopicPartition>> entry2 : assign.entrySet()) {
            hashMap2.put(entry2.getKey(), new ConsumerPartitionAssignor.Assignment(entry2.getValue()));
        }
        return new ConsumerPartitionAssignor.GroupAssignment(hashMap2);
    }

    static Map<String, List<TopicPartition>> assign(Map<String, List<TopicPartitionLag>> map, Map<String, List<String>> map2) {
        HashMap hashMap = new HashMap();
        Iterator<String> it = map2.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new ArrayList());
        }
        for (Map.Entry<String, List<String>> entry : consumersPerTopic(map2).entrySet()) {
            assignTopic(hashMap, entry.getKey(), entry.getValue(), map.getOrDefault(entry.getKey(), Collections.emptyList()));
        }
        return hashMap;
    }

    private static void assignTopic(Map<String, List<TopicPartition>> map, String str, List<String> list, List<TopicPartitionLag> list2) {
        if (list.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap(list.size());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), 0L);
        }
        HashMap hashMap2 = new HashMap(list.size());
        Iterator<String> it2 = list.iterator();
        while (it2.hasNext()) {
            hashMap2.put(it2.next(), 0);
        }
        list2.sort((topicPartitionLag, topicPartitionLag2) -> {
            return topicPartitionLag.getLag() == topicPartitionLag2.getLag() ? Integer.compare(topicPartitionLag.getPartition(), topicPartitionLag2.getPartition()) : Long.compare(topicPartitionLag2.getLag(), topicPartitionLag.getLag());
        });
        for (TopicPartitionLag topicPartitionLag3 : list2) {
            String str2 = (String) ((Map.Entry) Collections.min(hashMap.entrySet(), (entry, entry2) -> {
                int compare = Integer.compare(((Integer) hashMap2.get(entry.getKey())).intValue(), ((Integer) hashMap2.get(entry2.getKey())).intValue());
                if (compare != 0) {
                    return compare;
                }
                int compare2 = Long.compare(((Long) entry.getValue()).longValue(), ((Long) entry2.getValue()).longValue());
                return compare2 != 0 ? compare2 : ((String) entry.getKey()).compareTo((String) entry2.getKey());
            })).getKey();
            map.get(str2).add(new TopicPartition(topicPartitionLag3.getTopic(), topicPartitionLag3.getPartition()));
            hashMap.put(str2, Long.valueOf(((Long) hashMap.getOrDefault(str2, 0L)).longValue() + topicPartitionLag3.getLag()));
            hashMap2.put(str2, Integer.valueOf(((Integer) hashMap2.getOrDefault(str2, 0)).intValue() + 1));
            LOGGER.trace("Assigned partition {}-{} to consumer {}.  partition_lag={}, consumer_current_total_lag={}", new Object[]{topicPartitionLag3.getTopic(), Integer.valueOf(topicPartitionLag3.getPartition()), str2, Long.valueOf(topicPartitionLag3.getLag()), hashMap.get(str2)});
        }
        if (LOGGER.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder();
            Iterator it3 = hashMap.entrySet().iterator();
            while (it3.hasNext()) {
                String str3 = (String) ((Map.Entry) it3.next()).getKey();
                sb.append(String.format("\t%s (total_lag=%d)\n", str3, hashMap.get(str3)));
                Iterator<TopicPartition> it4 = map.getOrDefault(str3, Collections.emptyList()).iterator();
                while (it4.hasNext()) {
                    sb.append(String.format("\t\t%s\n", it4.next()));
                }
            }
            LOGGER.debug("Assignment for {}:\n{}", str, sb);
        }
    }

    private Map<String, List<TopicPartitionLag>> readTopicPartitionLags(Cluster cluster, Set<String> set) {
        if (this.metadataConsumer == null) {
            this.metadataConsumer = new KafkaConsumer<>(this.metadataConsumerProps);
        }
        HashMap hashMap = new HashMap();
        for (String str : set) {
            List partitionsForTopic = cluster.partitionsForTopic(str);
            if (partitionsForTopic == null || partitionsForTopic.isEmpty()) {
                LOGGER.warn("Skipping assignment for topic {} since no metadata is available", str);
            } else {
                List<TopicPartition> list = (List) partitionsForTopic.stream().map(partitionInfo -> {
                    return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                }).collect(Collectors.toList());
                hashMap.put(str, new ArrayList());
                Map beginningOffsets = this.metadataConsumer.beginningOffsets(list);
                Map endOffsets = this.metadataConsumer.endOffsets(list);
                Map committed = this.metadataConsumer.committed(new HashSet(list));
                for (TopicPartition topicPartition : list) {
                    ((List) hashMap.get(str)).add(new TopicPartitionLag(str, topicPartition.partition(), computePartitionLag((OffsetAndMetadata) committed.get(topicPartition), ((Long) beginningOffsets.getOrDefault(topicPartition, 0L)).longValue(), ((Long) endOffsets.getOrDefault(topicPartition, 0L)).longValue(), this.consumerGroupProps.getProperty("auto.offset.reset", "latest"))));
                }
            }
        }
        return hashMap;
    }

    static long computePartitionLag(OffsetAndMetadata offsetAndMetadata, long j, long j2, String str) {
        return Long.max(j2 - (offsetAndMetadata != null ? offsetAndMetadata.offset() : str.equalsIgnoreCase("latest") ? j2 : j), 0L);
    }

    private static Map<String, List<String>> consumersPerTopic(Map<String, List<String>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, List<String>> entry : map.entrySet()) {
            String key = entry.getKey();
            Iterator<String> it = entry.getValue().iterator();
            while (it.hasNext()) {
                ((List) hashMap.computeIfAbsent(it.next(), str -> {
                    return new ArrayList();
                })).add(key);
            }
        }
        return hashMap;
    }
}
