JavaInterview JavaInterview
首页
指南
分类
标签
归档
  • CSDN (opens new window)
  • 文档集合 (opens new window)
  • 系统架构 (opens new window)
  • 微信号 (opens new window)
  • 公众号 (opens new window)

『Java面试+Java学习』
首页
指南
分类
标签
归档
  • CSDN (opens new window)
  • 文档集合 (opens new window)
  • 系统架构 (opens new window)
  • 微信号 (opens new window)
  • 公众号 (opens new window)
  • 指南
  • 简历

  • Java

  • 面试

  • 算法

  • sourcecode
  • kafka
JavaInterview.cn
2022-07-31
目录

Kafka代码写法总结9Java

文章发布较早,内容可能过时,阅读注意甄别。

# 总结9

# 构造参数和JDK包

public class NodeApiVersions {
    private final Map<ApiKeys, ApiVersion> supportedVersions = new EnumMap(ApiKeys.class);
    private final List<ApiVersion> unknownApis = new ArrayList();

    public static NodeApiVersions create() {
        return create(Collections.emptyList());
    }

    public static NodeApiVersions create(Collection<ApiVersion> overrides) {
        List<ApiVersion> apiVersions = new LinkedList(overrides);
        Iterator var2 = ApiKeys.zkBrokerApis().iterator();

        while(var2.hasNext()) {
            ApiKeys apiKey = (ApiKeys)var2.next();
            boolean exists = false;
            Iterator var5 = apiVersions.iterator();

            while(var5.hasNext()) {
                ApiVersion apiVersion = (ApiVersion)var5.next();
                if(apiVersion.apiKey() == apiKey.id) {
                    exists = true;
                    break;
                }
            }

            if(!exists) {
                apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey));
            }
        }

        return new NodeApiVersions(apiVersions);
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# NetworkClient类定义

public class NetworkClient implements KafkaClient {
    private final Logger log;
    private final Selectable selector;
    private final MetadataUpdater metadataUpdater;
    private final Random randOffset;
    private final ClusterConnectionStates connectionStates;
    private final InFlightRequests inFlightRequests;
    private final int socketSendBuffer;
    private final int socketReceiveBuffer;
    private final String clientId;
    private int correlation;
    private final int defaultRequestTimeoutMs;
    private final long reconnectBackoffMs;
    private final Time time;
    private final boolean discoverBrokerVersions;
    private final ApiVersions apiVersions;
    private final Map<String, Builder> nodesNeedingApiVersionsFetch;
    private final List<ClientResponse> abortedSends;
    private final Sensor throttleTimeSensor;
    private final AtomicReference<NetworkClient.State> state;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# Map<String, ?> originals问号占位

public class AbstractConfig {
    private final Logger log;
    private final Set<String> used;
    private final Map<String, ?> originals;
    private final Map<String, Object> values;
    private final ConfigDef definition;
    public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
    private static final String CONFIG_PROVIDERS_PARAM = ".param.";

    public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
        this.log = LoggerFactory.getLogger(this.getClass());
        this.used = ConcurrentHashMap.newKeySet();
        Iterator var5 = originals.entrySet().iterator();

        while(var5.hasNext()) {
            Entry<?, ?> entry = (Entry)var5.next();
            if(!(entry.getKey() instanceof String)) {
                throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
            }
        }

        this.originals = this.resolveConfigVariables(configProviderProps, originals);
        this.values = definition.parse(this.originals);
        Map<String, Object> configUpdates = this.postProcessParsedConfig(Collections.unmodifiableMap(this.values));
        Iterator var9 = configUpdates.entrySet().iterator();

        while(var9.hasNext()) {
            Entry<String, Object> update = (Entry)var9.next();
            this.values.put(update.getKey(), update.getValue());
        }

