Compare commits
10 commits
Author | SHA1 | Date | |
---|---|---|---|
e9c64bb56a | |||
ff28e23636 | |||
7e63d4d5c4 | |||
f3cfc9e170 | |||
b82928e820 | |||
2ae6af81ab | |||
b005f7f10a | |||
a77529f4f7 | |||
a02b0ba721 | |||
4abda3c58f |
1 changed files with 76 additions and 72 deletions
148
main.go
148
main.go
|
@ -1,11 +1,11 @@
|
||||||
package library
|
package library
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"math/big"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -42,17 +42,30 @@ type InterServiceMessage struct {
|
||||||
|
|
||||||
// NewServiceInitializationInformation creates a new ServiceInitializationInformation and is only ever meant to be called
|
// NewServiceInitializationInformation creates a new ServiceInitializationInformation and is only ever meant to be called
|
||||||
// by fulgens or a compliant implementation of fulgens.
|
// by fulgens or a compliant implementation of fulgens.
|
||||||
func NewServiceInitializationInformation(domain *string, outbox chan<- InterServiceMessage, inbox <-chan InterServiceMessage, router *chi.Mux, configuration map[string]interface{}, resourceDir fs.FS) ServiceInitializationInformation {
|
func NewServiceInitializationInformation(domain *string, outbox chan<- InterServiceMessage, inbox <-chan InterServiceMessage, router *chi.Mux, configuration map[string]interface{}, resourceDir fs.FS) *ServiceInitializationInformation {
|
||||||
return ServiceInitializationInformation{
|
return &ServiceInitializationInformation{
|
||||||
Domain: domain,
|
Domain: domain,
|
||||||
Outbox: outbox,
|
Outbox: outbox,
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
Router: router,
|
Router: router,
|
||||||
Configuration: configuration,
|
Configuration: configuration,
|
||||||
ResourceDir: resourceDir,
|
ResourceDir: resourceDir,
|
||||||
|
internal: internal{
|
||||||
|
buffer: make(map[uuid.UUID]InterServiceMessage),
|
||||||
|
waitingList: make(map[uuid.UUID]struct{}),
|
||||||
|
arrived: make(chan uuid.UUID),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type internal struct {
|
||||||
|
buffer map[uuid.UUID]InterServiceMessage
|
||||||
|
waitingList map[uuid.UUID]struct{}
|
||||||
|
mutex sync.Mutex
|
||||||
|
arrived chan uuid.UUID
|
||||||
|
ispStarted bool
|
||||||
|
}
|
||||||
|
|
||||||
type ServiceInitializationInformation struct {
|
type ServiceInitializationInformation struct {
|
||||||
Service *Service `validate:"required"`
|
Service *Service `validate:"required"`
|
||||||
Domain *string
|
Domain *string
|
||||||
|
@ -61,6 +74,7 @@ type ServiceInitializationInformation struct {
|
||||||
Router *chi.Mux
|
Router *chi.Mux
|
||||||
Configuration map[string]interface{}
|
Configuration map[string]interface{}
|
||||||
ResourceDir fs.FS
|
ResourceDir fs.FS
|
||||||
|
internal internal
|
||||||
}
|
}
|
||||||
|
|
||||||
// YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox returns a channel that can be used to read messages from
|
// YesIAbsolutelyKnowWhatIAmDoingAndIWantToAccessTheRawInbox returns a channel that can be used to read messages from
|
||||||
|
@ -72,55 +86,17 @@ func (s *ServiceInitializationInformation) YesIAbsolutelyKnowWhatIAmDoingAndIWan
|
||||||
return s.inbox
|
return s.inbox
|
||||||
}
|
}
|
||||||
|
|
||||||
type ColumnType interface{}
|
type DBType int
|
||||||
|
|
||||||
type (
|
const (
|
||||||
// String represents arbitrary sized text
|
Sqlite DBType = iota
|
||||||
String string
|
Postgres
|
||||||
// Int32 represents a 32-bit signed integer
|
|
||||||
Int32 int32
|
|
||||||
// Int64 represents a 64-bit signed integer
|
|
||||||
Int64 int64
|
|
||||||
// IntInf represents an arbitrary sized signed integer
|
|
||||||
IntInf big.Int
|
|
||||||
// Float32 represents a 32-bit floating point number
|
|
||||||
Float32 float32
|
|
||||||
// Float64 represents a 64-bit floating point number
|
|
||||||
Float64 float64
|
|
||||||
// Boolean represents a boolean value
|
|
||||||
Boolean bool
|
|
||||||
// Blob represents an arbitrary sized binary object
|
|
||||||
Blob []byte
|
|
||||||
// UUID represents a UUID value
|
|
||||||
UUID uuid.UUID
|
|
||||||
// Time represents a time value
|
|
||||||
Time time.Time
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type TableSchema map[string]ColumnType
|
type Database struct {
|
||||||
type Row map[string]any
|
DB *sql.DB
|
||||||
type Rows []Row
|
DBType DBType
|
||||||
|
|
||||||
type QueryParameters struct {
|
|
||||||
Equal map[string]any
|
|
||||||
NotEqual map[string]any
|
|
||||||
GreaterThan map[string]any
|
|
||||||
LessThan map[string]any
|
|
||||||
GreaterThanOrEqual map[string]any
|
|
||||||
LessThanOrEqual map[string]any
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type UpdateParameters map[string]any
|
|
||||||
|
|
||||||
type Database interface {
|
|
||||||
CreateTable(name string, schema TableSchema) error
|
|
||||||
DeleteTable(name string) error
|
|
||||||
InsertRow(name string, row Row) error
|
|
||||||
Delete(name string, params QueryParameters) error
|
|
||||||
Select(name string, params QueryParameters) (Rows, error)
|
|
||||||
Update(name string, params QueryParameters, update UpdateParameters) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type MessageCode int
|
type MessageCode int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -130,24 +106,19 @@ const (
|
||||||
Unauthorized
|
Unauthorized
|
||||||
)
|
)
|
||||||
|
|
||||||
var buffer = make(map[uuid.UUID]InterServiceMessage)
|
|
||||||
var mutex = sync.Mutex{}
|
|
||||||
var arrived = make(chan uuid.UUID)
|
|
||||||
var ispStarted = false
|
|
||||||
|
|
||||||
func (s *ServiceInitializationInformation) StartISProcessor() {
|
func (s *ServiceInitializationInformation) StartISProcessor() {
|
||||||
if ispStarted {
|
if s.internal.ispStarted {
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
ispStarted = true
|
s.internal.ispStarted = true
|
||||||
}
|
}
|
||||||
listener := NewListener(s.inbox)
|
listener := NewListener(s.inbox)
|
||||||
for {
|
for {
|
||||||
msg := listener.AcceptMessage()
|
msg := listener.AcceptMessage()
|
||||||
mutex.Lock()
|
s.internal.mutex.Lock()
|
||||||
buffer[msg.MessageID] = msg
|
s.internal.buffer[msg.MessageID] = msg
|
||||||
mutex.Unlock()
|
s.internal.mutex.Unlock()
|
||||||
arrived <- msg.MessageID
|
s.internal.arrived <- msg.MessageID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,17 +126,18 @@ var (
|
||||||
ErrTimeout = errors.New("timeout")
|
ErrTimeout = errors.New("timeout")
|
||||||
)
|
)
|
||||||
|
|
||||||
func AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) {
|
func (s *ServiceInitializationInformation) AwaitISMessage(id uuid.UUID, timeout time.Duration) (InterServiceMessage, error) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
return InterServiceMessage{}, ErrTimeout
|
return InterServiceMessage{}, ErrTimeout
|
||||||
case msgID := <-arrived:
|
case msgID := <-s.internal.arrived:
|
||||||
if msgID == id {
|
if msgID == id {
|
||||||
mutex.Lock()
|
s.internal.mutex.Lock()
|
||||||
msg := buffer[id]
|
msg := s.internal.buffer[id]
|
||||||
delete(buffer, id)
|
delete(s.internal.buffer, id)
|
||||||
mutex.Unlock()
|
delete(s.internal.waitingList, id)
|
||||||
|
s.internal.mutex.Unlock()
|
||||||
if msg.MessageType != Success {
|
if msg.MessageType != Success {
|
||||||
return msg, msg.Message.(error)
|
return msg, msg.Message.(error)
|
||||||
}
|
}
|
||||||
|
@ -186,7 +158,8 @@ func NewListener(c <-chan InterServiceMessage) Listener {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l DefaultListener) AcceptMessage() InterServiceMessage {
|
func (l DefaultListener) AcceptMessage() InterServiceMessage {
|
||||||
return <-l
|
msg := <-l
|
||||||
|
return msg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, messageType MessageCode, message any) uuid.UUID {
|
func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, messageType MessageCode, message any) uuid.UUID {
|
||||||
|
@ -199,24 +172,55 @@ func (s *ServiceInitializationInformation) SendISMessage(forService uuid.UUID, m
|
||||||
SentAt: time.Now(),
|
SentAt: time.Now(),
|
||||||
Message: message,
|
Message: message,
|
||||||
}
|
}
|
||||||
|
s.internal.mutex.Lock()
|
||||||
|
s.internal.waitingList[id] = struct{}{}
|
||||||
|
s.internal.mutex.Unlock()
|
||||||
s.Outbox <- msg
|
s.Outbox <- msg
|
||||||
return id
|
return id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ServiceInitializationInformation) SendAndAwaitISMessage(forService uuid.UUID, messageType MessageCode, message any, timeout time.Duration) (InterServiceMessage, error) {
|
func (s *InterServiceMessage) Respond(messageType MessageCode, message any, information *ServiceInitializationInformation) {
|
||||||
id := s.SendISMessage(forService, messageType, message)
|
n := *s
|
||||||
return AwaitISMessage(id, timeout)
|
n.ServiceID, n.ForServiceID = n.ForServiceID, n.ServiceID
|
||||||
|
n.MessageType = messageType
|
||||||
|
n.Message = message
|
||||||
|
n.SentAt = time.Now()
|
||||||
|
information.Outbox <- n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ServiceInitializationInformation) SendAndAwaitISMessage(forService uuid.UUID, messageType MessageCode, message any, timeout time.Duration) (InterServiceMessage, error) {
|
||||||
|
id := s.SendISMessage(forService, messageType, message)
|
||||||
|
return s.AwaitISMessage(id, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
var databaseService = uuid.MustParse("00000000-0000-0000-0000-000000000001")
|
||||||
|
|
||||||
func (s *ServiceInitializationInformation) GetDatabase() (Database, error) {
|
func (s *ServiceInitializationInformation) GetDatabase() (Database, error) {
|
||||||
if !ispStarted {
|
if !s.internal.ispStarted {
|
||||||
go s.StartISProcessor()
|
go s.StartISProcessor()
|
||||||
}
|
}
|
||||||
|
|
||||||
response, err := s.SendAndAwaitISMessage(uuid.Nil, 0, nil, 5*time.Second)
|
response, err := s.SendAndAwaitISMessage(databaseService, 0, nil, 5*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return Database{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return response.Message.(Database), nil
|
return response.Message.(Database), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ServiceInitializationInformation) AcceptMessage() InterServiceMessage {
|
||||||
|
for {
|
||||||
|
<-s.internal.arrived
|
||||||
|
s.internal.mutex.Lock()
|
||||||
|
for id, msg := range s.internal.buffer {
|
||||||
|
_, ok := s.internal.waitingList[id]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
delete(s.internal.buffer, id)
|
||||||
|
s.internal.mutex.Unlock()
|
||||||
|
return msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.internal.mutex.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue