re-organize folder structure

This commit is contained in:
lone-cloud 2026-02-02 02:24:50 -08:00
parent a8d8325971
commit 051a13cb7a
24 changed files with 290 additions and 282 deletions

View file

@ -14,16 +14,6 @@
Prism is a self-hosted notification gateway that receives HTTP requests and routes them through Signal groups or custom webhooks. Route notifications through Signal to avoid exposing unique network fingerprints, or forward them to your own webhook endpoints for custom handling. Prism is a self-hosted notification gateway that receives HTTP requests and routes them through Signal groups or custom webhooks. Route notifications through Signal to avoid exposing unique network fingerprints, or forward them to your own webhook endpoints for custom handling.
## How?
Prism accepts notifications via HTTP POST requests and routes them based on your configured delivery method:
- **Signal groups**: Uses [signal-cli](https://github.com/AsamK/signal-cli) to create a Signal group for each app and send notifications as messages
- **Webhook forwarding**: Forwards notifications to your own webhook URL (useful for UnifiedPush distributors, ntfy, or custom handlers)
Each endpoint can be independently configured to use either delivery method through the admin UI.
For the optional Proton Mail integration, Prism requires a server that runs Proton's official [proton-bridge](https://github.com/ProtonMail/proton-bridge). Prism's docker compose process will run an image from [protonmail-bridge-docker](https://github.com/shenxn/protonmail-bridge-docker). Once authenticated, the communication between Prism and proton-bridge will be over IMAP.
## Setup ## Setup
@ -188,9 +178,11 @@ For API-based monitoring, call `/api/health` which returns JSON:
## Architecture ## Architecture
Prism consists of two services that **MUST run together on the same machine**: Prism accepts notifications via HTTP POST requests and routes them based on your configured delivery method:
- **prism** (Go): Receives webhooks, sends Signal messages via signal-cli. Optional: monitors Proton Mail IMAP - **Signal groups**: Uses [signal-cli](https://github.com/AsamK/signal-cli) to create a Signal group for each app and send notifications as messages
- **protonmail-bridge** (Official Proton, optional): Decrypts Proton Mail emails, runs local IMAP server - **Webhook forwarding**: Forwards notifications to your own webhook URL (useful for UnifiedPush distributors, ntfy or custom handlers)
All services communicate over a private Docker network with no external exposure except Signal protocol. **Separating these services across multiple machines would expose plaintext IMAP traffic and compromise security.** Each endpoint can be independently configured to use either delivery method through the admin UI.
For the optional Proton Mail integration, Prism requires a server that runs Proton's official [proton-bridge](https://github.com/ProtonMail/proton-bridge). Prism's docker compose process will run an image from [protonmail-bridge-docker](https://github.com/shenxn/protonmail-bridge-docker). Once authenticated, the communication between Prism and proton-bridge will be over IMAP.

View file

@ -1,250 +0,0 @@
package proton
import (
"context"
"fmt"
"log/slog"
"time"
"prism/internal/config"
"prism/internal/notification"
"github.com/emersion/go-imap"
"github.com/emersion/go-imap/client"
)
type Monitor struct {
cfg *config.Config
dispatcher *notification.Dispatcher
logger *slog.Logger
client *client.Client
monitorStartTime time.Time
}
func NewMonitor(cfg *config.Config, dispatcher *notification.Dispatcher, logger *slog.Logger) *Monitor {
return &Monitor{
cfg: cfg,
dispatcher: dispatcher,
logger: logger,
}
}
func (m *Monitor) Start(ctx context.Context) error {
if !m.cfg.IsProtonEnabled() {
m.logger.Info("Proton Mail monitoring disabled")
return nil
}
m.logger.Info("Starting Proton Mail monitor")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := m.connect(); err != nil {
m.logger.Error("Failed to connect to IMAP", "error", err)
time.Sleep(time.Duration(m.cfg.IMAPReconnectBaseDelay) * time.Millisecond)
continue
}
if err := m.monitor(ctx); err != nil {
m.logger.Error("Monitor error", "error", err)
}
if m.client != nil {
if err := m.client.Logout(); err != nil {
m.logger.Error("Logout failed", "error", err)
}
m.client = nil
}
time.Sleep(time.Duration(m.cfg.IMAPReconnectBaseDelay) * time.Millisecond)
}
}
}
func (m *Monitor) connect() error {
addr := fmt.Sprintf("%s:%d", m.cfg.ProtonBridgeHost, m.cfg.ProtonBridgePort)
c, err := client.Dial(addr)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
if err := c.Login(m.cfg.ProtonIMAPUsername, m.cfg.ProtonIMAPPassword); err != nil {
if logoutErr := c.Logout(); logoutErr != nil {
m.logger.Error("Logout failed", "error", logoutErr)
}
return fmt.Errorf("failed to login: %w", err)
}
m.client = c
m.logger.Info("Connected to Proton Bridge")
return nil
}
func (m *Monitor) monitor(ctx context.Context) error {
_, err := m.client.Select(m.cfg.IMAPInbox, false)
if err != nil {
return fmt.Errorf("failed to select inbox: %w", err)
}
if m.monitorStartTime.IsZero() {
m.monitorStartTime = time.Now()
m.logger.Info("Monitor start time set", "time", m.monitorStartTime)
}
updates := make(chan client.Update, 10)
m.client.Updates = updates
stop := make(chan struct{})
defer close(stop)
idleErr := make(chan error, 1)
go func() {
idleErr <- m.client.Idle(stop, nil)
}()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case update := <-updates:
switch u := update.(type) {
case *client.MailboxUpdate:
if u.Mailbox.UnseenSeqNum > 0 {
if err := m.handleNewMessages(); err != nil {
m.logger.Error("Failed to handle new messages", "error", err)
}
}
}
case <-ticker.C:
close(stop)
<-idleErr
if err := m.client.Noop(); err != nil {
return fmt.Errorf("noop failed: %w", err)
}
stop = make(chan struct{})
go func() {
idleErr <- m.client.Idle(stop, nil)
}()
case err := <-idleErr:
return fmt.Errorf("idle ended: %w", err)
}
}
}
func (m *Monitor) handleNewMessages() error {
criteria := imap.NewSearchCriteria()
criteria.WithoutFlags = []string{m.cfg.IMAPSeenFlag}
uids, err := m.client.UidSearch(criteria)
if err != nil {
return fmt.Errorf("search failed: %w", err)
}
if len(uids) == 0 {
return nil
}
seqSet := new(imap.SeqSet)
seqSet.AddNum(uids...)
messages := make(chan *imap.Message, 10)
done := make(chan error, 1)
go func() {
done <- m.client.UidFetch(seqSet, []imap.FetchItem{imap.FetchEnvelope, imap.FetchUid}, messages)
}()
for msg := range messages {
if err := m.processMessage(msg); err != nil {
m.logger.Error("Failed to process message", "error", err)
}
}
if err := <-done; err != nil {
return fmt.Errorf("fetch failed: %w", err)
}
return nil
}
func (m *Monitor) processMessage(msg *imap.Message) error {
if msg.Envelope == nil {
return nil
}
if msg.Envelope.Date.Before(m.monitorStartTime) {
m.logger.Debug("Skipping old email", "date", msg.Envelope.Date, "subject", msg.Envelope.Subject)
return nil
}
var from string
if len(msg.Envelope.From) > 0 {
addr := msg.Envelope.From[0]
if addr.PersonalName != "" {
from = addr.PersonalName
} else {
from = addr.Address()
}
} else {
from = "Unknown sender"
}
subject := msg.Envelope.Subject
if subject == "" {
subject = "No subject"
}
endpoint := m.cfg.EndpointPrefixProton + m.cfg.ProtonPrismTopic
notif := notification.Notification{
Title: from,
Message: subject,
Actions: []notification.Action{
{
ID: "mark-read",
Endpoint: "/api/proton-mail/mark-read",
Method: "POST",
Data: map[string]interface{}{
"uid": msg.Uid,
},
},
},
}
if err := m.dispatcher.Send(endpoint, notif); err != nil {
m.logger.Error("Failed to send notification", "endpoint", endpoint, "error", err)
return err
}
m.logger.Info("Processed Proton Mail notification", "from", from, "subject", subject)
return nil
}
func (m *Monitor) MarkAsRead(uid uint32) error {
if m.client == nil {
return fmt.Errorf("not connected")
}
seqSet := new(imap.SeqSet)
seqSet.AddNum(uid)
item := imap.FormatFlagsOp(imap.AddFlags, true)
flags := []interface{}{m.cfg.IMAPSeenFlag}
return m.client.UidStore(seqSet, item, flags, nil)
}
func (m *Monitor) IsConnected() bool {
return m.client != nil
}

