jsStreams/main.go

164 lines
4.4 KiB
Go
Raw Normal View History

2024-10-25 18:14:53 +01:00
package jsStreams
import (
"errors"
"fmt"
"io"
2024-10-25 18:14:53 +01:00
"sync"
"syscall/js"
)
2024-10-25 19:44:44 +01:00
// ReadableStream implements io.ReadCloser for a JavaScript ReadableStream.
2024-10-25 18:14:53 +01:00
type ReadableStream struct {
stream js.Value
lock sync.Mutex
}
// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.
// This implementation of Read does not use scratch space if n < len(p). If some data is available but not len(p) bytes,
// Read conventionally returns what is available instead of waiting for more. Note: Read will block until data is available,
// meaning in a WASM environment, you must use a goroutine to call Read.
func (r *ReadableStream) Read(p []byte) (n int, err error) {
2024-10-25 18:14:53 +01:00
defer func() {
recovered := recover()
if recovered != nil {
err = fmt.Errorf("panic: %v", recovered)
}
}()
r.lock.Lock()
var waitGroup sync.WaitGroup
waitGroup.Add(1)
reader := r.stream.Call("getReader", map[string]interface{}{"mode": "byob"})
2024-10-25 18:14:53 +01:00
resultBuffer := js.Global().Get("Uint8Array").New(len(p))
readResult := reader.Call("read", resultBuffer)
2024-10-25 18:14:53 +01:00
readResult.Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
defer waitGroup.Done()
data := args[0].Get("value")
js.CopyBytesToGo(p, data)
2024-10-25 18:14:53 +01:00
if args[0].Get("done").Bool() {
err = io.EOF
2024-10-25 18:14:53 +01:00
}
n = data.Length()
return nil
}))
readResult.Call("catch", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
defer waitGroup.Done()
err = errors.New(args[0].Get("message").String())
return nil
}))
waitGroup.Wait()
reader.Call("releaseLock")
r.lock.Unlock()
2024-10-25 18:14:53 +01:00
return n, err
}
// Close closes the ReadableStream. If the stream is already closed, Close does nothing.
// If the stream is not yet closed, it is canceled. The reader is closed and the underlying source or pipeline is terminated.
// This method is idempotent, meaning that it can be called multiple times without causing an error.
func (r *ReadableStream) Close() (err error) {
defer func() {
recovered := recover()
if recovered != nil {
err = fmt.Errorf("panic: %v", recovered)
}
}()
r.lock.Lock()
r.stream.Call("cancel")
r.lock.Unlock()
2024-10-25 18:14:53 +01:00
return nil
}
// NewReadableStream creates a new ReadableStream from a JavaScript ReadableStream.
2024-10-25 18:14:53 +01:00
func NewReadableStream(stream js.Value) *ReadableStream {
return &ReadableStream{stream: stream}
}
// WritableStream implements io.WriteCloser for a JavaScript WritableStream.
type WritableStream struct {
stream js.Value
lock sync.Mutex
}
// Write writes len(p) bytes from p to the underlying data stream. It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early. Write must return a non-nil error if it returns n < len(p).
// Write must not modify the slice data, even temporarily.
func (w *WritableStream) Write(p []byte) (n int, err error) {
defer func() {
recovered := recover()
if recovered != nil {
err = fmt.Errorf("panic: %v", recovered)
}
}()
w.lock.Lock()
var waitGroup sync.WaitGroup
waitGroup.Add(2)
writer := w.stream.Call("getWriter")
writer.Get("ready").Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
defer waitGroup.Done()
buffer := js.Global().Get("Uint8Array").New(len(p))
js.CopyBytesToJS(buffer, p)
writeResult := writer.Call("write", buffer)
writeResult.Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
defer waitGroup.Done()
n = len(p)
return nil
}))
writeResult.Call("catch", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
defer waitGroup.Done()
err = errors.New(args[0].Get("message").String())
return nil
}))
return nil
}))
waitGroup.Wait()
writer.Call("releaseLock")
w.lock.Unlock()
return n, err
}
// Close closes the WritableStream. If the stream is already closed, Close does nothing.
func (w *WritableStream) Close() (err error) {
defer func() {
recovered := recover()
if recovered != nil {
err = fmt.Errorf("panic: %v", recovered)
}
}()
w.lock.Lock()
w.stream.Call("close")
w.lock.Unlock()
return nil
}
// NewWritableStream creates a new WritableStream. If a JavaScript WritableStream is provided, it will be used.
// Otherwise, a new WritableStream will be created.
func NewWritableStream(stream ...js.Value) *WritableStream {
if len(stream) > 0 {
return &WritableStream{stream: stream[0]}
} else {
stream := js.Global().Get("WritableStream").New()
return &WritableStream{stream: stream}
}
}