001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018import java.io.File;
019import java.io.IOException;
020import java.io.InputStream;
021import java.lang.reflect.Field;
022import java.lang.reflect.Method;
023import java.nio.ByteBuffer;
024import java.nio.channels.FileChannel;
025import java.nio.file.Path;
026import java.nio.file.StandardOpenOption;
027import java.util.Objects;
028
029import org.apache.commons.io.IOUtils;
030
031/**
032 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java
033 * and native memory which happens when using {@link java.io.BufferedInputStream}. Unfortunately, this is not something
034 * already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports reading a file using NIO, but does not
035 * support buffering.
036 * <p>
037 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was
038 * called {@code NioBufferedFileInputStream}.
039 * </p>
040 *
041 * @since 2.9.0
042 */
043@SuppressWarnings("restriction")
044public final class BufferedFileChannelInputStream extends InputStream {
045
046    private final ByteBuffer byteBuffer;
047
048    private final FileChannel fileChannel;
049
050    private static final Class<?> DIRECT_BUFFER_CLASS = getDirectBufferClass();
051
052    private static Class<?> getDirectBufferClass() {
053        Class<?> res = null;
054        try {
055            res = Class.forName("sun.nio.ch.DirectBuffer");
056        } catch (final IllegalAccessError | ClassNotFoundException ignored) {
057            // ignored
058        }
059        return res;
060    }
061
062    private static boolean isDirectBuffer(final Object object) {
063        return DIRECT_BUFFER_CLASS != null && DIRECT_BUFFER_CLASS.isInstance(object);
064    }
065
066    /**
067     * Constructs a new instance for the given File.
068     *
069     * @param file The file to stream.
070     * @throws IOException If an I/O error occurs
071     */
072    public BufferedFileChannelInputStream(final File file) throws IOException {
073        this(file, IOUtils.DEFAULT_BUFFER_SIZE);
074    }
075
076    /**
077     * Constructs a new instance for the given File and buffer size.
078     *
079     * @param file The file to stream.
080     * @param bufferSizeInBytes buffer size.
081     * @throws IOException If an I/O error occurs
082     */
083    public BufferedFileChannelInputStream(final File file, final int bufferSizeInBytes) throws IOException {
084        this(file.toPath(), bufferSizeInBytes);
085    }
086
087    /**
088     * Constructs a new instance for the given Path.
089     *
090     * @param path The path to stream.
091     * @throws IOException If an I/O error occurs
092     */
093    public BufferedFileChannelInputStream(final Path path) throws IOException {
094        this(path, IOUtils.DEFAULT_BUFFER_SIZE);
095    }
096
097    /**
098     * Constructs a new instance for the given Path and buffer size.
099     *
100     * @param path The path to stream.
101     * @param bufferSizeInBytes buffer size.
102     * @throws IOException If an I/O error occurs
103     */
104    public BufferedFileChannelInputStream(final Path path, final int bufferSizeInBytes) throws IOException {
105        Objects.requireNonNull(path, "path");
106        fileChannel = FileChannel.open(path, StandardOpenOption.READ);
107        byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
108        byteBuffer.flip();
109    }
110
111    @Override
112    public synchronized int available() throws IOException {
113        return byteBuffer.remaining();
114    }
115
116    /**
117     * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause
118     * errors if one attempts to read from the disposed buffer. However, neither the bytes allocated to direct buffers
119     * nor file descriptors opened for memory-mapped buffers put pressure on the garbage collector. Waiting for garbage
120     * collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no
121     * standard API to manually dispose of these kinds of buffers.
122     *
123     * @param buffer the buffer to clean.
124     */
125    private void clean(final ByteBuffer buffer) {
126        if (isDirectBuffer(buffer)) {
127            cleanDirectBuffer(buffer);
128        }
129    }
130
131    /**
132     * In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible to access the method
133     * sun.misc.Cleaner.clean() to invoke it. The type changed to jdk.internal.ref.Cleaner in later JDKs, and the
134     * .clean() method is not accessible even with reflection. However sun.misc.Unsafe added a invokeCleaner() method in
135     * JDK 9+ and this is still accessible with reflection.
136     *
137     * @param buffer the buffer to clean. must be a DirectBuffer.
138     */
139    private void cleanDirectBuffer(final ByteBuffer buffer) {
140        //
141        // Ported from StorageUtils.scala.
142        //
143//      private val bufferCleaner: DirectBuffer => Unit =
144//      if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
145//        val cleanerMethod =
146//          Utils.classForName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer])
147//        val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe")
148//        unsafeField.setAccessible(true)
149//        val unsafe = unsafeField.get(null).asInstanceOf[Unsafe]
150//        buffer: DirectBuffer => cleanerMethod.invoke(unsafe, buffer)
151//      } else {
152//        val cleanerMethod = Utils.classForName("sun.misc.Cleaner").getMethod("clean")
153//        buffer: DirectBuffer => {
154//          // Careful to avoid the return type of .cleaner(), which changes with JDK
155//          val cleaner: AnyRef = buffer.cleaner()
156//          if (cleaner != null) {
157//            cleanerMethod.invoke(cleaner)
158//          }
159//        }
160//      }
161        //
162        final String specVer = System.getProperty("java.specification.version");
163        if ("1.8".equals(specVer)) {
164            // On Java 8, but also compiles on Java 11.
165            try {
166              final Class<?> clsCleaner = Class.forName("sun.misc.Cleaner");
167              final Method cleanerMethod = DIRECT_BUFFER_CLASS.getMethod("cleaner");
168              final Object cleaner = cleanerMethod.invoke(buffer);
169              if (cleaner != null) {
170                  final Method cleanMethod = clsCleaner.getMethod("clean");
171                  cleanMethod.invoke(cleaner);
172              }
173            } catch (final ReflectiveOperationException e) {
174                throw new IllegalStateException(e);
175            }
176        } else {
177            // On Java 9 and up, but compiles on Java 8.
178            try {
179                final Class<?> clsUnsafe = Class.forName("sun.misc.Unsafe");
180                final Method cleanerMethod = clsUnsafe.getMethod("invokeCleaner", ByteBuffer.class);
181                final Field unsafeField = clsUnsafe.getDeclaredField("theUnsafe");
182                unsafeField.setAccessible(true);
183                cleanerMethod.invoke(unsafeField.get(null), buffer);
184            } catch (final ReflectiveOperationException e) {
185                throw new IllegalStateException(e);
186            }
187        }
188    }
189
190    @Override
191    public synchronized void close() throws IOException {
192        try {
193            fileChannel.close();
194        } finally {
195            clean(byteBuffer);
196        }
197    }
198
199    @Override
200    public synchronized int read() throws IOException {
201        if (!refill()) {
202            return EOF;
203        }
204        return byteBuffer.get() & 0xFF;
205    }
206
207    @Override
208    public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
209        if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
210            throw new IndexOutOfBoundsException();
211        }
212        if (!refill()) {
213            return EOF;
214        }
215        len = Math.min(len, byteBuffer.remaining());
216        byteBuffer.get(b, offset, len);
217        return len;
218    }
219
220    /**
221     * Checks whether data is left to be read from the input stream.
222     *
223     * @return true if data is left, false otherwise
224     */
225    private boolean refill() throws IOException {
226        if (!byteBuffer.hasRemaining()) {
227            byteBuffer.clear();
228            int nRead = 0;
229            while (nRead == 0) {
230                nRead = fileChannel.read(byteBuffer);
231            }
232            byteBuffer.flip();
233            return nRead >= 0;
234        }
235        return true;
236    }
237
238    @Override
239    public synchronized long skip(final long n) throws IOException {
240        if (n <= 0L) {
241            return 0L;
242        }
243        if (byteBuffer.remaining() >= n) {
244            // The buffered content is enough to skip
245            byteBuffer.position(byteBuffer.position() + (int) n);
246            return n;
247        }
248        final long skippedFromBuffer = byteBuffer.remaining();
249        final long toSkipFromFileChannel = n - skippedFromBuffer;
250        // Discard everything we have read in the buffer.
251        byteBuffer.position(0);
252        byteBuffer.flip();
253        return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
254    }
255
256    private long skipFromFileChannel(final long n) throws IOException {
257        final long currentFilePosition = fileChannel.position();
258        final long size = fileChannel.size();
259        if (n > size - currentFilePosition) {
260            fileChannel.position(size);
261            return size - currentFilePosition;
262        }
263        fileChannel.position(currentFilePosition + n);
264        return n;
265    }
266
267}