Compare commits
7 commits
Author | SHA1 | Date | |
---|---|---|---|
3b9127c641 | |||
72d062ac0d | |||
a05d2cae18 | |||
1ad74669d6 | |||
47970e0051 | |||
18bd0a477a | |||
76f95f834c |
4 changed files with 139 additions and 19 deletions
2
.gitattributes
vendored
Normal file
2
.gitattributes
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
tests/index.html -linguist-detectable
|
||||
tests/index.html linguist-vendored
|
|
@ -1,7 +1,8 @@
|
|||
# jsStreams
|
||||
|
||||
Go library to communicate with the JS Stream API by bridging the JS ReadableStream object to a Go io.ReaderCloser.
|
||||
Go library to communicate with the JS Stream API by bridging the JS ReadableStream and WritableStream objects to a Go io.ReaderCloser and io.WriterCloser.
|
||||
It also works vice versa, and with pipe readers/writers.
|
||||
|
||||
[](https://goreportcard.com/report/git.ailur.dev/ailur/jsStreams) [](https://pkg.go.dev/git.ailur.dev/ailur/jsStreams)
|
||||
|
||||
The API is pretty self-explanatory - it provides a function to create an io.ReaderCloser from a JS ReadableStream object.
|
||||
The API is pretty self-explanatory, see the Go Reference badge above for the full documentation.
|
87
main.go
87
main.go
|
@ -4,6 +4,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"syscall/js"
|
||||
|
@ -18,8 +19,8 @@ type ReadableStream struct {
|
|||
// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.
|
||||
// This implementation of Read does not use scratch space if n < len(p). If some data is available but not len(p) bytes,
|
||||
// Read conventionally returns what is available instead of waiting for more. Note: Read will block until data is available,
|
||||
// meaning in a WASM environment, you **must** use a goroutine to call Read.
|
||||
func (r *ReadableStream) Read(inputBytes []byte) (n int, err error) {
|
||||
// meaning in a WASM environment, you must use a goroutine to call Read.
|
||||
func (r *ReadableStream) Read(p []byte) (n int, err error) {
|
||||
defer func() {
|
||||
recovered := recover()
|
||||
if recovered != nil {
|
||||
|
@ -33,16 +34,17 @@ func (r *ReadableStream) Read(inputBytes []byte) (n int, err error) {
|
|||
|
||||
reader := r.stream.Call("getReader", map[string]interface{}{"mode": "byob"})
|
||||
|
||||
resultBuffer := js.Global().Get("Uint8Array").New(len(inputBytes))
|
||||
resultBuffer := js.Global().Get("Uint8Array").New(len(p))
|
||||
readResult := reader.Call("read", resultBuffer)
|
||||
|
||||
readResult.Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
defer waitGroup.Done()
|
||||
data := args[0].Get("value")
|
||||
js.CopyBytesToGo(inputBytes, data)
|
||||
if args[0].Get("done").Bool() {
|
||||
if args[0].Get("done").Bool() || args[0].Get("value").Length() == 0 {
|
||||
err = io.EOF
|
||||
return nil
|
||||
}
|
||||
data := args[0].Get("value")
|
||||
js.CopyBytesToGo(p, data)
|
||||
n = data.Length()
|
||||
return nil
|
||||
}))
|
||||
|
@ -61,13 +63,14 @@ func (r *ReadableStream) Read(inputBytes []byte) (n int, err error) {
|
|||
}
|
||||
|
||||
// Close closes the ReadableStream. If the stream is already closed, Close does nothing.
|
||||
// If the stream is not yet closed, it is canceled. The reader is closed and the underlying source or pipeline is terminated.
|
||||
// This method is idempotent, meaning that it can be called multiple times without causing an error.
|
||||
func (r *ReadableStream) Close() (err error) {
|
||||
defer func() {
|
||||
recovered := recover()
|
||||
if recovered != nil {
|
||||
err = fmt.Errorf("panic: %v", recovered)
|
||||
// We don't want any errors to be thrown if the stream is already closed.
|
||||
recovery := recover()
|
||||
if !strings.Contains(fmt.Sprint(recovery), "Can not close stream after closing or error") {
|
||||
if recovery != nil {
|
||||
err = fmt.Errorf("panic: %v", recovery)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -82,11 +85,15 @@ func NewReadableStream(stream js.Value) *ReadableStream {
|
|||
return &ReadableStream{stream: stream}
|
||||
}
|
||||
|
||||
// WritableStream implements io.WriteCloser for a JavaScript WritableStream.
|
||||
type WritableStream struct {
|
||||
stream js.Value
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// Write writes len(p) bytes from p to the underlying data stream. It returns the number of bytes written from p (0 <= n <= len(p))
|
||||
// and any error encountered that caused the write to stop early. Write must return a non-nil error if it returns n < len(p).
|
||||
// Write must not modify the slice data, even temporarily.
|
||||
func (w *WritableStream) Write(p []byte) (n int, err error) {
|
||||
defer func() {
|
||||
recovered := recover()
|
||||
|
@ -131,11 +138,15 @@ func (w *WritableStream) Write(p []byte) (n int, err error) {
|
|||
return n, err
|
||||
}
|
||||
|
||||
// Close closes the WritableStream. If the stream is already closed, Close does nothing.
|
||||
func (w *WritableStream) Close() (err error) {
|
||||
defer func() {
|
||||
recovered := recover()
|
||||
if recovered != nil {
|
||||
err = fmt.Errorf("panic: %v", recovered)
|
||||
// We don't want any errors to be thrown if the stream is already closed.
|
||||
recovery := recover()
|
||||
if !strings.Contains(fmt.Sprint(recovery), "Can not close stream after closing or error") {
|
||||
if recovery != nil {
|
||||
err = fmt.Errorf("panic: %v", recovery)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -156,3 +167,51 @@ func NewWritableStream(stream ...js.Value) *WritableStream {
|
|||
return &WritableStream{stream: stream}
|
||||
}
|
||||
}
|
||||
|
||||
// Now we do the vice versa: Reader to ReadableStream and Writer to WritableStream.
|
||||
|
||||
// ReaderToReadableStream converts an io.Reader to a JavaScript ReadableStream.
|
||||
func ReaderToReadableStream(r io.Reader) js.Value {
|
||||
return js.Global().Get("ReadableStream").New(map[string]interface{}{
|
||||
"pull": js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
readController := args[0]
|
||||
return js.Global().Get("Promise").New(js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
var buffer []byte
|
||||
buffer, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
if len(buffer) == 0 {
|
||||
readController.Call("close")
|
||||
return nil
|
||||
}
|
||||
jsBuffer := js.Global().Get("Uint8Array").New(len(buffer))
|
||||
js.CopyBytesToJS(jsBuffer, buffer)
|
||||
readController.Call("enqueue", jsBuffer)
|
||||
readController.Call("close")
|
||||
args[0].Invoke()
|
||||
return nil
|
||||
}))
|
||||
}),
|
||||
"type": "bytes",
|
||||
})
|
||||
}
|
||||
|
||||
// WriterToWritableStream converts an io.Writer to a JavaScript WritableStream.
|
||||
func WriterToWritableStream(w io.Writer) js.Value {
|
||||
return js.Global().Get("WritableStream").New(map[string]interface{}{
|
||||
"write": js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
writeBuffer := args[0]
|
||||
return js.Global().Get("Promise").New(js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
buffer := make([]byte, writeBuffer.Length())
|
||||
js.CopyBytesToGo(buffer, writeBuffer)
|
||||
_, err := w.Write(buffer)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
args[0].Invoke()
|
||||
return nil
|
||||
}))
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,11 +1,9 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"git.ailur.dev/ailur/jsStreams"
|
||||
|
||||
"fmt"
|
||||
"git.ailur.dev/ailur/jsStreams"
|
||||
"io"
|
||||
|
||||
"syscall/js"
|
||||
)
|
||||
|
||||
|
@ -23,6 +21,10 @@ func main() {
|
|||
return
|
||||
}
|
||||
fmt.Println(string(buffer))
|
||||
err = readStream.Close()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
|
@ -36,6 +38,62 @@ func main() {
|
|||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
err = writeStream.Close()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}))
|
||||
|
||||
js.Global().Set("TryWriterConversions", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
go func() {
|
||||
reader, writer := io.Pipe()
|
||||
go func() {
|
||||
writeStream := jsStreams.WriterToWritableStream(writer)
|
||||
buffer := js.Global().Get("Uint8Array").New(45)
|
||||
js.CopyBytesToJS(buffer, []byte("Hi, I've been piped through a WritableStream!"))
|
||||
writeStream.Call("getWriter").Call("write", buffer).Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
go func() {
|
||||
err := writer.Close()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}))
|
||||
}()
|
||||
go func() {
|
||||
fmt.Println("Reading stream...")
|
||||
m, _ := io.ReadAll(reader)
|
||||
fmt.Println(string(m))
|
||||
}()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}))
|
||||
|
||||
js.Global().Set("TryReaderConversions", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
go func() {
|
||||
reader, writer := io.Pipe()
|
||||
go func() {
|
||||
_, err := writer.Write([]byte("Hi, I've been piped through a ReadableStream!"))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
err = writer.Close()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
fmt.Println("Reading stream...")
|
||||
m, _ := io.ReadAll(jsStreams.NewReadableStream(jsStreams.ReaderToReadableStream(reader)))
|
||||
fmt.Println(string(m))
|
||||
}()
|
||||
}()
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Add table
Reference in a new issue