diff --git a/pulsar/resource_pulsar_function.go b/pulsar/resource_pulsar_function.go index cfd30db..abc1d81 100644 --- a/pulsar/resource_pulsar_function.go +++ b/pulsar/resource_pulsar_function.go @@ -214,7 +214,31 @@ func resourcePulsarFunction() *schema.Resource { resourceFunctionSubscriptionPositionKey: { Type: schema.TypeString, Optional: true, + Computed: true, Description: resourceFunctionDescriptions[resourceFunctionSubscriptionPositionKey], + ValidateFunc: func(val interface{}, key string) ([]string, []error) { + v := val.(string) + subscriptionPositionSupported := []string{ + SubscriptionPositionEarliest, + SubscriptionPositionLatest, + } + + found := false + for _, item := range subscriptionPositionSupported { + if v == item { + found = true + break + } + } + if !found { + return nil, []error{ + fmt.Errorf("%s is unsupported, shold be one of %s", v, + strings.Join(subscriptionPositionSupported, ",")), + } + } + + return nil, nil + }, }, resourceFunctionCleanupSubscriptionKey: { Type: schema.TypeBool, diff --git a/pulsar/resource_pulsar_sink.go b/pulsar/resource_pulsar_sink.go index 4a0bac5..0c286fb 100644 --- a/pulsar/resource_pulsar_sink.go +++ b/pulsar/resource_pulsar_sink.go @@ -173,8 +173,31 @@ func resourcePulsarSink() *schema.Resource { resourceSinkSubscriptionPositionKey: { Type: schema.TypeString, Optional: true, - Default: "Earliest", + Default: SubscriptionPositionEarliest, Description: resourceSinkDescriptions[resourceSinkSubscriptionPositionKey], + ValidateFunc: func(val interface{}, key string) ([]string, []error) { + v := val.(string) + subscriptionPositionSupported := []string{ + SubscriptionPositionEarliest, + SubscriptionPositionLatest, + } + + found := false + for _, item := range subscriptionPositionSupported { + if v == item { + found = true + break + } + } + if !found { + return nil, []error{ + fmt.Errorf("%s is unsupported, shold be one of %s", v, + strings.Join(subscriptionPositionSupported, ",")), + } + } + + return nil, nil + }, }, resourceSinkCustomSerdeInputsKey: { Type: schema.TypeMap, @@ -563,7 +586,7 @@ func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta in } if sinkConfig.SourceSubscriptionPosition != "" { - err = d.Set(resourceFunctionSubscriptionPositionKey, sinkConfig.SourceSubscriptionPosition) + err = d.Set(resourceSinkSubscriptionPositionKey, sinkConfig.SourceSubscriptionPosition) if err != nil { return diag.FromErr(err) } diff --git a/pulsar/util.go b/pulsar/util.go index baab14f..305ce30 100644 --- a/pulsar/util.go +++ b/pulsar/util.go @@ -12,6 +12,11 @@ const ( ProcessingGuaranteesEffectivelyOnce = "EFFECTIVELY_ONCE" ) +const ( + SubscriptionPositionEarliest = "Earliest" + SubscriptionPositionLatest = "Latest" +) + func isPackageURLSupported(functionPkgURL string) bool { return strings.HasPrefix(functionPkgURL, "http://") || strings.HasPrefix(functionPkgURL, "https://") ||