package main import ( "database/sql" "errors" "github.com/KEINOS/go-countline/cl" "github.com/google/uuid" "io" "os" "sort" "syscall" "time" "github.com/OneOfOne/xxhash" "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapserver" ) // Mailbox is an in-memory mailbox. // // The same mailbox can be shared between multiple connections and multiple // users. type Mailbox struct { UIDValidity uint32 openSessions []*MailboxView tracker *imapserver.MailboxTracker Subscribed bool id uuid.UUID name string user *User } var ErrNoSuchMailbox = &imap.Error{ Type: imap.StatusResponseTypeNo, Code: imap.ResponseCodeTryCreate, Text: "No such mailbox", } func loadMbox(name string, u *User) (*Mailbox, error) { var idBytes []byte var subscribed bool err := Database.DB.QueryRow("SELECT id, subscribed FROM mailboxes WHERE mailbox = $1 AND owner = $2", name, u.sub[:]).Scan(&idBytes, &subscribed) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, ErrNoSuchMailbox } else { return nil, err } } return loadMboxRaw(subscribed, idBytes, name, u) } func loadMboxRaw(subscribed bool, idRaw []byte, name string, u *User) (*Mailbox, error) { id, err := uuid.FromBytes(idRaw) if err != nil { return nil, err } mbox := &Mailbox{ Subscribed: subscribed, id: id, name: name, user: u, UIDValidity: xxhash.Checksum32(id[:]), } count, err := mbox.getCount() if err != nil { return nil, err } mbox.tracker = imapserver.NewMailboxTracker(count) return mbox, nil } func (mbox *Mailbox) Delete() error { view := mbox.NewView() defer func() { err := view.Close() if err != nil { log("Failed to close view: "+err.Error()+", resource leaks may occur", 1) } }() err := view.ExpungeAll(nil) if err != nil { return err } _, err = Database.DB.Exec("DELETE FROM mailboxes WHERE id = $1", mbox.id[:]) if err != nil { return err } for _, view := range mbox.openSessions { err = view.Close() if err != nil { return err } } return nil } func CreateMailbox(newUUID uuid.UUID, name string, u *User) error { _, err := Database.DB.Exec("INSERT INTO mailboxes (mailbox, id, owner) VALUES ($1, $2, $3)", name, newUUID[:], u.sub[:]) if err != nil { return err } return nil } func (mbox *Mailbox) list(options *imap.ListOptions) (*imap.ListData, error) { if options.SelectSubscribed && !mbox.Subscribed { return nil, nil } data := imap.ListData{ Mailbox: mbox.name, Delim: mailboxDelim, } if mbox.Subscribed { data.Attrs = append(data.Attrs, imap.MailboxAttrSubscribed) } if options.ReturnStatus != nil { var err error data.Status, err = mbox.statusData(options.ReturnStatus) if err != nil { return nil, err } } return &data, nil } func (mbox *Mailbox) uidNext() (imap.UID, error) { messages, err := Database.DB.Query("SELECT uid FROM messages WHERE mailbox = $1", mbox.id[:]) if err != nil { return 0, err } defer func() { err = messages.Close() if err != nil { log("Failed to close rows: "+err.Error()+", resource leaks may occur", 1) } }() var uid uint32 //goland:noinspection GoDfaErrorMayBeNotNil for messages.Next() { var messageID uint32 err = messages.Scan(&messageID) if err != nil { return 0, err } if messageID > uid { uid = messageID } } uid++ return imap.UID(uid), nil } // StatusData returns data for the STATUS command. func (mbox *Mailbox) StatusData(options *imap.StatusOptions) *imap.StatusData { data, err := mbox.statusData(options) if err != nil { panic(err) } return data } func (mbox *Mailbox) getCount() (n uint32, err error) { err = Database.DB.QueryRow("SELECT COUNT(*) FROM messages WHERE mailbox = $1", mbox.id[:]).Scan(&n) return } func (mbox *Mailbox) statusData(options *imap.StatusOptions) (*imap.StatusData, error) { data := imap.StatusData{Mailbox: mbox.name} var n uint32 if options.NumMessages || options.NumUnseen || options.NumDeleted { var err error n, err = mbox.getCount() if err != nil { return nil, err } } if options.NumMessages { data.NumMessages = &n } if options.UIDNext { var err error data.UIDNext, err = mbox.uidNext() if err != nil { return nil, err } } if options.UIDValidity { data.UIDValidity = mbox.UIDValidity } if options.NumUnseen { ns, err := mbox.countByFlag(imap.FlagSeen) if err != nil { return nil, err } s := n - ns data.NumUnseen = &s } if options.NumDeleted { nd, err := mbox.countByFlag(imap.FlagDeleted) if err != nil { return nil, err } data.NumDeleted = &nd } if options.Size { size, err := mbox.getSize() if err != nil { return nil, err } data.Size = &size } return &data, nil } func (mbox *Mailbox) countByFlag(flag imap.Flag) (n uint32, err error) { flags, err := Database.DB.Query("SELECT flag FROM flags WHERE id = $1", mbox.id[:]) if err != nil { return 0, err } defer func() { err = flags.Close() if err != nil { log("Failed to close rows: "+err.Error()+", resource leaks may occur", 1) } }() for flags.Next() { var name string err = flags.Scan(&name) if err != nil { return 0, err } if name == string(canonicalFlag(flag)) { n++ } } return n, nil } func (mbox *Mailbox) getSize() (int64, error) { bodySizes, err := Database.DB.Query("SELECT bodySize FROM messages WHERE mailbox = $1", mbox.id[:]) if err != nil { return 0, err } defer func() { err = bodySizes.Close() if err != nil { log("Failed to close rows: "+err.Error()+", resource leaks may occur", 1) } }() var size int64 for bodySizes.Next() { var bodySize int64 err = bodySizes.Scan(&bodySize) if err != nil { return 0, err } size += bodySize } return size, nil } func (mbox *MailboxView) appendLiteral(r imap.LiteralReader, options *imap.AppendOptions) (*imap.AppendData, error) { return mbox.appendBuffer(r, r.Size(), options) } func (mbox *MailboxView) copyMsg(msg *message) (*imap.AppendData, error) { return mbox.appendBuffer(msg.buf, msg.len, &imap.AppendOptions{ Time: msg.t, Flags: msg.flagList(), }) } func (mbox *MailboxView) appendBuffer(buf io.Reader, length int64, options *imap.AppendOptions) (*imap.AppendData, error) { msg := &message{ flags: make(map[imap.Flag]struct{}), len: length, id: uuid.New(), } mbox.openMessages = append(mbox.openMessages, msg) if options.Time.IsZero() { msg.t = time.Now() } else { msg.t = options.Time } for _, flag := range options.Flags { msg.flags[canonicalFlag(flag)] = struct{}{} } var err error msg.uid, err = mbox.uidNext() if err != nil { return nil, err } err = StoreFile(msg.id.String(), &io.LimitedReader{ R: buf, N: length, }, mbox.user.sub) if err != nil { return nil, err } file, err := GetFile(msg.id.String(), mbox.user.sub) if err != nil { return nil, err } lines, err := cl.CountLines(file) if err != nil { return nil, err } msg.lines = int64(lines) _, err = file.Seek(0, io.SeekStart) if err != nil { return nil, err } msg.buf = file _, err = Database.DB.Exec("INSERT INTO messages (id, owner, uid, created, bodySize, mailbox) VALUES ($1, $2, $3, $4, $5, $6)", msg.id[:], mbox.user.sub[:], msg.uid, msg.t.Unix(), length, mbox.id[:]) if err != nil { return nil, err } for flag := range msg.flags { _, err = Database.DB.Exec("INSERT INTO flags (id, flag) VALUES ($1, $2)", msg.id[:], string(canonicalFlag(flag))) if err != nil { return nil, err } } count, err := mbox.getCount() if err != nil { return nil, err } mbox.Mailbox.tracker.QueueNumMessages(count) return &imap.AppendData{ UIDValidity: mbox.Mailbox.UIDValidity, UID: msg.uid, }, nil } func (mbox *Mailbox) rename(newName string) error { _, err := Database.DB.Exec("UPDATE mailboxes SET mailbox = $1 WHERE id = $2", newName, mbox.id[:]) if err != nil { return err } mbox.name = newName return nil } // SetSubscribed changes the subscription state of this mailbox. func (mbox *Mailbox) SetSubscribed(subscribed bool) error { _, err := Database.DB.Exec("UPDATE mailboxes SET subscribed = $1 WHERE id = $2", subscribed, mbox.id[:]) if err != nil { if errors.Is(err, sql.ErrNoRows) { return ErrNoSuchMailbox } else { return err } } mbox.Subscribed = subscribed return nil } func (mbox *Mailbox) selectData() (*imap.SelectData, error) { flags := mbox.getFlags() permanentFlags := make([]imap.Flag, len(flags)) copy(permanentFlags, flags) permanentFlags = append(permanentFlags, imap.FlagWildcard) messageNum, err := mbox.getCount() if err != nil { return nil, err } uidNext, err := mbox.uidNext() if err != nil { return nil, err } return &imap.SelectData{ Flags: flags, PermanentFlags: permanentFlags, NumMessages: messageNum, UIDNext: uidNext, UIDValidity: mbox.UIDValidity, }, nil } func (mbox *Mailbox) getFlags() []imap.Flag { flags, err := Database.DB.Query("SELECT flag FROM flags WHERE id = $1", mbox.id[:]) if err != nil { return nil } var l []imap.Flag for flags.Next() { var flag string err = flags.Scan(&flag) if err != nil { return nil } l = append(l, imap.Flag(flag)) } sort.Slice(l, func(i, j int) bool { return l[i] < l[j] }) return l } func (mbox *MailboxView) ExpungeAll(w *imapserver.ExpungeWriter) error { messages, err := Database.DB.Query("SELECT uid, id FROM messages WHERE mailbox = $1", mbox.id[:]) if err != nil { return err } for messages.Next() { var idBytes []byte var uid uint32 err = messages.Scan(&uid, &idBytes) if err != nil { return err } id, err := uuid.FromBytes(idBytes) if err != nil { return err } err = DeleteFile(id.String(), mbox.user.sub) if err != nil { return err } _, err = Database.DB.Exec("DELETE FROM flags WHERE id = $1", idBytes) if err != nil { return err } if w != nil { err = w.WriteExpunge(mbox.messageSequenceNum[imap.UID(uid)]) if err != nil { return err } } } _, err = Database.DB.Exec("DELETE FROM messages WHERE mailbox = $1", mbox.id[:]) if err != nil { return err } return nil } func (mbox *MailboxView) Expunge(w *imapserver.ExpungeWriter, uids *imap.UIDSet) error { if uids == nil { return mbox.ExpungeAll(w) } uidSlice, ok := uids.Nums() if !ok { return errors.New("incomplete UID set") } for uid := range uidSlice { var idBytes []byte err := Database.DB.QueryRow("SELECT id FROM messages WHERE uid = $1", uint32(uid)).Scan(&idBytes) if err != nil { return err } id, err := uuid.FromBytes(idBytes) if err != nil { return err } err = DeleteFile(id.String(), mbox.user.sub) if err != nil { return err } _, err = Database.DB.Exec("DELETE FROM flags WHERE id = $1", idBytes) if err != nil { return err } _, err = Database.DB.Exec("DELETE FROM messages WHERE id = $1", idBytes) if err != nil { return err } if w != nil { err = w.WriteExpunge(mbox.messageSequenceNum[imap.UID(uid)]) if err != nil { return err } } } return nil } // NewView creates a new view into this mailbox. // // Callers must call MailboxView.Close once they are done with the mailbox view. func (mbox *Mailbox) NewView() *MailboxView { view := &MailboxView{ Mailbox: mbox, tracker: mbox.tracker.NewSession(), messageSequenceNum: make(map[imap.UID]uint32), } // Index the sequence numbers of messages by UID messages, err := Database.DB.Query("SELECT uid FROM messages WHERE mailbox = $1", mbox.id[:]) if err != nil { return nil } var i uint32 = 1 for messages.Next() { var uid uint32 err := messages.Scan(&uid) if err != nil { return nil } view.messageSequenceNum[imap.UID(uid)] = i i++ } mbox.openSessions = append(mbox.openSessions, view) return view } // A MailboxView is a view into a mailbox. // // Each view has its own queue of pending unilateral updates. // // Once the mailbox view is no longer used, Close must be called. // // Typically, a new MailboxView is created for each IMAP connection in the // selected state. type MailboxView struct { *Mailbox messageSequenceNum map[imap.UID]uint32 openMessages []*message tracker *imapserver.SessionTracker searchRes imap.UIDSet } // Close releases the resources allocated for the mailbox view. func (mbox *MailboxView) Close() error { defer func() { // Ignore panics, it's probably because the mailbox is already closed recover() }() if mbox.tracker != nil { mbox.tracker.Close() } for _, msg := range mbox.openMessages { err := msg.buf.Close() if err != nil { if !errors.Is(err, os.ErrClosed) && !errors.Is(err, syscall.EINVAL) { return err } } } return nil } func (mbox *MailboxView) Fetch(w *imapserver.FetchWriter, numSet imap.NumSet, options *imap.FetchOptions) error { markSeen := false for _, bs := range options.BodySection { if !bs.Peek { markSeen = true break } } fetchID := uuid.New() var err error err = mbox.forEach(numSet, func(seqNum uint32, msg *message) { if err != nil { return } if markSeen { msg.flags[canonicalFlag(imap.FlagSeen)] = struct{}{} mbox.Mailbox.tracker.QueueMessageFlags(seqNum, msg.uid, msg.flagList(), nil) } respWriter := w.CreateMessage(mbox.tracker.EncodeSeqNum(seqNum)) err = msg.fetch(respWriter, options) }) println("FINISHED FETCHING FOR FETCH ID " + fetchID.String()) return err } func (mbox *MailboxView) Search(numKind imapserver.NumKind, criteria *imap.SearchCriteria, options *imap.SearchOptions) (*imap.SearchData, error) { mbox.staticSearchCriteria(criteria) data := imap.SearchData{UID: numKind == imapserver.NumKindUID} var ( seqSet imap.SeqSet uidSet imap.UIDSet i int ) messages, err := Database.DB.Query("SELECT uid, id, bodySize, created FROM messages WHERE mailbox = $1", mbox.id[:]) if err != nil { return nil, err } for messages.Next() { seqNum := mbox.tracker.EncodeSeqNum(uint32(i) + 1) i++ msg := &message{} var idBytes []byte var timeRaw int64 err := messages.Scan(&msg.uid, &idBytes, &msg.len, &timeRaw) if err != nil { return nil, err } msg.t = time.Unix(timeRaw, 0) msg.id, err = uuid.FromBytes(idBytes) if err != nil { return nil, err } file, err := GetFile(msg.id.String(), mbox.user.sub) if err != nil { return nil, err } lines, err := cl.CountLines(file) if err != nil { return nil, err } msg.lines = int64(lines) _, err = file.Seek(0, io.SeekStart) if err != nil { return nil, err } msg.buf = file if !msg.search(seqNum, criteria) { continue } // Always populate the UID set, since it may be saved later for SEARCHRES uidSet.AddNum(msg.uid) var num uint32 switch numKind { case imapserver.NumKindSeq: if seqNum == 0 { continue } seqSet.AddNum(seqNum) num = seqNum case imapserver.NumKindUID: num = uint32(msg.uid) } if data.Min == 0 || num < data.Min { data.Min = num } if data.Max == 0 || num > data.Max { data.Max = num } data.Count++ } switch numKind { case imapserver.NumKindSeq: data.All = seqSet case imapserver.NumKindUID: data.All = uidSet } if options.ReturnSave { mbox.searchRes = uidSet } return &data, nil } func (mbox *MailboxView) staticSearchCriteria(criteria *imap.SearchCriteria) { seqNums := make([]imap.SeqSet, 0, len(criteria.SeqNum)) for _, seqSet := range criteria.SeqNum { numSet := mbox.staticNumSet(seqSet) switch numSet := numSet.(type) { case imap.SeqSet: seqNums = append(seqNums, numSet) case imap.UIDSet: // can happen with SEARCHRES criteria.UID = append(criteria.UID, numSet) } } criteria.SeqNum = seqNums for i, uidSet := range criteria.UID { criteria.UID[i] = mbox.staticNumSet(uidSet).(imap.UIDSet) } for i := range criteria.Not { mbox.staticSearchCriteria(&criteria.Not[i]) } for i := range criteria.Or { for j := range criteria.Or[i] { mbox.staticSearchCriteria(&criteria.Or[i][j]) } } } func (mbox *MailboxView) Store(w *imapserver.FetchWriter, numSet imap.NumSet, flags *imap.StoreFlags, options *imap.StoreOptions) error { err := mbox.forEach(numSet, func(seqNum uint32, msg *message) { msg.store(flags) mbox.Mailbox.tracker.QueueMessageFlags(seqNum, msg.uid, msg.flagList(), mbox.tracker) }) if err != nil { return err } if !flags.Silent { return mbox.Fetch(w, numSet, &imap.FetchOptions{Flags: true}) } return nil } func (mbox *MailboxView) Poll(w *imapserver.UpdateWriter, allowExpunge bool) error { return mbox.tracker.Poll(w, allowExpunge) } func (mbox *MailboxView) Idle(w *imapserver.UpdateWriter, stop <-chan struct{}) error { return mbox.tracker.Idle(w, stop) } func (mbox *MailboxView) forEach(numSet imap.NumSet, f func(seqNum uint32, msg *message)) error { // TODO: optimize numSet = mbox.staticNumSet(numSet) messages, err := Database.DB.Query("SELECT uid, id, bodySize, created FROM messages WHERE mailbox = $1", mbox.id[:]) if err != nil { return err } for messages.Next() { msg := &message{} var idBytes []byte var timeRaw int64 err := messages.Scan(&msg.uid, &idBytes, &msg.len, &timeRaw) if err != nil { return err } msg.t = time.Unix(timeRaw, 0) seqNum := mbox.messageSequenceNum[msg.uid] msg.id, err = uuid.FromBytes(idBytes) if err != nil { return err } file, err := GetFile(msg.id.String(), mbox.user.sub) if err != nil { return err } lines, err := cl.CountLines(file) if err != nil { return err } msg.lines = int64(lines) _, err = file.Seek(0, io.SeekStart) if err != nil { return err } msg.buf = file mbox.openMessages = append(mbox.openMessages, msg) var contains bool switch numSet := numSet.(type) { case imap.SeqSet: seqNum := mbox.tracker.EncodeSeqNum(seqNum) contains = seqNum != 0 && numSet.Contains(seqNum) case imap.UIDSet: contains = numSet.Contains(msg.uid) } if !contains { continue } f(seqNum, msg) } return nil } // staticNumSet converts a dynamic sequence set into a static one. // // This is necessary to properly handle the special symbol "*", which // represents the maximum sequence number or UID in the mailbox. // // This function also handles the special SEARCHRES marker "$". func (mbox *MailboxView) staticNumSet(numSet imap.NumSet) imap.NumSet { if imap.IsSearchRes(numSet) { return mbox.searchRes } switch numSet := numSet.(type) { case imap.SeqSet: maxCount, err := mbox.getCount() if err != nil { return nil } for i := range numSet { r := &numSet[i] staticNumRange(&r.Start, &r.Stop, maxCount) } case imap.UIDSet: uidNext, err := mbox.uidNext() if err != nil { return nil } maxCount := uidNext - 1 for i := range numSet { r := &numSet[i] staticNumRange((*uint32)(&r.Start), (*uint32)(&r.Stop), uint32(maxCount)) } } return numSet } func staticNumRange(start, stop *uint32, max uint32) { dyn := false if *start == 0 { *start = max dyn = true } if *stop == 0 { *stop = max dyn = true } if dyn && *start > *stop { *start, *stop = *stop, *start } }