-
Notifications
You must be signed in to change notification settings - Fork 8
/
event.go
134 lines (108 loc) · 4.39 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package workflow
import (
"strconv"
"time"
"github.com/google/uuid"
"google.golang.org/protobuf/proto"
"github.com/luno/workflow/internal/outboxpb"
)
type Event struct {
// ID is a unique ID for the event generated by the event streamer.
ID int64
// ForeignID refers to the ID of a record in the record store.
ForeignID string
// Type relates to the StatusType that the associated record changed to.
Type int
// Headers stores meta-data in a simple and easily queryable way.
Headers map[Header]string
// CreatedAt is the time that the event was produced and is generated by the event streamer.
CreatedAt time.Time
}
// ConnectorEvent defines a schema that is inline with how workflow uses an event notification pattern. This means that
// events only tell us what happened and do not transmit the state change. ConnectorEvent differs slightly from Event
// in that all fields, except for CreatedAt, are string based and allows representation relations to elements
// with string identifiers and string based types.
type ConnectorEvent struct {
// ID is a unique ID for the event.
ID string
// ForeignID refers to the ID of the element that the event relates to.
ForeignID string
// Type relates to the StatusType that the associated record changed to.
Type string
// Headers stores meta-data in a simple and easily queryable way.
Headers map[string]string
// CreatedAt is the time that the event was produced and is generated by the event streamer.
CreatedAt time.Time
}
type OutboxEvent struct {
// ID is a unique ID for this specific OutboxEvent.
ID string
// WorkflowName refers to the name of the workflow that the OutboxEventData belongs to.
WorkflowName string
// Data represents a slice of bytes the OutboxEventDataMaker constructs via serialising event data
// in an expected way for it to also be deserialized by the outbox consumer.
Data []byte
// CreatedAt is the time that this specific OutboxEvent was produced.
CreatedAt time.Time
}
type OutboxEventData struct {
ID string
// WorkflowName refers to the name of the workflow that the OutboxEventData belongs to.
WorkflowName string
// Data represents a slice of bytes the OutboxEventDataMaker constructs via serialising event data
// in an expected way for it to also be deserialized by the outbox consumer.
Data []byte
}
// MakeOutboxEventData creates a OutboxEventData that houses all the information that must be stored and be
// retrievable from the outbox.
func MakeOutboxEventData(record Record) (OutboxEventData, error) {
topic := Topic(record.WorkflowName, record.Status)
// Any record that is updated with a RunState of RunStateRequestedDataDeleted has it's events pushed into
// the "delete" topic so that the event can be processed async and not be spread across the workflow's status
// topics as it usually is
if record.RunState == RunStateRequestedDataDeleted {
topic = DeleteTopic(record.WorkflowName)
}
// Records being updated to any of the following RunStates means that the event should be directed to the
// RunStateChangeTopic. This is because normal consumers in workflow should not consume events that relate
// to any of the below RunStates. The separate topic allows for hooks to consume the RunState changes and
// respond to changes in RunState.
if record.RunState == RunStatePaused ||
record.RunState == RunStateCancelled ||
record.RunState == RunStateDataDeleted ||
record.RunState == RunStateCompleted {
topic = RunStateChangeTopic(record.WorkflowName)
}
headers := make(map[string]string)
headers[string(HeaderForeignID)] = record.ForeignID
headers[string(HeaderWorkflowName)] = record.WorkflowName
headers[string(HeaderTopic)] = topic
headers[string(HeaderRunID)] = record.RunID
headers[string(HeaderRunState)] = strconv.FormatInt(int64(record.RunState), 10)
r := outboxpb.OutboxRecord{
RunId: record.RunID,
Type: int32(record.Status),
Headers: headers,
}
data, err := proto.Marshal(&r)
if err != nil {
return OutboxEventData{}, err
}
uid, err := uuid.NewUUID()
if err != nil {
return OutboxEventData{}, err
}
return OutboxEventData{
ID: "outbox-id-" + uid.String(),
WorkflowName: record.WorkflowName,
Data: data,
}, nil
}
type Header string
const (
HeaderWorkflowName Header = "workflow_name"
HeaderForeignID Header = "foreign_id"
HeaderTopic Header = "topic"
HeaderRunID Header = "run_id"
HeaderRunState Header = "run_state"
)