Source file albatross_stats_pure.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
open Vmm_core
let ( let* ) = Result.bind
external sysconf_clock_tick : unit -> int = "vmmanage_sysconf_clock_tick"
external sysctl_kinfo_proc : int -> Stats.rusage * Stats.kinfo_mem =
"vmmanage_sysctl_kinfo_proc"
external get_ifindex_by_name : string -> int = "vmmanage_get_ifindex_by_name"
external sysctl_ifdata : int -> Stats.ifdata = "vmmanage_sysctl_ifdata"
type vmctx
type vcpu
external vmmapi_open : string -> (vmctx * vcpu) = "vmmanage_vmmapi_open"
external vmmapi_close : vmctx -> vcpu -> unit = "vmmanage_vmmapi_close"
external vmmapi_stats : vmctx -> vcpu -> (string * int64) list = "vmmanage_vmmapi_stats"
type 'a t = {
pid_nic : ((vmctx * vcpu, int) result * string * (string * int * string) list) IM.t ;
vmid_pid : int Vmm_trie.t ;
name_sockets : 'a Vmm_trie.t ;
}
let pp_strings pp strs = Fmt.(list ~sep:(any ",@ ") string) pp strs
let pp_nics pp nets =
Fmt.(list ~sep:(any ",@ ") (pair ~sep:(any ": ") string string)) pp nets
let empty () =
{ pid_nic = IM.empty ; vmid_pid = Vmm_trie.empty ; name_sockets = Vmm_trie.empty }
let remove_entry t name =
let name_sockets = Vmm_trie.remove name t.name_sockets in
{ t with name_sockets }
let rec wrap f arg =
try Some (f arg) with
| Unix.Unix_error (Unix.EINTR, _, _) -> wrap f arg
| e ->
Logs.err (fun m -> m "exception %s" (Printexc.to_string e)) ;
None
let vmmapi = conn_metrics "vmmapi"
let remove_vmid t vmid =
Logs.info (fun m -> m "removing vmid %a" Vmm_core.Name.pp vmid) ;
match Vmm_trie.find vmid t.vmid_pid with
| None -> Logs.warn (fun m -> m "no pid found for %a" Vmm_core.Name.pp vmid) ; t
| Some pid ->
Logs.info (fun m -> m "removing pid %d" pid) ;
(match IM.find_opt pid t.pid_nic with
| Some (Ok (vmctx, vcpu), _, _) -> ignore (wrap (vmmapi_close vmctx) vcpu) ; vmmapi `Close
| _ -> ()) ;
let pid_nic = IM.remove pid t.pid_nic
and vmid_pid = Vmm_trie.remove vmid t.vmid_pid
in
{ t with pid_nic ; vmid_pid }
let open_vmmapi ~retries name =
if retries = 0 then begin
Logs.debug (fun m -> m "(ignored 0) vmmapi_open failed for %s" name) ;
Error 0
end else
match wrap vmmapi_open name with
| None ->
let left = max 0 (pred retries) in
Logs.warn (fun m -> m "(ignored, %d attempts left) vmmapi_open failed for %s" left name) ;
Error left
| Some vmctx ->
vmmapi `Open;
Logs.info (fun m -> m "vmmapi_open succeeded for %s" name) ;
Ok vmctx
let try_open_vmmapi pid_nic =
IM.fold (fun pid (vmctx, vmmdev, nics) fresh ->
let vmctx =
match vmctx with
| Ok vmctx -> Ok vmctx
| Error retries -> open_vmmapi ~retries vmmdev
in
IM.add pid (vmctx, vmmdev, nics) fresh)
pid_nic IM.empty
let string_of_file filename =
try
let fh = open_in filename in
let content = input_line fh in
close_in_noerr fh ;
Ok content
with _ -> Error (`Msg (Fmt.str "Error reading file %S" filename))
let parse_proc_stat s =
let stats_opt =
match String.index_opt s '(', String.rindex_opt s ')' with
| Some idxa, Some idxb ->
let pid = String.sub s 0 (idxa - 1)
and tcomm = String.sub s (idxa + 1) (idxb - idxa - 1)
and rest = String.sub s (idxb + 2) (String.length s - (idxb + 2))
in
let rest = String.split_on_char ' ' rest in
Some (pid :: tcomm :: rest)
| _ -> None
in
Option.to_result ~none:(`Msg "unable to parse /proc/<pid>/stat") stats_opt
let read_proc_status pid =
try
let fh = open_in ("/proc/" ^ string_of_int pid ^ "/status") in
let lines =
let rec read_lines acc = try
read_lines (input_line fh :: acc)
with End_of_file -> acc in
read_lines []
in
close_in_noerr fh ;
List.map (String.split_on_char ':') lines |>
List.fold_left (fun acc x -> match acc, x with
| Some acc, k :: v ->
let v = String.concat ":" v |> String.trim in
Some ((k, v) :: acc)
| _ -> None) (Some []) |>
Option.to_result ~none:(`Msg "failed to parse /proc/<pid>/status")
with _ -> Error (`Msg (Fmt.str "error reading file /proc/%d/status" pid))
let linux_rusage pid =
let* start =
match Unix.stat ("/proc/" ^ string_of_int pid) with
| { Unix.st_ctime = start; _ } ->
let frac = Float.rem start 1. in
Ok (Int64.of_float start, int_of_float (frac *. 1_000_000.))
| exception Unix.Unix_error (Unix.ENOENT,_,_) -> Error (`Msg "failed to stat process")
in
let* data = string_of_file ("/proc/" ^ string_of_int pid ^ "/stat") in
let* stat_vals = parse_proc_stat data in
let* data = string_of_file ("/proc/" ^ string_of_int pid ^ "/statm") in
let statm_vals = String.split_on_char ' ' data in
let* status = read_proc_status pid in
let assoc_i64 key : (int64, _) result =
let e x = Option.to_result ~none:(`Msg "error parsing /proc/<pid>/status") x in
let* v = e (List.assoc_opt key status) in
e (Int64.of_string_opt v)
in
let i64 s = try Ok (Int64.of_string s) with
Failure _ -> Error (`Msg "couldn't parse integer")
in
let time_of_int64 t =
let clock_tick = Int64.of_int (sysconf_clock_tick ()) in
let ( * ) = Int64.mul and ( / ) = Int64.div in
(t / clock_tick, Int64.to_int (((Int64.rem t clock_tick) * 1_000_000L) / clock_tick))
and us_of_int64 t =
let clock_tick = Int64.of_int (sysconf_clock_tick ()) in
let ( * ) = Int64.mul and ( / ) = Int64.div in
t * 1_000_000L / clock_tick
in
if List.length stat_vals >= 52 && List.length statm_vals >= 7 then
let* minflt = i64 (List.nth stat_vals 9) in
let* majflt = i64 (List.nth stat_vals 11) in
let* utime = i64 (List.nth stat_vals 13) in
let* stime = i64 (List.nth stat_vals 14) in
let runtime = us_of_int64 Int64.(add utime stime) in
let utime = time_of_int64 utime
and stime = time_of_int64 stime in
let* vsize = i64 (List.nth stat_vals 22) in
let* = i64 (List.nth stat_vals 23) in
let* nswap = i64 (List.nth stat_vals 35) in
let* tsize = i64 (List.nth statm_vals 3) in
let* dsize = i64 (List.nth statm_vals 5) in
let* ssize = i64 (List.nth statm_vals 5) in
let* nvcsw = assoc_i64 "voluntary_ctxt_switches" in
let* nivcsw = assoc_i64 "nonvoluntary_ctxt_switches" in
let rusage = { Stats.utime ; stime ; maxrss = rss ; ixrss = 0L ;
idrss = 0L ; isrss = 0L ; minflt ; majflt ; nswap ; inblock = 0L ; outblock = 0L ;
msgsnd = 0L ; msgrcv = 0L ; nsignals = 0L ; nvcsw ; nivcsw }
and kmem = { Stats.vsize; rss; tsize; dsize; ssize; runtime; cow = 0; start }
in
Ok (rusage, kmem)
else
Error (`Msg "couldn't read /proc/<pid>/stat")
let rusage pid =
match Lazy.force Vmm_unix.uname with
| Vmm_unix.FreeBSD -> wrap sysctl_kinfo_proc pid
| Vmm_unix.Linux -> match linux_rusage pid with
| Ok x -> Some x
| Error (`Msg msg) ->
Logs.err (fun m -> m "error %s while reading /proc/" msg);
None
let gather pid vmctx nics =
let ru, mem =
match rusage pid with
| None -> None, None
| Some (mem, ru) -> Some mem, Some ru
in
ru, mem,
(match vmctx with
| Error _ -> None
| Ok (vmctx, vcpu) -> wrap (vmmapi_stats vmctx) vcpu),
List.fold_left (fun ifd (bridge, nic, nname) ->
match wrap sysctl_ifdata nic with
| None ->
Logs.warn (fun m -> m "failed to get ifdata for %s" nname) ;
ifd
| Some data -> { data with Stats.bridge }::ifd)
[] nics
let tick gather_bhyve t =
let pid_nic = if gather_bhyve then try_open_vmmapi t.pid_nic else t.pid_nic in
let t' = { t with pid_nic } in
let outs, to_remove =
List.fold_left (fun (out, to_remove) (vmid, pid) ->
let listeners = Vmm_trie.collect vmid t'.name_sockets in
match listeners with
| [] -> Logs.debug (fun m -> m "nobody is listening") ; (out, to_remove)
| xs -> match IM.find_opt pid t.pid_nic with
| None ->
Logs.warn (fun m -> m "couldn't find nics of %d" pid) ;
out, to_remove
| Some (vmctx, _, nics) ->
let ru, mem, vmm, ifd = gather pid vmctx nics in
match ru with
| None ->
Logs.err (fun m -> m "failed to get rusage for %d" pid) ;
out, vmid :: to_remove
| Some ru' ->
let stats =
ru', mem, vmm, ifd
in
let outs =
List.fold_left (fun out (id, (version, socket)) ->
let listening_path = Vmm_core.Name.path id in
let real_id = Vmm_core.Name.drop_prefix_exn vmid listening_path in
let = Vmm_commands.header ~version real_id in
((socket, id, (header, `Data (`Stats_data stats))) :: out))
out xs
in
outs, to_remove)
([], []) (Vmm_trie.all t'.vmid_pid)
in
let t'' = List.fold_left remove_vmid t' to_remove in
(t'', outs)
let add_pid t vmid vmmdev pid nics =
let nic_ids =
List.filter_map
(fun (bridge, tap) ->
match wrap get_ifindex_by_name tap with
| Some ifindex -> Some (bridge, ifindex, tap)
| None -> Logs.debug (fun m -> m "failed to get ifindex for: %S" tap); None)
nics
in
Logs.info (fun m -> m "adding %a %d %a" Name.pp vmid pid pp_nics nics) ;
let pid_nic = IM.add pid (Error 4, vmmdev, nic_ids) t.pid_nic
and vmid_pid, ret = Vmm_trie.insert vmid pid t.vmid_pid
in
assert (ret = None) ;
Ok { t with pid_nic ; vmid_pid }
let handle t socket (hdr, wire) =
match wire with
| `Command (`Stats_cmd cmd) ->
begin
let id = hdr.Vmm_commands.name in
match cmd with
| `Stats_initial ->
Logs.warn (fun m -> m "unexpected message initial");
Error (`Msg "unexpected message initial")
| `Stats_add (vmmdev, pid, taps) ->
let* t = add_pid t id vmmdev pid taps in
Ok (t, None, "added")
| `Stats_remove ->
let t = remove_vmid t id in
Ok (t, None, "removed")
| `Stats_subscribe ->
let name_sockets, close =
Vmm_trie.insert id (hdr.Vmm_commands.version, socket) t.name_sockets
in
Ok ({ t with name_sockets }, close, "subscribed")
end
| _ ->
Logs.err (fun m -> m "unexpected wire %a"
(Vmm_commands.pp_wire ~verbose:false) (hdr, wire)) ;
Error (`Msg "unexpected command")