View file

@ -7,9 +7,9 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"prism/internal/config" "prism/service/config"
"prism/internal/server" "prism/service/server"
"prism/internal/util" "prism/service/util"
"github.com/joho/godotenv" "github.com/joho/godotenv"
) )

View file

@ -8,7 +8,7 @@ import (
"net/http" "net/http"
"strings" "strings"
"prism/internal/signal" "prism/service/signal"
) )
type Dispatcher struct { type Dispatcher struct {

View file

@ -0,0 +1,87 @@
package proton
import (
"context"
"fmt"
"time"
"github.com/emersion/go-imap/client"
)
func (m *Monitor) connect() error {
addr := fmt.Sprintf("%s:%d", m.cfg.ProtonBridgeHost, m.cfg.ProtonBridgePort)
c, err := client.Dial(addr)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
if err := c.Login(m.cfg.ProtonIMAPUsername, m.cfg.ProtonIMAPPassword); err != nil {
if logoutErr := c.Logout(); logoutErr != nil {
m.logger.Error("Logout failed", "error", logoutErr)
}
return fmt.Errorf("failed to login: %w", err)
}
m.client = c
m.logger.Info("Connected to Proton Bridge")
return nil
}
func (m *Monitor) monitor(ctx context.Context) error {
_, err := m.client.Select(m.cfg.IMAPInbox, false)
if err != nil {
return fmt.Errorf("failed to select inbox: %w", err)
}
if m.monitorStartTime.IsZero() {
m.monitorStartTime = time.Now()
m.logger.Info("Monitor start time set", "time", m.monitorStartTime)
}
updates := make(chan client.Update, 10)
m.client.Updates = updates
stop := make(chan struct{})
defer close(stop)
idleErr := make(chan error, 1)
go func() {
idleErr <- m.client.Idle(stop, nil)
}()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case update := <-updates:
switch u := update.(type) {
case *client.MailboxUpdate:
if u.Mailbox.UnseenSeqNum > 0 {
if err := m.handleNewMessages(); err != nil {
m.logger.Error("Failed to handle new messages", "error", err)
}
}
}
case <-ticker.C:
close(stop)
<-idleErr
if err := m.client.Noop(); err != nil {
return fmt.Errorf("noop failed: %w", err)
}
stop = make(chan struct{})
go func() {
idleErr <- m.client.Idle(stop, nil)
}()
case err := <-idleErr:
return fmt.Errorf("idle ended: %w", err)
}
}
}

