Kafka代码写法总结7Java
文章发布较早,内容可能过时,阅读注意甄别。
# 总结7
# NodeApiVersions
/**
* An internal class which represents the API versions supported by a particular node.
*/
public class NodeApiVersions {
// A map of the usable versions of each API, keyed by the ApiKeys instance
private final Map<ApiKeys, ApiVersion> supportedVersions = new EnumMap<>(ApiKeys.class);
// List of APIs which the broker supports, but which are unknown to the client
private final List<ApiVersion> unknownApis = new ArrayList<>();
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# NetworkClient
/**
* A network client for asynchronous request/response network i/o. This is an internal class used to implement the
* user-facing producer and consumer clients.
* <p>
* This class is not thread-safe!
*/
public class NetworkClient implements KafkaClient {
private enum State {
ACTIVE,
CLOSING,
CLOSED
}
private final Logger log;
/* the selector used to perform network i/o */
private final Selectable selector;
private final MetadataUpdater metadataUpdater;
private final Random randOffset;
/* the state of each node's connection */
private final ClusterConnectionStates connectionStates;
/* the set of requests currently being sent or awaiting a response */
private final InFlightRequests inFlightRequests;
/* the socket send buffer size in bytes */
private final int socketSendBuffer;
/* the socket receive size buffer in bytes */
private final int socketReceiveBuffer;
/* the client id used to identify this client in requests to the server */
private final String clientId;
/* the current correlation id to use when sending requests to servers */
private int correlation;
/* default timeout for individual requests to await acknowledgement from servers */
private final int defaultRequestTimeoutMs;
/* time in ms to wait before retrying to create connection to a server */
private final long reconnectBackoffMs;
private final Time time;
/**
* True if we should send an ApiVersionRequest when first connecting to a broker.
*/
private final boolean discoverBrokerVersions;
private final ApiVersions apiVersions;
private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<>();
private final List<ClientResponse> abortedSends = new LinkedList<>();
private final Sensor throttleTimeSensor;
private final AtomicReference<State> state;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65