From 4e659c09deb76df1120a7581e9289f81d2b9c70c Mon Sep 17 00:00:00 2001 From: Ignas Mikalajunas Date: Sun, 17 Dec 2023 21:36:37 +0200 Subject: [PATCH] Extracted a block repository class out --- spotbugs-exclude.xml | 2 +- .../nukagit/dfs/DfsRepositoryResolver.java | 78 +- .../lt/pow/nukagit/dfs/GitDfsPackCommand.java | 5 +- .../nukagit/dfs/NukagitDfsObjDatabase.java | 679 ++++++++---------- .../pow/nukagit/dfs/NukagitDfsRepository.java | 142 ++-- .../nukagit/minio/NukagitBlockRepository.java | 87 +++ .../dfs/NukagitDfsObjDatabaseTest.groovy | 6 +- 7 files changed, 525 insertions(+), 474 deletions(-) create mode 100644 src/main/java/lt/pow/nukagit/minio/NukagitBlockRepository.java diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml index 3cc3db4..de09e14 100644 --- a/spotbugs-exclude.xml +++ b/spotbugs-exclude.xml @@ -2,7 +2,7 @@ - + diff --git a/src/main/java/lt/pow/nukagit/dfs/DfsRepositoryResolver.java b/src/main/java/lt/pow/nukagit/dfs/DfsRepositoryResolver.java index 3699b0f..358d8d7 100644 --- a/src/main/java/lt/pow/nukagit/dfs/DfsRepositoryResolver.java +++ b/src/main/java/lt/pow/nukagit/dfs/DfsRepositoryResolver.java @@ -1,8 +1,8 @@ package lt.pow.nukagit.dfs; -import io.minio.MinioClient; import io.opentelemetry.instrumentation.annotations.WithSpan; import lt.pow.nukagit.db.dao.NukagitDfsDao; +import lt.pow.nukagit.minio.NukagitBlockRepository; import org.eclipse.jgit.internal.storage.dfs.DfsReaderOptions; import org.eclipse.jgit.internal.storage.dfs.DfsRepositoryDescription; import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository; @@ -19,43 +19,43 @@ @Singleton public class DfsRepositoryResolver { - private static final Logger LOGGER = LoggerFactory.getLogger(DfsRepositoryResolver.class); - private final ConcurrentHashMap repositoryCache; - private final NukagitDfsDao dfsDao; - private final MinioClient minio; - - @Inject - public DfsRepositoryResolver(NukagitDfsDao dfsDao, MinioClient minio) { - this.dfsDao = dfsDao; - this.minio = minio; - repositoryCache = new ConcurrentHashMap<>(); - } - - @WithSpan - public synchronized Repository resolveDfsRepository(String username, String[] args) - throws IOException { - LOGGER.debug("resolveDfsRepository: username={}, args={}", username, args); - String repositoryName = args[1]; - - if (!repositoryName.startsWith("/memory/")) { - var id = dfsDao.getRepositoryIdByName(repositoryName); - if (id == null) { - throw new IOException(String.format("Repository with the name %s does not exist!", repositoryName)); - } - return new NukagitDfsRepository.Builder(dfsDao, minio) - .setRepositoryDescription(new NukagitDfsRepositoryDescription(id, repositoryName)) - // .withPath(new Path("testRepositories", name)) - // .withBlockSize(64) - // .withReplication((short) 2) - .setReaderOptions(new DfsReaderOptions()) - .build(); + private static final Logger LOGGER = LoggerFactory.getLogger(DfsRepositoryResolver.class); + private final ConcurrentHashMap repositoryCache; + private final NukagitDfsDao dfsDao; + private final NukagitBlockRepository blockRepository; + + @Inject + public DfsRepositoryResolver(NukagitDfsDao dfsDao, NukagitBlockRepository blockRepository) { + this.dfsDao = dfsDao; + this.blockRepository = blockRepository; + repositoryCache = new ConcurrentHashMap<>(); + } + + @WithSpan + public synchronized Repository resolveDfsRepository(String username, String[] args) + throws IOException { + LOGGER.debug("resolveDfsRepository: username={}, args={}", username, args); + String repositoryName = args[1]; + + if (!repositoryName.startsWith("/memory/")) { + var id = dfsDao.getRepositoryIdByName(repositoryName); + if (id == null) { + throw new IOException(String.format("Repository with the name %s does not exist!", repositoryName)); + } + return new NukagitDfsRepository.Builder(dfsDao, blockRepository) + .setRepositoryDescription(new NukagitDfsRepositoryDescription(id, repositoryName)) + // .withPath(new Path("testRepositories", name)) + // .withBlockSize(64) + // .withReplication((short) 2) + .setReaderOptions(new DfsReaderOptions()) + .build(); + } + // Keep in memory repositories for simple testing + return repositoryCache.computeIfAbsent( + repositoryName, (key) -> new InMemoryRepository(new DfsRepositoryDescription(key))); + } + + public synchronized List listRepositories() { + return List.copyOf(repositoryCache.keySet()); } - // Keep in memory repositories for simple testing - return repositoryCache.computeIfAbsent( - repositoryName, (key) -> new InMemoryRepository(new DfsRepositoryDescription(key))); - } - - public synchronized List listRepositories() { - return List.copyOf(repositoryCache.keySet()); - } } diff --git a/src/main/java/lt/pow/nukagit/dfs/GitDfsPackCommand.java b/src/main/java/lt/pow/nukagit/dfs/GitDfsPackCommand.java index 5f0da4d..b7cbf9a 100644 --- a/src/main/java/lt/pow/nukagit/dfs/GitDfsPackCommand.java +++ b/src/main/java/lt/pow/nukagit/dfs/GitDfsPackCommand.java @@ -34,8 +34,10 @@ public GitDfsPackCommand( @WithSpan public void run() { String username = (String) getServerSession().getIoSession().getAttribute("username"); - String command = getCommand(); MDC.put("username", username); + NukagitDfsRepository.USERNAME.set(username); + + String command = getCommand(); try { var args = extractQuotedStrings(command); @@ -65,6 +67,7 @@ public void run() { MDC.remove("command"); MDC.remove("git.repository"); MDC.remove("git.command"); + NukagitDfsRepository.USERNAME.remove(); } } diff --git a/src/main/java/lt/pow/nukagit/dfs/NukagitDfsObjDatabase.java b/src/main/java/lt/pow/nukagit/dfs/NukagitDfsObjDatabase.java index 29f938a..ca2f767 100644 --- a/src/main/java/lt/pow/nukagit/dfs/NukagitDfsObjDatabase.java +++ b/src/main/java/lt/pow/nukagit/dfs/NukagitDfsObjDatabase.java @@ -1,441 +1,382 @@ package lt.pow.nukagit.dfs; import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import io.minio.GetObjectArgs; -import io.minio.MinioClient; -import io.minio.PutObjectArgs; -import io.minio.errors.MinioException; import io.opentelemetry.instrumentation.annotations.SpanAttribute; import io.opentelemetry.instrumentation.annotations.WithSpan; import lt.pow.nukagit.db.dao.NukagitDfsDao; import lt.pow.nukagit.db.dao.NukagitDfsPackConflictException; import lt.pow.nukagit.db.entities.ImmutablePack; import lt.pow.nukagit.db.entities.Pack; +import lt.pow.nukagit.minio.NukagitBlockRepository; import org.eclipse.jgit.internal.storage.dfs.*; import org.eclipse.jgit.internal.storage.pack.PackExt; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class NukagitDfsObjDatabase extends DfsObjDatabase { - private final Logger LOGGER = LoggerFactory.getLogger(NukagitDfsObjDatabase.class); - private final MinioClient minio; - private final NukagitDfsDao dfsDao; - private final int blockSize; - - private final UUID repositoryId; - - public NukagitDfsObjDatabase( - NukagitDfsRepository nukagitDfsRepository, - NukagitDfsDao dfsDao, - MinioClient minio, - DfsReaderOptions readerOptions, - int blockSize) { - super(nukagitDfsRepository, readerOptions); - this.minio = minio; - LOGGER.debug( - "NukagitDfsObjDatabase: blockSize={} for repository {}", - blockSize, - nukagitDfsRepository.getDescription().getRepositoryName()); - // Should I add one more abstraction layer like a repository class on top of all the dao classes - // and minio? - this.dfsDao = dfsDao; - this.blockSize = blockSize; - this.repositoryId = - ((NukagitDfsRepositoryDescription) getRepository().getDescription()).getRepositoryId(); - } - - @Override - @WithSpan - protected List listPacks() { - var packDescriptionList = - dfsDao.listPacks(repositoryId).stream() - // Group packs by name + source - // Each pack will contain all the exts - .collect( - Collectors.groupingBy(pack -> String.format("%s\0%s", pack.name(), pack.source()))) - .values() - .stream() - .map( - (List packs) -> - mapPacksToPackDescriptions(getRepository().getDescription(), blockSize, packs)) - .toList(); - LOGGER.debug("listPacks returning {} packs", packDescriptionList.size()); - // This method must return a mutable list. - return new ArrayList<>(packDescriptionList); - } - - @NotNull - public static DfsPackDescription mapPacksToPackDescriptions( - DfsRepositoryDescription repositoryDescription, int blockSize, List packs) { - var firstPack = packs.get(0); - var packSource = PackSource.valueOf(firstPack.source()); - // TODO: probably want to pass the repo uuid along - var desc = new MinioPack(firstPack.name(), repositoryDescription, packSource); - packs.forEach( - pack -> { - var ext = getExt(pack); - desc.addFileExt(ext); - desc.setBlockSize(ext, blockSize); - desc.setFileSize(ext, pack.fileSize()); - }); - desc.setObjectCount(firstPack.objectCount()); - return desc; - } - - @NotNull - private static PackExt getExt(Pack pack) { - var ext = - Arrays.stream(PackExt.values()) - .filter(packExt -> packExt.getExtension().equals(pack.ext())) - .findFirst(); - return ext.orElseThrow( - () -> new IllegalArgumentException(String.format("Invalid pack Extension %s", pack.ext()))); - } - - @Override - @WithSpan - protected DfsPackDescription newPack(PackSource source) { - var packName = "pack-" + UUID.randomUUID() + "-" + source.name(); - var pack = new MinioPack(packName, getRepository().getDescription(), source); - LOGGER.debug("newPack: pack={} source={} packName={}", pack, source, packName); - return pack; - } - - @Override - @WithSpan - protected void commitPackImpl( - Collection desc, Collection replace) throws IOException { - LOGGER.debug("commitPackImpl: desc={}, replace={}", desc, replace); - ArrayList newPacks = mapPackDescriptionsToPacks(desc); - ArrayList removePacks = mapPackDescriptionsToPacks(replace); - LOGGER.debug("commitPackImpl: newPacks={}, removePacks={}", newPacks, removePacks); - try { - dfsDao.commitPack(repositoryId, newPacks, removePacks); - } catch (NukagitDfsPackConflictException e) { - LOGGER.warn("commitPackImpl: encountered conflict when committing packs", e); - throw new IOException(e); - } - clearCache(); - } - - @NotNull - @VisibleForTesting - public static ArrayList mapPackDescriptionsToPacks(Collection desc) { - var packs = new ArrayList(); - if (desc == null) { - return packs; + private final Logger LOGGER = LoggerFactory.getLogger(NukagitDfsObjDatabase.class); + + private final NukagitBlockRepository blockRepository; + private final NukagitDfsDao dfsDao; + private final int blockSize; + + private final UUID repositoryId; + + public NukagitDfsObjDatabase( + NukagitDfsRepository nukagitDfsRepository, + NukagitDfsDao dfsDao, + NukagitBlockRepository blockRepository, DfsReaderOptions readerOptions, + int blockSize) { + super(nukagitDfsRepository, readerOptions); + this.blockRepository = blockRepository; + LOGGER.debug( + "NukagitDfsObjDatabase: blockSize={} for repository {}", + blockSize, + nukagitDfsRepository.getDescription().getRepositoryName()); + this.dfsDao = dfsDao; + this.blockSize = blockSize; + this.repositoryId = + ((NukagitDfsRepositoryDescription) getRepository().getDescription()).getRepositoryId(); } - desc.forEach( - packDesc -> { - // This is the only way to get the pack name - var name = packDesc.getFileName(PackExt.PACK); - int dot = name.lastIndexOf('.'); - var packName = (dot < 0) ? name : name.substring(0, dot); - var packSource = packDesc.getPackSource(); - Arrays.stream(PackExt.values()) - .forEach( - ext -> { - if (packDesc.hasFileExt(ext)) { - var extSize = packDesc.getFileSize(ext); - var objectCount = packDesc.getObjectCount(); - packs.add( - ImmutablePack.builder() - .name(packName) - .source(packSource.name()) - .ext(ext.getExtension()) - .fileSize(extSize) - .objectCount(objectCount) - .minUpdateIndex(packDesc.getMinUpdateIndex()) - .maxUpdateIndex(packDesc.getMaxUpdateIndex()) - .build()); - } - }); - }); - return packs; - } - - @Override - @WithSpan - protected void rollbackPack(Collection desc) { - LOGGER.debug("rollbackPack: desc={}", desc); - // Do nothing. Pack is not recorded until commitPack. - } - - @Override - protected ReadableChannel openFile(DfsPackDescription desc, PackExt ext) throws IOException { - LOGGER.debug("openFile: desc={}, ext={}", desc, ext); - return new MinioBlockReadableChannel(minio, (MinioPack) desc, ext, blockSize); - } - - @Override - protected DfsOutputStream writeFile(DfsPackDescription desc, PackExt ext) throws IOException { - LOGGER.debug("writeFile: desc={}, ext={}", desc, ext); - return new Out(minio, (MinioPack) desc, ext, blockSize); - } - - @Override - @WithSpan - public long getApproximateObjectCount() { - LOGGER.debug("getApproximateObjectCount"); - // TODO: reimplement as a single query - long count = 0; - for (DfsPackDescription p : listPacks()) { - count += p.getObjectCount(); + + @Override + @WithSpan + protected List listPacks() { + var packDescriptionList = + dfsDao.listPacks(repositoryId).stream() + // Group packs by name + source + // Each pack will contain all the exts + .collect( + Collectors.groupingBy(pack -> String.format("%s\0%s", pack.name(), pack.source()))) + .values() + .stream() + .map( + (List packs) -> + mapPacksToPackDescriptions(getRepository().getDescription(), blockSize, packs)) + .toList(); + LOGGER.debug("listPacks returning {} packs", packDescriptionList.size()); + // This method must return a mutable list. + return new ArrayList<>(packDescriptionList); } - return count; - } - private static class MinioPack extends DfsPackDescription { - MinioPack(String name, DfsRepositoryDescription repoDesc, PackSource source) { - super(repoDesc, name, source); + @NotNull + public static DfsPackDescription mapPacksToPackDescriptions( + DfsRepositoryDescription repositoryDescription, int blockSize, List packs) { + var firstPack = packs.get(0); + var packSource = PackSource.valueOf(firstPack.source()); + // TODO: probably want to pass the repo uuid along + var desc = new MinioPack(firstPack.name(), repositoryDescription, packSource); + packs.forEach( + pack -> { + var ext = getExt(pack); + desc.addFileExt(ext); + desc.setBlockSize(ext, blockSize); + desc.setFileSize(ext, pack.fileSize()); + }); + desc.setObjectCount(firstPack.objectCount()); + return desc; } - } - - private static class Out extends DfsOutputStream { - Logger LOGGER = LoggerFactory.getLogger(Out.class); - private final byte[] buffer; - private final ByteArrayOutputStream wholePackBuffer; - private int positionInChunk; - private int chunkCount; - private final MinioClient minio; - private final MinioPack desc; - private final PackExt ext; - private final int blockSize; - public Out(MinioClient minio, MinioPack desc, PackExt ext, int blockSize) { - this.minio = minio; - this.desc = desc; - this.ext = ext; - this.blockSize = blockSize; - this.positionInChunk = 0; - this.chunkCount = 0; - this.buffer = new byte[blockSize]; - this.wholePackBuffer = new ByteArrayOutputStream(); + @NotNull + private static PackExt getExt(Pack pack) { + var ext = + Arrays.stream(PackExt.values()) + .filter(packExt -> packExt.getExtension().equals(pack.ext())) + .findFirst(); + return ext.orElseThrow( + () -> new IllegalArgumentException(String.format("Invalid pack Extension %s", pack.ext()))); } @Override - public int blockSize() { - return blockSize; + @WithSpan + protected DfsPackDescription newPack(PackSource source) { + var packName = "pack-" + UUID.randomUUID() + "-" + source.name(); + var pack = new MinioPack(packName, getRepository().getDescription(), source); + LOGGER.debug("newPack: pack={} source={} packName={}", pack, source, packName); + return pack; } @Override @WithSpan - public void write(byte[] buf, int off, @SpanAttribute int len) throws IOException { - LOGGER.debug("write: buf.length={}, off={}, len={}", buf.length, off, len); - wholePackBuffer.write(buf, off, len); - int remaining = len; - int offset = off; - - while (remaining > 0) { - int bytesToWrite = Math.min(remaining, blockSize - positionInChunk); - System.arraycopy(buf, offset, buffer, positionInChunk, bytesToWrite); - positionInChunk += bytesToWrite; - remaining -= bytesToWrite; - offset += bytesToWrite; - - if (positionInChunk == blockSize) { - flushChunk(); + protected void commitPackImpl( + Collection desc, Collection replace) throws IOException { + LOGGER.debug("commitPackImpl: desc={}, replace={}", desc, replace); + ArrayList newPacks = mapPackDescriptionsToPacks(desc); + ArrayList removePacks = mapPackDescriptionsToPacks(replace); + LOGGER.debug("commitPackImpl: newPacks={}, removePacks={}", newPacks, removePacks); + try { + dfsDao.commitPack(repositoryId, newPacks, removePacks); + } catch (NukagitDfsPackConflictException e) { + LOGGER.warn("commitPackImpl: encountered conflict when committing packs", e); + throw new IOException(e); } - } + clearCache(); + } + + @NotNull + @VisibleForTesting + public static ArrayList mapPackDescriptionsToPacks(Collection desc) { + var packs = new ArrayList(); + if (desc == null) { + return packs; + } + desc.forEach( + packDesc -> { + // This is the only way to get the pack name + var name = packDesc.getFileName(PackExt.PACK); + int dot = name.lastIndexOf('.'); + var packName = (dot < 0) ? name : name.substring(0, dot); + var packSource = packDesc.getPackSource(); + Arrays.stream(PackExt.values()) + .forEach( + ext -> { + if (packDesc.hasFileExt(ext)) { + var extSize = packDesc.getFileSize(ext); + var objectCount = packDesc.getObjectCount(); + packs.add( + ImmutablePack.builder() + .name(packName) + .source(packSource.name()) + .ext(ext.getExtension()) + .fileSize(extSize) + .objectCount(objectCount) + .minUpdateIndex(packDesc.getMinUpdateIndex()) + .maxUpdateIndex(packDesc.getMaxUpdateIndex()) + .build()); + } + }); + }); + return packs; } @Override @WithSpan - public int read(@SpanAttribute long position, ByteBuffer buf) throws IOException { - LOGGER.debug("read: position={}, buf={}", position, buf); - byte[] byteArray = wholePackBuffer.toByteArray(); - int length = byteArray.length; - int remaining = Math.min(buf.remaining(), length - (int) position); - - if (remaining <= 0) { - return -1; // End of data - } - - buf.put(byteArray, (int) position, remaining); - return remaining; + protected void rollbackPack(Collection desc) { + LOGGER.debug("rollbackPack: desc={}", desc); + // Do nothing. Pack is not recorded until commitPack. } - @WithSpan - public void flushChunk() throws IOException { - if (positionInChunk == 0) { - return; - } - try { - minio.putObject( - PutObjectArgs.builder() - .bucket("nukagit") - .object( - String.format( - "%s/%05d-%s", - ((NukagitDfsRepositoryDescription) desc.getRepositoryDescription()) - .getRepositoryId(), - chunkCount, - desc.getFileName(ext))) - .stream(new ByteArrayInputStream(buffer, 0, positionInChunk), positionInChunk, -1) - .contentType("application/octet-stream") - .build()); - this.chunkCount += 1; - this.positionInChunk = 0; - } catch (MinioException | InvalidKeyException | NoSuchAlgorithmException e) { - throw new IOException(e); - } + @Override + protected ReadableChannel openFile(DfsPackDescription desc, PackExt ext) throws IOException { + LOGGER.debug("openFile: desc={}, ext={}", desc, ext); + return new MinioBlockReadableChannel(blockRepository, (MinioPack) desc, ext, blockSize); } @Override - @WithSpan - public void flush() throws IOException { - LOGGER.debug("flush"); - flushChunk(); + protected DfsOutputStream writeFile(DfsPackDescription desc, PackExt ext) throws IOException { + LOGGER.debug("writeFile: desc={}, ext={}", desc, ext); + return new Out(blockRepository, (MinioPack) desc, ext, blockSize); } @Override @WithSpan - public void close() throws IOException { - flushChunk(); - super.close(); + public long getApproximateObjectCount() { + LOGGER.debug("getApproximateObjectCount"); + // TODO: reimplement as a single query + long count = 0; + for (DfsPackDescription p : listPacks()) { + count += p.getObjectCount(); + } + return count; } - } - private static final class MinioBlockReadableChannel implements ReadableChannel { - private final Logger LOGGER = LoggerFactory.getLogger(MinioBlockReadableChannel.class); - private final MinioClient minio; - private final PackExt ext; - private final int blockSize; - private final MinioPack desc; - private int position; - private boolean open = true; - private int readAheadBytes; - - private final LoadingCache blockCache; - - public MinioBlockReadableChannel( - MinioClient minio, MinioPack desc, PackExt ext, int blockSize) { - // TODO: Inject this cache and share it between all the channels - blockCache = - CacheBuilder.newBuilder() - .maximumSize(100) - .expireAfterAccess(10, TimeUnit.MINUTES) - .build(CacheLoader.from(this::getBlock)); - this.ext = ext; - this.blockSize = blockSize; - this.position = 0; - this.readAheadBytes = 0; - this.minio = minio; - this.desc = desc; + private static class MinioPack extends DfsPackDescription { + MinioPack(String name, DfsRepositoryDescription repoDesc, PackSource source) { + super(repoDesc, name, source); + } } - private byte[] getBlock(String key) throws UncheckedIOException { - try { - try { - return minio - .getObject(GetObjectArgs.builder().bucket("nukagit").object(key).build()) - .readAllBytes(); - } catch (MinioException | InvalidKeyException | NoSuchAlgorithmException e) { - throw new IOException(e); + private static class Out extends DfsOutputStream { + Logger LOGGER = LoggerFactory.getLogger(Out.class); + private final byte[] buffer; + private final ByteArrayOutputStream wholePackBuffer; + private int positionInChunk; + private int chunkCount; + private final NukagitBlockRepository blockRepository; + private final MinioPack desc; + private final PackExt ext; + private final int blockSize; + + public Out(NukagitBlockRepository blockRepository, MinioPack desc, PackExt ext, int blockSize) { + this.blockRepository = blockRepository; + this.desc = desc; + this.ext = ext; + this.blockSize = blockSize; + this.positionInChunk = 0; + this.chunkCount = 0; + this.buffer = new byte[blockSize]; + this.wholePackBuffer = new ByteArrayOutputStream(); } - } catch (IOException ex) { - throw new UncheckedIOException( - String.format("Failed to load a block %s from cache", key), ex); - } - } - @Override - @WithSpan - public int read(ByteBuffer dst) throws IOException { - LOGGER.debug("read: dst={}", dst); - long start = position(); - int positionInBlock = (int) (position() % blockSize); - int totalBytesRead = 0; + @Override + public int blockSize() { + return blockSize; + } - while (dst.remaining() > 0) { - long blockNumber = start / blockSize; - String blockKey = convertBlockNumberToKey(blockNumber); + @Override + @WithSpan + public void write(byte[] buf, int off, @SpanAttribute int len) throws IOException { + LOGGER.debug("write: buf.length={}, off={}, len={}", buf.length, off, len); + wholePackBuffer.write(buf, off, len); + int remaining = len; + int offset = off; + + while (remaining > 0) { + int bytesToWrite = Math.min(remaining, blockSize - positionInChunk); + System.arraycopy(buf, offset, buffer, positionInChunk, bytesToWrite); + positionInChunk += bytesToWrite; + remaining -= bytesToWrite; + offset += bytesToWrite; + + if (positionInChunk == blockSize) { + flushChunk(); + } + } + } - byte[] blockData; - try { - blockData = blockCache.getUnchecked(blockKey); - } catch (UncheckedIOException e) { - throw new IOException(e); + @Override + @WithSpan + public int read(@SpanAttribute long position, ByteBuffer buf) throws IOException { + LOGGER.debug("read: position={}, buf={}", position, buf); + byte[] byteArray = wholePackBuffer.toByteArray(); + int length = byteArray.length; + int remaining = Math.min(buf.remaining(), length - (int) position); + + if (remaining <= 0) { + return -1; // End of data + } + + buf.put(byteArray, (int) position, remaining); + return remaining; } - int bytesToRead = Math.min(blockSize - positionInBlock, dst.remaining()); + @WithSpan + public void flushChunk() throws IOException { + if (positionInChunk == 0) { + return; + } + blockRepository.putBlock(((NukagitDfsRepositoryDescription) desc.getRepositoryDescription()) + .getRepositoryId(), + chunkCount, + desc.getFileName(ext), + buffer, + positionInChunk); + this.chunkCount += 1; + this.positionInChunk = 0; + } - dst.put(blockData, positionInBlock, bytesToRead); - totalBytesRead += bytesToRead; + @Override + @WithSpan + public void flush() throws IOException { + LOGGER.debug("flush"); + flushChunk(); + } - positionInBlock = 0; // Reset positionInBlock for subsequent blocks - start += bytesToRead; - if (start >= size()) { - break; // Reached end of the file + @Override + @WithSpan + public void close() throws IOException { + flushChunk(); + super.close(); } - } - return totalBytesRead; } - private String convertBlockNumberToKey(long blockNumber) { - return String.format( - "%s/%05d-%s", - ((NukagitDfsRepositoryDescription) desc.getRepositoryDescription()).getRepositoryId(), - blockNumber, - desc.getFileName(ext)); - } + private static final class MinioBlockReadableChannel implements ReadableChannel { + private final Logger LOGGER = LoggerFactory.getLogger(MinioBlockReadableChannel.class); + + private final NukagitBlockRepository blockRepository; + private final PackExt ext; + private final int blockSize; + private final MinioPack desc; + private int position; + private boolean open = true; + private int readAheadBytes; + + public MinioBlockReadableChannel( + NukagitBlockRepository blockRepository, MinioPack desc, PackExt ext, int blockSize) { + this.blockRepository = blockRepository; + this.ext = ext; + this.blockSize = blockSize; + this.position = 0; + this.readAheadBytes = 0; + this.desc = desc; + } - @Override - public long position() throws IOException { - return position; - } + @Override + @WithSpan + public int read(ByteBuffer dst) throws IOException { + LOGGER.debug("read: dst={}", dst); + long start = position(); + int positionInBlock = (int) (position() % blockSize); + int totalBytesRead = 0; + + while (dst.remaining() > 0) { + long blockNumber = start / blockSize; + + byte[] blockData = blockRepository.getBlock( + ((NukagitDfsRepositoryDescription) desc.getRepositoryDescription()).getRepositoryId(), + blockNumber, + desc.getFileName(ext)); + int bytesToRead = Math.min(blockSize - positionInBlock, dst.remaining()); + + dst.put(blockData, positionInBlock, bytesToRead); + totalBytesRead += bytesToRead; + + positionInBlock = 0; // Reset positionInBlock for subsequent blocks + start += bytesToRead; + if (start >= size()) { + break; // Reached end of the file + } + } + return totalBytesRead; + } - @Override - @WithSpan - public void position(long newPosition) throws IOException { - LOGGER.debug("position: newPosition={}", newPosition); - position = (int) newPosition; - } + @Override + public long position() throws IOException { + return position; + } - @Override - public long size() throws IOException { - var size = desc.getFileSize(ext); - LOGGER.debug("size={}", size); - return size; - } + @Override + @WithSpan + public void position(long newPosition) throws IOException { + LOGGER.debug("position: newPosition={}", newPosition); + position = (int) newPosition; + } - @Override - public int blockSize() { - return blockSize; - } + @Override + public long size() throws IOException { + var size = desc.getFileSize(ext); + LOGGER.debug("size={}", size); + return size; + } - @Override - public void setReadAheadBytes(int bufferSize) throws IOException { - LOGGER.debug("setReadAheadBytes: bufferSize={}", bufferSize); - this.readAheadBytes = bufferSize; - } + @Override + public int blockSize() { + return blockSize; + } - @Override - public boolean isOpen() { - return open; - } + @Override + public void setReadAheadBytes(int bufferSize) throws IOException { + LOGGER.debug("setReadAheadBytes: bufferSize={}", bufferSize); + this.readAheadBytes = bufferSize; + } - @Override - @WithSpan - public void close() { - LOGGER.debug("close"); - open = false; - blockCache.invalidateAll(); + @Override + public boolean isOpen() { + return open; + } + + @Override + @WithSpan + public void close() { + LOGGER.debug("close"); + open = false; + } } - } } diff --git a/src/main/java/lt/pow/nukagit/dfs/NukagitDfsRepository.java b/src/main/java/lt/pow/nukagit/dfs/NukagitDfsRepository.java index 449bdec..feb9e25 100644 --- a/src/main/java/lt/pow/nukagit/dfs/NukagitDfsRepository.java +++ b/src/main/java/lt/pow/nukagit/dfs/NukagitDfsRepository.java @@ -1,82 +1,100 @@ package lt.pow.nukagit.dfs; -import java.io.IOException; - -import io.minio.MinioClient; import lt.pow.nukagit.db.dao.NukagitDfsDao; -import org.eclipse.jgit.internal.storage.dfs.DfsObjDatabase; -import org.eclipse.jgit.internal.storage.dfs.DfsReaderOptions; -import org.eclipse.jgit.internal.storage.dfs.DfsReftableDatabase; -import org.eclipse.jgit.internal.storage.dfs.DfsRepository; -import org.eclipse.jgit.internal.storage.dfs.DfsRepositoryBuilder; +import lt.pow.nukagit.minio.NukagitBlockRepository; +import org.eclipse.jgit.annotations.Nullable; +import org.eclipse.jgit.internal.storage.dfs.*; import org.eclipse.jgit.internal.storage.reftable.ReftableConfig; +import org.eclipse.jgit.internal.storage.reftable.ReftableDatabase; +import org.eclipse.jgit.lib.NullProgressMonitor; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.transport.ReceiveCommand; + +import java.io.IOException; public class NukagitDfsRepository extends DfsRepository { - private final DfsReaderOptions readerOptions; - private final DfsObjDatabase objDb; - private final org.eclipse.jgit.lib.RefDatabase refDb; - private final NukagitDfsDao dfsDao; - private final MinioClient minio; + public final static ThreadLocal USERNAME + = new ThreadLocal<>(); + private final DfsReaderOptions readerOptions; + private final DfsObjDatabase objDb; + private final org.eclipse.jgit.lib.RefDatabase refDb; + private final NukagitDfsDao dfsDao; + private final NukagitBlockRepository blockRepository; - public NukagitDfsRepository(Builder builder) { - super(builder); - this.readerOptions = builder.getReaderOptions(); - this.dfsDao = builder.getDfsDao(); - this.minio = builder.getMinio(); - // Should I create these whenever they are retrieved? - // TODO: Test different block sizes - objDb = new NukagitDfsObjDatabase(this, dfsDao, minio, this.readerOptions, 1024 * 1024); - refDb = new RefDatabase(this); - } + public NukagitDfsRepository(Builder builder) { + super(builder); + this.readerOptions = builder.getReaderOptions(); + this.dfsDao = builder.getDfsDao(); + this.blockRepository = builder.getBlockRepository(); + // Should I create these whenever they are retrieved? + // TODO: Test different block sizes + objDb = new NukagitDfsObjDatabase(this, dfsDao, blockRepository, this.readerOptions, blockRepository.getBlockSize()); + refDb = new RefDatabase(this); + } - public DfsObjDatabase getObjectDatabase() { - return objDb; - } + public DfsObjDatabase getObjectDatabase() { + return objDb; + } - public org.eclipse.jgit.lib.RefDatabase getRefDatabase() { - return refDb; - } + public org.eclipse.jgit.lib.RefDatabase getRefDatabase() { + return refDb; + } - public static class Builder extends DfsRepositoryBuilder { - private NukagitDfsDao dfsDao; - private final MinioClient minio; + public static class Builder extends DfsRepositoryBuilder { + private final NukagitDfsDao dfsDao; + private final NukagitBlockRepository blockRepository; - public Builder(NukagitDfsDao dfsDao, MinioClient minio) { - super(); - this.dfsDao = dfsDao; - this.minio = minio; - } + public Builder(NukagitDfsDao dfsDao, NukagitBlockRepository blockRepository) { + super(); + this.dfsDao = dfsDao; + this.blockRepository = blockRepository; + } - @Override - public NukagitDfsRepository build() throws IOException { - return new NukagitDfsRepository(this); - } + @Override + public NukagitDfsRepository build() throws IOException { + return new NukagitDfsRepository(this); + } - public void setDfsDao(NukagitDfsDao dfsDao) { - this.dfsDao = dfsDao; - } + public NukagitDfsDao getDfsDao() { + return dfsDao; + } - public NukagitDfsDao getDfsDao() { - return dfsDao; + public NukagitBlockRepository getBlockRepository() { + return blockRepository; + } } - public MinioClient getMinio() { - return minio; - } - } + public static class RefDatabase extends DfsReftableDatabase { + protected RefDatabase(DfsRepository repo) { + super(repo); + } - public static class RefDatabase extends DfsReftableDatabase { - protected RefDatabase(DfsRepository repo) { - super(repo); - } + @Override + public ReftableConfig getReftableConfig() { + ReftableConfig cfg = new ReftableConfig(); + cfg.setAlignBlocks(true); + cfg.setIndexObjects(true); + cfg.fromConfig(getRepository().getConfig()); + return cfg; + } - @Override - public ReftableConfig getReftableConfig() { - ReftableConfig cfg = new ReftableConfig(); - cfg.setAlignBlocks(true); - cfg.setIndexObjects(true); - cfg.fromConfig(getRepository().getConfig()); - return cfg; + @Override + protected boolean compareAndPut(Ref oldRef, @Nullable Ref newRef) + throws IOException { + // Add settings for a repository + // Pass along username somehow + ReceiveCommand cmd = ReftableDatabase.toCommand(oldRef, newRef); + try (RevWalk rw = new RevWalk(getRepository())) { + rw.setRetainBody(false); + newBatchUpdate().setAllowNonFastForwards(true).addCommand(cmd) + .execute(rw, NullProgressMonitor.INSTANCE); + } + return switch (cmd.getResult()) { + case OK -> true; + case REJECTED_OTHER_REASON -> throw new IOException(cmd.getMessage()); + default -> false; + }; + } } - } } diff --git a/src/main/java/lt/pow/nukagit/minio/NukagitBlockRepository.java b/src/main/java/lt/pow/nukagit/minio/NukagitBlockRepository.java new file mode 100644 index 0000000..85ec5da --- /dev/null +++ b/src/main/java/lt/pow/nukagit/minio/NukagitBlockRepository.java @@ -0,0 +1,87 @@ +package lt.pow.nukagit.minio; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.minio.GetObjectArgs; +import io.minio.MinioClient; +import io.minio.PutObjectArgs; +import io.minio.errors.MinioException; + +import javax.inject.Inject; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public class NukagitBlockRepository { + + private final MinioClient minio; + private final LoadingCache blockCache; + + @Inject + public NukagitBlockRepository(MinioClient minio) { + this.minio = minio; + this.blockCache = CacheBuilder.newBuilder() + .maximumSize(100) + .expireAfterAccess(10, TimeUnit.MINUTES) + .build(CacheLoader.from(this::getBlock)); + } + + private byte[] getBlock(String key) throws UncheckedIOException { + try { + try { + return minio + .getObject(GetObjectArgs.builder().bucket("nukagit").object(key).build()) + .readAllBytes(); + } catch (MinioException | InvalidKeyException | NoSuchAlgorithmException e) { + throw new IOException(e); + } + } catch (IOException ex) { + throw new UncheckedIOException( + String.format("Failed to load a block %s from cache", key), ex); + } + } + + public byte[] getBlock(UUID repositoryId, long blockNumber, String fileName) throws IOException { + try { + return blockCache.getUnchecked(getBlockKey(repositoryId, blockNumber, fileName)); + } catch (UncheckedIOException e) { + throw new IOException(e); + } + } + + public void putBlock( + UUID repositoryId, + long blockNumber, + String fileName, + byte[] buffer, + int length) throws IOException { + try { + minio.putObject( + PutObjectArgs.builder() + .bucket("nukagit") + .object(getBlockKey(repositoryId, blockNumber, fileName)) + .stream(new ByteArrayInputStream(buffer, 0, length), length, -1) + .contentType("application/octet-stream") + .build()); + } catch (MinioException | InvalidKeyException | NoSuchAlgorithmException e) { + throw new IOException(e); + } + } + + private static String getBlockKey(UUID repositoryId, long blockNumber, String fileName) { + return String.format( + "%s/%05d-%s", + repositoryId, + blockNumber, + fileName); + } + + public int getBlockSize() { + return 1024 * 1024; + } +} diff --git a/src/test/groovy/lt/pow/nukagit/dfs/NukagitDfsObjDatabaseTest.groovy b/src/test/groovy/lt/pow/nukagit/dfs/NukagitDfsObjDatabaseTest.groovy index fab373f..efd602d 100644 --- a/src/test/groovy/lt/pow/nukagit/dfs/NukagitDfsObjDatabaseTest.groovy +++ b/src/test/groovy/lt/pow/nukagit/dfs/NukagitDfsObjDatabaseTest.groovy @@ -3,6 +3,7 @@ package lt.pow.nukagit.dfs import io.minio.MinioClient import lt.pow.nukagit.db.dao.NukagitDfsDao import lt.pow.nukagit.db.entities.ImmutablePack +import lt.pow.nukagit.minio.NukagitBlockRepository import org.eclipse.jgit.internal.storage.dfs.DfsObjDatabase import org.eclipse.jgit.internal.storage.dfs.DfsPackDescription import org.eclipse.jgit.internal.storage.dfs.DfsReaderOptions @@ -17,17 +18,18 @@ class NukagitDfsObjDatabaseTest extends Specification { NukagitDfsRepository nukagitDfsRepository NukagitDfsDao nukagitDfsDao MinioClient minioClient + NukagitBlockRepository blockRepository UUID repositoryId = UUID.randomUUID() def setup() { nukagitDfsDao = Mock(NukagitDfsDao.class) nukagitDfsRepository = Mock(NukagitDfsRepository.class) nukagitDfsRepository.getDescription() >> new NukagitDfsRepositoryDescription(repositoryId, random.nextObject(String.class)) - minioClient = Mock(MinioClient.class) + blockRepository = Mock(NukagitBlockRepository.class) nukagitDfsObjDatabase = new NukagitDfsObjDatabase( nukagitDfsRepository, nukagitDfsDao, - minioClient, + blockRepository, new DfsReaderOptions(), 1024) }