fg-library/main.go

213 lines
5.7 KiB
Go
Raw Normal View History

2024-10-03 19:03:18 +01:00
package library
import (
2025-01-07 18:03:32 +00:00
"database/sql"
"errors"
"github.com/go-chi/chi/v5"
2024-10-03 19:03:18 +01:00
"github.com/google/uuid"
"io/fs"
"sync"
2024-10-03 19:03:18 +01:00
"time"
)
type Permissions struct {
2025-01-01 12:23:29 +00:00
// Authenticate allows the service to register with the nucleus authentication service and use OAuth2
Authenticate bool `validate:"required"`
// Router allows the service to serve web pages
Router bool `validate:"required"`
// Database allows the service to ask for a centralised database connection
Database bool `validate:"required"`
// BlobStorage allows the service to use the blob storage service
BlobStorage bool `validate:"required"`
// InterServiceCommunication allows the service to send and receive messages from other services
2024-10-03 19:03:18 +01:00
InterServiceCommunication bool `validate:"required"`
2025-01-01 12:23:29 +00:00
// Resources allows the service to access their resource directory
Resources bool `validate:"required"`
2024-10-03 19:03:18 +01:00
}
type Service struct {
Name string `validate:"required"`
Permissions Permissions `validate:"required"`
ServiceID uuid.UUID `validate:"required"`
}
type InterServiceMessage struct {
2025-01-01 12:23:29 +00:00
MessageID uuid.UUID `validate:"required"`
ServiceID uuid.UUID `validate:"required"`
ForServiceID uuid.UUID `validate:"required"`
MessageType MessageCode `validate:"required"`
SentAt time.Time `validate:"required"`
Message any `validate:"required"`
}
// NewServiceInitializationInformation creates a new ServiceInitializationInformation and is only ever meant to be called
// by fulgens or a compliant implementation of fulgens.
func NewServiceInitializationInformation(domain *string, outbox chan<- InterServiceMessage, inbox <-chan InterServiceMessage, router *chi.Mux, configuration map[string]interface{}, resourceDir fs.FS) ServiceInitializationInformation {
return ServiceInitializationInformation{
Domain: domain,
Outbox: outbox,
inbox: inbox,
Router: router,
Configuration: configuration,
ResourceDir: resourceDir,
}
2024-10-03 19:03:18 +01:00
}
type ServiceInitializationInformation struct {
2025-01-01 12:23:29 +00:00
Service *Service `validate:"required"`
Domain *string
2024-10-03 19:03:18 +01:00
Outbox chan<- InterServiceMessage `validate:"required"`
2025-01-01 12:23:29 +00:00
inbox <-chan InterServiceMessage `validate:"required"`
Router *chi.Mux
2024-10-03 19:03:18 +01:00
Configuration map[string]interface{}
ResourceDir fs.FS
}
2024-10-05 19:33:00 +01:00
2025-01-01 12:23:29 +00:00
// YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox returns a channel that can be used to read messages from
// the inbox. This is a dangerous operation, can and will break the buffer, and you should most absolutely not use this
// unless you would like to handle the messages yourself with no outside help or synchronization.
//
// If you think you know what you're doing, **you probably don't**.
func (s *ServiceInitializationInformation) YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox() <-chan InterServiceMessage {
return s.inbox
}
2024-10-05 19:33:00 +01:00
2025-01-07 18:03:32 +00:00
type DBType int
2025-01-07 18:03:32 +00:00
const (
Sqlite DBType = iota
Postgres
)
2025-01-01 12:23:29 +00:00
2025-01-07 18:03:32 +00:00
type Database struct {
DB *sql.DB
DBType DBType
2025-01-01 12:23:29 +00:00
}
type MessageCode int
const (
2025-01-01 12:23:29 +00:00
Success MessageCode = iota
BadRequest
InternalError
Unauthorized
)
var buffer = make(map[uuid.UUID]InterServiceMessage)
2025-01-08 10:28:58 +00:00
var waitingList = make(map[uuid.UUID]struct{})
var mutex = sync.Mutex{}
var arrived = make(chan uuid.UUID)
2025-01-01 12:23:29 +00:00
var ispStarted = false
2025-01-01 12:23:29 +00:00
func (s *ServiceInitializationInformation) StartISProcessor() {
if ispStarted {
return
} else {
ispStarted = true
}
listener := NewListener(s.inbox)
for {
2025-01-01 12:23:29 +00:00
msg := listener.AcceptMessage()
mutex.Lock()
buffer[msg.MessageID] = msg
mutex.Unlock()
arrived <- msg.MessageID
}
}
var (
ErrTimeout = errors.New("timeout")
)
func AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) {
for {
select {
case <-time.After(timeout):
return InterServiceMessage{}, ErrTimeout
case msgID := <-arrived:
if msgID == id {
mutex.Lock()
msg := buffer[id]
delete(buffer, id)
2025-01-08 10:28:58 +00:00
delete(waitingList, id)
mutex.Unlock()
2025-01-01 12:23:29 +00:00
if msg.MessageType != Success {
return msg, msg.Message.(error)
}
return msg, nil
}
}
}
}
2025-01-01 12:23:29 +00:00
type Listener interface {
AcceptMessage() InterServiceMessage
}
type DefaultListener <-chan InterServiceMessage
func NewListener(c <-chan InterServiceMessage) Listener {
return DefaultListener(c)
}
func (l DefaultListener) AcceptMessage() InterServiceMessage {
return <-l
}
func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, messageType MessageCode, message any) uuid.UUID {
id := uuid.New()
msg := InterServiceMessage{
MessageID: id,
2025-01-01 12:23:29 +00:00
ServiceID: s.Service.ServiceID,
ForServiceID: forService,
MessageType: messageType,
SentAt: time.Now(),
Message: message,
}
2025-01-08 10:28:58 +00:00
mutex.Lock()
waitingList[id] = struct{}{}
mutex.Unlock()
s.Outbox <- msg
return id
}
2025-01-08 12:57:12 +00:00
func (s *InterServiceMessage) Respond(messageType MessageCode, message any) {
s.ServiceID, s.ForServiceID = s.ForServiceID, s.ServiceID
s.MessageType = messageType
s.Message = message
}
2025-01-01 12:23:29 +00:00
func (s *ServiceInitializationInformation) SendAndAwaitISMessage(forService uuid.UUID, messageType MessageCode, message any, timeout time.Duration) (InterServiceMessage, error) {
id := s.SendISMessage(forService, messageType, message)
return AwaitISMessage(id, timeout)
}
2025-01-01 12:23:29 +00:00
func (s *ServiceInitializationInformation) GetDatabase() (Database, error) {
if !ispStarted {
go s.StartISProcessor()
}
response, err := s.SendAndAwaitISMessage(uuid.Nil, 0, nil, 5*time.Second)
if err != nil {
2025-01-07 18:03:32 +00:00
return Database{}, err
2025-01-01 12:23:29 +00:00
}
return response.Message.(Database), nil
}
2025-01-08 10:28:58 +00:00
func (s *ServiceInitializationInformation) AcceptMessage() InterServiceMessage {
for {
<-arrived
mutex.Lock()
for id, msg := range buffer {
_, ok := waitingList[id]
2025-01-08 12:52:30 +00:00
2025-01-08 10:28:58 +00:00
if !ok {
delete(buffer, id)
mutex.Unlock()
return msg
}
}
mutex.Unlock()
}
}