Skip to content

Commit

Permalink
azure code support for AzureAD
Browse files Browse the repository at this point in the history
Signed-off-by: Vivek Reddy <[email protected]>
  • Loading branch information
Vivek Reddy committed Oct 1, 2024
1 parent c634658 commit 473e80e
Show file tree
Hide file tree
Showing 2 changed files with 453 additions and 114 deletions.
248 changes: 168 additions & 80 deletions pkg/splunk/client/azureblobclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,200 +22,288 @@ import (
"os"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// AzureBlobClient is a client to implement Azure Blob specific APIs using Azure SDK
type AzureBlobClient struct {
BucketName string
StorageAccountName string
SecretAccessKey string
Prefix string
StartAfter string
Endpoint string
ContainerClient ContainerClientInterface // Use the interface here
}

// Define an interface for BlobClient
type BlobClientInterface interface {
DownloadStream(ctx context.Context, options *azblob.DownloadStreamOptions) (azblob.DownloadStreamResponse, error)
}
var _ RemoteDataClient = &AzureBlobClient{}

// Define an interface that matches the methods you need from the container client
// ContainerClientInterface abstracts the methods used from the Azure SDK's ContainerClient.
type ContainerClientInterface interface {
NewListBlobsFlatPager(options *azblob.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse]
NewListBlobsFlatPager(options *container.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse]
NewBlobClient(blobName string) BlobClientInterface
}

// Wrap the actual blob.Client to implement BlobClientInterface
type BlobClientWrapper struct {
*blob.Client
// BlobClientInterface abstracts the methods used from the Azure SDK's BlobClient.
type BlobClientInterface interface {
DownloadStream(ctx context.Context, options *blob.DownloadStreamOptions) (blob.DownloadStreamResponse, error)
}

func (b *BlobClientWrapper) DownloadStream(ctx context.Context, options *azblob.DownloadStreamOptions) (azblob.DownloadStreamResponse, error) {
return b.Client.DownloadStream(ctx, options)
func (c *ContainerClientWrapper) NewListBlobsFlatPager(options *azblob.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse] {
return c.Client.NewListBlobsFlatPager(options)
}

// Wrap the actual container.Client to implement ContainerClientInterface
// ContainerClientWrapper wraps the Azure SDK's ContainerClient and implements ContainerClientInterface.
type ContainerClientWrapper struct {
*container.Client
}

func (c *ContainerClientWrapper) NewListBlobsFlatPager(options *azblob.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse] {
return c.Client.NewListBlobsFlatPager(options)
// NewBlobClient wraps the Azure SDK's NewBlobClient method to return BlobClientInterface.
func (w *ContainerClientWrapper) NewBlobClient(blobName string) BlobClientInterface {
return &BlobClientWrapper{w.Client.NewBlobClient(blobName)}
}

// BlobClientWrapper wraps the Azure SDK's BlobClient and implements BlobClientInterface.
type BlobClientWrapper struct {
*blob.Client
}

// DownloadStream wraps the Azure SDK's DownloadStream method.
func (w *BlobClientWrapper) DownloadStream(ctx context.Context, options *blob.DownloadStreamOptions) (blob.DownloadStreamResponse, error) {
return w.Client.DownloadStream(ctx, options)
}

func (c *ContainerClientWrapper) NewBlobClient(blobName string) BlobClientInterface {
return &BlobClientWrapper{c.Client.NewBlobClient(blobName)}
// CredentialType defines the type of credential used for authentication.
type CredentialType int

const (
// CredentialTypeSharedKey indicates Shared Key authentication.
CredentialTypeSharedKey CredentialType = iota
// CredentialTypeAzureAD indicates Azure AD authentication.
CredentialTypeAzureAD
)

// AzureBlobClient implements the RemoteDataClient interface for Azure Blob Storage.
type AzureBlobClient struct {
BucketName string
StorageAccountName string
Prefix string
StartAfter string
Endpoint string
ContainerClient ContainerClientInterface
CredentialType CredentialType
}

// NewAzureBlobClient initializes and returns an AzureBlob client using Azure SDK
func NewAzureBlobClient(ctx context.Context, bucketName string, storageAccountName string, secretAccessKey string, prefix string, startAfter string, region string, endpoint string, fn GetInitFunc) (RemoteDataClient, error) {
// NewAzureBlobClient initializes and returns an AzureBlobClient.
// It supports both Shared Key and Azure AD authentication based on provided credentials.
func NewAzureBlobClient(
ctx context.Context,
bucketName string, // Azure Blob container name
storageAccountName string, // Azure Storage account name
secretAccessKey string, // Shared Key (optional; leave empty to use Azure AD)
prefix string, // Prefix for blob listing (optional)
startAfter string, // Marker for blob listing (optional)
region string, // Azure region (e.g., "eastus")
endpoint string, // Custom endpoint (optional)
initFunc GetInitFunc, // Initialization function
) (RemoteDataClient, error) { // Matches GetRemoteDataClient signature
reqLogger := log.FromContext(ctx)
scopedLog := reqLogger.WithName("NewAzureBlobClient")

scopedLog.Info("Creating AzureBlobClient using Azure SDK")
scopedLog.Info("Initializing AzureBlobClient")

// Create the blob service client
// Execute the initialization function if provided.
if initFunc != nil {
initResult := initFunc(ctx, endpoint, storageAccountName, secretAccessKey)
// Currently, no action is taken with initResult. Modify if needed.
_ = initResult
}

// Construct the service URL.
var serviceURL string
if endpoint != "" {
serviceURL = endpoint
} else if region != "" {
// Adjust the service URL based on the region
serviceURL = fmt.Sprintf("https://%s.blob.%s.core.windows.net/", storageAccountName, region)
} else {
serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/", storageAccountName)
}

credential, err := azblob.NewSharedKeyCredential(storageAccountName, secretAccessKey)
if err != nil {
scopedLog.Error(err, "Failed to create SharedKeyCredential")
return nil, err
}
var containerClient ContainerClientInterface
var credentialType CredentialType

// Create the container client
containerClient, err := container.NewClientWithSharedKeyCredential(fmt.Sprintf("%s%s", serviceURL, bucketName), credential, nil)
if err != nil {
scopedLog.Error(err, "Failed to create ContainerClient")
return nil, err
if secretAccessKey != "" {
// Use Shared Key authentication.
scopedLog.Info("Using Shared Key authentication")

// Create a Shared Key Credential.
sharedKeyCredential, err := azblob.NewSharedKeyCredential(storageAccountName, secretAccessKey)
if err != nil {
scopedLog.Error(err, "Failed to create SharedKeyCredential")
return nil, fmt.Errorf("failed to create SharedKeyCredential: %w", err)
}

// Initialize the container client with Shared Key Credential.
rawContainerClient, err := container.NewClientWithSharedKeyCredential(
fmt.Sprintf("%s%s", serviceURL, bucketName),
sharedKeyCredential,
nil,
)
if err != nil {
scopedLog.Error(err, "Failed to create ContainerClient with SharedKeyCredential")
return nil, fmt.Errorf("failed to create ContainerClient with SharedKeyCredential: %w", err)
}

// Wrap the container client.
containerClient = &ContainerClientWrapper{rawContainerClient}

credentialType = CredentialTypeSharedKey
} else {
// Use Azure AD authentication.
scopedLog.Info("Using Azure AD authentication")

// Create a Token Credential using DefaultAzureCredential.
tokenCredential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
scopedLog.Error(err, "Failed to create DefaultAzureCredential")
return nil, fmt.Errorf("failed to create DefaultAzureCredential: %w", err)
}

// Initialize the container client with Token Credential.
rawContainerClient, err := container.NewClient(
fmt.Sprintf("%s%s", serviceURL, bucketName),
tokenCredential,
nil,
)
if err != nil {
scopedLog.Error(err, "Failed to create ContainerClient with TokenCredential")
return nil, fmt.Errorf("failed to create ContainerClient with TokenCredential: %w", err)
}

// Wrap the container client.
containerClient = &ContainerClientWrapper{rawContainerClient}

credentialType = CredentialTypeAzureAD
}
containerClientWrapper := &ContainerClientWrapper{containerClient}

scopedLog.Info("AzureBlobClient initialized successfully",
"CredentialType", credentialType,
"BucketName", bucketName,
"StorageAccountName", storageAccountName,
)

return &AzureBlobClient{
BucketName: bucketName,
StorageAccountName: storageAccountName,
SecretAccessKey: secretAccessKey,
Prefix: prefix,
StartAfter: startAfter,
Endpoint: endpoint,
ContainerClient: containerClientWrapper, // Assign the real client here
ContainerClient: containerClient,
CredentialType: credentialType,
}, nil
}

// GetAppsList gets the list of apps (blobs) from the Azure Blob container
// GetAppsList retrieves a list of blobs (apps) from the Azure Blob container.
func (client *AzureBlobClient) GetAppsList(ctx context.Context) (RemoteDataListResponse, error) {
reqLogger := log.FromContext(ctx)
scopedLog := reqLogger.WithName("AzureBlob:GetAppsList").WithValues("Bucket", client.BucketName)

scopedLog.Info("Fetching list of apps")

// Set prefix and other options if needed
options := container.ListBlobsFlatOptions{
// Define options for listing blobs.
options := &container.ListBlobsFlatOptions{
Prefix: &client.Prefix,
}

// Set the Marker if StartAfter is provided
// Set the Marker if StartAfter is provided.
if client.StartAfter != "" {
options.Marker = &client.StartAfter
}

// Use ListBlobsFlatPager to paginate through the blobs in the container
pager := client.ContainerClient.NewListBlobsFlatPager(&options)
// Create a pager to iterate through blobs.
pager := client.ContainerClient.NewListBlobsFlatPager(options)

var blobs []*RemoteObject
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
scopedLog.Error(err, "Error listing blobs")
return RemoteDataListResponse{}, err
return RemoteDataListResponse{}, fmt.Errorf("error listing blobs: %w", err)
}

for _, blob := range resp.Segment.BlobItems {
etag := string(*blob.Properties.ETag)
name := *blob.Name
lastModified := *blob.Properties.LastModified
size := *blob.Properties.ContentLength
lastModified := blob.Properties.LastModified
size := blob.Properties.ContentLength

remoteObject := &RemoteObject{
Etag: &etag,
Key: &name,
LastModified: &lastModified,
Size: &size,
LastModified: lastModified,
Size: size,
}
blobs = append(blobs, remoteObject)
}
}

scopedLog.Info("Successfully fetched list of apps", "TotalBlobs", len(blobs))

return RemoteDataListResponse{Objects: blobs}, nil
}

// DownloadApp downloads a specific blob (app package) from Azure Blob storage
// DownloadApp downloads a specific blob from Azure Blob Storage to a local file.
func (client *AzureBlobClient) DownloadApp(ctx context.Context, downloadRequest RemoteDataDownloadRequest) (bool, error) {
reqLogger := log.FromContext(ctx)
scopedLog := reqLogger.WithName("AzureBlob:DownloadApp").WithValues("Bucket", client.BucketName, "RemoteFile", downloadRequest.RemoteFile)
scopedLog := reqLogger.WithName("AzureBlob:DownloadApp").WithValues(
"Bucket", client.BucketName,
"RemoteFile", downloadRequest.RemoteFile,
"LocalFile", downloadRequest.LocalFile,
)

scopedLog.Info("Downloading app package")
scopedLog.Info("Initiating blob download")

// Create a blob client for the specific blob.
blobClient := client.ContainerClient.NewBlobClient(downloadRequest.RemoteFile)

// Download the blob content
// Download the blob content.
get, err := blobClient.DownloadStream(ctx, nil)
if err != nil {
scopedLog.Error(err, "Failed to download blob")
return false, err
return false, fmt.Errorf("failed to download blob: %w", err)
}

defer get.Body.Close()

// Create local file
// Create or truncate the local file.
localFile, err := os.Create(downloadRequest.LocalFile)
if err != nil {
scopedLog.Error(err, "Failed to create local file")
return false, err
return false, fmt.Errorf("failed to create local file: %w", err)
}
defer localFile.Close()

// Write the content to the local file
// Write the content to the local file.
_, err = io.Copy(localFile, get.Body)
if err != nil {
scopedLog.Error(err, "Failed to write blob content to local file")
return false, err
return false, fmt.Errorf("failed to write blob content to local file: %w", err)
}

scopedLog.Info("Download successful")
scopedLog.Info("Blob downloaded successfully")

return true, nil
}

// RegisterAzureBlobClient will add the corresponding function pointer to the map
// NoOpInitFunc performs no additional initialization.
// It satisfies the GetInitFunc type and can be used when no extra setup is needed.
func NoOpInitFunc(
ctx context.Context,
appAzureBlobEndPoint string,
storageAccountName string,
secretAccessKey string, // Optional: can be empty
) interface{} {
// No additional initialization required.
return nil
}

// RegisterAzureBlobClient registers the AzureBlobClient in the RemoteDataClientsMap.
func RegisterAzureBlobClient() {
wrapperObject := GetRemoteDataClientWrapper{
GetRemoteDataClient: NewAzureBlobClient,
GetInitFunc: InitAzureBlobClientWrapper,
GetInitFunc: NoOpInitFunc, // Use CustomInitFunc if additional initialization is needed
}
RemoteDataClientsMap["azure"] = wrapperObject
}

// InitAzureBlobClientWrapper is a wrapper around InitAzureBlobClientSession
func InitAzureBlobClientWrapper(ctx context.Context, appAzureBlobEndPoint string, storageAccountName string, secretAccessKey string) interface{} {
return InitAzureBlobClientSession(ctx)
}

// InitAzureBlobClientSession initializes and returns a client session object
func InitAzureBlobClientSession(ctx context.Context) *container.Client {
// This can be used to initialize the Azure client session if needed
return nil // Placeholder
}
Loading

0 comments on commit 473e80e

Please sign in to comment.