diff --git a/go.mod b/go.mod index 9e0ef5b4..a82c5450 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,8 @@ require ( gopkg.in/yaml.v2 v2.4.0 ) +replace github.com/apache/pulsar-client-go v0.13.0-candidate-1.0.20240813105849-ab042ae714d1 => ../pulsar-client-go + require ( dario.cat/mergo v1.0.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect diff --git a/pkg/ctl/schemas/get_all.go b/pkg/ctl/schemas/get_all.go new file mode 100644 index 00000000..e92b1aea --- /dev/null +++ b/pkg/ctl/schemas/get_all.go @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schemas + +import ( + "io" + + "github.com/streamnative/pulsarctl/pkg/cmdutils" +) + +func getAllSchemas(vc *cmdutils.VerbCmd) { + desc := cmdutils.LongDescription{} + desc.CommandUsedFor = "Get all schemas for a topic." + desc.CommandPermission = "This command requires namespace admin permissions." + + var examples []cmdutils.Example + del := cmdutils.Example{ + Desc: "Get all schemas for a topic", + Command: "pulsarctl schemas get-all (topic name)", + } + + examples = append(examples, del) + desc.CommandExamples = examples + + var out []cmdutils.Output + successOut := cmdutils.Output{ + Desc: "normal output", + Out: "[\n" + + " {\n" + + " \"name\": \"test-schema\",\n" + + " \"schema\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Test\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"id\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " \"int\"\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " \"string\"\n" + + " ]\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"type\": \"AVRO\",\n" + + " \"properties\": {}\n" + + " }\n" + + "]", + } + + failOut := cmdutils.Output{ + Desc: "HTTP 404 Not Found, please check if the topic name you entered is correct", + Out: "[✖] code: 404 reason: Not Found", + } + + notTopicName := cmdutils.Output{ + Desc: "you must specify a topic name, please check if the topic name is provided", + Out: "[✖] the topic name is not specified or the topic name is specified more than once", + } + + out = append(out, successOut, failOut, notTopicName) + desc.CommandOutput = out + + vc.SetDescription( + "get-all", + "Get the schema for a topic", + desc.ToString(), + desc.ExampleToString(), + "get-all", + ) + + vc.SetRunFuncWithNameArg(func() error { + return doGetAllSchemas(vc) + }, "the topic name is not specified or the topic name is specified more than once") + + vc.EnableOutputFlagSet() +} + +func doGetAllSchemas(vc *cmdutils.VerbCmd) error { + topic := vc.NameArg + + admin := cmdutils.NewPulsarClient() + infos, err := admin.Schemas().GetAllSchemas(topic) + if err == nil { + oc := cmdutils.NewOutputContent(). + WithObject(infos). + WithTextFunc(func(w io.Writer) error { + PrintSchemas(w, infos) + return nil + }) + err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc) + } + return err +} diff --git a/pkg/ctl/schemas/schema_test.go b/pkg/ctl/schemas/schema_test.go index 921ddb1d..1435c617 100644 --- a/pkg/ctl/schemas/schema_test.go +++ b/pkg/ctl/schemas/schema_test.go @@ -58,6 +58,14 @@ func TestSchema(t *testing.T) { assert.True(t, strings.Contains(getOut.String(), "AVRO")) assert.True(t, strings.Contains(getOut.String(), "test-schema")) + getAllArgs := []string{"get-all", "test-schema"} + getAllOut, _, err := TestSchemasCommands(getAllSchemas, getAllArgs) + + fmt.Print(getAllOut.String()) + assert.Nil(t, err) + assert.True(t, strings.Contains(getAllOut.String(), "AVRO")) + assert.True(t, strings.Contains(getAllOut.String(), "test-schema")) + delArgs := []string{"delete", "test-schema"} delOut, _, err := TestSchemasCommands(deleteSchema, delArgs) assert.Nil(t, err) diff --git a/pkg/ctl/schemas/schemas.go b/pkg/ctl/schemas/schemas.go index df93bba9..52b6c9c6 100644 --- a/pkg/ctl/schemas/schemas.go +++ b/pkg/ctl/schemas/schemas.go @@ -38,12 +38,22 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { ) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getSchema) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getAllSchemas) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, deleteSchema) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, uploadSchema) return resourceCmd } +func PrintSchemas(w io.Writer, schemas []*utils.SchemaInfoWithVersion) { + _, _ = fmt.Fprintln(w, "[") + for _, schema := range schemas { + PrintSchema(w, schema) + _, _ = fmt.Fprint(w, "\n") + } + _, _ = fmt.Fprintln(w, "]") +} + func PrintSchema(w io.Writer, schema *utils.SchemaInfoWithVersion) { name, err := json.MarshalIndent(schema.SchemaInfo.Name, "", " ") if err != nil {