diff --git a/main.go b/main.go index b6cb099..4e0e778 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package jsStreams import ( "errors" "fmt" + "io" "sync" "syscall/js" @@ -11,7 +12,6 @@ import ( // ReadableStream implements io.ReaderCloser for a JavaScript ReadableStream. type ReadableStream struct { stream js.Value - reader js.Value lock sync.Mutex } @@ -31,19 +31,17 @@ func (r *ReadableStream) Read(inputBytes []byte) (n int, err error) { var waitGroup sync.WaitGroup waitGroup.Add(1) - if r.reader.IsUndefined() { - r.reader = r.stream.Call("getReader", map[string]interface{}{"mode": "byob"}) - } + reader := r.stream.Call("getReader", map[string]interface{}{"mode": "byob"}) resultBuffer := js.Global().Get("Uint8Array").New(len(inputBytes)) - readResult := r.reader.Call("read", resultBuffer) + readResult := reader.Call("read", resultBuffer) readResult.Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} { defer waitGroup.Done() data := args[0].Get("value") js.CopyBytesToGo(inputBytes, data) if args[0].Get("done").Bool() { - err = errors.New("EOF") + err = io.EOF } n = data.Length() return nil @@ -56,6 +54,8 @@ func (r *ReadableStream) Read(inputBytes []byte) (n int, err error) { })) waitGroup.Wait() + reader.Call("releaseLock") + r.lock.Unlock() return n, err } @@ -71,13 +71,88 @@ func (r *ReadableStream) Close() (err error) { } }() - if !r.reader.IsUndefined() { - r.reader.Call("cancel") - } + r.lock.Lock() + r.stream.Call("cancel") + r.lock.Unlock() + return nil +} + +// NewReadableStream creates a new ReadableStream from a JavaScript ReadableStream. +func NewReadableStream(stream js.Value) *ReadableStream { + return &ReadableStream{stream: stream} +} + +type WritableStream struct { + stream js.Value + lock sync.Mutex +} + +func (w *WritableStream) Write(p []byte) (n int, err error) { + defer func() { + recovered := recover() + if recovered != nil { + err = fmt.Errorf("panic: %v", recovered) + } + }() + + w.lock.Lock() + var waitGroup sync.WaitGroup + waitGroup.Add(2) + + writer := w.stream.Call("getWriter") + + writer.Get("ready").Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + defer waitGroup.Done() + + buffer := js.Global().Get("Uint8Array").New(len(p)) + js.CopyBytesToJS(buffer, p) + + writeResult := writer.Call("write", buffer) + + writeResult.Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + defer waitGroup.Done() + n = len(p) + return nil + })) + + writeResult.Call("catch", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + defer waitGroup.Done() + err = errors.New(args[0].Get("message").String()) + return nil + })) + + return nil + })) + + waitGroup.Wait() + writer.Call("releaseLock") + w.lock.Unlock() + + return n, err +} + +func (w *WritableStream) Close() (err error) { + defer func() { + recovered := recover() + if recovered != nil { + err = fmt.Errorf("panic: %v", recovered) + } + }() + + w.lock.Lock() + w.stream.Call("close") + w.lock.Unlock() return nil } -func NewReadableStream(stream js.Value) *ReadableStream { - return &ReadableStream{stream: stream} +// NewWritableStream creates a new WritableStream. If a JavaScript WritableStream is provided, it will be used. +// Otherwise, a new WritableStream will be created. +func NewWritableStream(stream ...js.Value) *WritableStream { + if len(stream) > 0 { + return &WritableStream{stream: stream[0]} + } else { + stream := js.Global().Get("WritableStream").New() + return &WritableStream{stream: stream} + } } diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..674e65b --- /dev/null +++ b/tests/README.md @@ -0,0 +1,3 @@ +# These are not unit tests + +Open index.html in a browser to try to use them as JS functions. These are not unit tests, and are just a non-automated way to test the functionality of the library in a browser. \ No newline at end of file diff --git a/tests/build.sh b/tests/build.sh new file mode 100755 index 0000000..9e7e444 --- /dev/null +++ b/tests/build.sh @@ -0,0 +1 @@ +GOOS=js GOARCH=wasm go build -o main.wasm \ No newline at end of file diff --git a/tests/index.html b/tests/index.html new file mode 100644 index 0000000..16a90ae --- /dev/null +++ b/tests/index.html @@ -0,0 +1,593 @@ + + + + + Wasm-Tester + + + + + + \ No newline at end of file diff --git a/tests/main.go b/tests/main.go new file mode 100644 index 0000000..0fc7ada --- /dev/null +++ b/tests/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "git.ailur.dev/ailur/jsStreams" + + "fmt" + "io" + + "syscall/js" +) + +// NOTE: Please do not use this code as an example. It never closes the stream and will leak memory. +// It is intended for use in the developer console, where you can close the stream via JavaScript. + +func main() { + js.Global().Set("TryReadStream", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + go func() { + readStream := jsStreams.NewReadableStream(args[0]) + var buffer []byte + buffer, err := io.ReadAll(readStream) + if err != nil { + fmt.Println(err) + return + } + fmt.Println(string(buffer)) + }() + + return nil + })) + + js.Global().Set("TryWriteStream", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + go func() { + writeStream := jsStreams.NewWritableStream(args[0]) + _, err := writeStream.Write([]byte(args[1].String())) + if err != nil { + fmt.Println(err) + return + } + }() + + return nil + })) + + select {} +}