Kafka代码写法总结6Java
文章发布较早,内容可能过时,阅读注意甄别。
# 总结6
# 有循环依赖的情况
/**
* A record that can be serialized and deserialized according to a pre-defined schema
*/
public class Struct {
private final Schema schema;
private final Object[] values;
/**
* The schema for a compound record definition
*/
public class Schema extends Type {
private final static Object[] NO_VALUES = new Object[0];
private final BoundField[] fields;
private final Map<String, BoundField> fieldsByName;
private final boolean tolerateMissingFieldsWithDefaults;
private final Struct cachedStruct;
/**
* A serializable type
*/
public abstract class Type {
/**
* Write the typed object to the buffer
*
* @throws SchemaException If the object is not valid for its type
*/
public abstract void write(ByteBuffer buffer, Object o);
/**
* Read the typed object from the buffer
*
* @throws SchemaException If the object is not valid for its type
*/
public abstract Object read(ByteBuffer buffer);
/**
* Validate the object. If succeeded return its typed object.
*
* @throws SchemaException If validation failed
*/
public abstract Object validate(Object o);
/**
* Return the size of the object in bytes
*/
public abstract int sizeOf(Object o);
/**
* Check if the type supports null values
* @return whether or not null is a valid value for the type implementation
*/
public boolean isNullable() {
return false;
}
/**
* If the type is an array, return the type of the array elements. Otherwise, return empty.
*/
public Optional<Type> arrayElementType() {
return Optional.empty();
}
/**
* Returns true if the type is an array.
*/
public final boolean isArray() {
return arrayElementType().isPresent();
}
public class BoundField {
public final Field def;
final int index;
final Schema schema;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# Action类
@InterfaceStability.Evolving
public class Action {
private final ResourcePattern resourcePattern;
private final AclOperation operation;
private final int resourceReferenceCount;
private final boolean logIfAllowed;
private final boolean logIfDenied;
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# Optional类包装
@InterfaceStability.Evolving
public class AclCreateResult {
public static final AclCreateResult SUCCESS = new AclCreateResult();
private final ApiException exception;
private AclCreateResult() {
this(null);
}
public AclCreateResult(ApiException exception) {
this.exception = exception;
}
/**
* Returns any exception during create. If exception is empty, the request has succeeded.
*/
public Optional<ApiException> exception() {
return exception == null ? Optional.empty() : Optional.of(exception);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# AbortTransactionSpec类
@InterfaceStability.Evolving
public class AbortTransactionSpec {
private final TopicPartition topicPartition;
private final long producerId;
private final short producerEpoch;
private final int coordinatorEpoch;
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# FeatureMetadata类
/**
* Encapsulates details about finalized as well as supported features. This is particularly useful
* to hold the result returned by the {@link Admin#describeFeatures(DescribeFeaturesOptions)} API.
*/
public class FeatureMetadata {
private final Map<String, FinalizedVersionRange> finalizedFeatures;
private final Optional<Long> finalizedFeaturesEpoch;
private final Map<String, SupportedVersionRange> supportedFeatures;
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 消费者配置
/**
* The consumer configuration keys
*/
public class ConsumerConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
// a list contains all the assignor names that only assign subscribed topics to consumer. Should be updated when new assignor added.
// This is to help optimize ConsumerCoordinator#performAssignment method
public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
Collections.unmodifiableList(Arrays.asList(
RANGE_ASSIGNOR_NAME,
ROUNDROBIN_ASSIGNOR_NAME,
STICKY_ASSIGNOR_NAME,
COOPERATIVE_STICKY_ASSIGNOR_NAME
));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Consumer接口
/**
* @see KafkaConsumer
* @see MockConsumer
*/
public interface Consumer<K, V> extends Closeable {
/**
* @see KafkaConsumer#assignment()
*/
Set<TopicPartition> assignment();
/**
* @see KafkaConsumer#subscription()
*/
Set<String> subscription();
/**
* @see KafkaConsumer#subscribe(Collection)
*/
void subscribe(Collection<String> topics);
/**
* @see KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)
*/
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
/**
* @see KafkaConsumer#assign(Collection)
*/
void assign(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)
*/
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
/**
* @see KafkaConsumer#subscribe(Pattern)
*/
void subscribe(Pattern pattern);
/**
* @see KafkaConsumer#unsubscribe()
*/
void unsubscribe();
/**
* @see KafkaConsumer#poll(long)
*/
@Deprecated
ConsumerRecords<K, V> poll(long timeout);
/**
* @see KafkaConsumer#poll(Duration)
*/
ConsumerRecords<K, V> poll(Duration timeout);
/**
* @see KafkaConsumer#commitSync()
*/
void commitSync();
/**
* @see KafkaConsumer#commitSync(Duration)
*/
void commitSync(Duration timeout);
/**
* @see KafkaConsumer#commitSync(Map)
*/
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
/**
* @see KafkaConsumer#commitSync(Map, Duration)
*/
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
/**
* @see KafkaConsumer#commitAsync()
*/
void commitAsync();
/**
* @see KafkaConsumer#commitAsync(OffsetCommitCallback)
*/
void commitAsync(OffsetCommitCallback callback);
/**
* @see KafkaConsumer#commitAsync(Map, OffsetCommitCallback)
*/
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
/**
* @see KafkaConsumer#seek(TopicPartition, long)
*/
void seek(TopicPartition partition, long offset);
/**
* @see KafkaConsumer#seek(TopicPartition, OffsetAndMetadata)
*/
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
/**
* @see KafkaConsumer#seekToBeginning(Collection)
*/
void seekToBeginning(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#seekToEnd(Collection)
*/
void seekToEnd(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#position(TopicPartition)
*/
long position(TopicPartition partition);
/**
* @see KafkaConsumer#position(TopicPartition, Duration)
*/
long position(TopicPartition partition, final Duration timeout);
/**
* @see KafkaConsumer#committed(TopicPartition)
*/
@Deprecated
OffsetAndMetadata committed(TopicPartition partition);
/**
* @see KafkaConsumer#committed(TopicPartition, Duration)
*/
@Deprecated
OffsetAndMetadata committed(TopicPartition partition, final Duration timeout);
/**
* @see KafkaConsumer#committed(Set)
*/
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions);
/**
* @see KafkaConsumer#committed(Set, Duration)
*/
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, final Duration timeout);
/**
* @see KafkaConsumer#metrics()
*/
Map<MetricName, ? extends Metric> metrics();
/**
* @see KafkaConsumer#partitionsFor(String)
*/
List<PartitionInfo> partitionsFor(String topic);
/**
* @see KafkaConsumer#partitionsFor(String, Duration)
*/
List<PartitionInfo> partitionsFor(String topic, Duration timeout);
/**
* @see KafkaConsumer#listTopics()
*/
Map<String, List<PartitionInfo>> listTopics();
/**
* @see KafkaConsumer#listTopics(Duration)
*/
Map<String, List<PartitionInfo>> listTopics(Duration timeout);
/**
* @see KafkaConsumer#paused()
*/
Set<TopicPartition> paused();
/**
* @see KafkaConsumer#pause(Collection)
*/
void pause(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#resume(Collection)
*/
void resume(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#offsetsForTimes(Map)
*/
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
/**
* @see KafkaConsumer#offsetsForTimes(Map, Duration)
*/
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
/**
* @see KafkaConsumer#beginningOffsets(Collection)
*/
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#beginningOffsets(Collection, Duration)
*/
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout);
/**
* @see KafkaConsumer#endOffsets(Collection)
*/
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#endOffsets(Collection, Duration)
*/
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
/**
* @see KafkaConsumer#currentLag(TopicPartition)
*/
OptionalLong currentLag(TopicPartition topicPartition);
/**
* @see KafkaConsumer#groupMetadata()
*/
ConsumerGroupMetadata groupMetadata();
/**
* @see KafkaConsumer#enforceRebalance()
*/
void enforceRebalance();
/**
* @see KafkaConsumer#enforceRebalance(String)
*/
void enforceRebalance(final String reason);
/**
* @see KafkaConsumer#close()
*/
void close();
/**
* @see KafkaConsumer#close(Duration)
*/
void close(Duration timeout);
/**
* @see KafkaConsumer#wakeup()
*/
void wakeup();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Fetcher<K,V>类
public class Fetcher<K, V> implements Closeable {
private final Logger log;
private final LogContext logContext;
private final ConsumerNetworkClient client;
private final Time time;
private final int minBytes;
private final int maxBytes;
private final int maxWaitMs;
private final int fetchSize;
private final long retryBackoffMs;
private final long requestTimeoutMs;
private final int maxPollRecords;
private final boolean checkCrcs;
private final String clientRackId;
private final ConsumerMetadata metadata;
private final FetchManagerMetrics sensors;
private final SubscriptionState subscriptions;
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final IsolationLevel isolationLevel;
private final Map<Integer, FetchSessionHandler> sessionHandlers;
private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
private final Set<Integer> nodesWithPendingFetchRequests;
private final ApiVersions apiVersions;
private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
private CompletedFetch nextInLineFetch = null;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Heartbeat心跳
/**
* A helper class for managing the heartbeat to the coordinator
*/
public final class Heartbeat {
private final int maxPollIntervalMs;
private final GroupRebalanceConfig rebalanceConfig;
private final Time time;
private final Timer heartbeatTimer;
private final Timer sessionTimer;
private final Timer pollTimer;
private final Logger log;
private volatile long lastHeartbeatSend = 0L;
private volatile boolean heartbeatInFlight = false;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# RequestFuture
public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
private static final Object INCOMPLETE_SENTINEL = new Object();
private final AtomicReference<Object> result = new AtomicReference<>(INCOMPLETE_SENTINEL);
private final ConcurrentLinkedQueue<RequestFutureListener<T>> listeners = new ConcurrentLinkedQueue<>();
private final CountDownLatch completedLatch = new CountDownLatch(1);
/**
* Check whether the response is ready to be handled
* @return true if the response is ready, false otherwise
*/
public boolean isDone() {
return result.get() != INCOMPLETE_SENTINEL;
}
public boolean awaitDone(long timeout, TimeUnit unit) throws InterruptedException {
return completedLatch.await(timeout, unit);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19