kittemail/mailbox.go

888 lines
19 KiB
Go

package main
import (
"database/sql"
"errors"
"github.com/KEINOS/go-countline/cl"
"github.com/google/uuid"
"io"
"os"
"sort"
"syscall"
"time"
"github.com/OneOfOne/xxhash"
"github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapserver"
)
// Mailbox is an in-memory mailbox.
//
// The same mailbox can be shared between multiple connections and multiple
// users.
type Mailbox struct {
UIDValidity uint32
openSessions []*MailboxView
tracker *imapserver.MailboxTracker
Subscribed bool
id uuid.UUID
name string
user *User
}
var ErrNoSuchMailbox = &imap.Error{
Type: imap.StatusResponseTypeNo,
Code: imap.ResponseCodeTryCreate,
Text: "No such mailbox",
}
func loadMbox(name string, u *User) (*Mailbox, error) {
var idBytes []byte
var subscribed bool
err := Database.DB.QueryRow("SELECT id, subscribed FROM mailboxes WHERE mailbox = $1 AND owner = $2", name, u.sub[:]).Scan(&idBytes, &subscribed)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNoSuchMailbox
} else {
return nil, err
}
}
return loadMboxRaw(subscribed, idBytes, name, u)
}
func loadMboxRaw(subscribed bool, idRaw []byte, name string, u *User) (*Mailbox, error) {
id, err := uuid.FromBytes(idRaw)
if err != nil {
return nil, err
}
mbox := &Mailbox{
Subscribed: subscribed,
id: id,
name: name,
user: u,
UIDValidity: xxhash.Checksum32(id[:]),
}
count, err := mbox.getCount()
if err != nil {
return nil, err
}
mbox.tracker = imapserver.NewMailboxTracker(count)
return mbox, nil
}
func (mbox *Mailbox) Delete() error {
view := mbox.NewView()
defer func() {
err := view.Close()
if err != nil {
log("Failed to close view: "+err.Error()+", resource leaks may occur", 1)
}
}()
err := view.ExpungeAll(nil)
if err != nil {
return err
}
_, err = Database.DB.Exec("DELETE FROM mailboxes WHERE id = $1", mbox.id[:])
if err != nil {
return err
}
for _, view := range mbox.openSessions {
err = view.Close()
if err != nil {
return err
}
}
return nil
}
func CreateMailbox(newUUID uuid.UUID, name string, u *User) error {
_, err := Database.DB.Exec("INSERT INTO mailboxes (mailbox, id, owner) VALUES ($1, $2, $3)", name, newUUID[:], u.sub[:])
if err != nil {
return err
}
return nil
}
func (mbox *Mailbox) list(options *imap.ListOptions) (*imap.ListData, error) {
if options.SelectSubscribed && !mbox.Subscribed {
return nil, nil
}
data := imap.ListData{
Mailbox: mbox.name,
Delim: mailboxDelim,
}
if mbox.Subscribed {
data.Attrs = append(data.Attrs, imap.MailboxAttrSubscribed)
}
if options.ReturnStatus != nil {
var err error
data.Status, err = mbox.statusData(options.ReturnStatus)
if err != nil {
return nil, err
}
}
return &data, nil
}
func (mbox *Mailbox) uidNext() (imap.UID, error) {
messages, err := Database.DB.Query("SELECT uid FROM messages WHERE mailbox = $1", mbox.id[:])
if err != nil {
return 0, err
}
defer func() {
err = messages.Close()
if err != nil {
log("Failed to close rows: "+err.Error()+", resource leaks may occur", 1)
}
}()
var uid uint32
//goland:noinspection GoDfaErrorMayBeNotNil
for messages.Next() {
var messageID uint32
err = messages.Scan(&messageID)
if err != nil {
return 0, err
}
if messageID > uid {
uid = messageID
}
}
uid++
return imap.UID(uid), nil
}
// StatusData returns data for the STATUS command.
func (mbox *Mailbox) StatusData(options *imap.StatusOptions) *imap.StatusData {
data, err := mbox.statusData(options)
if err != nil {
panic(err)
}
return data
}
func (mbox *Mailbox) getCount() (n uint32, err error) {
err = Database.DB.QueryRow("SELECT COUNT(*) FROM messages WHERE mailbox = $1", mbox.id[:]).Scan(&n)
return
}
func (mbox *Mailbox) statusData(options *imap.StatusOptions) (*imap.StatusData, error) {
data := imap.StatusData{Mailbox: mbox.name}
var n uint32
if options.NumMessages || options.NumUnseen || options.NumDeleted {
var err error
n, err = mbox.getCount()
if err != nil {
return nil, err
}
}
if options.NumMessages {
data.NumMessages = &n
}
if options.UIDNext {
var err error
data.UIDNext, err = mbox.uidNext()
if err != nil {
return nil, err
}
}
if options.UIDValidity {
data.UIDValidity = mbox.UIDValidity
}
if options.NumUnseen {
ns, err := mbox.countByFlag(imap.FlagSeen)
if err != nil {
return nil, err
}
s := n - ns
data.NumUnseen = &s
}
if options.NumDeleted {
nd, err := mbox.countByFlag(imap.FlagDeleted)
if err != nil {
return nil, err
}
data.NumDeleted = &nd
}
if options.Size {
size, err := mbox.getSize()
if err != nil {
return nil, err
}
data.Size = &size
}
return &data, nil
}
func (mbox *Mailbox) countByFlag(flag imap.Flag) (n uint32, err error) {
flags, err := Database.DB.Query("SELECT flag FROM flags WHERE id = $1", mbox.id[:])
if err != nil {
return 0, err
}
defer func() {
err = flags.Close()
if err != nil {
log("Failed to close rows: "+err.Error()+", resource leaks may occur", 1)
}
}()
for flags.Next() {
var name string
err = flags.Scan(&name)
if err != nil {
return 0, err
}
if name == string(canonicalFlag(flag)) {
n++
}
}
return n, nil
}
func (mbox *Mailbox) getSize() (int64, error) {
bodySizes, err := Database.DB.Query("SELECT bodySize FROM messages WHERE mailbox = $1", mbox.id[:])
if err != nil {
return 0, err
}
defer func() {
err = bodySizes.Close()
if err != nil {
log("Failed to close rows: "+err.Error()+", resource leaks may occur", 1)
}
}()
var size int64
for bodySizes.Next() {
var bodySize int64
err = bodySizes.Scan(&bodySize)
if err != nil {
return 0, err
}
size += bodySize
}
return size, nil
}
func (mbox *MailboxView) appendLiteral(r imap.LiteralReader, options *imap.AppendOptions) (*imap.AppendData, error) {
return mbox.appendBuffer(r, r.Size(), options)
}
func (mbox *MailboxView) copyMsg(msg *message) (*imap.AppendData, error) {
return mbox.appendBuffer(msg.buf, msg.len, &imap.AppendOptions{
Time: msg.t,
Flags: msg.flagList(),
})
}
func (mbox *MailboxView) appendBuffer(buf io.Reader, length int64, options *imap.AppendOptions) (*imap.AppendData, error) {
msg := &message{
flags: make(map[imap.Flag]struct{}),
len: length,
id: uuid.New(),
}
mbox.openMessages = append(mbox.openMessages, msg)
if options.Time.IsZero() {
msg.t = time.Now()
} else {
msg.t = options.Time
}
for _, flag := range options.Flags {
msg.flags[canonicalFlag(flag)] = struct{}{}
}
var err error
msg.uid, err = mbox.uidNext()
if err != nil {
return nil, err
}
err = StoreFile(msg.id.String(), &io.LimitedReader{
R: buf,
N: length,
}, mbox.user.sub)
if err != nil {
return nil, err
}
file, err := GetFile(msg.id.String(), mbox.user.sub)
if err != nil {
return nil, err
}
lines, err := cl.CountLines(file)
if err != nil {
return nil, err
}
msg.lines = int64(lines)
_, err = file.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}
msg.buf = file
_, err = Database.DB.Exec("INSERT INTO messages (id, owner, uid, created, bodySize, mailbox) VALUES ($1, $2, $3, $4, $5, $6)", msg.id[:], mbox.user.sub[:], msg.uid, msg.t.Unix(), length, mbox.id[:])
if err != nil {
return nil, err
}
for flag := range msg.flags {
_, err = Database.DB.Exec("INSERT INTO flags (id, flag) VALUES ($1, $2)", msg.id[:], string(canonicalFlag(flag)))
if err != nil {
return nil, err
}
}
count, err := mbox.getCount()
if err != nil {
return nil, err
}
mbox.Mailbox.tracker.QueueNumMessages(count)
return &imap.AppendData{
UIDValidity: mbox.Mailbox.UIDValidity,
UID: msg.uid,
}, nil
}
func (mbox *Mailbox) rename(newName string) error {
_, err := Database.DB.Exec("UPDATE mailboxes SET mailbox = $1 WHERE id = $2", newName, mbox.id[:])
if err != nil {
return err
}
mbox.name = newName
return nil
}
// SetSubscribed changes the subscription state of this mailbox.
func (mbox *Mailbox) SetSubscribed(subscribed bool) error {
_, err := Database.DB.Exec("UPDATE mailboxes SET subscribed = $1 WHERE id = $2", subscribed, mbox.id[:])
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNoSuchMailbox
} else {
return err
}
}
mbox.Subscribed = subscribed
return nil
}
func (mbox *Mailbox) selectData() (*imap.SelectData, error) {
flags := mbox.getFlags()
permanentFlags := make([]imap.Flag, len(flags))
copy(permanentFlags, flags)
permanentFlags = append(permanentFlags, imap.FlagWildcard)
messageNum, err := mbox.getCount()
if err != nil {
return nil, err
}
uidNext, err := mbox.uidNext()
if err != nil {
return nil, err
}
return &imap.SelectData{
Flags: flags,
PermanentFlags: permanentFlags,
NumMessages: messageNum,
UIDNext: uidNext,
UIDValidity: mbox.UIDValidity,
}, nil
}
func (mbox *Mailbox) getFlags() []imap.Flag {
flags, err := Database.DB.Query("SELECT flag FROM flags WHERE id = $1", mbox.id[:])
if err != nil {
return nil
}
var l []imap.Flag
for flags.Next() {
var flag string
err = flags.Scan(&flag)
if err != nil {
return nil
}
l = append(l, imap.Flag(flag))
}
sort.Slice(l, func(i, j int) bool {
return l[i] < l[j]
})
return l
}
func (mbox *MailboxView) ExpungeAll(w *imapserver.ExpungeWriter) error {
messages, err := Database.DB.Query("SELECT uid, id FROM messages WHERE mailbox = $1", mbox.id[:])
if err != nil {
return err
}
for messages.Next() {
var idBytes []byte
var uid uint32
err = messages.Scan(&uid, &idBytes)
if err != nil {
return err
}
id, err := uuid.FromBytes(idBytes)
if err != nil {
return err
}
err = DeleteFile(id.String(), mbox.user.sub)
if err != nil {
return err
}
_, err = Database.DB.Exec("DELETE FROM flags WHERE id = $1", idBytes)
if err != nil {
return err
}
if w != nil {
err = w.WriteExpunge(mbox.messageSequenceNum[imap.UID(uid)])
if err != nil {
return err
}
}
}
_, err = Database.DB.Exec("DELETE FROM messages WHERE mailbox = $1", mbox.id[:])
if err != nil {
return err
}
return nil
}
func (mbox *MailboxView) Expunge(w *imapserver.ExpungeWriter, uids *imap.UIDSet) error {
if uids == nil {
return mbox.ExpungeAll(w)
}
uidSlice, ok := uids.Nums()
if !ok {
return errors.New("incomplete UID set")
}
for uid := range uidSlice {
var idBytes []byte
err := Database.DB.QueryRow("SELECT id FROM messages WHERE uid = $1", uint32(uid)).Scan(&idBytes)
if err != nil {
return err
}
id, err := uuid.FromBytes(idBytes)
if err != nil {
return err
}
err = DeleteFile(id.String(), mbox.user.sub)
if err != nil {
return err
}
_, err = Database.DB.Exec("DELETE FROM flags WHERE id = $1", idBytes)
if err != nil {
return err
}
_, err = Database.DB.Exec("DELETE FROM messages WHERE id = $1", idBytes)
if err != nil {
return err
}
if w != nil {
err = w.WriteExpunge(mbox.messageSequenceNum[imap.UID(uid)])
if err != nil {
return err
}
}
}
return nil
}
// NewView creates a new view into this mailbox.
//
// Callers must call MailboxView.Close once they are done with the mailbox view.
func (mbox *Mailbox) NewView() *MailboxView {
view := &MailboxView{
Mailbox: mbox,
tracker: mbox.tracker.NewSession(),
messageSequenceNum: make(map[imap.UID]uint32),
}
// Index the sequence numbers of messages by UID
messages, err := Database.DB.Query("SELECT uid FROM messages WHERE mailbox = $1", mbox.id[:])
if err != nil {
return nil
}
var i uint32 = 1
for messages.Next() {
var uid uint32
err := messages.Scan(&uid)
if err != nil {
return nil
}
view.messageSequenceNum[imap.UID(uid)] = i
i++
}
mbox.openSessions = append(mbox.openSessions, view)
return view
}
// A MailboxView is a view into a mailbox.
//
// Each view has its own queue of pending unilateral updates.
//
// Once the mailbox view is no longer used, Close must be called.
//
// Typically, a new MailboxView is created for each IMAP connection in the
// selected state.
type MailboxView struct {
*Mailbox
messageSequenceNum map[imap.UID]uint32
openMessages []*message
tracker *imapserver.SessionTracker
searchRes imap.UIDSet
}
// Close releases the resources allocated for the mailbox view.
func (mbox *MailboxView) Close() error {
defer func() {
// Ignore panics, it's probably because the mailbox is already closed
recover()
}()
if mbox.tracker != nil {
mbox.tracker.Close()
}
for _, msg := range mbox.openMessages {
err := msg.buf.Close()
if err != nil {
if !errors.Is(err, os.ErrClosed) && !errors.Is(err, syscall.EINVAL) {
return err
}
}
}
return nil
}
func (mbox *MailboxView) Fetch(w *imapserver.FetchWriter, numSet imap.NumSet, options *imap.FetchOptions) error {
markSeen := false
for _, bs := range options.BodySection {
if !bs.Peek {
markSeen = true
break
}
}
fetchID := uuid.New()
var err error
err = mbox.forEach(numSet, func(seqNum uint32, msg *message) {
if err != nil {
return
}
if markSeen {
msg.flags[canonicalFlag(imap.FlagSeen)] = struct{}{}
mbox.Mailbox.tracker.QueueMessageFlags(seqNum, msg.uid, msg.flagList(), nil)
}
respWriter := w.CreateMessage(mbox.tracker.EncodeSeqNum(seqNum))
err = msg.fetch(respWriter, options)
})
println("FINISHED FETCHING FOR FETCH ID " + fetchID.String())
return err
}
func (mbox *MailboxView) Search(numKind imapserver.NumKind, criteria *imap.SearchCriteria, options *imap.SearchOptions) (*imap.SearchData, error) {
mbox.staticSearchCriteria(criteria)
data := imap.SearchData{UID: numKind == imapserver.NumKindUID}
var (
seqSet imap.SeqSet
uidSet imap.UIDSet
i int
)
messages, err := Database.DB.Query("SELECT uid, id, bodySize, created FROM messages WHERE mailbox = $1", mbox.id[:])
if err != nil {
return nil, err
}
for messages.Next() {
seqNum := mbox.tracker.EncodeSeqNum(uint32(i) + 1)
i++
msg := &message{}
var idBytes []byte
var timeRaw int64
err := messages.Scan(&msg.uid, &idBytes, &msg.len, &timeRaw)
if err != nil {
return nil, err
}
msg.t = time.Unix(timeRaw, 0)
msg.id, err = uuid.FromBytes(idBytes)
if err != nil {
return nil, err
}
file, err := GetFile(msg.id.String(), mbox.user.sub)
if err != nil {
return nil, err
}
lines, err := cl.CountLines(file)
if err != nil {
return nil, err
}
msg.lines = int64(lines)
_, err = file.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}
msg.buf = file
if !msg.search(seqNum, criteria) {
continue
}
// Always populate the UID set, since it may be saved later for SEARCHRES
uidSet.AddNum(msg.uid)
var num uint32
switch numKind {
case imapserver.NumKindSeq:
if seqNum == 0 {
continue
}
seqSet.AddNum(seqNum)
num = seqNum
case imapserver.NumKindUID:
num = uint32(msg.uid)
}
if data.Min == 0 || num < data.Min {
data.Min = num
}
if data.Max == 0 || num > data.Max {
data.Max = num
}
data.Count++
}
switch numKind {
case imapserver.NumKindSeq:
data.All = seqSet
case imapserver.NumKindUID:
data.All = uidSet
}
if options.ReturnSave {
mbox.searchRes = uidSet
}
return &data, nil
}
func (mbox *MailboxView) staticSearchCriteria(criteria *imap.SearchCriteria) {
seqNums := make([]imap.SeqSet, 0, len(criteria.SeqNum))
for _, seqSet := range criteria.SeqNum {
numSet := mbox.staticNumSet(seqSet)
switch numSet := numSet.(type) {
case imap.SeqSet:
seqNums = append(seqNums, numSet)
case imap.UIDSet: // can happen with SEARCHRES
criteria.UID = append(criteria.UID, numSet)
}
}
criteria.SeqNum = seqNums
for i, uidSet := range criteria.UID {
criteria.UID[i] = mbox.staticNumSet(uidSet).(imap.UIDSet)
}
for i := range criteria.Not {
mbox.staticSearchCriteria(&criteria.Not[i])
}
for i := range criteria.Or {
for j := range criteria.Or[i] {
mbox.staticSearchCriteria(&criteria.Or[i][j])
}
}
}
func (mbox *MailboxView) Store(w *imapserver.FetchWriter, numSet imap.NumSet, flags *imap.StoreFlags, options *imap.StoreOptions) error {
err := mbox.forEach(numSet, func(seqNum uint32, msg *message) {
msg.store(flags)
mbox.Mailbox.tracker.QueueMessageFlags(seqNum, msg.uid, msg.flagList(), mbox.tracker)
})
if err != nil {
return err
}
if !flags.Silent {
return mbox.Fetch(w, numSet, &imap.FetchOptions{Flags: true})
}
return nil
}
func (mbox *MailboxView) Poll(w *imapserver.UpdateWriter, allowExpunge bool) error {
return mbox.tracker.Poll(w, allowExpunge)
}
func (mbox *MailboxView) Idle(w *imapserver.UpdateWriter, stop <-chan struct{}) error {
return mbox.tracker.Idle(w, stop)
}
func (mbox *MailboxView) forEach(numSet imap.NumSet, f func(seqNum uint32, msg *message)) error {
// TODO: optimize
numSet = mbox.staticNumSet(numSet)
messages, err := Database.DB.Query("SELECT uid, id, bodySize, created FROM messages WHERE mailbox = $1", mbox.id[:])
if err != nil {
return err
}
for messages.Next() {
msg := &message{}
var idBytes []byte
var timeRaw int64
err := messages.Scan(&msg.uid, &idBytes, &msg.len, &timeRaw)
if err != nil {
return err
}
msg.t = time.Unix(timeRaw, 0)
seqNum := mbox.messageSequenceNum[msg.uid]
msg.id, err = uuid.FromBytes(idBytes)
if err != nil {
return err
}
file, err := GetFile(msg.id.String(), mbox.user.sub)
if err != nil {
return err
}
lines, err := cl.CountLines(file)
if err != nil {
return err
}
msg.lines = int64(lines)
_, err = file.Seek(0, io.SeekStart)
if err != nil {
return err
}
msg.buf = file
mbox.openMessages = append(mbox.openMessages, msg)
var contains bool
switch numSet := numSet.(type) {
case imap.SeqSet:
seqNum := mbox.tracker.EncodeSeqNum(seqNum)
contains = seqNum != 0 && numSet.Contains(seqNum)
case imap.UIDSet:
contains = numSet.Contains(msg.uid)
}
if !contains {
continue
}
f(seqNum, msg)
}
return nil
}
// staticNumSet converts a dynamic sequence set into a static one.
//
// This is necessary to properly handle the special symbol "*", which
// represents the maximum sequence number or UID in the mailbox.
//
// This function also handles the special SEARCHRES marker "$".
func (mbox *MailboxView) staticNumSet(numSet imap.NumSet) imap.NumSet {
if imap.IsSearchRes(numSet) {
return mbox.searchRes
}
switch numSet := numSet.(type) {
case imap.SeqSet:
maxCount, err := mbox.getCount()
if err != nil {
return nil
}
for i := range numSet {
r := &numSet[i]
staticNumRange(&r.Start, &r.Stop, maxCount)
}
case imap.UIDSet:
uidNext, err := mbox.uidNext()
if err != nil {
return nil
}
maxCount := uidNext - 1
for i := range numSet {
r := &numSet[i]
staticNumRange((*uint32)(&r.Start), (*uint32)(&r.Stop), uint32(maxCount))
}
}
return numSet
}
func staticNumRange(start, stop *uint32, max uint32) {
dyn := false
if *start == 0 {
*start = max
dyn = true
}
if *stop == 0 {
*stop = max
dyn = true
}
if dyn && *start > *stop {
*start, *stop = *stop, *start
}
}