JavaInterview JavaInterview
首页
指南
分类
标签
归档
  • CSDN (opens new window)
  • 文档集合 (opens new window)
  • 系统架构 (opens new window)
  • 微信号 (opens new window)
  • 公众号 (opens new window)

『Java面试+Java学习』
首页
指南
分类
标签
归档
  • CSDN (opens new window)
  • 文档集合 (opens new window)
  • 系统架构 (opens new window)
  • 微信号 (opens new window)
  • 公众号 (opens new window)
  • 指南
  • 简历

  • Java

  • 面试

  • 算法

  • sourcecode
  • kafka
JavaInterview.cn
2022-07-20
目录

Kafka代码写法总结8Java

文章发布较早,内容可能过时,阅读注意甄别。

# 总结8

# ConfigEntry类

@InterfaceStability.Evolving
public class ConfigEntry {

    private final String name;
    private final String value;
    private final ConfigSource source;
    private final boolean isSensitive;
    private final boolean isReadOnly;
    private final List<ConfigSynonym> synonyms;
    private final ConfigType type;
    private final String documentation;
1
2
3
4
5
6
7
8
9
10
11

# Selector类

public class Selector implements Selectable, AutoCloseable {

    public static final long NO_IDLE_TIMEOUT_MS = -1;
    public static final int NO_FAILED_AUTHENTICATION_DELAY = 0;

    private enum CloseMode {
        GRACEFUL(true),            // process outstanding buffered receives, notify disconnect
        NOTIFY_ONLY(true),         // discard any outstanding receives, notify disconnect
        DISCARD_NO_NOTIFY(false);  // discard any outstanding receives, no disconnect notification

        boolean notifyDisconnect;

        CloseMode(boolean notifyDisconnect) {
            this.notifyDisconnect = notifyDisconnect;
        }
    }

    private final Logger log;
    private final java.nio.channels.Selector nioSelector;
    private final Map<String, KafkaChannel> channels;
    private final Set<KafkaChannel> explicitlyMutedChannels;
    private boolean outOfMemory;
    private final List<NetworkSend> completedSends;
    private final LinkedHashMap<String, NetworkReceive> completedReceives;
    private final Set<SelectionKey> immediatelyConnectedKeys;
    private final Map<String, KafkaChannel> closingChannels;
    private Set<SelectionKey> keysWithBufferedRead;
    private final Map<String, ChannelState> disconnected;
    private final List<String> connected;
    private final List<String> failedSends;
    private final Time time;
    private final SelectorMetrics sensors;
    private final ChannelBuilder channelBuilder;
    private final int maxReceiveSize;
    private final boolean recordTimePerConnection;
    private final IdleExpiryManager idleExpiryManager;
    private final LinkedHashMap<String, DelayedAuthenticationFailureClose> delayedClosingChannels;
    private final MemoryPool memoryPool;
    private final long lowMemThreshold;
    private final int failedAuthenticationDelayMs;

    //indicates if the previous call to poll was able to make progress in reading already-buffered data.
    //this is used to prevent tight loops when memory is not available to read any more data
    private boolean madeReadProgressLastPoll = true;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 批量操作

public final class ProducerBatch {
    private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class);
    final long createdMs;
    final TopicPartition topicPartition;
    final ProduceRequestResult produceFuture;
    private final List<ProducerBatch.Thunk> thunks;
    private final MemoryRecordsBuilder recordsBuilder;
    private final AtomicInteger attempts;
    private final boolean isSplitBatch;
    private final AtomicReference<ProducerBatch.FinalState> finalState;
    int recordCount;
    int maxRecordSize;
    private long lastAttemptMs;
    private long lastAppendTime;
    private long drainedMs;
    private boolean retry;
    private boolean reopened;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# ErrorLoggingCallback错误回调类

public class ErrorLoggingCallback implements Callback {
    private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class);
    private String topic;
    private byte[] key;
    private byte[] value;
    private int valueLength;
    private boolean logAsString;

    public ErrorLoggingCallback(String topic, byte[] key, byte[] value, boolean logAsString) {
        this.topic = topic;
        this.key = key;
        if(logAsString) {
            this.value = value;
        }

        this.valueLength = value == null?-1:value.length;
        this.logAsString = logAsString;
    }

    public void onCompletion(RecordMetadata metadata, Exception e) {
        if(e != null) {
            String keyString = this.key == null?"null":(this.logAsString?new String(this.key, StandardCharsets.UTF_8):this.key.length + " bytes");
            String valueString = this.valueLength == -1?"null":(this.logAsString?new String(this.value, StandardCharsets.UTF_8):this.valueLength + " bytes");
            log.error("Error when sending message to topic {} with key: {}, value: {} with error:", new Object[]{this.topic, keyString, valueString, e});
        }

    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
微信 支付宝
#Java
最近更新
01
1686. 石子游戏VI Java
08-18
02
1688. 比赛中的配对次数 Java
08-18
03
1687. 从仓库到码头运输箱子 Java
08-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 JavaInterview.cn
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式