Netty代码写法总结9Java
文章发布较早,内容可能过时,阅读注意甄别。
# 总结9
# 处理器
多类别处理器等,如http、http2、dns、bytes、json、compression等
@Sharable
public class Base64Encoder extends MessageToMessageEncoder<ByteBuf> {
private final boolean breakLines;
private final Base64Dialect dialect;
public Base64Encoder() {
this(true);
}
public Base64Encoder(boolean breakLines) {
this(breakLines, Base64Dialect.STANDARD);
}
public Base64Encoder(boolean breakLines, Base64Dialect dialect) {
if (dialect == null) {
throw new NullPointerException("dialect");
}
this.breakLines = breakLines;
this.dialect = dialect;
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(Base64.encode(msg, msg.readerIndex(), msg.readableBytes(), breakLines, dialect));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 方法都返回相同的对象
import io.netty.util.ReferenceCounted;
/**
* A packet which is send or receive.
*/
public interface ByteBufHolder extends ReferenceCounted {
/**
* Return the data which is held by this {@link ByteBufHolder}.
*/
ByteBuf content();
/**
* Create a deep copy of this {@link ByteBufHolder}.
*/
ByteBufHolder copy();
/**
* Duplicate the {@link ByteBufHolder}. Be aware that this will not automatically call {@link #retain()}.
*/
ByteBufHolder duplicate();
@Override
ByteBufHolder retain();
@Override
ByteBufHolder retain(int increment);
@Override
ByteBufHolder touch();
@Override
ByteBufHolder touch(Object hint);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 类型强转
/**
* A derived buffer which simply forwards all data access requests to its
* parent. It is recommended to use {@link ByteBuf#duplicate()} instead
* of calling the constructor explicitly.
*/
public class DuplicatedByteBuf extends AbstractDerivedByteBuf {
private final ByteBuf buffer;
public DuplicatedByteBuf(ByteBuf buffer) {
super(buffer.maxCapacity());
if (buffer instanceof DuplicatedByteBuf) {
this.buffer = ((DuplicatedByteBuf) buffer).buffer;
} else {
this.buffer = buffer;
}
setIndex(buffer.readerIndex(), buffer.writerIndex());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 未实现方法返回空对象或this
/**
* An empty {@link ByteBuf} whose capacity and maximum capacity are all {@code 0}.
*/
public final class EmptyByteBuf extends ByteBuf {
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0);
private static final long EMPTY_BYTE_BUFFER_ADDRESS;
static {
long emptyByteBufferAddress = 0;
try {
if (PlatformDependent.hasUnsafe()) {
emptyByteBufferAddress = PlatformDependent.directBufferAddress(EMPTY_BYTE_BUFFER);
}
} catch (Throwable t) {
// Ignore
}
EMPTY_BYTE_BUFFER_ADDRESS = emptyByteBufferAddress;
}
private final ByteBufAllocator alloc;
private final ByteOrder order;
private final String str;
private EmptyByteBuf swapped;
public EmptyByteBuf(ByteBufAllocator alloc) {
this(alloc, ByteOrder.BIG_ENDIAN);
}
private EmptyByteBuf(ByteBufAllocator alloc, ByteOrder order) {
if (alloc == null) {
throw new NullPointerException("alloc");
}
this.alloc = alloc;
this.order = order;
str = StringUtil.simpleClassName(this) + (order == ByteOrder.BIG_ENDIAN? "BE" : "LE");
}
@Override
public int capacity() {
return 0;
}
@Override
public ByteBuf capacity(int newCapacity) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBufAllocator alloc() {
return alloc;
}
@Override
public ByteOrder order() {
return order;
}
@Override
public ByteBuf unwrap() {
return null;
}
@Override
public boolean isDirect() {
return true;
}
@Override
public int maxCapacity() {
return 0;
}
@Override
public ByteBuf order(ByteOrder endianness) {
if (endianness == null) {
throw new NullPointerException("endianness");
}
if (endianness == order()) {
return this;
}
EmptyByteBuf swapped = this.swapped;
if (swapped != null) {
return swapped;
}
this.swapped = swapped = new EmptyByteBuf(alloc(), endianness);
return swapped;
}
@Override
public int readerIndex() {
return 0;
}
@Override
public ByteBuf readerIndex(int readerIndex) {
return checkIndex(readerIndex);
}
@Override
public int writerIndex() {
return 0;
}
@Override
public ByteBuf writerIndex(int writerIndex) {
return checkIndex(writerIndex);
}
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
checkIndex(readerIndex);
checkIndex(writerIndex);
return this;
}
@Override
public int readableBytes() {
return 0;
}
@Override
public int writableBytes() {
return 0;
}
@Override
public int maxWritableBytes() {
return 0;
}
@Override
public boolean isReadable() {
return false;
}
@Override
public boolean isWritable() {
return false;
}
@Override
public ByteBuf clear() {
return this;
}
@Override
public ByteBuf markReaderIndex() {
return this;
}
@Override
public ByteBuf resetReaderIndex() {
return this;
}
@Override
public ByteBuf markWriterIndex() {
return this;
}
@Override
public ByteBuf resetWriterIndex() {
return this;
}
@Override
public ByteBuf discardReadBytes() {
return this;
}
@Override
public ByteBuf discardSomeReadBytes() {
return this;
}
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException("minWritableBytes: " + minWritableBytes + " (expected: >= 0)");
}
if (minWritableBytes != 0) {
throw new IndexOutOfBoundsException();
}
return this;
}
@Override
public int ensureWritable(int minWritableBytes, boolean force) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException("minWritableBytes: " + minWritableBytes + " (expected: >= 0)");
}
if (minWritableBytes == 0) {
return 0;
}
return 1;
}
@Override
public boolean getBoolean(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public byte getByte(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public short getUnsignedByte(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public short getShort(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public int getUnsignedShort(int index) {
throw new IndexOutOfBoundsException();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# 泛型类中定义多个泛型类属性,及属性为本身泛型类
final class PoolChunk<T> {
final PoolArena<T> arena;
final T memory;
final boolean unpooled;
private final byte[] memoryMap;
private final byte[] depthMap;
private final PoolSubpage<T>[] subpages;
/** Used to determine if the requested capacity is equal to or greater than pageSize. */
private final int subpageOverflowMask;
private final int pageSize;
private final int pageShifts;
private final int maxOrder;
private final int chunkSize;
private final int log2ChunkSize;
private final int maxSubpageAllocs;
/** Used to mark memory as unusable */
private final byte unusable;
private int freeBytes;
PoolChunkList<T> parent;
PoolChunk<T> prev;
PoolChunk<T> next;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 类的初始化,及嵌套节点
package io.netty.buffer;
final class PoolSubpage<T> {
final PoolChunk<T> chunk;
private final int memoryMapIdx;
private final int runOffset;
private final int pageSize;
private final long[] bitmap;
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy;
int elemSize;
private int maxNumElems;
private int bitmapLength;
private int nextAvail;
private int numAvail;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
/** Special constructor that creates a linked list head */
PoolSubpage(int pageSize) {
chunk = null;
memoryMapIdx = -1;
runOffset = -1;
elemSize = -1;
this.pageSize = pageSize;
bitmap = null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 泛型可以传byte[]数组
final class PoolThreadCache {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// Used for bitshifting when calculate the index of normal caches later
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;
private int allocations;
private final Thread thread = Thread.currentThread();
private final Runnable freeTask = new Runnable() {
@Override
public void run() {
free0();
}
};
private abstract static class MemoryRegionCache<T> {
private final Entry<T>[] entries;
private final int maxUnusedCached;
private int head;
private int tail;
private int maxEntriesInUse;
private int entriesInUse;
@SuppressWarnings("unchecked")
MemoryRegionCache(int size) {
entries = new Entry[powerOfTwo(size)];
for (int i = 0; i < entries.length; i++) {
entries[i] = new Entry<T>();
}
maxUnusedCached = size / 2;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 重写重载,继承,初始化大小,返回this,类型强转
final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements ChannelHandlerInvoker, EventLoop {
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
@Override
public EventLoop unwrap() {
return this;
}
@Override
public EventLoopGroup parent() {
return (EventLoopGroup) super.parent();
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException("command");
}
tasks.add(command);
}
void runTasks() {
for (;;) {
Runnable task = tasks.poll();
if (task == null) {
break;
}
task.run();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 初始化id大小
public final class Epoll {
private static final Throwable UNAVAILABILITY_CAUSE;
static {
Throwable cause = null;
int epollFd = -1;
int eventFd = -1;
try {
epollFd = Native.epollCreate();
eventFd = Native.eventFd();
} catch (Throwable t) {
cause = t;
} finally {
if (epollFd != -1) {
try {
Native.close(epollFd);
} catch (Exception ignore) {
// ignore
}
}
if (eventFd != -1) {
try {
Native.close(eventFd);
} catch (Exception ignore) {
// ignore
}
}
}
if (cause != null) {
UNAVAILABILITY_CAUSE = cause;
} else {
UNAVAILABILITY_CAUSE = null;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37