Files
kor-elf-shield/internal/daemon/analyzer/log/systemd.go
Leonid Nikitin 8615c79f12 Refactor log analyzer to support SSH login detection
- Moved `Entry` type to `analysis` package for better organization.
- Introduced `SSH` analysis service to detect and notify about SSH logins.
- Added notification and logging for detected SSH login events.
2025-12-31 22:52:12 +05:00

148 lines
3.0 KiB
Go

package log
import (
"context"
"encoding/json"
"fmt"
"io"
"os/exec"
"strconv"
"sync"
"time"
analysisServices "git.kor-elf.net/kor-elf-shield/kor-elf-shield/internal/daemon/analyzer/log/analysis"
"git.kor-elf.net/kor-elf-shield/kor-elf-shield/internal/log"
)
type Systemd interface {
Run(ctx context.Context, logChan chan<- analysisServices.Entry)
Close() error
}
type systemd struct {
path string
units []string
logger log.Logger
cmd *exec.Cmd
mu sync.Mutex
}
type journalRawEntry struct {
Message string `json:"MESSAGE"`
Unit string `json:"_SYSTEMD_UNIT"`
PID string `json:"_PID"`
SourceTimestamp string `json:"_SOURCE_REALTIME_TIMESTAMP"`
RealtimeTimestamp string `json:"__REALTIME_TIMESTAMP"`
}
func NewSystemd(path string, units []string, logger log.Logger) Systemd {
return &systemd{
path: path,
units: units,
logger: logger,
}
}
func (s *systemd) Run(ctx context.Context, logChan chan<- analysisServices.Entry) {
if len(s.units) == 0 {
s.logger.Debug("No units specified for journalctl")
return
}
args := []string{"-f", "-n", "0", "-o", "json"}
for _, unit := range s.units {
args = append(args, "-u", unit)
}
s.logger.Debug("Journalctl started")
for {
select {
case <-ctx.Done():
return
default:
if err := s.watch(ctx, args, logChan); err != nil {
s.logger.Error(fmt.Sprintf("Journalctl exited with error: %v", err))
}
// Pause before restarting to avoid CPU load during persistent errors
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
s.logger.Warn("Journalctl connection lost. Restarting in 5s...")
continue
}
}
}
}
func (s *systemd) watch(ctx context.Context, args []string, logChan chan<- analysisServices.Entry) error {
cmd := exec.CommandContext(ctx, s.path, args...)
s.mu.Lock()
s.cmd = cmd
s.mu.Unlock()
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("stdout pipe error: %w", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("start error: %w", err)
}
decoder := json.NewDecoder(stdout)
for {
var raw journalRawEntry
if err := decoder.Decode(&raw); err != nil {
if err == io.EOF {
break // The process terminated normally or was killed.
}
return fmt.Errorf("decode error: %w", err)
}
tsStr := raw.SourceTimestamp
if tsStr == "" {
tsStr = raw.RealtimeTimestamp
}
var entryTime time.Time
if usec, err := strconv.ParseInt(tsStr, 10, 64); err == nil {
entryTime = time.Unix(0, usec*int64(time.Microsecond))
} else {
entryTime = time.Now()
}
logChan <- analysisServices.Entry{
Message: raw.Message,
Unit: raw.Unit,
PID: raw.PID,
Time: entryTime,
}
}
return cmd.Wait()
}
func (s *systemd) Close() error {
if s.units == nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
if s.cmd != nil && s.cmd.Process != nil {
s.logger.Debug("Stopping journalctl")
// Force journalctl to quit on shutdown
return s.cmd.Process.Kill()
}
s.logger.Debug("Journalctl stopped")
return nil
}