diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..8c6421b --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.warc.gz filter=lfs diff=lfs merge=lfs -text diff --git a/.github/ISSUE_TEMPLATE/bug-report.md b/.github/ISSUE_TEMPLATE/bug-report.md new file mode 100644 index 0000000..9e244d1 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug-report.md @@ -0,0 +1,34 @@ +--- +name: Bug Report +about: Create a bug report to help us improve +labels: "bug" +--- + +**Bug Report** + + + +**To Reproduce** + +```shell +# A standalone program is preferred +``` + +**Expected Behavior** + + + +**Actual Behavior** + + + +**Your Environment** + + diff --git a/.github/ISSUE_TEMPLATE/feature-request.md b/.github/ISSUE_TEMPLATE/feature-request.md new file mode 100644 index 0000000..cc7d1ac --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature-request.md @@ -0,0 +1,17 @@ +--- +name: Feature request +about: Propose a new feature +labels: "feature" +--- + +**Motivation** + + + +**Feature** + + diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml new file mode 100644 index 0000000..a77d668 --- /dev/null +++ b/.github/dependabot.yaml @@ -0,0 +1,31 @@ +version: 2 +updates: + - package-ecosystem: "gomod" + directory: "/" + schedule: + interval: "weekly" + labels: + - "dependencies" + commit-message: + prefix: "build" + include: "scope" + + - package-ecosystem: "docker" + directory: "/" + schedule: + interval: "weekly" + labels: + - "dependencies" + commit-message: + prefix: "build" + include: "scope" + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + labels: + - "dependencies" + commit-message: + prefix: "build" + include: "scope" diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml new file mode 100644 index 0000000..ca2fbf4 --- /dev/null +++ b/.github/workflows/main.yaml @@ -0,0 +1,51 @@ +name: Main +on: push + +jobs: + test: + name: Test + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + lfs: true + - name: Setup go + uses: actions/setup-go@v4 + with: + go-version: "1.21" + cache: false + - name: Build + run: "go build ./..." + - name: Test + run: "go test ./..." + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Setup go + uses: actions/setup-go@v4 + with: + go-version: "1.21" + cache: false + - name: Lint + uses: golangci/golangci-lint-action@v3 + with: + version: v1.55.1 + args: --config=.golangci.yaml + codeqL-build: + name: CodeQL build + runs-on: ubuntu-latest + permissions: + security-events: write + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..a8b7210 --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,48 @@ +name: Release + +on: + push: + branches: + - main + tags: + - "v[0-9]+.[0-9]+.[0-9]+**" + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + build: + name: Create and release docker image + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Extract metadata (tags, labels, version) for Docker + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=semver,pattern={{version}} + type=ref,event=branch + type=ref,event=pr + + - name: Log in to the container registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3852366 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/fai diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..415485c --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,118 @@ +output: + format: github-actions + print-issued-lines: true + print-linter-name: true + uniq-by-line: true + sort-results: true + +linters-settings: + gofmt: + simplify: true + +issues: + max-issues-per-linter: 0 + max-same-issues: 0 + +severity: + case-sensitive: true + +linters: + enable-all: true + disable: + # TODO(https://github.com/nlnwa/go_container/issues/3): The following + # sub-linters should be evaluated if they are going to be enabled or not. + - asasalint + - asciicheck + - bidichk + - bodyclose + - containedctx + - cyclop + - deadcode + - decorder + - depguard + - dogsled + - dupl + - dupword + - errchkjson + - execinquery + - exhaustive + - exhaustivestruct + - exhaustruct + - exportloopref + - forbidigo + - forcetypeassert + - funlen + - gci + - ginkgolinter + - gocheckcompilerdirectives + - gochecknoglobals + - gochecknoinits + - gocognit + - goconst + - gocritic + - gocyclo + - godot + - godox + - goerr113 + - gofumpt + - goheader + - goimports + - golint + - gomnd + - gomoddirectives + - gomodguard + - goprintffuncname + - gosec + - gosmopolitan + - grouper + - ifshort + - importas + - interfacebloat + - interfacer + - ireturn + - lll + - loggercheck + - maintidx + - makezero + - maligned + - mirror + - misspell + - musttag + - nakedret + - nestif + - nilerr + - nilnil + - nlreturn + - noctx + - nolintlint + - nonamedreturns + - nosnakecase + - nosprintfhostport + - paralleltest + - prealloc + - predeclared + - promlinter + - reassign + - revive + - rowserrcheck + - scopelint + - sqlclosecheck + - structcheck + - stylecheck + - tagalign + - tagliatelle + - tenv + - testableexamples + - testpackage + - thelper + - tparallel + - unconvert + - unparam + - usestdlibvars + - varcheck + - varnamelen + - wastedassign + - whitespace + - wrapcheck + - wsl + - zerologlint diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..01d27c6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM golang:1.21 AS build + +WORKDIR /go/src/app + +COPY go.mod go.sum ./ + +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags "-s -w" . + + +FROM gcr.io/distroless/static-debian12:latest + +COPY --from=build /go/src/app/fai /fai + +EXPOSE 8081 + +ENTRYPOINT ["/fai"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a5381e8 --- /dev/null +++ b/README.md @@ -0,0 +1,30 @@ +# First Article Inspection (FAI) + +FAI loops trough files in a given directory that match a given pattern +and for every matching file: + +- creates a checksum file +- validates the file as a WARC-file +- logs the name, size, checksum and validation status +- updates a file size histogram metric and a validation error counter metric +- moves the file and the corresponding checksum file to a target directory + +```text +Usage of fai: + -concurrency int + number of concurrent files processed (default [number of CPU cores]) + -invalid-target-dir string + path to target directory where invalid files and their corresponding checksum files will be moved to + -metrics-port int + port to expose metrics on (default 8081) + -pattern string + glob pattern used to match filenames in source directory (default "*") + -sleep duration + sleep duration between directory listings, set to 0 to only do a single run (default 5s) + -source-dir string + path to source directory + -tmp-dir string + path to directory where temporary buffer files will be stored + -valid-target-dir string + path to target directory where valid files and their corresponding checksum files will be moved to +``` \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..acfab46 --- /dev/null +++ b/go.mod @@ -0,0 +1,27 @@ +module github.com/nlnwa/fai + +go 1.21 + +require ( + github.com/nlnwa/gowarc v1.1.1 + github.com/prometheus/client_golang v1.17.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.10.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/google/uuid v1.4.0 // indirect + github.com/klauspost/compress v1.17.2 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/nlnwa/whatwg-url v0.4.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/prometheus v0.47.2 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9aa5190 --- /dev/null +++ b/go.sum @@ -0,0 +1,91 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.5.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= +github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88= +github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= +github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= +github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= +github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= +github.com/nlnwa/gowarc v1.1.1 h1:C0kWp8aX3E/+/TvaqlO6I86drYO8qdZDGbJLD+aJilk= +github.com/nlnwa/gowarc v1.1.1/go.mod h1:aFbCuh7ZaGlKO2IAH7GbMgp0b2K8kzK2cb0zWsXWp/8= +github.com/nlnwa/whatwg-url v0.4.0 h1:B3kFb5EL7KILeBkhrlQvFi41Ex0p4ropVA9brt5ungI= +github.com/nlnwa/whatwg-url v0.4.0/go.mod h1:pLzpJjFPtA+n7RCLvp0GBxvDHa/2ckNCBK9mfEeNOMQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= +github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= +github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/prometheus/prometheus v0.47.2 h1:jWcnuQHz1o1Wu3MZ6nMJDuTI0kU5yJp9pkxh8XEkNvI= +github.com/prometheus/prometheus v0.47.2/go.mod h1:J/bmOSjgH7lFxz2gZhrWEZs2i64vMS+HIuZfmYNhJ/M= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/checksum/checksum.go b/internal/checksum/checksum.go new file mode 100644 index 0000000..cfcf9ac --- /dev/null +++ b/internal/checksum/checksum.go @@ -0,0 +1,70 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package checksum + +import ( + "crypto/md5" + "encoding/hex" + "io" + "os" +) + +// MD5Sum returns the md5 checksum of the given file encoded as a hex string. +func MD5Sum(filepath string) (string, error) { + f, err := os.Open(filepath) + if err != nil { + return "", err + } + defer f.Close() + + h := md5.New() + if _, err := io.Copy(h, f); err != nil { + return "", err + } + + return hex.EncodeToString(h.Sum(nil)), nil +} + +// separator is used to separate the checksum from the filepath in the checksum file. +const separator = " " + +// CreateChecksumFile creates a checksum file for the given file. +// It returns the path to the created checksum file. +func CreateChecksumFile(file string, checksum string, extension string) (string, error) { + // Don't allow empty checksum + if checksum == "" { + panic("checksum is empty") + } + + checksumFile := file + extension + content := checksum + separator + file + "\n" + + // Create checksum file + f, err := os.Create(checksumFile) + if err != nil { + return "", err + } + defer f.Close() + + // Write content to checksum file + _, err = f.WriteString(content) + if err != nil { + return "", err + } + + return checksumFile, nil +} diff --git a/internal/checksum/checksum_test.go b/internal/checksum/checksum_test.go new file mode 100644 index 0000000..dd3d41f --- /dev/null +++ b/internal/checksum/checksum_test.go @@ -0,0 +1,86 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package checksum + +import ( + "os" + "testing" +) + +func TestMD5Sum(t *testing.T) { + tmpDir := t.TempDir() + f, err := os.CreateTemp(tmpDir, "test") + + if err != nil { + t.Fatalf("failed to create test file: %v", err) + } + _, err = f.WriteString("Test string") + if err != nil { + t.Fatalf("failed to write to test file: %v", err) + } + testFile := f.Name() + defer os.Remove(testFile) + defer f.Close() + + got, err := MD5Sum(testFile) + if err != nil { + t.Errorf("expected no error, but got %v", err) + } + want := "0fd3dbec9730101bff92acc820befc34" + if got != want { + t.Errorf("expected %s, but got %s", want, got) + } +} + +func TestCreateChecksumFile(t *testing.T) { + tmpDir := t.TempDir() + f, err := os.CreateTemp(tmpDir, "test") + + if err != nil { + t.Fatalf("failed to create test file: %v", err) + } + _, err = f.WriteString("Test string") + if err != nil { + t.Fatalf("failed to write to test file: %v", err) + } + testFile := f.Name() + defer os.Remove(testFile) + defer f.Close() + + wantHash := "0fd3dbec9730101bff92acc820befc34" + fileHash, err := MD5Sum(testFile) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + checksumFile, err := CreateChecksumFile(testFile, fileHash, ".md5") + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + defer os.Remove(checksumFile) + + b, err := os.ReadFile(checksumFile) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + got := string(b) + want := wantHash + separator + testFile + "\n" + + if got != want { + t.Errorf("expected %s, got %s", want, got) + } +} diff --git a/internal/fai/fai.go b/internal/fai/fai.go new file mode 100644 index 0000000..63d0832 --- /dev/null +++ b/internal/fai/fai.go @@ -0,0 +1,161 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fai + +import ( + "context" + "errors" + "log/slog" + "os" + "path/filepath" + "time" + + "github.com/nlnwa/fai/internal/checksum" + "github.com/nlnwa/fai/internal/metrics" + "github.com/nlnwa/fai/internal/queue" + "github.com/nlnwa/fai/internal/warc" +) + +type fAI struct { + sourceDir string + validTargetDir string + invalidTargetDir string + tmpDir string + concurrency int + sleep time.Duration + globPattern string + logger *slog.Logger +} + +func New(options ...Option) (*fAI, error) { + opts, err := validateOptions(options...) + if err != nil { + return nil, err + } + + return &fAI{ + sourceDir: opts.sourceDir, + validTargetDir: opts.validTargetDir, + invalidTargetDir: opts.invalidTargetDir, + tmpDir: opts.tmpDir, + concurrency: opts.concurrency, + sleep: opts.sleep, + globPattern: opts.globPattern, + logger: opts.logger, + }, nil +} + +const checksumFileSuffix = ".md5" + +func (f *fAI) processFile(file string) { + if _, err := os.Stat(file); errors.Is(err, os.ErrNotExist) { + // file does not exist so skip it + return + } + + // calculate checksum + md5sum, err := checksum.MD5Sum(file) + if err != nil { + f.logger.Error("Failed to calculate checksum", "file", file, "error", err) + return + } + // create checksum file + checksumFile, err := checksum.CreateChecksumFile(file, md5sum, checksumFileSuffix) + if err != nil { + f.logger.Error("Failed to create checksum file", "file", file, "error", err) + return + } + + // validate file + isValid, err := warc.IsValid(file, filepath.Join(f.tmpDir, "buffer")) + if err != nil { + f.logger.Error("Failed to validate file", "file", file, "error", err) + return + } + + targetDir := f.validTargetDir + if !isValid { + targetDir = f.invalidTargetDir + metrics.ValidationError() + } + + newChecksumFile := filepath.Join(targetDir, filepath.Base(checksumFile)) + newFile := filepath.Join(targetDir, filepath.Base(file)) + + // Move checksum file and file to target directory. + // + // The order is important because a failed move of the checksum file + // will result in the file being checksummed again (ok). If the file + // is moved first and the checksum file fails to move then the + // checksum file will never be created (not ok). + + // move checksum file to new location + err = os.Rename(checksumFile, newChecksumFile) + if err != nil { + f.logger.Error("Failed to move checksum file", "source", checksumFile, "target", newChecksumFile, "error", err) + return + } + + // move file to new location + err = os.Rename(file, newFile) + if err != nil { + f.logger.Error("Failed to move file", "source", file, "target", newFile, "error", err) + return + } + + // get file size + fileInfo, err := os.Stat(newFile) + if err != nil { + f.logger.Error("Failed to get file size", "file", newFile, "error", err) + return + } + fileSizeBytes := fileInfo.Size() + + metrics.Size(fileSizeBytes) + + f.logger.Info("Processed file", "file", newFile, "size", fileSizeBytes, "md5", md5sum, "valid", isValid) +} + +// Run starts the FAI. +// It will run until the context is cancelled or stop after one pass if the sleep duration is zero. +func (f *fAI) Run(ctx context.Context) { + f.logger.Info("Starting FAI", "sourceDir", f.sourceDir, "validTargetDir", f.validTargetDir, "invalidTargetDir", f.invalidTargetDir, "concurrency", f.concurrency, "sleep", f.sleep) + + queue := queue.NewWorkQueue(f.processFile, f.concurrency) + defer queue.CloseAndWait() + + for { + files, _ := filepath.Glob(f.globPattern) + for _, file := range files { + select { + case <-ctx.Done(): + return + default: + queue.Add(file) + } + } + select { + case <-ctx.Done(): + return + case <-time.After(f.sleep): + // do a single pass if sleep duration is zero + if f.sleep == 0 { + return + } + } + } +} diff --git a/internal/fai/fai_test.go b/internal/fai/fai_test.go new file mode 100644 index 0000000..6310b9b --- /dev/null +++ b/internal/fai/fai_test.go @@ -0,0 +1,63 @@ +package fai + +import ( + "context" + "os" + "path/filepath" + "testing" +) + +// createTestFile creates a test file in the given directory +func createTestFile(t *testing.T, dir string) *os.File { + t.Helper() + f, _ := os.CreateTemp(dir, "testfile") + return f +} + +func TestRun(t *testing.T) { + sourceDir := t.TempDir() + targetDir := t.TempDir() + + testFiles := []struct { + file *os.File + expectedDir string + isValid bool + }{ + { + file: createTestFile(t, sourceDir), + expectedDir: targetDir, + }, + { + file: createTestFile(t, sourceDir), + expectedDir: targetDir, + }, + } + + fai, err := New( + WithConcurrency(len(testFiles)), + WithSleep(0), + WithSourceDir(sourceDir), + WithValidTargetDir(targetDir), + WithInvalidTargetDir(targetDir), + WithTmpDir(t.TempDir()), + ) + if err != nil { + t.Fatalf("failed to create fai: %v", err) + } + + // run fai + fai.Run(context.Background()) + + for _, testFile := range testFiles { + // check that test file has been moved + _, err = os.Stat(filepath.Join(testFile.expectedDir, filepath.Base(testFile.file.Name()))) + if err != nil { + t.Errorf("failed to stat test file '%v'", err) + } + // check that checksum file has been created and moved + _, err = os.Stat(filepath.Join(testFile.expectedDir, filepath.Base(testFile.file.Name())+".md5")) + if err != nil { + t.Errorf("failed to stat checksum file: %v", err) + } + } +} diff --git a/internal/fai/options.go b/internal/fai/options.go new file mode 100644 index 0000000..b1087e8 --- /dev/null +++ b/internal/fai/options.go @@ -0,0 +1,158 @@ +package fai + +import ( + "errors" + "fmt" + "log/slog" + "os" + "path/filepath" + "runtime" + "strings" + "time" + + "github.com/nlnwa/fai/internal/log" +) + +type options struct { + sourceDir string + validTargetDir string + invalidTargetDir string + tmpDir string + concurrency int + sleep time.Duration + globPattern string + logger *slog.Logger +} + +func defaultOptions() *options { + return &options{ + sourceDir: "", + validTargetDir: "", + invalidTargetDir: "", + tmpDir: "", + concurrency: runtime.NumCPU(), + sleep: 1 * time.Second, + globPattern: "*", + logger: log.Noop(), + } +} + +func validateOptions(options ...Option) (*options, error) { + o := defaultOptions() + for _, opt := range options { + opt(o) + } + + if o.concurrency < 1 { + return nil, fmt.Errorf("concurrency must be greater than 0") + } + + var err error + + if (o.sourceDir == o.validTargetDir || o.sourceDir == o.invalidTargetDir) && (strings.HasSuffix(o.globPattern, "*") || strings.HasSuffix(o.globPattern, checksumFileSuffix)) { + return nil, fmt.Errorf("source and target directories cannot be the same when glob pattern is a wildcard or glob pattern ends with %s", checksumFileSuffix) + } + + // make sure source, temp and target directories are absolute paths + o.sourceDir, err = filepath.Abs(o.sourceDir) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path of source directory: %w", err) + } + if info, err := os.Stat(o.sourceDir); errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("source directory does not exist: %w", err) + } else if !info.IsDir() { + return nil, fmt.Errorf("source directory is not a directory: %w", err) + } + + o.validTargetDir, err = filepath.Abs(o.validTargetDir) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path of valid target directory: %w", err) + } + if info, err := os.Stat(o.validTargetDir); errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("valid target directory does not exist: %w", err) + } else if !info.IsDir() { + return nil, fmt.Errorf("valid target directory is not a directory: %w", err) + } + o.invalidTargetDir, err = filepath.Abs(o.invalidTargetDir) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path of invalid target directory: %w", err) + } + if info, err := os.Stat(o.invalidTargetDir); errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("invalid target directory does not exist: %w", err) + } else if !info.IsDir() { + return nil, fmt.Errorf("invalid target directory is not a directory: %w", err) + } + o.tmpDir, err = filepath.Abs(o.tmpDir) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path of tmp directory: %w", err) + } + if info, err := os.Stat(o.tmpDir); errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("tmp directory does not exist: %w", err) + } else if !info.IsDir() { + return nil, fmt.Errorf("tmp directory is not a directory: %w", err) + } + + o.globPattern = filepath.Join(o.sourceDir, o.globPattern) + + // Test glob pattern is valid + _, err = filepath.Glob(o.globPattern) + if err != nil { + return nil, fmt.Errorf("invalid glob pattern: %w", err) + } + + if o.logger == nil { + return nil, fmt.Errorf("logger cannot be nil") + } + + return o, nil +} + +type Option func(opts *options) + +func WithSourceDir(dir string) Option { + return func(opts *options) { + opts.sourceDir = dir + } +} + +func WithValidTargetDir(dir string) Option { + return func(opts *options) { + opts.validTargetDir = dir + } +} + +func WithInvalidTargetDir(dir string) Option { + return func(opts *options) { + opts.invalidTargetDir = dir + } +} + +func WithTmpDir(dir string) Option { + return func(opts *options) { + opts.tmpDir = dir + } +} + +func WithConcurrency(concurrency int) Option { + return func(opts *options) { + opts.concurrency = concurrency + } +} + +func WithSleep(sleep time.Duration) Option { + return func(opts *options) { + opts.sleep = sleep + } +} + +func WithGlobPattern(globPattern string) Option { + return func(opts *options) { + opts.globPattern = globPattern + } +} + +func WithLogger(logger *slog.Logger) Option { + return func(opts *options) { + opts.logger = logger + } +} diff --git a/internal/fai/options_test.go b/internal/fai/options_test.go new file mode 100644 index 0000000..c20caea --- /dev/null +++ b/internal/fai/options_test.go @@ -0,0 +1,77 @@ +package fai + +import ( + "os" + "testing" +) + +func TestValidateOptions(t *testing.T) { + testDir := t.TempDir() + testFile, err := os.CreateTemp(testDir, "test") + if err != nil { + t.Fatalf("failed to create test file: %v", err) + } + + // Test glob pattern is a wildcard and source and valid target directory are the same + _, err = validateOptions( + WithSourceDir(testDir), + WithValidTargetDir(testDir), + WithGlobPattern("*"), + ) + if err == nil { + t.Error("expected error when source and valid target directory are the same and glob pattern is '*'") + } + + // Test glob pattern is a wildcard and source and invalid target directory are the same + _, err = validateOptions( + WithSourceDir(testDir), + WithInvalidTargetDir(testDir), + WithGlobPattern("*"), + ) + if err == nil { + t.Error("expected error when source and invalid target directory are the same and glob pattern is '*'") + } + + // Test glob pattern ends with checksum file suffix and source and target directory are the same + _, err = validateOptions( + WithSourceDir(testDir), + WithValidTargetDir(testDir), + WithGlobPattern("*."+checksumFileSuffix), + ) + if err == nil { + t.Errorf("expected error when source and target directory are the same and glob pattern ends with '%s'", checksumFileSuffix) + } + + // Test valid target directory is not a directory + _, err = validateOptions( + WithSourceDir(testDir), + WithValidTargetDir(testFile.Name()), + ) + if err == nil { + t.Error("expected error when valid target directory is not a directory") + } + + // Test concurrency cannot be zero + _, err = validateOptions( + WithConcurrency(0), + ) + if err == nil { + t.Error("expected error when concurrency is zero") + } + + // Test glob pattern is invalid + _, err = validateOptions( + WithGlobPattern("["), + ) + if err == nil { + t.Error("expected error when glob pattern is invalid") + } + + // Test logger cannot be nil + _, err = validateOptions( + WithLogger(nil), + ) + if err == nil { + t.Error("expected error when logger is nil") + } +} diff --git a/internal/log/noop.go b/internal/log/noop.go new file mode 100644 index 0000000..87d56a8 --- /dev/null +++ b/internal/log/noop.go @@ -0,0 +1,34 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "context" + "log/slog" +) + +// noopHandler is a slog.Handler that does nothing. +type noopHandler struct{} + +func (n noopHandler) Enabled(context.Context, slog.Level) bool { return false } +func (n noopHandler) Handle(context.Context, slog.Record) error { return nil } +func (n noopHandler) WithAttrs([]slog.Attr) slog.Handler { return n } +func (n noopHandler) WithGroup(string) slog.Handler { return n } + +func Noop() *slog.Logger { + return slog.New(noopHandler{}) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 0000000..8612c08 --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,42 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var filesize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "file_size_bytes", + Help: "Size of files in bytes.", + // 1MB, 100MB, 500MB, 1GB + Buckets: []float64{1000000, 100000000, 500000000, 1000000000}, +}) + +var validationError = promauto.NewCounter(prometheus.CounterOpts{ + Name: "validation_error", + Help: "Number of files with validation errors.", +}) + +func ValidationError() { + validationError.Inc() +} + +func Size(size int64) { + filesize.Observe(float64(size)) +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 0000000..ed1025b --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,78 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package queue + +import ( + "sync" +) + +// workQueue is a queue of jobs that are executed by a number of workers. +type workQueue struct { + queue chan string + wg sync.WaitGroup + m sync.Mutex + hm map[string]struct{} +} + +// NewWorkQueue creates a new work queue and adds the given number of workers. +func NewWorkQueue(execute func(string), concurrency int) *workQueue { + iw := &workQueue{ + queue: make(chan string, concurrency), + hm: make(map[string]struct{}, concurrency), + } + + for i := 0; i < concurrency; i++ { + iw.wg.Add(1) + go func() { + defer iw.wg.Done() + for job := range iw.queue { + execute(job) + iw.m.Lock() + delete(iw.hm, job) + iw.m.Unlock() + } + }() + } + + return iw +} + +// CloseAndWait closes the queue and waits for all workers to complete. +func (iw *workQueue) CloseAndWait() { + // close queue + close(iw.queue) + // and wait for queue to be drained + iw.wg.Wait() +} + +// Add adds a job to the queue. +// If the job is already in the queue, it will be ignored. +// If the queue is full, it will block until there is room. +// If the queue is closed, it will panic. +func (iw *workQueue) Add(job string) { + iw.m.Lock() + // check if job is already in queue + if _, ok := iw.hm[job]; ok { + iw.m.Unlock() + return + } + // add job to queue + iw.hm[job] = struct{}{} + iw.m.Unlock() + + iw.queue <- job +} diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go new file mode 100644 index 0000000..40825bf --- /dev/null +++ b/internal/queue/queue_test.go @@ -0,0 +1,100 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package queue + +import ( + "math/rand" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestWorkQueue(t *testing.T) { + concurrency := 10000 + jobs := 1000000 + executed := new(atomic.Int32) + + var m sync.Mutex + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + getTimeout := func() time.Duration { + m.Lock() + defer m.Unlock() + return time.Duration(r.Intn(10)) * time.Millisecond + } + + perJobFn := func(path string) { + time.Sleep(getTimeout()) + executed.Add(1) + } + + queue := NewWorkQueue(perJobFn, concurrency) + for i := 0; i < jobs; i++ { + queue.Add(strconv.Itoa(i)) + } + + queue.CloseAndWait() + + if len(queue.hm) != 0 { + t.Errorf("expected queue to be empty, but got %d jobs", len(queue.hm)) + } + if executed.Load() != int32(jobs) { + t.Errorf("expected %d jobs to have been executed, but got %d", jobs, executed.Load()) + } +} + +func TestAddToClosedWorkQueue(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("expected panic") + } + }() + queue := NewWorkQueue(func(string) {}, 1) + queue.CloseAndWait() + queue.Add("this should panic") +} + +func TestAddSameJobToWorkQueue(t *testing.T) { + executed := new(atomic.Int32) + + perJobFn := func(path string) { + time.Sleep(10 * time.Millisecond) + executed.Add(1) + } + + queue := NewWorkQueue(perJobFn, 2) + + // add same job 100 times + // since each job takes 10 ms to execute, only one job should be expected to have been + // executed because 100 jobs should have time to be added to the queue before the first + // job is finished + for i := 0; i < 100; i++ { + queue.Add("job") + } + + queue.CloseAndWait() + + // only one job should have been executed + want := int32(1) + got := executed.Load() + + if got != want { + t.Errorf("expected %d jobs to have been executed, but got %d", want, got) + } +} diff --git a/internal/warc/testdata/invalid.warc.gz b/internal/warc/testdata/invalid.warc.gz new file mode 100644 index 0000000..ac47116 --- /dev/null +++ b/internal/warc/testdata/invalid.warc.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2fffc1398c7712c34afda2f6a4dea5072a48357954176d775499ac31cd6cdbc4 +size 3649 diff --git a/internal/warc/testdata/valid.warc.gz b/internal/warc/testdata/valid.warc.gz new file mode 100644 index 0000000..f3a4350 --- /dev/null +++ b/internal/warc/testdata/valid.warc.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9d3f830f757a3733d56de5b76b5a3ccabb5de4cfdfed2559205b9c719ee6cbdb +size 1779 diff --git a/internal/warc/validate.go b/internal/warc/validate.go new file mode 100644 index 0000000..514e498 --- /dev/null +++ b/internal/warc/validate.go @@ -0,0 +1,55 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package warc + +import ( + "errors" + "io" + + "github.com/nlnwa/gowarc" +) + +func IsValid(file, tmpDir string) (bool, error) { + wf, err := gowarc.NewWarcFileReader(file, 0, + gowarc.WithBufferTmpDir(tmpDir), + ) + if err != nil { + return false, err + } + defer wf.Close() + + for { + wr, _, validation, err := wf.Next() + if errors.Is(err, io.EOF) { + return validation.Valid(), nil + } + if err != nil { + *validation = append(*validation, err) + } + func() { + defer wr.Close() + err = wr.ValidateDigest(validation) + if err != nil { + *validation = append(*validation, err) + } + }() + // stop processing if we have found an invalid record + if !validation.Valid() { + return false, nil + } + } +} diff --git a/internal/warc/validate_test.go b/internal/warc/validate_test.go new file mode 100644 index 0000000..e6c556b --- /dev/null +++ b/internal/warc/validate_test.go @@ -0,0 +1,58 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package warc + +import ( + "testing" +) + +const ( + validWarcFile = "testdata/valid.warc.gz" + invalidWarcFile = "testdata/invalid.warc.gz" +) + +func TestMain(m *testing.M) { + m.Run() +} + +func TestIsValid(t *testing.T) { + tests := []struct { + file string + isValid bool + }{ + { + file: validWarcFile, + isValid: true, + }, + { + file: invalidWarcFile, + isValid: false, + }, + } + + for _, test := range tests { + t.Run(test.file, func(t *testing.T) { + isValid, err := IsValid(test.file, "") + if err != nil { + t.Errorf("expected no error, got: %v", err) + } + if isValid != test.isValid { + t.Errorf("expected %v, got: %v", test.isValid, isValid) + } + }) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..72931a1 --- /dev/null +++ b/main.go @@ -0,0 +1,86 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "net/http" + "os" + "os/signal" + "runtime" + "syscall" + "time" + + "github.com/nlnwa/fai/internal/fai" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func main() { + sourceDir := "" + validTargetDir := "" + invalidTargetDir := "" + tmpDir := "" + + concurrency := runtime.NumCPU() + sleep := 5 * time.Second + pattern := "*" + metricsPort := 8081 + + flag.StringVar(&sourceDir, "source-dir", sourceDir, "path to source directory") + flag.StringVar(&validTargetDir, "valid-target-dir", validTargetDir, "path to target directory where valid files and their corresponding checksum files will be moved to") + flag.StringVar(&invalidTargetDir, "invalid-target-dir", invalidTargetDir, "path to target directory where invalid files and their corresponding checksum files will be moved to") + flag.StringVar(&tmpDir, "tmp-dir", tmpDir, "path to directory where temporary buffer files will be stored") + flag.IntVar(&concurrency, "concurrency", concurrency, "number of concurrent files processed") + flag.DurationVar(&sleep, "sleep", sleep, "sleep duration between directory listings, set to 0 to only do a single run") + flag.StringVar(&pattern, "pattern", pattern, "glob pattern used to match filenames in source directory") + flag.IntVar(&metricsPort, "metrics-port", metricsPort, "port to expose metrics on") + flag.Parse() + + logger := slog.Default() + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + go func() { + defer cancel() + http.Handle("/metrics", promhttp.Handler()) + err := http.ListenAndServe(fmt.Sprintf(":%d", metricsPort), nil) + if err != nil { + logger.Error("Failed to start metrics server", "error", err) + } + }() + + f, err := fai.New( + fai.WithSourceDir(sourceDir), + fai.WithValidTargetDir(validTargetDir), + fai.WithInvalidTargetDir(invalidTargetDir), + fai.WithTmpDir(tmpDir), + fai.WithConcurrency(concurrency), + fai.WithSleep(sleep), + fai.WithGlobPattern(pattern), + fai.WithLogger(logger), + ) + if err != nil { + logger.Error("", "error", err) + os.Exit(1) + } + + f.Run(ctx) +}