Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch improvement #42

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions src/main/kotlin/com/featurevisor/sdk/DatafileReader.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
package com.featurevisor.sdk

import com.featurevisor.types.Attribute
import com.featurevisor.types.AttributeKey
import com.featurevisor.types.DatafileContent
import com.featurevisor.types.Feature
import com.featurevisor.types.FeatureKey
import com.featurevisor.types.Segment
import com.featurevisor.types.SegmentKey

class DatafileReader constructor(
import com.featurevisor.types.*

class DatafileReader(
datafileJson: DatafileContent,
) {

Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/com/featurevisor/sdk/FeaturevisorError.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ sealed class FeaturevisorError(message: String) : Throwable(message = message) {
class InvalidUrl(val url: String?) : FeaturevisorError("Invalid URL")

object MissingDatafileUrlWhileRefreshing : FeaturevisorError("Missing datafile url need to refresh")

/// Fetching was cancelled
object FetchingDataFileCancelled : FeaturevisorError("Fetching data file cancelled")
}
171 changes: 125 additions & 46 deletions src/main/kotlin/com/featurevisor/sdk/Instance+Fetch.kt
Original file line number Diff line number Diff line change
@@ -1,31 +1,99 @@
package com.featurevisor.sdk

import com.featurevisor.types.DatafileContent
import com.featurevisor.types.DatafileFetchResult
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.serialization.decodeFromString
import java.io.IOException
import okhttp3.*
import kotlinx.serialization.json.Json
import okhttp3.*
import okhttp3.HttpUrl.Companion.toHttpUrl
import java.lang.IllegalArgumentException
import java.io.IOException
import java.net.ConnectException
import java.net.UnknownHostException

// MARK: - Fetch datafile content
@Throws(IOException::class)
internal fun FeaturevisorInstance.fetchDatafileContent(
internal fun fetchDatafileContentJob(
url: String,
logger: Logger?,
coroutineScope: CoroutineScope,
retryCount: Int = 3, // Retry count
retryInterval: Long = 300L, // Retry interval in milliseconds
handleDatafileFetch: DatafileFetchHandler? = null,
completion: (Result<DatafileFetchResult>) -> Unit,
): Job {
val job = Job()
coroutineScope.launch(job) {
fetchDatafileContent(
url = url,
handleDatafileFetch = handleDatafileFetch,
completion = completion,
retryCount = retryCount,
retryInterval = retryInterval,
job = job,
logger = logger,
)
}
return job
}

internal suspend fun fetchDatafileContent(
url: String,
logger: Logger? = null,
retryCount: Int = 1,
retryInterval: Long = 0L,
job: Job? = null,
handleDatafileFetch: DatafileFetchHandler? = null,
completion: (Result<DatafileContent>) -> Unit,
completion: (Result<DatafileFetchResult>) -> Unit,
) {
handleDatafileFetch?.let { handleFetch ->
val result = handleFetch(url)
completion(result)
for (attempt in 0 until retryCount) {
if (job != null && (job.isCancelled || job.isActive.not())) {
completion(Result.failure(FeaturevisorError.FetchingDataFileCancelled))
break
}

val result = handleFetch(url)
result.fold(
onSuccess = {
completion(Result.success(DatafileFetchResult(it, "")))
return
},
onFailure = { exception ->
if (attempt < retryCount - 1) {
logger?.error(exception.localizedMessage)
delay(retryInterval)
} else {
completion(Result.failure(exception))
}
}
)
}
} ?: run {
fetchDatafileContentFromUrl(url, completion)
fetchDatafileContentFromUrl(
url = url,
completion = completion,
retryCount = retryCount,
retryInterval = retryInterval,
job = job,
logger = logger,
)
}
}

private fun fetchDatafileContentFromUrl(
const val BODY_BYTE_COUNT = 1000000L
private val client = OkHttpClient()

private suspend fun fetchDatafileContentFromUrl(
url: String,
completion: (Result<DatafileContent>) -> Unit,
logger: Logger?,
retryCount: Int,
retryInterval: Long,
job: Job?,
completion: (Result<DatafileFetchResult>) -> Unit,
) {
try {
val httpUrl = url.toHttpUrl()
Expand All @@ -34,55 +102,66 @@ private fun fetchDatafileContentFromUrl(
.addHeader("Content-Type", "application/json")
.build()

fetch(request, completion)
fetchWithRetry(
request = request,
completion = completion,
retryCount = retryCount,
retryInterval = retryInterval,
job = job,
logger = logger,
)
} catch (throwable: IllegalArgumentException) {
completion(Result.failure(FeaturevisorError.InvalidUrl(url)))
} catch (e: Exception) {
logger?.error("Exception occurred during datafile fetch: ${e.message}")
completion(Result.failure(e))
}
}

const val BODY_BYTE_COUNT = 1000000L
private inline fun fetch(
private suspend fun fetchWithRetry(
request: Request,
crossinline completion: (Result<DatafileContent>) -> Unit,
logger: Logger?,
completion: (Result<DatafileFetchResult>) -> Unit,
retryCount: Int,
retryInterval: Long,
job: Job?
) {
val client = OkHttpClient()
val call = client.newCall(request)
call.enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
for (attempt in 0 until retryCount) {
if (job != null && (job.isCancelled || job.isActive.not())) {
completion(Result.failure(FeaturevisorError.FetchingDataFileCancelled))
return
}

val call = client.newCall(request)
try {
val response = call.execute()
val responseBody = response.peekBody(BODY_BYTE_COUNT)
val responseBodyString = responseBody.string()
if (response.isSuccessful) {
val json = Json {
ignoreUnknownKeys = true
}
val responseBodyString = responseBody.string()
val json = Json { ignoreUnknownKeys = true }
FeaturevisorInstance.companionLogger?.debug(responseBodyString)
try {
val content = json.decodeFromString<DatafileContent>(responseBodyString)
completion(Result.success(content))
} catch(throwable: Throwable) {
completion(
Result.failure(
FeaturevisorError.UnparsableJson(
responseBody.string(),
response.message
)
)
)
val content = json.decodeFromString<DatafileContent>(responseBodyString)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be tested

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not merging this PR till everything works fine. Just raised for testing purpose.

completion(Result.success(DatafileFetchResult(content, responseBodyString)))
return
} else {
if (attempt < retryCount - 1) {
logger?.error("Request failed with message: ${response.message}")
delay(retryInterval)
} else {
completion(Result.failure(FeaturevisorError.UnparsableJson(responseBodyString, response.message)))
}
}
} catch (e: IOException) {
val isInternetException = e is ConnectException || e is UnknownHostException
if (attempt >= retryCount - 1 || isInternetException) {
completion(Result.failure(e))
} else {
completion(
Result.failure(
FeaturevisorError.UnparsableJson(
responseBody.string(),
response.message
)
)
)
logger?.error("IOException occurred during request: ${e.message}")
delay(retryInterval)
}
}

override fun onFailure(call: Call, e: IOException) {
} catch (e: Exception) {
logger?.error("Exception occurred during request: ${e.message}")
completion(Result.failure(e))
}
})
}
}
14 changes: 7 additions & 7 deletions src/main/kotlin/com/featurevisor/sdk/Instance+Refresh.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fun FeaturevisorInstance.startRefreshing() = when {
refreshJob != null -> logger?.warn("refreshing has already started")
refreshInterval == null -> logger?.warn("no `refreshInterval` option provided")
else -> {
refreshJob = CoroutineScope(Dispatchers.Unconfined).launch {
refreshJob = coroutineScope.launch {
while (isActive) {
refresh()
delay(refreshInterval)
Expand All @@ -32,20 +32,20 @@ fun FeaturevisorInstance.stopRefreshing() {
logger?.warn("refreshing has stopped")
}

private fun FeaturevisorInstance.refresh() {
private suspend fun FeaturevisorInstance.refresh() {
logger?.debug("refreshing datafile")
when {
statuses.refreshInProgress -> logger?.warn("refresh in progress, skipping")
datafileUrl.isNullOrBlank() -> logger?.error("cannot refresh since `datafileUrl` is not provided")
else -> {
statuses.refreshInProgress = true
fetchDatafileContent(
datafileUrl,
handleDatafileFetch,
url = datafileUrl,
handleDatafileFetch = handleDatafileFetch,
) { result ->

if (result.isSuccess) {
val datafileContent = result.getOrThrow()
result.onSuccess { fetchResult ->
val datafileContent = fetchResult.datafileContent
val currentRevision = getRevision()
val newRevision = datafileContent.revision
val isNotSameRevision = currentRevision != newRevision
Expand All @@ -59,7 +59,7 @@ private fun FeaturevisorInstance.refresh() {
}

statuses.refreshInProgress = false
} else {
}.onFailure {
logger?.error(
"failed to refresh datafile",
mapOf("error" to result)
Expand Down
35 changes: 27 additions & 8 deletions src/main/kotlin/com/featurevisor/sdk/Instance.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package com.featurevisor.sdk
import com.featurevisor.sdk.FeaturevisorError.MissingDatafileOptions
import com.featurevisor.types.*
import com.featurevisor.types.EventName.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json
Expand All @@ -16,7 +18,7 @@ typealias InterceptContext = (Context) -> Context
typealias DatafileFetchHandler = (datafileUrl: String) -> Result<DatafileContent>

var emptyDatafile = DatafileContent(
schemaVersion = "1",
schemaVersion = "1",
revision = "unknown",
attributes = emptyList(),
segments = emptyList(),
Expand Down Expand Up @@ -56,6 +58,8 @@ class FeaturevisorInstance private constructor(options: InstanceOptions) {
internal var configureBucketKey = options.configureBucketKey
internal var configureBucketValue = options.configureBucketValue
internal var refreshJob: Job? = null
private var fetchJob: Job? = null
internal val coroutineScope = CoroutineScope(Dispatchers.Unconfined)

init {
with(options) {
Expand Down Expand Up @@ -99,17 +103,26 @@ class FeaturevisorInstance private constructor(options: InstanceOptions) {
}

datafileUrl != null -> {
datafileReader = DatafileReader(options.datafile?: emptyDatafile)
fetchDatafileContent(datafileUrl, handleDatafileFetch) { result ->
if (result.isSuccess) {
datafileReader = DatafileReader(result.getOrThrow())
datafileReader = DatafileReader(options.datafile ?: emptyDatafile)
fetchJob = fetchDatafileContentJob(
url = datafileUrl,
logger = logger,
handleDatafileFetch = handleDatafileFetch,
retryCount = retryCount.coerceAtLeast(1),
retryInterval = retryInterval.coerceAtLeast(0),
coroutineScope = coroutineScope,
) { result ->
result.onSuccess { fetchResult ->
val datafileContent = fetchResult.datafileContent
datafileReader = DatafileReader(datafileContent)
statuses.ready = true
emitter.emit(READY, result.getOrThrow())
emitter.emit(READY, datafileContent, fetchResult.responseBodyString)
if (refreshInterval != null) startRefreshing()
} else {
logger?.error("Failed to fetch datafile: $result")
}.onFailure { error ->
logger?.error("Failed to fetch datafile: $error")
emitter.emit(ERROR)
}
cancelFetchRetry()
}
}

Expand All @@ -118,6 +131,12 @@ class FeaturevisorInstance private constructor(options: InstanceOptions) {
}
}

// Provide a mechanism to cancel the fetch job if retry count is more than one
fun cancelFetchRetry() {
fetchJob?.cancel()
fetchJob = null
}

fun setLogLevels(levels: List<Logger.LogLevel>) {
this.logger?.setLevels(levels)
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/com/featurevisor/sdk/InstanceOptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ data class InstanceOptions(
val onError: Listener? = null,
val refreshInterval: Long? = null, // seconds
val stickyFeatures: StickyFeatures? = null,
val retryInterval: Long = 300L,
val retryCount: Int = 1,
) {
companion object {
private const val defaultBucketKeySeparator = "."
Expand Down
6 changes: 6 additions & 0 deletions src/main/kotlin/com/featurevisor/types/Types.kt
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ data class DatafileContent(
val features: List<Feature>,
)

@Serializable
data class DatafileFetchResult(
val datafileContent: DatafileContent,
val responseBodyString: String
)

@Serializable
data class OverrideFeature(
val enabled: Boolean,
Expand Down
Loading