-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathtail.go
More file actions
119 lines (97 loc) · 2.11 KB
/
tail.go
File metadata and controls
119 lines (97 loc) · 2.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"bufio"
"context"
"fmt"
"os"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
type Tail struct {
Finished bool
closed chan struct{}
logger *Logger
namespace string
pod string
container string
sinceSeconds int64
timestamps bool
}
// NewTail creates new Tail object
func NewTail(namespace, pod, container string, logger *Logger, sinceSeconds int64, timestamps bool) *Tail {
return &Tail{
Finished: false,
closed: make(chan struct{}),
logger: logger,
namespace: namespace,
pod: pod,
container: container,
sinceSeconds: sinceSeconds,
timestamps: timestamps,
}
}
// Start starts Pod log streaming
func (t *Tail) Start(ctx context.Context, clientset *kubernetes.Clientset) {
t.logger.PrintPodDetected(t.pod, t.container)
go func() {
rs, err := clientset.CoreV1().Pods(t.namespace).GetLogs(t.pod, &v1.PodLogOptions{
Container: t.container,
Follow: true,
SinceSeconds: &t.sinceSeconds,
Timestamps: t.timestamps,
}).Stream(ctx)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
defer rs.Close()
go func() {
<-t.closed
rs.Close()
}()
sc := bufio.NewScanner(rs)
for sc.Scan() {
t.logger.PrintPodLog(t.pod, t.container, sc.Text(), t.timestamps)
}
}()
go func() {
<-ctx.Done()
close(t.closed)
}()
}
// Finish finishes Pod log streaming with Pod completion
func (t *Tail) Finish() {
t.logger.PrintPodFinished(t.pod, t.container)
t.Finished = true
}
// Delete finishes Pod log streaming with Pod deletion
func (t *Tail) Delete() {
t.logger.PrintPodDeleted(t.pod, t.container)
close(t.closed)
}
type TailMap struct {
mu sync.Mutex
data map[string]*Tail
}
func NewTailMap() *TailMap {
return &TailMap{
data: make(map[string]*Tail),
}
}
func (m *TailMap) Get(k string) (*Tail, bool) {
m.mu.Lock()
defer m.mu.Unlock()
d, ok := m.data[k]
return d, ok
}
func (m *TailMap) Set(k string, v *Tail) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[k] = v
}
func (m *TailMap) Delete(k string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.data, k)
}