ISM rewrite part 9

This commit is contained in:
Tracker-Friendly 2025-01-08 17:46:34 +00:00
parent f3cfc9e170
commit 7e63d4d5c4
1 changed files with 46 additions and 32 deletions

78
main.go
View File

@ -50,9 +50,22 @@ func NewServiceInitializationInformation(domain *string, outbox chan<- InterServ
Router: router,
Configuration: configuration,
ResourceDir: resourceDir,
internal: internal{
buffer: make(map[uuid.UUID]InterServiceMessage),
waitingList: make(map[uuid.UUID]struct{}),
arrived: make(chan uuid.UUID),
},
}
}
type internal struct {
buffer map[uuid.UUID]InterServiceMessage
waitingList map[uuid.UUID]struct{}
mutex sync.Mutex
arrived chan uuid.UUID
ispStarted bool
}
type ServiceInitializationInformation struct {
Service *Service `validate:"required"`
Domain *string
@ -61,6 +74,7 @@ type ServiceInitializationInformation struct {
Router *chi.Mux
Configuration map[string]interface{}
ResourceDir fs.FS
internal internal
}
// YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox returns a channel that can be used to read messages from
@ -92,25 +106,21 @@ const (
Unauthorized
)
var buffer = make(map[uuid.UUID]InterServiceMessage)
var waitingList = make(map[uuid.UUID]struct{})
var mutex = sync.Mutex{}
var arrived = make(chan uuid.UUID)
var ispStarted = false
func (s *ServiceInitializationInformation) StartISProcessor() {
if ispStarted {
if s.internal.ispStarted {
println("IS Processor already started")
return
} else {
ispStarted = true
s.internal.ispStarted = true
}
listener := NewListener(s.inbox)
for {
msg := listener.AcceptMessage()
mutex.Lock()
buffer[msg.MessageID] = msg
mutex.Unlock()
arrived <- msg.MessageID
println("Received message on ID: " + msg.ForServiceID.String() + "for id: " + msg.MessageID.String())
s.internal.mutex.Lock()
s.internal.buffer[msg.MessageID] = msg
s.internal.mutex.Unlock()
s.internal.arrived <- msg.MessageID
}
}
@ -118,18 +128,18 @@ var (
ErrTimeout = errors.New("timeout")
)
func AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) {
func (s *ServiceInitializationInformation) AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) {
for {
select {
case <-time.After(timeout):
return InterServiceMessage{}, ErrTimeout
case msgID := <-arrived:
case msgID := <-s.internal.arrived:
if msgID == id {
mutex.Lock()
msg := buffer[id]
delete(buffer, id)
delete(waitingList, id)
mutex.Unlock()
s.internal.mutex.Lock()
msg := s.internal.buffer[id]
delete(s.internal.buffer, id)
delete(s.internal.waitingList, id)
s.internal.mutex.Unlock()
if msg.MessageType != Success {
return msg, msg.Message.(error)
}
@ -150,7 +160,10 @@ func NewListener(c <-chan InterServiceMessage) Listener {
}
func (l DefaultListener) AcceptMessage() InterServiceMessage {
return <-l
println("Listener has pointer address: ", l)
msg := <-l
println("Received message on ID: " + msg.ForServiceID.String())
return msg
}
func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, messageType MessageCode, message any) uuid.UUID {
@ -163,9 +176,9 @@ func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, m
SentAt: time.Now(),
Message: message,
}
mutex.Lock()
waitingList[id] = struct{}{}
mutex.Unlock()
s.internal.mutex.Lock()
s.internal.waitingList[id] = struct{}{}
s.internal.mutex.Unlock()
s.Outbox <- msg
return id
}
@ -176,18 +189,19 @@ func (s *InterServiceMessage) Respond(messageType MessageCode, message any, info
n.MessageType = messageType
n.Message = message
n.SentAt = time.Now()
println(information.Outbox)
information.Outbox <- n
}
func (s *ServiceInitializationInformation) SendAndAwaitISMessage(forService uuid.UUID, messageType MessageCode, message any, timeout time.Duration) (InterServiceMessage, error) {
id := s.SendISMessage(forService, messageType, message)
return AwaitISMessage(id, timeout)
return s.AwaitISMessage(id, timeout)
}
var databaseService = uuid.MustParse("00000000-0000-0000-0000-000000000001")
func (s *ServiceInitializationInformation) GetDatabase() (Database, error) {
if !ispStarted {
if !s.internal.ispStarted {
go s.StartISProcessor()
}
@ -201,17 +215,17 @@ func (s *ServiceInitializationInformation) GetDatabase() (Database, error) {
func (s *ServiceInitializationInformation) AcceptMessage() InterServiceMessage {
for {
<-arrived
mutex.Lock()
for id, msg := range buffer {
_, ok := waitingList[id]
<-s.internal.arrived
s.internal.mutex.Lock()
for id, msg := range s.internal.buffer {
_, ok := s.internal.waitingList[id]
if !ok {
delete(buffer, id)
mutex.Unlock()
delete(s.internal.buffer, id)
s.internal.mutex.Unlock()
return msg
}
}
mutex.Unlock()
s.internal.mutex.Unlock()
}
}