Skip to content

Commit

Permalink
fix tests and code
Browse files Browse the repository at this point in the history
  • Loading branch information
canercidam committed Oct 27, 2023
1 parent 6ef1217 commit 0d9ca92
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ coverage: test
go tool cover -func=coverage.out | grep total | awk '{print substr($$3, 1, length($$3)-1)}'

.PHONY: e2e
e2e:
e2e: build
docker pull nats:2.4
docker tag nats:2.4 localhost:1970/test
cd e2e && E2E_TEST=1 go test -v .
18 changes: 10 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ var (
Cache configuration.Storage
CacheOnly bool
RedirectTo *url.URL
NoClone bool
)

// DiscoConfig contains the extra configuration settings that blend with
// discoConfig contains the extra configuration settings that blend with
// the distribution library config.
var DiscoConfig struct {
var discoConfig struct {
Storage struct {
IPFS struct {
Router RouterConfig `yaml:"router"`
Expand Down Expand Up @@ -83,15 +84,16 @@ func Init() error {

file, _ = os.Open(Vars.RegistryConfigurationPath)
defer file.Close()
err = yaml.NewDecoder(file).Decode(&DiscoConfig)
err = yaml.NewDecoder(file).Decode(&discoConfig)
if err != nil {
return err
}
Router = DiscoConfig.Storage.IPFS.Router
Cache = DiscoConfig.Storage.IPFS.Cache
CacheOnly = DiscoConfig.Storage.IPFS.CacheOnly
if len(DiscoConfig.Storage.IPFS.Redirect) > 0 {
RedirectTo, err = url.Parse(DiscoConfig.Storage.IPFS.Redirect)
Router = discoConfig.Storage.IPFS.Router
Cache = discoConfig.Storage.IPFS.Cache
CacheOnly = discoConfig.Storage.IPFS.CacheOnly
NoClone = discoConfig.Disco.NoClone
if len(discoConfig.Storage.IPFS.Redirect) > 0 {
RedirectTo, err = url.Parse(discoConfig.Storage.IPFS.Redirect)
if err != nil {
return err
}
Expand Down
4 changes: 0 additions & 4 deletions deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ func Get() interfaces.IPFSClient {
}

func initialize() interfaces.IPFSClient {
if len(config.Router.Nodes) == 0 {
panic("no routed nodes")
}

log.Info("running with ipfs router client")
return ipfsclient.NewRouterClient(&config.Router)
}
2 changes: 1 addition & 1 deletion drivers/ipfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (df *driverFactory) Create(parameters map[string]interface{}) (storagedrive
}
if config.CacheOnly {
defaultDriver = cacheDriver
return nil, nil
return defaultDriver, nil
}
defaultDriver, err = multidriver.New(config.RedirectTo, ipfsDriver, cacheDriver), nil
return defaultDriver, err
Expand Down
28 changes: 28 additions & 0 deletions e2e/disco-e2e-config-cache-only.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: 0.1
log:
level: info
fields:
service: disco
environment: development
storage:
ipfs:
cache:
filesystem:
rootdirectory: ./testdir/cache
cacheonly: true
delete:
enabled: false
maintenance:
uploadpurging:
enabled: false
disco:
noclone: true
http:
addr: :5000
debug:
addr: :5050
prometheus:
enabled: true
path: /metrics
headers:
X-Content-Type-Options: [nosniff]
57 changes: 50 additions & 7 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"testing"
"time"

"github.com/forta-network/disco/cmd"
"github.com/forta-network/disco/utils"
ipfsapi "github.com/ipfs/go-ipfs-api"
"github.com/stretchr/testify/require"
Expand All @@ -21,15 +20,18 @@ var (
processStartWaitSeconds = 60
pushImageRef = "localhost:1970/test"

expectedImageSha = "35ff92bfc7e822eab96fe3d712164f6b547c3acffc8691b80528d334283849ab"
expectedImageCid = "bafybeihfub2ktzp6a77zrihiwf6c2hex3nwxd7zl7u6tj3ueu5kstqk4ii"
expectedImageSha = "35ff92bfc7e822eab96fe3d712164f6b547c3acffc8691b80528d334283849ab"
expectedImageCid = "bafybeihfub2ktzp6a77zrihiwf6c2hex3nwxd7zl7u6tj3ueu5kstqk4ii"
expectedImageCidCacheOnly = "bafybeibv76jl7r7ielvls37d24jbmt3lkr6dvt74q2i3qbji2m2cqocjvm"

unexpectedImageCid = "bafybeielvnt5apaxbk6chthc4dc3p6vscpx3ai4uvti7gwh253j7facsxu"
unexpectedPullImageRef = fmt.Sprintf("localhost:1970/%s", unexpectedImageCid)

reposPath = "/docker/registry/v2/repositories/"

expectedSha256Repo = path.Join(reposPath, expectedImageSha)
expectedSha256Repo = path.Join(reposPath, expectedImageSha)
expectedCidRepo = path.Join(reposPath, expectedImageCid)
expectedCidRepoCacheOnly = path.Join(reposPath, expectedImageCidCacheOnly)

expectedManifestBlob = "/docker/registry/v2/blobs/sha256/35/35ff92bfc7e822eab96fe3d712164f6b547c3acffc8691b80528d334283849ab/data"
expectedConfigBlob = "/docker/registry/v2/blobs/sha256/16/165538b9f99adf71764e6e01627236bc7de03587ef8c39b621c159491466465e/data"
Expand All @@ -52,9 +54,6 @@ func TestE2E(t *testing.T) {
if os.Getenv("E2E_TEST") != "1" {
return
}

os.Setenv("REGISTRY_CONFIGURATION_PATH", "./disco-e2e-config.yml")
go cmd.Main(context.Background())
suite.Run(t, &E2ETestSuite{})
}

Expand All @@ -66,6 +65,14 @@ func (s *E2ETestSuite) SetupTest() {
s.startCleanIpfs()
}

func (s *E2ETestSuite) startDisco(configPath string) {
os.Setenv("REGISTRY_CONFIGURATION_PATH", configPath)
discoCmd := exec.Command("./../build/disco")
discoCmd.Stdout = os.Stdout
discoCmd.Stderr = os.Stdout
s.r.NoError(discoCmd.Start())
}

func (s *E2ETestSuite) startCleanIpfs() {
_ = exec.Command("pkill", "ipfs").Run()
s.r.NoError(os.RemoveAll("testdir/.ipfs1"))
Expand Down Expand Up @@ -113,9 +120,12 @@ func (s *E2ETestSuite) ensureAvailability(name string, check func() error) {

func (s *E2ETestSuite) TearDownTest() {
_ = exec.Command("pkill", "ipfs").Run()
_ = exec.Command("pkill", "disco").Run()
}

func (s *E2ETestSuite) TestPushVerify() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())

s.verifyFiles()
Expand Down Expand Up @@ -159,6 +169,8 @@ func (s *E2ETestSuite) verifyFiles() {
}

func (s *E2ETestSuite) TestPurgeIPFS_Pull() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())

// delete from ipfs (primary store)
Expand All @@ -173,6 +185,8 @@ func (s *E2ETestSuite) TestPurgeIPFS_Pull() {
}

func (s *E2ETestSuite) TestPurgeIPFS_PushAgainPull() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())

// delete from ipfs (primary store)
Expand All @@ -187,6 +201,8 @@ func (s *E2ETestSuite) TestPurgeIPFS_PushAgainPull() {
}

func (s *E2ETestSuite) TestPurgeCache_Pull() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())

// delete from filestore (secondary store)
Expand All @@ -197,6 +213,8 @@ func (s *E2ETestSuite) TestPurgeCache_Pull() {
}

func (s *E2ETestSuite) TestPurgeCache_PushAgainPull() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())

// delete from filestore (secondary store)
Expand All @@ -211,6 +229,8 @@ func (s *E2ETestSuite) TestPurgeCache_PushAgainPull() {
}

func (s *E2ETestSuite) TestPurgeCache_MissingCidRepo() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())

// delete the cid repo from filestore (secondary store)
Expand All @@ -223,6 +243,8 @@ func (s *E2ETestSuite) TestPurgeCache_MissingCidRepo() {
}

func (s *E2ETestSuite) TestPullUnknown_NoClone() {
s.startDisco("./disco-e2e-config.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())

var out bytes.Buffer
Expand All @@ -232,3 +254,24 @@ func (s *E2ETestSuite) TestPullUnknown_NoClone() {
s.r.Error(pullCmd.Run())
s.r.Contains(out.String(), "not found", out.String())
}

func (s *E2ETestSuite) TestCacheOnly() {
s.startDisco("disco-e2e-config-cache-only.yml")

s.r.NoError(exec.Command("docker", "push", pushImageRef).Run())

// verify that file exists in the cache storage
for _, contentPath := range []string{
expectedSha256Repo,
expectedCidRepoCacheOnly,

expectedManifestBlob,
expectedConfigBlob,
expectedLayerBlob1,
expectedLayerBlob2,
} {
fsInfo, err := os.Stat(path.Join("testdir/cache", contentPath))
s.r.NoError(err, contentPath)
s.r.Greater(fsInfo.Size(), int64(0), contentPath)
}
}
2 changes: 1 addition & 1 deletion proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func New() (*http.Server, error) {

return &http.Server{
Addr: fmt.Sprintf(":%d", config.Vars.DiscoPort),
Handler: newHandler(rp, services.NewDiscoService(config.DiscoConfig.Disco.NoClone, config.DiscoConfig.Storage.IPFS.CacheOnly)),
Handler: newHandler(rp, services.NewDiscoService()),
ReadTimeout: requestTimeout,
WriteTimeout: requestTimeout,
IdleTimeout: time.Second * 30,
Expand Down
39 changes: 22 additions & 17 deletions proxy/services/disco.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/forta-network/disco/config"
"github.com/forta-network/disco/deps"
"github.com/forta-network/disco/drivers"
"github.com/forta-network/disco/drivers/ipfs"
Expand All @@ -19,8 +20,6 @@ import (
// Disco service allows us to do Disco things on top of the
// Distribution server.
type Disco struct {
noClone bool
cacheOnly bool
getIpfsClient getIpfsClientFunc
getDriver getDriverFunc
}
Expand All @@ -29,10 +28,8 @@ type getIpfsClientFunc func() interfaces.IPFSClient
type getDriverFunc func() storagedriver.StorageDriver

// NewDiscoService creates a new Disco service.
func NewDiscoService(noClone, cacheOnly bool) *Disco {
func NewDiscoService() *Disco {
return &Disco{
noClone: noClone,
cacheOnly: cacheOnly,
getIpfsClient: deps.Get,
getDriver: ipfs.Get,
}
Expand Down Expand Up @@ -88,26 +85,34 @@ func (disco *Disco) MakeGlobalRepo(ctx context.Context, repoName string) error {
}()
}

// Step #1
manifestDigest, err := disco.digestFromLink(ctx, makeManifestLinkPath(repoName))
if err != nil {
return fmt.Errorf("failed to read the digest from the link: %v", err)
}
manifestDigestRepoPath := makeRepoPath(manifestDigest)

// cache-only mode produces the cid v1 repo by converting
// the manifest digest into a cid v1 hash and keeps the compatibility
// of the references
if disco.cacheOnly {
if config.CacheOnly {
b, err := driver.GetContent(ctx, makeManifestLinkPath(repoName))
if err != nil {
return fmt.Errorf("failed to get manifest digest from cache-only driver: %v", err)
}
manifestDigest := string(b)[7:]
cacheCid, err := utils.ConvertSHA256HexToCIDv1(manifestDigest)
if err != nil {
return fmt.Errorf("failed to create cache-only cid: %v", err)
}
_, err = drivers.Copy(ctx, driver, manifestDigestRepoPath, makeRepoPath(cacheCid))
if _, err = drivers.Copy(ctx, driver, uploadRepoPath, makeRepoPath(manifestDigest)); err != nil {
return fmt.Errorf("failed to create cache-only manifest digest repo: %v", err)
}
if _, err = drivers.Copy(ctx, driver, uploadRepoPath, makeRepoPath(cacheCid)); err != nil {
return fmt.Errorf("failed to create cache-only cid repo: %v", err)
}
return err
}

// continue step #1
// Step #1
manifestDigest, err := disco.digestFromLink(ctx, makeManifestLinkPath(repoName))
if err != nil {
return fmt.Errorf("failed to read the digest from the link: %v", err)
}
manifestDigestRepoPath := makeRepoPath(manifestDigest)
stat, err := driver.Stat(ctx, manifestDigestRepoPath)
if err == nil && stat.Size() > 0 {
log.Info("already made globally accessible - skipping")
Expand Down Expand Up @@ -194,7 +199,7 @@ func (disco *Disco) IsOnlyPullable(repoName string) bool {
//
// The end result in the IPFS node's MFS should look like the one from MakeGlobalRepo and all CIDs should match.
func (disco *Disco) CloneGlobalRepo(ctx context.Context, repoName string) error {
if disco.cacheOnly {
if config.CacheOnly {
return nil
}

Expand Down Expand Up @@ -228,7 +233,7 @@ func (disco *Disco) CloneGlobalRepo(ctx context.Context, repoName string) error
return fmt.Errorf("failed to check disco file using the driver: %v", err)
}

if disco.noClone {
if config.NoClone {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion proxy/services/disco_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/forta-network/disco/config"
mock_multidriver "github.com/forta-network/disco/drivers/multidriver/mocks"
"github.com/forta-network/disco/interfaces"
mock_interfaces "github.com/forta-network/disco/interfaces/mocks"
Expand Down Expand Up @@ -282,7 +283,7 @@ func (s *Suite) TestCloneGlobalRepo_NoClone() {
// Given that a repo is to be cloned
// When "no clone" setting is true
// Then cloning should be a no-op
s.disco.noClone = true
config.NoClone = true
s.driver.EXPECT().Stat(gomock.Any(), makeDiscoFilePath(testCidv1)).Return(&fileInfo{
path: makeDiscoFilePath(testCidv1),
size: 1,
Expand Down

0 comments on commit 0d9ca92

Please sign in to comment.