-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadmin_events.go
104 lines (85 loc) · 2.35 KB
/
admin_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
import (
"context"
"encoding/json"
"strings"
"time"
"github.com/machinebox/graphql"
amqp "github.com/rabbitmq/amqp091-go"
log "github.com/sirupsen/logrus"
)
type AdminEvent struct {
Class string `json:"@class"`
Timestamp int64 `json:"time"`
RealmID string `json:"realmId"`
AuthDetails json.RawMessage `json:"authDetails"`
ResourceType string `json:"resourceType"`
OperationType string `json:"operationType"`
ResourcePath string `json:"resourcePath"`
ResourceTypeAsString string `json:"resourceTypeAsString"`
}
type AuthDetails struct {
RealmID string `json:"realmId"`
ClientID string `json:"clientId"`
UserID string `json:"userId"`
IpAddress string `json:"ipAddress"`
}
func (eh *adminEventHandler) Handle(ctx context.Context, msg amqp.Delivery) interface{} {
ev := AdminEvent{}
err := json.Unmarshal(msg.Body, &ev)
if err != nil {
log.Errorf("Error on parsing json from queue: %s\n", err)
msg.Nack(false, true)
}
ts := time.Unix(int64(ev.Timestamp/1000), 0)
log.Infof("ADMIN EVENT: %s, TYPE: %s, REALMID: %s", ts.Format(time.RFC3339), ev.OperationType, ev.RealmID)
switch ev.OperationType {
case "DELETE":
ad := AuthDetails{}
err := json.Unmarshal(ev.AuthDetails, &ad)
if err != nil {
log.Errorf("Error on parsing json from queue: %s\n", err)
msg.Nack(false, true)
}
userId := strings.Split(ev.ResourcePath, "/")[1]
log.Infof("Removing user from hasura, userId: %s", userId)
set := struct {
Deleted bool `json:"deleted"`
}{
Deleted: true,
}
user := struct {
ID string `json:"id"`
}{
ID: userId,
}
// make a request
req := graphql.NewRequest(`
mutation($set: user_set_input!, $user_id: user_pk_columns_input!){
update_user_by_pk(
_set: $set,
pk_columns: $user_id
) {
id
}
}
`)
req.Var("set", set)
req.Var("user_id", user)
req.Header.Set("x-hasura-admin-secret", configuration.HasuraAdminSecret)
ctx := context.Background()
var response interface{}
err = hc.Run(ctx, req, &response)
if err != nil {
log.Errorf("Update user error: %s\n", err)
msg.Nack(false, true)
return nil
}
log.Infof("Removed user")
default:
log.Infof("Do nothing")
}
// process message
msg.Ack(false)
return nil
}