Skip to content

Commit

Permalink
CT-1565 - share & link & refresh test
Browse files Browse the repository at this point in the history
  • Loading branch information
vojtabiberle committed Nov 5, 2024
1 parent 82245f7 commit 726a6f5
Showing 1 changed file with 218 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,42 @@
namespace Keboola\Test\Backend\ExternalBuckets;

use Doctrine\DBAL\Connection;
use Keboola\StorageApi\Client;
use Keboola\StorageApi\ClientException;
use Keboola\StorageApi\Workspaces;
use Keboola\TableBackendUtils\Connection\Snowflake\SnowflakeConnectionFactory;
use Keboola\Test\Backend\Workspaces\Backend\WorkspaceBackendFactory;
use Keboola\Test\StorageApiTestCase;
use Keboola\Test\Utils\ConnectionUtils;
use Keboola\Test\Utils\EventsQueryBuilder;
use Keboola\Test\Utils\EventTesterUtils;
use Throwable;

class SnowflakeRegisterExternalBucketInSecureDataShareTest extends StorageApiTestCase
{
use ConnectionUtils;
use EventTesterUtils;

protected Client $shareClient;

protected Client $linkingClient;

public function setUp(): void
{
parent::setUp();
$this->allowTestForBackendsOnly([self::BACKEND_SNOWFLAKE], 'Backend has to support external buckets');

$this->shareClient = $this->getClientForToken(
STORAGE_API_SHARE_TOKEN,
);
$this->linkingClient = $this->getClientForToken(
STORAGE_API_LINKING_TOKEN,
);

$tokenData = $this->shareClient->verifyToken();
if ($tokenData['organization']['id'] !== $this->linkingClient->verifyToken()['organization']['id']) {
throw new \Exception('STORAGE_API_LINKING_TOKEN is not in the same organization as STORAGE_API_TOKEN');
}
}

public function testRegisterExternalBucket(): void
Expand Down Expand Up @@ -174,6 +193,205 @@ function ($tableRow) {
$this->ensureSharedDatabaseStillExists();
}

public function testRefreshBucketInLinkedProject(): void
{
$stage = self::STAGE_IN;
$description = $this->generateDescriptionForTestObject();
$bucketName = $this->getTestBucketName($description);
$newTableName = 'NEW_TABLE';
$bucketId = $stage . '.' . $bucketName;

$this->forceUnshareBucketIfExists($this->shareClient, $stage . '.' . $bucketName, true);
$this->dropBucketIfExists($this->_client, $stage.'.'.$bucketName, true);
$this->dropTableInProducerDatabase($newTableName);

$workspaces = new Workspaces($this->_client);
$workspace0 = $workspaces->createWorkspace(['backend' => 'snowflake']);
$projectRole = $workspace0['connection']['database'];

$this->grantImportedPrivilegesToProjectRole($projectRole);

$this->dropBucketIfExists($this->_client, $bucketId);

$this->_client->registerBucket(
$bucketName,
explode('.', $this->getInboundSharedDatabaseName()),
self::STAGE_IN,
$description,
'snowflake',
null,
true,
);

$tables = $this->_client->listTables($stage.'.'.$bucketName);
// tables created manually during setup
$this->assertCount(2, $tables);

$shareToken = $this->linkingClient->verifyToken();
$targetProjectId = $shareToken['owner']['id'];

$this->shareClient->shareBucketToProjects($bucketId, [$targetProjectId]);

$sharedBucket = $this->_client->getBucket($bucketId);
$this->assertEquals('specific-projects', $sharedBucket['sharing']);
$this->assertEquals($targetProjectId, $sharedBucket['sharingParameters']['projects'][0]['id']);

$linkingWorkspaces = new Workspaces($this->linkingClient);
$linkingWorkspace = $linkingWorkspaces->createWorkspace([], true);
$linkingBackend = WorkspaceBackendFactory::createWorkspaceBackend($linkingWorkspace);

/** @var \Keboola\Db\Import\Snowflake\Connection $linkingSnowflakeDb */
$linkingSnowflakeDb = $linkingBackend->getDb();

// check before link is not work via RO
try {
$linkingSnowflakeDb->fetchAll(sprintf(
'SELECT * FROM %s.%s',
$sharedBucket['path'],
'NAMES_TABLE', // table created manually during setup
));
$this->fail('Select should fail.');
} catch (Throwable $e) {
$dbName = explode('.', $sharedBucket['path'])[0];
$this->assertStringContainsString("Database '{$dbName}' does not exist or not authorized., SQL state 02000 in SQLPrepare", $e->getMessage());
}

// LINKING START

$token = $this->_client->verifyToken();
$linkedBucketId = $this->linkingClient->linkBucket('LINKED_BUCKET', self::STAGE_IN, $token['owner']['id'], $sharedBucket['id'], 'LINKED_BUCKET');
$linkedBucket = $this->linkingClient->getBucket($linkedBucketId);
$this->assertEquals($sharedBucket['id'], $linkedBucket['sourceBucket']['id']);

$linkingTables = $this->linkingClient->listTables($linkedBucketId);
$this->assertCount(2, $linkingTables);
$linkingTable = $linkingTables[0];

$dataPreview = $this->linkingClient->getTableDataPreview($linkingTable['id']);
$this->assertEquals(
<<<EXPECTED
"ID","NAME"
"1","Jiří"
"2","Roman"
"3","Tomáš"
"4","Vojta"
"5","Martin"
EXPECTED,
$dataPreview,
);

// test RO works
/** @var \Keboola\Db\Import\Snowflake\Connection $linkingSnowflakeDb */
$linkingSnowflakeDb = $linkingBackend->getDb();

$result = $linkingSnowflakeDb->fetchAll(sprintf(
'SELECT * FROM %s.%s',
$linkedBucket['path'],
'NAMES_TABLE', // table created manually during setup
));
$this->assertEquals(
[
[
'ID' => 1,
'NAME' => 'Jiří',
],
[
'ID' => 2,
'NAME' => 'Roman',
],
[
'ID' => 3,
'NAME' => 'Tomáš',
],
[
'ID' => 4,
'NAME' => 'Vojta',
],
[
'ID' => 5,
'NAME' => 'Martin',
],
],
$result,
);

// REFRESH START

$this->createTableInProducerDatabase($newTableName, ['ID INT', 'NAME VARCHAR'], [[1, "'Jan'"], [2, "'Josef'"]]);
$this->_client->refreshBucket($bucketId);

$refreshedBucket = $this->_client->getBucket($bucketId);
$tableNames = array_map(
function ($tableRow) {
return $tableRow['name'];
},
$refreshedBucket['tables'],
);
$this->assertTrue(in_array($newTableName, $tableNames), 'New table not found in refreshed bucket');

$linkingTables = $this->linkingClient->listTables($linkedBucketId);
$this->assertCount(3, $linkingTables);

$linkedBucket = $this->linkingClient->getBucket($linkedBucketId);
$linkedTableNames = array_map(
function ($tableRow) {
return $tableRow['name'];
},
$linkedBucket['tables'],
);
$this->assertTrue(in_array($newTableName, $linkedTableNames), 'New table not found in linked bucket after refresh');

$filteredTables = array_filter(
$linkedBucket['tables'],
function ($tableRow) use ($newTableName) {
return $tableRow['name'] === $newTableName;
},
);

$linkedTable = reset($filteredTables);

$dataPreview = $this->linkingClient->getTableDataPreview($linkedTable['id']);
$this->assertEquals(
<<<EXPECTED
"ID","NAME"
"1","Jan"
"2","Josef"
EXPECTED,
$dataPreview,
);

// test RO works
$result = $linkingSnowflakeDb->fetchAll(sprintf(
'SELECT * FROM %s.%s',
$linkedBucket['path'],
$newTableName,
));
$this->assertEquals(
[
[
'ID' => 1,
'NAME' => 'Jan',
],
[
'ID' => 2,
'NAME' => 'Josef',
],
],
$result,
);

// CLEANUP

$this->forceUnshareBucketIfExists($this->shareClient, $stage . '.' . $bucketName, true);
$this->_client->dropBucket($bucketId);
$bucketExist = $this->_client->bucketExists($bucketId);
$this->assertFalse($bucketExist, 'Bucket '.$bucketId.' still exist.');
$this->dropTableInProducerDatabase($newTableName);
$this->ensureSharedDatabaseStillExists();
}

private function getInboundSharedDatabaseName(): string
{
$inboundDatabaseName = getenv('SNOWFLAKE_INBOUND_DATABASE_NAME');
Expand Down

0 comments on commit 726a6f5

Please sign in to comment.