        definition.parse(this.values);
        this.definition = definition;
        if(doLog) {
            this.logAll();
        }

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# Sender类

public class Sender implements Runnable {
    private final Logger log;
    private final KafkaClient client;
    private final RecordAccumulator accumulator;
    private final ProducerMetadata metadata;
    private final boolean guaranteeMessageOrder;
    private final int maxRequestSize;
    private final short acks;
    private final int retries;
    private final Time time;
    private volatile boolean running;
    private volatile boolean forceClose;
    private final Sender.SenderMetrics sensors;
    private final int requestTimeoutMs;
    private final long retryBackoffMs;
    private final ApiVersions apiVersions;
    private final TransactionManager transactionManager;
    private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# Metadata类


public class Metadata implements Closeable {
    private final Logger log;
    private final long refreshBackoffMs;
    private final long metadataExpireMs;
    private int updateVersion;
    private int requestVersion;
    private long lastRefreshMs;
    private long lastSuccessfulRefreshMs;
    private KafkaException fatalException;
    private Set<String> invalidTopics;
    private Set<String> unauthorizedTopics;
    private MetadataCache cache = MetadataCache.empty();
    private boolean needFullUpdate;
    private boolean needPartialUpdate;
    private final ClusterResourceListeners clusterResourceListeners;
    private boolean isClosed;
    private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# StickyPartitionCache缓存

public class StickyPartitionCache {
    private final ConcurrentMap<String, Integer> indexCache = new ConcurrentHashMap();

    public StickyPartitionCache() {
    }

    public int partition(String topic, Cluster cluster) {
        Integer part = (Integer)this.indexCache.get(topic);
        return part == null?this.nextPartition(topic, cluster, -1):part.intValue();
    }

    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = (Integer)this.indexCache.get(topic);
        Integer newPart = oldPart;
        if(oldPart != null && oldPart.intValue() != prevPartition) {
            return ((Integer)this.indexCache.get(topic)).intValue();
        } else {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if(availablePartitions.size() < 1) {
                Integer random = Integer.valueOf(Utils.toPositive(ThreadLocalRandom.current().nextInt()));
                newPart = Integer.valueOf(random.intValue() % partitions.size());
            } else if(availablePartitions.size() == 1) {
                newPart = Integer.valueOf(((PartitionInfo)availablePartitions.get(0)).partition());
            } else {
                while(newPart == null || newPart.equals(oldPart)) {
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = Integer.valueOf(((PartitionInfo)availablePartitions.get(random % availablePartitions.size())).partition());
                }
            }

            if(oldPart == null) {
                this.indexCache.putIfAbsent(topic, newPart);
            } else {
                this.indexCache.replace(topic, Integer.valueOf(prevPartition), newPart);
            }

            return ((Integer)this.indexCache.get(topic)).intValue();
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# TransactionManager事务管理

public class TransactionManager {
    private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
    private static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
    private final Logger log;
    private final String transactionalId;
    private final int transactionTimeoutMs;
    private final ApiVersions apiVersions;
    private final TransactionManager.TopicPartitionBookkeeper topicPartitionBookkeeper;
    private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
    private final Map<TopicPartition, Integer> partitionsWithUnresolvedSequences;
    private final Set<TopicPartition> partitionsToRewriteSequences;
    private final PriorityQueue<TransactionManager.TxnRequestHandler> pendingRequests;
    private final Set<TopicPartition> newPartitionsInTransaction;
    private final Set<TopicPartition> pendingPartitionsInTransaction;
    private final Set<TopicPartition> partitionsInTransaction;
    private TransactionManager.PendingStateTransition pendingTransition;
    private final long retryBackoffMs;
    private static final long ADD_PARTITIONS_RETRY_BACKOFF_MS = 20L;
    private int inFlightRequestCorrelationId = -1;
    private Node transactionCoordinator;
    private Node consumerGroupCoordinator;
    private boolean coordinatorSupportsBumpingEpoch;
    private volatile TransactionManager.State currentState;
    private volatile RuntimeException lastError;
    private volatile ProducerIdAndEpoch producerIdAndEpoch;
    private volatile boolean transactionStarted;
    private volatile boolean epochBumpRequired;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
微信 支付宝
#Java
最近更新
01
1686. 石子游戏VI Java
08-18
02
1688. 比赛中的配对次数 Java
08-18
03
1687. 从仓库到码头运输箱子 Java
08-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 JavaInterview.cn
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式