package jsStreams import ( "errors" "fmt" "io" "strings" "sync" "syscall/js" ) // ReadableStream implements io.ReadCloser for a JavaScript ReadableStream. type ReadableStream struct { stream js.Value lock sync.Mutex } // Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered. // This implementation of Read does not use scratch space if n < len(p). If some data is available but not len(p) bytes, // Read conventionally returns what is available instead of waiting for more. Note: Read will block until data is available, // meaning in a WASM environment, you must use a goroutine to call Read. func (r *ReadableStream) Read(p []byte) (n int, err error) { defer func() { recovered := recover() if recovered != nil { err = fmt.Errorf("panic: %v", recovered) } }() r.lock.Lock() var waitGroup sync.WaitGroup waitGroup.Add(1) reader := r.stream.Call("getReader", map[string]interface{}{"mode": "byob"}) resultBuffer := js.Global().Get("Uint8Array").New(len(p)) readResult := reader.Call("read", resultBuffer) readResult.Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} { defer waitGroup.Done() data := args[0].Get("value") js.CopyBytesToGo(p, data) if args[0].Get("done").Bool() { err = io.EOF } n = data.Length() return nil })) readResult.Call("catch", js.FuncOf(func(this js.Value, args []js.Value) interface{} { defer waitGroup.Done() err = errors.New(args[0].Get("message").String()) return nil })) waitGroup.Wait() reader.Call("releaseLock") r.lock.Unlock() return n, err } // Close closes the ReadableStream. If the stream is already closed, Close does nothing. func (r *ReadableStream) Close() (err error) { defer func() { // We don't want any errors to be thrown if the stream is already closed. recovery := recover() if !strings.Contains(recovery.(string), "Can not close stream after closing or error") { err = fmt.Errorf("panic: %v", recovery) } }() r.lock.Lock() r.stream.Call("cancel") r.lock.Unlock() return nil } // NewReadableStream creates a new ReadableStream from a JavaScript ReadableStream. func NewReadableStream(stream js.Value) *ReadableStream { return &ReadableStream{stream: stream} } // WritableStream implements io.WriteCloser for a JavaScript WritableStream. type WritableStream struct { stream js.Value lock sync.Mutex } // Write writes len(p) bytes from p to the underlying data stream. It returns the number of bytes written from p (0 <= n <= len(p)) // and any error encountered that caused the write to stop early. Write must return a non-nil error if it returns n < len(p). // Write must not modify the slice data, even temporarily. func (w *WritableStream) Write(p []byte) (n int, err error) { defer func() { recovered := recover() if recovered != nil { err = fmt.Errorf("panic: %v", recovered) } }() w.lock.Lock() var waitGroup sync.WaitGroup waitGroup.Add(2) writer := w.stream.Call("getWriter") writer.Get("ready").Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} { defer waitGroup.Done() buffer := js.Global().Get("Uint8Array").New(len(p)) js.CopyBytesToJS(buffer, p) writeResult := writer.Call("write", buffer) writeResult.Call("then", js.FuncOf(func(this js.Value, args []js.Value) interface{} { defer waitGroup.Done() n = len(p) return nil })) writeResult.Call("catch", js.FuncOf(func(this js.Value, args []js.Value) interface{} { defer waitGroup.Done() err = errors.New(args[0].Get("message").String()) return nil })) return nil })) waitGroup.Wait() writer.Call("releaseLock") w.lock.Unlock() return n, err } // Close closes the WritableStream. If the stream is already closed, Close does nothing. func (w *WritableStream) Close() (err error) { defer func() { // We don't want any errors to be thrown if the stream is already closed. recovery := recover() if !strings.Contains(recovery.(string), "Can not close stream after closing or error") { err = fmt.Errorf("panic: %v", recovery) } }() w.lock.Lock() w.stream.Call("close") w.lock.Unlock() return nil } // NewWritableStream creates a new WritableStream. If a JavaScript WritableStream is provided, it will be used. // Otherwise, a new WritableStream will be created. func NewWritableStream(stream ...js.Value) *WritableStream { if len(stream) > 0 { return &WritableStream{stream: stream[0]} } else { stream := js.Global().Get("WritableStream").New() return &WritableStream{stream: stream} } }