Compare commits

..

No commits in common. "main" and "v3.4.0" have entirely different histories.
main ... v3.4.0

90
main.go
View file

@ -42,30 +42,17 @@ type InterServiceMessage struct {
// NewServiceInitializationInformation creates a new ServiceInitializationInformation and is only ever meant to be called // NewServiceInitializationInformation creates a new ServiceInitializationInformation and is only ever meant to be called
// by fulgens or a compliant implementation of fulgens. // by fulgens or a compliant implementation of fulgens.
func NewServiceInitializationInformation(domain *string, outbox chan<- InterServiceMessage, inbox <-chan InterServiceMessage, router *chi.Mux, configuration map[string]interface{}, resourceDir fs.FS) *ServiceInitializationInformation { func NewServiceInitializationInformation(domain *string, outbox chan<- InterServiceMessage, inbox <-chan InterServiceMessage, router *chi.Mux, configuration map[string]interface{}, resourceDir fs.FS) ServiceInitializationInformation {
return &ServiceInitializationInformation{ return ServiceInitializationInformation{
Domain: domain, Domain: domain,
Outbox: outbox, Outbox: outbox,
inbox: inbox, inbox: inbox,
Router: router, Router: router,
Configuration: configuration, Configuration: configuration,
ResourceDir: resourceDir, ResourceDir: resourceDir,
internal: internal{
buffer: make(map[uuid.UUID]InterServiceMessage),
waitingList: make(map[uuid.UUID]struct{}),
arrived: make(chan uuid.UUID),
},
} }
} }
type internal struct {
buffer map[uuid.UUID]InterServiceMessage
waitingList map[uuid.UUID]struct{}
mutex sync.Mutex
arrived chan uuid.UUID
ispStarted bool
}
type ServiceInitializationInformation struct { type ServiceInitializationInformation struct {
Service *Service `validate:"required"` Service *Service `validate:"required"`
Domain *string Domain *string
@ -74,7 +61,6 @@ type ServiceInitializationInformation struct {
Router *chi.Mux Router *chi.Mux
Configuration map[string]interface{} Configuration map[string]interface{}
ResourceDir fs.FS ResourceDir fs.FS
internal internal
} }
// YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox returns a channel that can be used to read messages from // YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox returns a channel that can be used to read messages from
@ -106,19 +92,25 @@ const (
Unauthorized Unauthorized
) )
var buffer = make(map[uuid.UUID]InterServiceMessage)
var waitingList = make(map[uuid.UUID]struct{})
var mutex = sync.Mutex{}
var arrived = make(chan uuid.UUID)
var ispStarted = false
func (s *ServiceInitializationInformation) StartISProcessor() { func (s *ServiceInitializationInformation) StartISProcessor() {
if s.internal.ispStarted { if ispStarted {
return return
} else { } else {
s.internal.ispStarted = true ispStarted = true
} }
listener := NewListener(s.inbox) listener := NewListener(s.inbox)
for { for {
msg := listener.AcceptMessage() msg := listener.AcceptMessage()
s.internal.mutex.Lock() mutex.Lock()
s.internal.buffer[msg.MessageID] = msg buffer[msg.MessageID] = msg
s.internal.mutex.Unlock() mutex.Unlock()
s.internal.arrived <- msg.MessageID arrived <- msg.MessageID
} }
} }
@ -126,18 +118,18 @@ var (
ErrTimeout = errors.New("timeout") ErrTimeout = errors.New("timeout")
) )
func (s *ServiceInitializationInformation) AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) { func AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) {
for { for {
select { select {
case <-time.After(timeout): case <-time.After(timeout):
return InterServiceMessage{}, ErrTimeout return InterServiceMessage{}, ErrTimeout
case msgID := <-s.internal.arrived: case msgID := <-arrived:
if msgID == id { if msgID == id {
s.internal.mutex.Lock() mutex.Lock()
msg := s.internal.buffer[id] msg := buffer[id]
delete(s.internal.buffer, id) delete(buffer, id)
delete(s.internal.waitingList, id) delete(waitingList, id)
s.internal.mutex.Unlock() mutex.Unlock()
if msg.MessageType != Success { if msg.MessageType != Success {
return msg, msg.Message.(error) return msg, msg.Message.(error)
} }
@ -158,8 +150,7 @@ func NewListener(c <-chan InterServiceMessage) Listener {
} }
func (l DefaultListener) AcceptMessage() InterServiceMessage { func (l DefaultListener) AcceptMessage() InterServiceMessage {
msg := <-l return <-l
return msg
} }
func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, messageType MessageCode, message any) uuid.UUID { func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, messageType MessageCode, message any) uuid.UUID {
@ -172,35 +163,24 @@ func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, m
SentAt: time.Now(), SentAt: time.Now(),
Message: message, Message: message,
} }
s.internal.mutex.Lock() mutex.Lock()
s.internal.waitingList[id] = struct{}{} waitingList[id] = struct{}{}
s.internal.mutex.Unlock() mutex.Unlock()
s.Outbox <- msg s.Outbox <- msg
return id return id
} }
func (s *InterServiceMessage) Respond(messageType MessageCode, message any, information *ServiceInitializationInformation) {
n := *s
n.ServiceID, n.ForServiceID = n.ForServiceID, n.ServiceID
n.MessageType = messageType
n.Message = message
n.SentAt = time.Now()
information.Outbox <- n
}
func (s *ServiceInitializationInformation) SendAndAwaitISMessage(forService uuid.UUID, messageType MessageCode, message any, timeout time.Duration) (InterServiceMessage, error) { func (s *ServiceInitializationInformation) SendAndAwaitISMessage(forService uuid.UUID, messageType MessageCode, message any, timeout time.Duration) (InterServiceMessage, error) {
id := s.SendISMessage(forService, messageType, message) id := s.SendISMessage(forService, messageType, message)
return s.AwaitISMessage(id, timeout) return AwaitISMessage(id, timeout)
} }
var databaseService = uuid.MustParse("00000000-0000-0000-0000-000000000001")
func (s *ServiceInitializationInformation) GetDatabase() (Database, error) { func (s *ServiceInitializationInformation) GetDatabase() (Database, error) {
if !s.internal.ispStarted { if !ispStarted {
go s.StartISProcessor() go s.StartISProcessor()
} }
response, err := s.SendAndAwaitISMessage(databaseService, 0, nil, 5*time.Second) response, err := s.SendAndAwaitISMessage(uuid.Nil, 0, nil, 5*time.Second)
if err != nil { if err != nil {
return Database{}, err return Database{}, err
} }
@ -210,17 +190,17 @@ func (s *ServiceInitializationInformation) GetDatabase() (Database, error) {
func (s *ServiceInitializationInformation) AcceptMessage() InterServiceMessage { func (s *ServiceInitializationInformation) AcceptMessage() InterServiceMessage {
for { for {
<-s.internal.arrived <-arrived
s.internal.mutex.Lock() mutex.Lock()
for id, msg := range s.internal.buffer { for id, msg := range buffer {
_, ok := s.internal.waitingList[id] _, ok := waitingList[id]
if !ok { if !ok {
delete(s.internal.buffer, id) delete(buffer, id)
s.internal.mutex.Unlock() mutex.Unlock()
return msg return msg
} }
} }
s.internal.mutex.Unlock() mutex.Unlock()
} }
} }