Compare commits

..

No commits in common. "main" and "v3.1.0" have entirely different histories.
main ... v3.1.0

144
main.go
View file

@ -1,11 +1,11 @@
package library
import (
"database/sql"
"errors"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"io/fs"
"math/big"
"sync"
"time"
)
@ -42,30 +42,17 @@ type InterServiceMessage struct {
// NewServiceInitializationInformation creates a new ServiceInitializationInformation and is only ever meant to be called
// by fulgens or a compliant implementation of fulgens.
func NewServiceInitializationInformation(domain *string, outbox chan<- InterServiceMessage, inbox <-chan InterServiceMessage, router *chi.Mux, configuration map[string]interface{}, resourceDir fs.FS) *ServiceInitializationInformation {
return &ServiceInitializationInformation{
func NewServiceInitializationInformation(domain *string, outbox chan<- InterServiceMessage, inbox <-chan InterServiceMessage, router *chi.Mux, configuration map[string]interface{}, resourceDir fs.FS) ServiceInitializationInformation {
return ServiceInitializationInformation{
Domain: domain,
Outbox: outbox,
inbox: inbox,
Router: router,
Configuration: configuration,
ResourceDir: resourceDir,
internal: internal{
buffer: make(map[uuid.UUID]InterServiceMessage),
waitingList: make(map[uuid.UUID]struct{}),
arrived: make(chan uuid.UUID),
},
}
}
type internal struct {
buffer map[uuid.UUID]InterServiceMessage
waitingList map[uuid.UUID]struct{}
mutex sync.Mutex
arrived chan uuid.UUID
ispStarted bool
}
type ServiceInitializationInformation struct {
Service *Service `validate:"required"`
Domain *string
@ -74,7 +61,6 @@ type ServiceInitializationInformation struct {
Router *chi.Mux
Configuration map[string]interface{}
ResourceDir fs.FS
internal internal
}
// YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox returns a channel that can be used to read messages from
@ -86,17 +72,55 @@ func (s *ServiceInitializationInformation) YesIAbsolutelyKnowWhatIAmDoingAndIWan
return s.inbox
}
type DBType int
type ColumnType interface{}
const (
Sqlite DBType = iota
Postgres
type (
// String represents arbitrary sized text
String string
// Int32 represents a 32-bit signed integer
Int32 int32
// Int64 represents a 64-bit signed integer
Int64 int64
// IntInf represents an arbitrary sized signed integer
IntInf big.Int
// Float32 represents a 32-bit floating point number
Float32 float32
// Float64 represents a 64-bit floating point number
Float64 float64
// Boolean represents a boolean value
Boolean bool
// Blob represents an arbitrary sized binary object
Blob []byte
// UUID represents a UUID value
UUID uuid.UUID
// Time represents a time value
Time time.Time
)
type Database struct {
DB *sql.DB
DBType DBType
type TableSchema map[string]ColumnType
type Row map[string]any
type Rows []Row
type QueryParameters struct {
Equal map[string]any
NotEqual map[string]any
GreaterThan map[string]any
LessThan map[string]any
GreaterThanOrEqual map[string]any
LessThanOrEqual map[string]any
}
type UpdateParameters map[string]any
type Database interface {
CreateTable(name string, schema TableSchema) error
DeleteTable(name string) error
InsertRow(name string, row Row) error
Delete(name string, params QueryParameters) error
Select(name string, params QueryParameters) (Rows, error)
Update(name string, params QueryParameters, update UpdateParameters) error
}
type MessageCode int
const (
@ -106,19 +130,24 @@ const (
Unauthorized
)
var buffer = make(map[uuid.UUID]InterServiceMessage)
var mutex = sync.Mutex{}
var arrived = make(chan uuid.UUID)
var ispStarted = false
func (s *ServiceInitializationInformation) StartISProcessor() {
if s.internal.ispStarted {
if ispStarted {
return
} else {
s.internal.ispStarted = true
ispStarted = true
}
listener := NewListener(s.inbox)
for {
msg := listener.AcceptMessage()
s.internal.mutex.Lock()
s.internal.buffer[msg.MessageID] = msg
s.internal.mutex.Unlock()
s.internal.arrived <- msg.MessageID
mutex.Lock()
buffer[msg.MessageID] = msg
mutex.Unlock()
arrived <- msg.MessageID
}
}
@ -126,18 +155,17 @@ var (
ErrTimeout = errors.New("timeout")
)
func (s *ServiceInitializationInformation) AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) {
func AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) {
for {
select {
case <-time.After(timeout):
return InterServiceMessage{}, ErrTimeout
case msgID := <-s.internal.arrived:
case msgID := <-arrived:
if msgID == id {
s.internal.mutex.Lock()
msg := s.internal.buffer[id]
delete(s.internal.buffer, id)
delete(s.internal.waitingList, id)
s.internal.mutex.Unlock()
mutex.Lock()
msg := buffer[id]
delete(buffer, id)
mutex.Unlock()
if msg.MessageType != Success {
return msg, msg.Message.(error)
}
@ -158,8 +186,7 @@ func NewListener(c <-chan InterServiceMessage) Listener {
}
func (l DefaultListener) AcceptMessage() InterServiceMessage {
msg := <-l
return msg
return <-l
}
func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, messageType MessageCode, message any) uuid.UUID {
@ -172,55 +199,24 @@ func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, m
SentAt: time.Now(),
Message: message,
}
s.internal.mutex.Lock()
s.internal.waitingList[id] = struct{}{}
s.internal.mutex.Unlock()
s.Outbox <- msg
return id
}
func (s *InterServiceMessage) Respond(messageType MessageCode, message any, information *ServiceInitializationInformation) {
n := *s
n.ServiceID, n.ForServiceID = n.ForServiceID, n.ServiceID
n.MessageType = messageType
n.Message = message
n.SentAt = time.Now()
information.Outbox <- n
}
func (s *ServiceInitializationInformation) SendAndAwaitISMessage(forService uuid.UUID, messageType MessageCode, message any, timeout time.Duration) (InterServiceMessage, error) {
id := s.SendISMessage(forService, messageType, message)
return s.AwaitISMessage(id, timeout)
return AwaitISMessage(id, timeout)
}
var databaseService = uuid.MustParse("00000000-0000-0000-0000-000000000001")
func (s *ServiceInitializationInformation) GetDatabase() (Database, error) {
if !s.internal.ispStarted {
if !ispStarted {
go s.StartISProcessor()
}
response, err := s.SendAndAwaitISMessage(databaseService, 0, nil, 5*time.Second)
response, err := s.SendAndAwaitISMessage(uuid.Nil, 0, nil, 5*time.Second)
if err != nil {
return Database{}, err
return nil, err
}
return response.Message.(Database), nil
}
func (s *ServiceInitializationInformation) AcceptMessage() InterServiceMessage {
for {
<-s.internal.arrived
s.internal.mutex.Lock()
for id, msg := range s.internal.buffer {
_, ok := s.internal.waitingList[id]
if !ok {
delete(s.internal.buffer, id)
s.internal.mutex.Unlock()
return msg
}
}
s.internal.mutex.Unlock()
}
}