diff --git a/main.go b/main.go index 2939233..f7d3e8c 100644 --- a/main.go +++ b/main.go @@ -50,9 +50,22 @@ func NewServiceInitializationInformation(domain *string, outbox chan<- InterServ Router: router, Configuration: configuration, ResourceDir: resourceDir, + internal: internal{ + buffer: make(map[uuid.UUID]InterServiceMessage), + waitingList: make(map[uuid.UUID]struct{}), + arrived: make(chan uuid.UUID), + }, } } +type internal struct { + buffer map[uuid.UUID]InterServiceMessage + waitingList map[uuid.UUID]struct{} + mutex sync.Mutex + arrived chan uuid.UUID + ispStarted bool +} + type ServiceInitializationInformation struct { Service *Service `validate:"required"` Domain *string @@ -61,6 +74,7 @@ type ServiceInitializationInformation struct { Router *chi.Mux Configuration map[string]interface{} ResourceDir fs.FS + internal internal } // YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox returns a channel that can be used to read messages from @@ -92,25 +106,21 @@ const ( Unauthorized ) -var buffer = make(map[uuid.UUID]InterServiceMessage) -var waitingList = make(map[uuid.UUID]struct{}) -var mutex = sync.Mutex{} -var arrived = make(chan uuid.UUID) -var ispStarted = false - func (s *ServiceInitializationInformation) StartISProcessor() { - if ispStarted { + if s.internal.ispStarted { + println("IS Processor already started") return } else { - ispStarted = true + s.internal.ispStarted = true } listener := NewListener(s.inbox) for { msg := listener.AcceptMessage() - mutex.Lock() - buffer[msg.MessageID] = msg - mutex.Unlock() - arrived <- msg.MessageID + println("Received message on ID: " + msg.ForServiceID.String() + "for id: " + msg.MessageID.String()) + s.internal.mutex.Lock() + s.internal.buffer[msg.MessageID] = msg + s.internal.mutex.Unlock() + s.internal.arrived <- msg.MessageID } } @@ -118,18 +128,18 @@ var ( ErrTimeout = errors.New("timeout") ) -func AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) { +func (s *ServiceInitializationInformation) AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) { for { select { case <-time.After(timeout): return InterServiceMessage{}, ErrTimeout - case msgID := <-arrived: + case msgID := <-s.internal.arrived: if msgID == id { - mutex.Lock() - msg := buffer[id] - delete(buffer, id) - delete(waitingList, id) - mutex.Unlock() + s.internal.mutex.Lock() + msg := s.internal.buffer[id] + delete(s.internal.buffer, id) + delete(s.internal.waitingList, id) + s.internal.mutex.Unlock() if msg.MessageType != Success { return msg, msg.Message.(error) } @@ -150,7 +160,10 @@ func NewListener(c <-chan InterServiceMessage) Listener { } func (l DefaultListener) AcceptMessage() InterServiceMessage { - return <-l + println("Listener has pointer address: ", l) + msg := <-l + println("Received message on ID: " + msg.ForServiceID.String()) + return msg } func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, messageType MessageCode, message any) uuid.UUID { @@ -163,9 +176,9 @@ func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, m SentAt: time.Now(), Message: message, } - mutex.Lock() - waitingList[id] = struct{}{} - mutex.Unlock() + s.internal.mutex.Lock() + s.internal.waitingList[id] = struct{}{} + s.internal.mutex.Unlock() s.Outbox <- msg return id } @@ -176,18 +189,19 @@ func (s *InterServiceMessage) Respond(messageType MessageCode, message any, info n.MessageType = messageType n.Message = message n.SentAt = time.Now() + println(information.Outbox) information.Outbox <- n } func (s *ServiceInitializationInformation) SendAndAwaitISMessage(forService uuid.UUID, messageType MessageCode, message any, timeout time.Duration) (InterServiceMessage, error) { id := s.SendISMessage(forService, messageType, message) - return AwaitISMessage(id, timeout) + return s.AwaitISMessage(id, timeout) } var databaseService = uuid.MustParse("00000000-0000-0000-0000-000000000001") func (s *ServiceInitializationInformation) GetDatabase() (Database, error) { - if !ispStarted { + if !s.internal.ispStarted { go s.StartISProcessor() } @@ -201,17 +215,17 @@ func (s *ServiceInitializationInformation) GetDatabase() (Database, error) { func (s *ServiceInitializationInformation) AcceptMessage() InterServiceMessage { for { - <-arrived - mutex.Lock() - for id, msg := range buffer { - _, ok := waitingList[id] + <-s.internal.arrived + s.internal.mutex.Lock() + for id, msg := range s.internal.buffer { + _, ok := s.internal.waitingList[id] if !ok { - delete(buffer, id) - mutex.Unlock() + delete(s.internal.buffer, id) + s.internal.mutex.Unlock() return msg } } - mutex.Unlock() + s.internal.mutex.Unlock() } }