-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathConfig.kt
176 lines (162 loc) · 7.22 KB
/
Config.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package org.radarbase.gateway
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
import org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
import org.radarbase.gateway.inject.PushIntegrationEnhancerFactory
import org.radarbase.jersey.enhancer.EnhancerFactory
import org.radarbase.push.integration.garmin.user.GarminUserRepository
import java.net.URI
import java.time.Instant
data class Config(
/** Radar-jersey resource configuration class. */
val resourceConfig: Class<out EnhancerFactory> = PushIntegrationEnhancerFactory::class.java,
/** Kafka configurations. */
val kafka: KafkaConfig = KafkaConfig(),
/** Server configurations. */
val server: GatewayServerConfig = GatewayServerConfig(),
/** Push integration configs **/
val pushIntegration: PushIntegrationConfig = PushIntegrationConfig()
) {
/** Fill in some default values for the configuration. */
fun withDefaults(): Config = copy(kafka = kafka.withDefaults())
/**
* Validate the configuration.
* @throws IllegalStateException if the configuration is incorrect
*/
fun validate() {
kafka.validate()
pushIntegration.validate()
}
}
data class PushIntegrationConfig(
val garmin: GarminConfig = GarminConfig()
) {
fun validate() {
garmin.validate()
// Add more validations as services are added
}
}
data class GarminConfig(
val enabled: Boolean = false,
val consumerKey: String = "",
val consumerSecret: String = "",
val backfill: BackfillConfig = BackfillConfig(),
val userRepositoryClass: String =
"org.radarbase.push.integration.garmin.user.GarminServiceUserRepository",
val userRepositoryUrl: String = "http://localhost:8080/",
val userRepositoryClientId: String = "radar_pushendpoint",
val userRepositoryClientSecret: String = "",
val userRepositoryTokenUrl: String = "http://localhost:8080/token/",
val dailiesTopicName: String = "push_garmin_daily_summary",
val activitiesTopicName: String = "push_garmin_activity_summary",
val activityDetailsTopicName: String = "push_garmin_activity_detail",
val epochSummariesTopicName: String = "push_garmin_epoch_summary",
val sleepsTopicName: String = "push_garmin_sleep_summary",
val bodyCompositionsTopicName: String = "push_garmin_body_composition",
val stressTopicName: String = "push_garmin_stress_detail_summary",
val userMetricsTopicName: String = "push_garmin_user_metrics",
val moveIQTopicName: String = "push_garmin_move_iq_summary",
val pulseOXTopicName: String = "push_garmin_pulse_ox",
val respirationTopicName: String = "push_garmin_respiration",
val activityDetailsSampleTopicName: String = "push_garmin_activity_detail_sample",
val bodyBatterySampleTopicName: String = "push_garmin_body_battery_sample",
val heartRateSampleTopicName: String = "push_garmin_heart_rate_sample",
val sleepLevelTopicName: String = "push_garmin_sleep_level",
val stressLevelTopicName: String = "push_garmin_stress_level",
val sleepScoreTopicName: String = "push_garmin_sleep_score",
val bloodPressureTopicName: String = "push_garmin_blood_pressure",
val healthSnapshotTopicName: String = "push_garmin_health_snapshot_summary",
val heartRateVariabilityTopicName: String = "push_garmin_heart_rate_variability",
val heartRateVariabilitySampleTopicName: String = "push_garmin_heart_rate_variability_sample"
) {
val userRepository: Class<*> = Class.forName(userRepositoryClass)
fun validate() {
if (enabled) {
check(GarminUserRepository::class.java.isAssignableFrom(userRepository)) {
"$userRepositoryClass is not valid. Please specify a class that is a subclass of" +
" `org.radarbase.push.integration.garmin.user.GarminUserRepository`"
}
}
}
}
data class BackfillConfig(
val enabled: Boolean = false,
val redis: RedisConfig = RedisConfig(),
val maxThreads: Int = 4,
val defaultEndDate: Instant = Instant.MAX,
val userBackfill: List<UserBackfillConfig> = emptyList(),
val requestsPerUserPerIteration: Int = 40,
val iterationIntervalMinutes: Long = 5,
)
data class RedisConfig(
val uri: URI = URI("redis://localhost:6379"),
val lockPrefix: String = "radar-push-garmin/lock"
)
data class UserBackfillConfig(
val userId: String,
val startDate: Instant,
val endDate: Instant
)
data class GatewayServerConfig(
/** Base URL to serve data with. This will determine the base path and the port. */
val baseUri: URI = URI.create("http://0.0.0.0:8090/push/integrations/"),
/** Maximum number of simultaneous requests. */
val maxRequests: Int = 200,
/**
* Maximum request content length, also when decompressed.
* This protects against memory overflows.
*/
val maxRequestSize: Long = 24 * 1024 * 1024,
/**
* Whether JMX should be enabled. Disable if not needed, for higher performance.
*/
val isJmxEnabled: Boolean = true
)
data class KafkaConfig(
/** Number of Kafka brokers to keep in a pool for reuse in multiple requests. */
val poolSize: Int = 20,
/** Kafka producer settings. Read from https://kafka.apache.org/documentation/#producerconfigs. */
val producer: Map<String, Any> = mapOf(),
/** Kafka Admin Client settings. Read from https://kafka.apache.org/documentation/#adminclientconfigs. */
val admin: Map<String, Any> = mapOf(),
/** Kafka serialization settings, used in KafkaAvroSerializer. Read from [io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig]. */
val serialization: Map<String, Any> = mapOf()
) {
fun withDefaults(): KafkaConfig = copy(
producer = producerDefaults + producer,
admin = mutableMapOf<String, Any>().apply {
producer[BOOTSTRAP_SERVERS_CONFIG]?.let {
this[BOOTSTRAP_SERVERS_CONFIG] = it
}
this += adminDefaults
this += admin
},
serialization = serializationDefaults + serialization
)
fun validate() {
check(producer[BOOTSTRAP_SERVERS_CONFIG] is String) { "$BOOTSTRAP_SERVERS_CONFIG missing in kafka: producer: {} configuration" }
check(admin[BOOTSTRAP_SERVERS_CONFIG] is String) { "$BOOTSTRAP_SERVERS_CONFIG missing in kafka: admin: {} configuration" }
val schemaRegistryUrl = serialization[SCHEMA_REGISTRY_URL_CONFIG]
check(schemaRegistryUrl is String || schemaRegistryUrl is List<*>) {
"$SCHEMA_REGISTRY_URL_CONFIG missing in kafka: serialization: {} configuration"
}
}
companion object {
private val producerDefaults = mapOf(
"request.timeout.ms" to 3000,
"max.block.ms" to 6000,
"linger.ms" to 10,
"retries" to 5,
"acks" to "all",
"delivery.timeout.ms" to 6000
)
private val adminDefaults = mapOf(
"default.api.timeout.ms" to 6000,
"request.timeout.ms" to 3000,
"retries" to 5
)
private val serializationDefaults = mapOf<String, Any>(
MAX_SCHEMAS_PER_SUBJECT_CONFIG to 10_000
)
}
}