112
service/proton/email.go Normal file
View file

@ -0,0 +1,112 @@
package proton
import (
"fmt"
"prism/service/notification"
"github.com/emersion/go-imap"
)
func (m *Monitor) handleNewMessages() error {
criteria := imap.NewSearchCriteria()
criteria.WithoutFlags = []string{m.cfg.IMAPSeenFlag}
uids, err := m.client.UidSearch(criteria)
if err != nil {
return fmt.Errorf("search failed: %w", err)
}
if len(uids) == 0 {
return nil
}
seqSet := new(imap.SeqSet)
seqSet.AddNum(uids...)
messages := make(chan *imap.Message, 10)
done := make(chan error, 1)
go func() {
done <- m.client.UidFetch(seqSet, []imap.FetchItem{imap.FetchEnvelope, imap.FetchUid}, messages)
}()
for msg := range messages {
if err := m.processMessage(msg); err != nil {
m.logger.Error("Failed to process message", "error", err)
}
}
if err := <-done; err != nil {
return fmt.Errorf("fetch failed: %w", err)
}
return nil
}
func (m *Monitor) processMessage(msg *imap.Message) error {
if msg.Envelope == nil {
return nil
}
if msg.Envelope.Date.Before(m.monitorStartTime) {
m.logger.Debug("Skipping old email", "date", msg.Envelope.Date, "subject", msg.Envelope.Subject)
return nil
}
var from string
if len(msg.Envelope.From) > 0 {
addr := msg.Envelope.From[0]
if addr.PersonalName != "" {
from = addr.PersonalName
} else {
from = addr.Address()
}
} else {
from = "Unknown sender"
}
subject := msg.Envelope.Subject
if subject == "" {
subject = "No subject"
}
endpoint := m.cfg.EndpointPrefixProton + m.cfg.ProtonPrismTopic
notif := notification.Notification{
Title: from,
Message: subject,
Actions: []notification.Action{
{
ID: "mark-read",
Endpoint: "/api/proton-mail/mark-read",
Method: "POST",
Data: map[string]interface{}{
"uid": msg.Uid,
},
},
},
}
if err := m.dispatcher.Send(endpoint, notif); err != nil {
m.logger.Error("Failed to send notification", "endpoint", endpoint, "error", err)
return err
}
m.logger.Info("Processed Proton Mail notification", "from", from, "subject", subject)
return nil
}
func (m *Monitor) MarkAsRead(uid uint32) error {
if m.client == nil {
return fmt.Errorf("not connected")
}
seqSet := new(imap.SeqSet)
seqSet.AddNum(uid)
item := imap.FormatFlagsOp(imap.AddFlags, true)
flags := []interface{}{m.cfg.IMAPSeenFlag}
return m.client.UidStore(seqSet, item, flags, nil)
}

