ISM rewrite part 2

This commit is contained in:
Tracker-Friendly 2025-01-01 12:23:29 +00:00
parent ba2582e851
commit d404caa114
2 changed files with 138 additions and 32 deletions

2
go.sum
View File

@ -1,4 +1,6 @@
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=

150
main.go
View File

@ -1,20 +1,27 @@
package library package library
import ( import (
"database/sql"
"errors" "errors"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/google/uuid" "github.com/google/uuid"
"io/fs" "io/fs"
"math/big"
"sync" "sync"
"time" "time"
) )
type Permissions struct { type Permissions struct {
// Authenticate allows the service to register with the nucleus authentication service and use OAuth2
Authenticate bool `validate:"required"` Authenticate bool `validate:"required"`
// Router allows the service to serve web pages
Router bool `validate:"required"`
// Database allows the service to ask for a centralised database connection
Database bool `validate:"required"` Database bool `validate:"required"`
// BlobStorage allows the service to use the blob storage service
BlobStorage bool `validate:"required"` BlobStorage bool `validate:"required"`
// InterServiceCommunication allows the service to send and receive messages from other services
InterServiceCommunication bool `validate:"required"` InterServiceCommunication bool `validate:"required"`
// Resources allows the service to access their resource directory
Resources bool `validate:"required"` Resources bool `validate:"required"`
} }
@ -28,36 +35,96 @@ type InterServiceMessage struct {
MessageID uuid.UUID `validate:"required"` MessageID uuid.UUID `validate:"required"`
ServiceID uuid.UUID `validate:"required"` ServiceID uuid.UUID `validate:"required"`
ForServiceID uuid.UUID `validate:"required"` ForServiceID uuid.UUID `validate:"required"`
MessageType uint64 `validate:"required"` MessageType MessageCode `validate:"required"`
SentAt time.Time `validate:"required"` SentAt time.Time `validate:"required"`
Message any `validate:"required"` Message any `validate:"required"`
} }
// NewServiceInitializationInformation creates a new ServiceInitializationInformation and is only ever meant to be called
// by fulgens or a compliant implementation of fulgens.
func NewServiceInitializationInformation(domain *string, outbox chan<- InterServiceMessage, inbox <-chan InterServiceMessage, router *chi.Mux, configuration map[string]interface{}, resourceDir fs.FS) ServiceInitializationInformation {
return ServiceInitializationInformation{
Domain: domain,
Outbox: outbox,
inbox: inbox,
Router: router,
Configuration: configuration,
ResourceDir: resourceDir,
}
}
type ServiceInitializationInformation struct { type ServiceInitializationInformation struct {
Domain string `validate:"required"` Service *Service `validate:"required"`
Domain *string
Outbox chan<- InterServiceMessage `validate:"required"` Outbox chan<- InterServiceMessage `validate:"required"`
Inbox <-chan InterServiceMessage `validate:"required"` inbox <-chan InterServiceMessage `validate:"required"`
Router *chi.Mux `validate:"required"` Router *chi.Mux
Configuration map[string]interface{} Configuration map[string]interface{}
ResourceDir fs.FS ResourceDir fs.FS
} }
type DBType int // YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox returns a channel that can be used to read messages from
// the inbox. This is a dangerous operation, can and will break the buffer, and you should most absolutely not use this
const ( // unless you would like to handle the messages yourself with no outside help or synchronization.
Sqlite DBType = iota //
Postgres // If you think you know what you're doing, **you probably don't**.
) func (s *ServiceInitializationInformation) YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox() <-chan InterServiceMessage {
return s.inbox
type Database struct {
DB *sql.DB
DBType DBType
} }
type ResponseCode int type ColumnType interface{}
type (
// String represents arbitrary sized text
String string
// Int32 represents a 32-bit signed integer
Int32 int32
// Int64 represents a 64-bit signed integer
Int64 int64
// IntInf represents an arbitrary sized signed integer
IntInf big.Int
// Float32 represents a 32-bit floating point number
Float32 float32
// Float64 represents a 64-bit floating point number
Float64 float64
// Boolean represents a boolean value
Boolean bool
// Blob represents an arbitrary sized binary object
Blob []byte
// UUID represents a UUID value
UUID uuid.UUID
// Time represents a time value
Time time.Time
)
type TableSchema map[string]ColumnType
type Row map[string]any
type Rows []Row
type QueryParameters struct {
Equal map[string]any
NotEqual map[string]any
GreaterThan map[string]any
LessThan map[string]any
GreaterThanOrEqual map[string]any
LessThanOrEqual map[string]any
}
type UpdateParameters map[string]any
type Database interface {
CreateTable(name string, schema TableSchema) error
DeleteTable(name string) error
InsertRow(name string, row Row) error
Delete(name string, params QueryParameters) error
Select(name string, params QueryParameters) (Rows, error)
Update(name string, params QueryParameters, update UpdateParameters) error
}
type MessageCode int
const ( const (
Success ResponseCode = iota Success MessageCode = iota
BadRequest BadRequest
InternalError InternalError
Unauthorized Unauthorized
@ -66,11 +133,18 @@ const (
var buffer = make(map[uuid.UUID]InterServiceMessage) var buffer = make(map[uuid.UUID]InterServiceMessage)
var mutex = sync.Mutex{} var mutex = sync.Mutex{}
var arrived = make(chan uuid.UUID) var arrived = make(chan uuid.UUID)
var ispStarted = false
func ISMessageBuffer(inbox <-chan InterServiceMessage) { func (s *ServiceInitializationInformation) StartISProcessor() {
if ispStarted {
return
} else {
ispStarted = true
}
listener := NewListener(s.inbox)
for { for {
msg := listener.AcceptMessage()
mutex.Lock() mutex.Lock()
msg := <-inbox
buffer[msg.MessageID] = msg buffer[msg.MessageID] = msg
mutex.Unlock() mutex.Unlock()
arrived <- msg.MessageID arrived <- msg.MessageID
@ -92,17 +166,34 @@ func AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, e
msg := buffer[id] msg := buffer[id]
delete(buffer, id) delete(buffer, id)
mutex.Unlock() mutex.Unlock()
if msg.MessageType != Success {
return msg, msg.Message.(error)
}
return msg, nil return msg, nil
} }
} }
} }
} }
func (s *ServiceInitializationInformation) SendISMessage(service Service, forService uuid.UUID, messageType uint64, message any) uuid.UUID { type Listener interface {
AcceptMessage() InterServiceMessage
}
type DefaultListener <-chan InterServiceMessage
func NewListener(c <-chan InterServiceMessage) Listener {
return DefaultListener(c)
}
func (l DefaultListener) AcceptMessage() InterServiceMessage {
return <-l
}
func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, messageType MessageCode, message any) uuid.UUID {
id := uuid.New() id := uuid.New()
msg := InterServiceMessage{ msg := InterServiceMessage{
MessageID: id, MessageID: id,
ServiceID: service.ServiceID, ServiceID: s.Service.ServiceID,
ForServiceID: forService, ForServiceID: forService,
MessageType: messageType, MessageType: messageType,
SentAt: time.Now(), SentAt: time.Now(),
@ -112,7 +203,20 @@ func (s *ServiceInitializationInformation) SendISMessage(service Service, forSer
return id return id
} }
func (s *ServiceInitializationInformation) SendAndAwaitISMessage(service Service, forService uuid.UUID, messageType uint64, message any, timeout time.Duration) (InterServiceMessage, error) { func (s *ServiceInitializationInformation) SendAndAwaitISMessage(forService uuid.UUID, messageType MessageCode, message any, timeout time.Duration) (InterServiceMessage, error) {
id := s.SendISMessage(service, forService, messageType, message) id := s.SendISMessage(forService, messageType, message)
return AwaitISMessage(id, timeout) return AwaitISMessage(id, timeout)
} }
func (s *ServiceInitializationInformation) GetDatabase() (Database, error) {
if !ispStarted {
go s.StartISProcessor()
}
response, err := s.SendAndAwaitISMessage(uuid.Nil, 0, nil, 5*time.Second)
if err != nil {
return nil, err
}
return response.Message.(Database), nil
}