1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.filter.codec.support;
21
22 import java.util.LinkedList;
23 import java.util.Queue;
24
25 import org.apache.mina.common.ByteBuffer;
26 import org.apache.mina.common.WriteFuture;
27 import org.apache.mina.filter.codec.ProtocolEncoderOutput;
28
29 /**
30 * A {@link ProtocolEncoderOutput} based on queue.
31 *
32 * @author The Apache Directory Project (mina-dev@directory.apache.org)
33 * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (Fri, 13 Jul 2007) $
34 */
35 public abstract class SimpleProtocolEncoderOutput implements
36 ProtocolEncoderOutput {
37 private final Queue<ByteBuffer> bufferQueue = new LinkedList<ByteBuffer>();
38
39 public SimpleProtocolEncoderOutput() {
40 }
41
42 public Queue<ByteBuffer> getBufferQueue() {
43 return bufferQueue;
44 }
45
46 public void write(ByteBuffer buf) {
47 bufferQueue.add(buf);
48 }
49
50 public void mergeAll() {
51 int sum = 0;
52 final int size = bufferQueue.size();
53
54 if (size < 2) {
55 // no need to merge!
56 return;
57 }
58
59 // Get the size of merged BB
60 for (Object o : bufferQueue) {
61 sum += ((ByteBuffer) o).remaining();
62 }
63
64 // Allocate a new BB that will contain all fragments
65 ByteBuffer newBuf = ByteBuffer.allocate(sum);
66
67 // and merge all.
68 for (;;) {
69 ByteBuffer buf = bufferQueue.poll();
70 if (buf == null) {
71 break;
72 }
73
74 newBuf.put(buf);
75 buf.release();
76 }
77
78 // Push the new buffer finally.
79 newBuf.flip();
80 bufferQueue.offer(newBuf);
81 }
82
83 public WriteFuture flush() {
84 Queue<ByteBuffer> bufferQueue = this.bufferQueue;
85 WriteFuture future = null;
86 if (bufferQueue.isEmpty()) {
87 return null;
88 } else {
89 for (;;) {
90 ByteBuffer buf = bufferQueue.poll();
91 if (buf == null) {
92 break;
93 }
94
95 // Flush only when the buffer has remaining.
96 if (buf.hasRemaining()) {
97 future = doFlush(buf);
98 }
99 }
100 }
101
102 return future;
103 }
104
105 protected abstract WriteFuture doFlush(ByteBuffer buf);
106 }