67
service/proton/monitor.go Normal file
View file

@ -0,0 +1,67 @@
package proton
import (
"context"
"log/slog"
"time"
"prism/service/config"
"prism/service/notification"
"github.com/emersion/go-imap/client"
)
type Monitor struct {
cfg *config.Config
dispatcher *notification.Dispatcher
logger *slog.Logger
client *client.Client
monitorStartTime time.Time
}
func NewMonitor(cfg *config.Config, dispatcher *notification.Dispatcher, logger *slog.Logger) *Monitor {
return &Monitor{
cfg: cfg,
dispatcher: dispatcher,
logger: logger,
}
}
func (m *Monitor) Start(ctx context.Context) error {
if !m.cfg.IsProtonEnabled() {
m.logger.Info("Proton Mail monitoring disabled")
return nil
}
m.logger.Info("Starting Proton Mail monitor")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := m.connect(); err != nil {
m.logger.Error("Failed to connect to IMAP", "error", err)
time.Sleep(time.Duration(m.cfg.IMAPReconnectBaseDelay) * time.Millisecond)
continue
}
if err := m.monitor(ctx); err != nil {
m.logger.Error("Monitor error", "error", err)
}
if m.client != nil {
if err := m.client.Logout(); err != nil {
m.logger.Error("Logout failed", "error", err)
}
m.client = nil
}
time.Sleep(time.Duration(m.cfg.IMAPReconnectBaseDelay) * time.Millisecond)
}
}
}
func (m *Monitor) IsConnected() bool {
return m.client != nil
}

View file

@ -3,7 +3,7 @@ package server
import ( import (
"net/http" "net/http"
"prism/internal/notification" "prism/service/notification"
) )
func (s *Server) handleDeleteEndpointAction(w http.ResponseWriter, r *http.Request) { func (s *Server) handleDeleteEndpointAction(w http.ResponseWriter, r *http.Request) {

View file

@ -6,8 +6,8 @@ import (
"net/http" "net/http"
"time" "time"
"prism/internal/notification" "prism/service/notification"
"prism/internal/util" "prism/service/util"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
) )

View file

@ -5,9 +5,9 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"prism/internal/notification" "prism/service/notification"
"prism/internal/signal" "prism/service/signal"
"prism/internal/util" "prism/service/util"
) )
func (s *Server) handleFragmentHealth(w http.ResponseWriter, r *http.Request) { func (s *Server) handleFragmentHealth(w http.ResponseWriter, r *http.Request) {

View file

@ -8,7 +8,7 @@ import (
"strings" "strings"
"time" "time"
"prism/internal/notification" "prism/service/notification"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
) )

View file

@ -5,7 +5,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"prism/internal/notification" "prism/service/notification"
) )
type registerWebhookRequest struct { type registerWebhookRequest struct {

View file

@ -6,7 +6,7 @@ import (
"sync" "sync"
"time" "time"
"prism/internal/util" "prism/service/util"
"golang.org/x/time/rate" "golang.org/x/time/rate"
) )

View file

@ -7,11 +7,11 @@ import (
"net/http" "net/http"
"time" "time"
"prism/internal/config" "prism/service/config"
"prism/internal/notification" "prism/service/notification"
"prism/internal/proton" "prism/service/proton"
"prism/internal/signal" "prism/service/signal"
"prism/internal/util" "prism/service/util"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"