~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/tools/perf/scripts/python/parallel-perf.py

Version: ~ [ linux-6.12-rc7 ] ~ [ linux-6.11.7 ] ~ [ linux-6.10.14 ] ~ [ linux-6.9.12 ] ~ [ linux-6.8.12 ] ~ [ linux-6.7.12 ] ~ [ linux-6.6.60 ] ~ [ linux-6.5.13 ] ~ [ linux-6.4.16 ] ~ [ linux-6.3.13 ] ~ [ linux-6.2.16 ] ~ [ linux-6.1.116 ] ~ [ linux-6.0.19 ] ~ [ linux-5.19.17 ] ~ [ linux-5.18.19 ] ~ [ linux-5.17.15 ] ~ [ linux-5.16.20 ] ~ [ linux-5.15.171 ] ~ [ linux-5.14.21 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.229 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.285 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.323 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.336 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.337 ] ~ [ linux-4.4.302 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.12 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 #!/usr/bin/env python3
  2 # SPDX-License-Identifier: GPL-2.0
  3 #
  4 # Run a perf script command multiple times in parallel, using perf script
  5 # options --cpu and --time so that each job processes a different chunk
  6 # of the data.
  7 #
  8 # Copyright (c) 2024, Intel Corporation.
  9 
 10 import subprocess
 11 import argparse
 12 import pathlib
 13 import shlex
 14 import time
 15 import copy
 16 import sys
 17 import os
 18 import re
 19 
 20 glb_prog_name = "parallel-perf.py"
 21 glb_min_interval = 10.0
 22 glb_min_samples = 64
 23 
 24 class Verbosity():
 25 
 26         def __init__(self, quiet=False, verbose=False, debug=False):
 27                 self.normal    = True
 28                 self.verbose   = verbose
 29                 self.debug     = debug
 30                 self.self_test = True
 31                 if self.debug:
 32                         self.verbose = True
 33                 if self.verbose:
 34                         quiet = False
 35                 if quiet:
 36                         self.normal = False
 37 
 38 # Manage work (Start/Wait/Kill), as represented by a subprocess.Popen command
 39 class Work():
 40 
 41         def __init__(self, cmd, pipe_to, output_dir="."):
 42                 self.popen = None
 43                 self.consumer = None
 44                 self.cmd = cmd
 45                 self.pipe_to = pipe_to
 46                 self.output_dir = output_dir
 47                 self.cmdout_name = f"{output_dir}/cmd.txt"
 48                 self.stdout_name = f"{output_dir}/out.txt"
 49                 self.stderr_name = f"{output_dir}/err.txt"
 50 
 51         def Command(self):
 52                 sh_cmd = [ shlex.quote(x) for x in self.cmd ]
 53                 return " ".join(self.cmd)
 54 
 55         def Stdout(self):
 56                 return open(self.stdout_name, "w")
 57 
 58         def Stderr(self):
 59                 return open(self.stderr_name, "w")
 60 
 61         def CreateOutputDir(self):
 62                 pathlib.Path(self.output_dir).mkdir(parents=True, exist_ok=True)
 63 
 64         def Start(self):
 65                 if self.popen:
 66                         return
 67                 self.CreateOutputDir()
 68                 with open(self.cmdout_name, "w") as f:
 69                         f.write(self.Command())
 70                         f.write("\n")
 71                 stdout = self.Stdout()
 72                 stderr = self.Stderr()
 73                 if self.pipe_to:
 74                         self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=stderr)
 75                         args = shlex.split(self.pipe_to)
 76                         self.consumer = subprocess.Popen(args, stdin=self.popen.stdout, stdout=stdout, stderr=stderr)
 77                 else:
 78                         self.popen = subprocess.Popen(self.cmd, stdout=stdout, stderr=stderr)
 79 
 80         def RemoveEmptyErrFile(self):
 81                 if os.path.exists(self.stderr_name):
 82                         if os.path.getsize(self.stderr_name) == 0:
 83                                 os.unlink(self.stderr_name)
 84 
 85         def Errors(self):
 86                 if os.path.exists(self.stderr_name):
 87                         if os.path.getsize(self.stderr_name) != 0:
 88                                 return [ f"Non-empty error file {self.stderr_name}" ]
 89                 return []
 90 
 91         def TidyUp(self):
 92                 self.RemoveEmptyErrFile()
 93 
 94         def RawPollWait(self, p, wait):
 95                 if wait:
 96                         return p.wait()
 97                 return p.poll()
 98 
 99         def Poll(self, wait=False):
100                 if not self.popen:
101                         return None
102                 result = self.RawPollWait(self.popen, wait)
103                 if self.consumer:
104                         res = result
105                         result = self.RawPollWait(self.consumer, wait)
106                         if result != None and res == None:
107                                 self.popen.kill()
108                                 result = None
109                         elif result == 0 and res != None and res != 0:
110                                 result = res
111                 if result != None:
112                         self.TidyUp()
113                 return result
114 
115         def Wait(self):
116                 return self.Poll(wait=True)
117 
118         def Kill(self):
119                 if not self.popen:
120                         return
121                 self.popen.kill()
122                 if self.consumer:
123                         self.consumer.kill()
124 
125 def KillWork(worklist, verbosity):
126         for w in worklist:
127                 w.Kill()
128         for w in worklist:
129                 w.Wait()
130 
131 def NumberOfCPUs():
132         return os.sysconf("SC_NPROCESSORS_ONLN")
133 
134 def NanoSecsToSecsStr(x):
135         if x == None:
136                 return ""
137         x = str(x)
138         if len(x) < 10:
139                 x = "0" * (10 - len(x)) + x
140         return x[:len(x) - 9] + "." + x[-9:]
141 
142 def InsertOptionAfter(cmd, option, after):
143         try:
144                 pos = cmd.index(after)
145                 cmd.insert(pos + 1, option)
146         except:
147                 cmd.append(option)
148 
149 def CreateWorkList(cmd, pipe_to, output_dir, cpus, time_ranges_by_cpu):
150         max_len = len(str(cpus[-1]))
151         cpu_dir_fmt = f"cpu-%.{max_len}u"
152         worklist = []
153         pos = 0
154         for cpu in cpus:
155                 if cpu >= 0:
156                         cpu_dir = os.path.join(output_dir, cpu_dir_fmt % cpu)
157                         cpu_option = f"--cpu={cpu}"
158                 else:
159                         cpu_dir = output_dir
160                         cpu_option = None
161 
162                 tr_dir_fmt = "time-range"
163 
164                 if len(time_ranges_by_cpu) > 1:
165                         time_ranges = time_ranges_by_cpu[pos]
166                         tr_dir_fmt += f"-{pos}"
167                         pos += 1
168                 else:
169                         time_ranges = time_ranges_by_cpu[0]
170 
171                 max_len = len(str(len(time_ranges)))
172                 tr_dir_fmt += f"-%.{max_len}u"
173 
174                 i = 0
175                 for r in time_ranges:
176                         if r == [None, None]:
177                                 time_option = None
178                                 work_output_dir = cpu_dir
179                         else:
180                                 time_option = "--time=" + NanoSecsToSecsStr(r[0]) + "," + NanoSecsToSecsStr(r[1])
181                                 work_output_dir = os.path.join(cpu_dir, tr_dir_fmt % i)
182                                 i += 1
183                         work_cmd = list(cmd)
184                         if time_option != None:
185                                 InsertOptionAfter(work_cmd, time_option, "script")
186                         if cpu_option != None:
187                                 InsertOptionAfter(work_cmd, cpu_option, "script")
188                         w = Work(work_cmd, pipe_to, work_output_dir)
189                         worklist.append(w)
190         return worklist
191 
192 def DoRunWork(worklist, nr_jobs, verbosity):
193         nr_to_do = len(worklist)
194         not_started = list(worklist)
195         running = []
196         done = []
197         chg = False
198         while True:
199                 nr_done = len(done)
200                 if chg and verbosity.normal:
201                         nr_run = len(running)
202                         print(f"\rThere are {nr_to_do} jobs: {nr_done} completed, {nr_run} running", flush=True, end=" ")
203                         if verbosity.verbose:
204                                 print()
205                         chg = False
206                 if nr_done == nr_to_do:
207                         break
208                 while len(running) < nr_jobs and len(not_started):
209                         w = not_started.pop(0)
210                         running.append(w)
211                         if verbosity.verbose:
212                                 print("Starting:", w.Command())
213                         w.Start()
214                         chg = True
215                 if len(running):
216                         time.sleep(0.1)
217                 finished = []
218                 not_finished = []
219                 while len(running):
220                         w = running.pop(0)
221                         r = w.Poll()
222                         if r == None:
223                                 not_finished.append(w)
224                                 continue
225                         if r == 0:
226                                 if verbosity.verbose:
227                                         print("Finished:", w.Command())
228                                 finished.append(w)
229                                 chg = True
230                                 continue
231                         if verbosity.normal and not verbosity.verbose:
232                                 print()
233                         print("Job failed!\n    return code:", r, "\n    command:    ", w.Command())
234                         if w.pipe_to:
235                                 print("    piped to:   ", w.pipe_to)
236                         print("Killing outstanding jobs")
237                         KillWork(not_finished, verbosity)
238                         KillWork(running, verbosity)
239                         return False
240                 running = not_finished
241                 done += finished
242         errorlist = []
243         for w in worklist:
244                 errorlist += w.Errors()
245         if len(errorlist):
246                 print("Errors:")
247                 for e in errorlist:
248                         print(e)
249         elif verbosity.normal:
250                 print("\r"," "*50, "\rAll jobs finished successfully", flush=True)
251         return True
252 
253 def RunWork(worklist, nr_jobs=NumberOfCPUs(), verbosity=Verbosity()):
254         try:
255                 return DoRunWork(worklist, nr_jobs, verbosity)
256         except:
257                 for w in worklist:
258                         w.Kill()
259                 raise
260         return True
261 
262 def ReadHeader(perf, file_name):
263         return subprocess.Popen([perf, "script", "--header-only", "--input", file_name], stdout=subprocess.PIPE).stdout.read().decode("utf-8")
264 
265 def ParseHeader(hdr):
266         result = {}
267         lines = hdr.split("\n")
268         for line in lines:
269                 if ":" in line and line[0] == "#":
270                         pos = line.index(":")
271                         name = line[1:pos-1].strip()
272                         value = line[pos+1:].strip()
273                         if name in result:
274                                 orig_name = name
275                                 nr = 2
276                                 while True:
277                                         name = f"{orig_name} {nr}"
278                                         if name not in result:
279                                                 break
280                                         nr += 1
281                         result[name] = value
282         return result
283 
284 def HeaderField(hdr_dict, hdr_fld):
285         if hdr_fld not in hdr_dict:
286                 raise Exception(f"'{hdr_fld}' missing from header information")
287         return hdr_dict[hdr_fld]
288 
289 # Represent the position of an option within a command string
290 # and provide the option value and/or remove the option
291 class OptPos():
292 
293         def Init(self, opt_element=-1, value_element=-1, opt_pos=-1, value_pos=-1, error=None):
294                 self.opt_element = opt_element          # list element that contains option
295                 self.value_element = value_element      # list element that contains option value
296                 self.opt_pos = opt_pos                  # string position of option
297                 self.value_pos = value_pos              # string position of value
298                 self.error = error                      # error message string
299 
300         def __init__(self, args, short_name, long_name, default=None):
301                 self.args = list(args)
302                 self.default = default
303                 n = 2 + len(long_name)
304                 m = len(short_name)
305                 pos = -1
306                 for opt in args:
307                         pos += 1
308                         if m and opt[:2] == f"-{short_name}":
309                                 if len(opt) == 2:
310                                         if pos + 1 < len(args):
311                                                 self.Init(pos, pos + 1, 0, 0)
312                                         else:
313                                                 self.Init(error = f"-{short_name} option missing value")
314                                 else:
315                                         self.Init(pos, pos, 0, 2)
316                                 return
317                         if opt[:n] == f"--{long_name}":
318                                 if len(opt) == n:
319                                         if pos + 1 < len(args):
320                                                 self.Init(pos, pos + 1, 0, 0)
321                                         else:
322                                                 self.Init(error = f"--{long_name} option missing value")
323                                 elif opt[n] == "=":
324                                         self.Init(pos, pos, 0, n + 1)
325                                 else:
326                                         self.Init(error = f"--{long_name} option expected '='")
327                                 return
328                         if m and opt[:1] == "-" and opt[:2] != "--" and short_name in opt:
329                                 ipos = opt.index(short_name)
330                                 if "-" in opt[1:]:
331                                         hpos = opt[1:].index("-")
332                                         if hpos < ipos:
333                                                 continue
334                                 if ipos + 1 == len(opt):
335                                         if pos + 1 < len(args):
336                                                 self.Init(pos, pos + 1, ipos, 0)
337                                         else:
338                                                 self.Init(error = f"-{short_name} option missing value")
339                                 else:
340                                         self.Init(pos, pos, ipos, ipos + 1)
341                                 return
342                 self.Init()
343 
344         def Value(self):
345                 if self.opt_element >= 0:
346                         if self.opt_element != self.value_element:
347                                 return self.args[self.value_element]
348                         else:
349                                 return self.args[self.value_element][self.value_pos:]
350                 return self.default
351 
352         def Remove(self, args):
353                 if self.opt_element == -1:
354                         return
355                 if self.opt_element != self.value_element:
356                         del args[self.value_element]
357                 if self.opt_pos:
358                         args[self.opt_element] = args[self.opt_element][:self.opt_pos]
359                 else:
360                         del args[self.opt_element]
361 
362 def DetermineInputFileName(cmd):
363         p = OptPos(cmd, "i", "input", "perf.data")
364         if p.error:
365                 raise Exception(f"perf command {p.error}")
366         file_name = p.Value()
367         if not os.path.exists(file_name):
368                 raise Exception(f"perf command input file '{file_name}' not found")
369         return file_name
370 
371 def ReadOption(args, short_name, long_name, err_prefix, remove=False):
372         p = OptPos(args, short_name, long_name)
373         if p.error:
374                 raise Exception(f"{err_prefix}{p.error}")
375         value = p.Value()
376         if remove:
377                 p.Remove(args)
378         return value
379 
380 def ExtractOption(args, short_name, long_name, err_prefix):
381         return ReadOption(args, short_name, long_name, err_prefix, True)
382 
383 def ReadPerfOption(args, short_name, long_name):
384         return ReadOption(args, short_name, long_name, "perf command ")
385 
386 def ExtractPerfOption(args, short_name, long_name):
387         return ExtractOption(args, short_name, long_name, "perf command ")
388 
389 def PerfDoubleQuickCommands(cmd, file_name):
390         cpu_str = ReadPerfOption(cmd, "C", "cpu")
391         time_str = ReadPerfOption(cmd, "", "time")
392         # Use double-quick sampling to determine trace data density
393         times_cmd = ["perf", "script", "--ns", "--input", file_name, "--itrace=qqi"]
394         if cpu_str != None and cpu_str != "":
395                 times_cmd.append(f"--cpu={cpu_str}")
396         if time_str != None and time_str != "":
397                 times_cmd.append(f"--time={time_str}")
398         cnts_cmd = list(times_cmd)
399         cnts_cmd.append("-Fcpu")
400         times_cmd.append("-Fcpu,time")
401         return cnts_cmd, times_cmd
402 
403 class CPUTimeRange():
404         def __init__(self, cpu):
405                 self.cpu = cpu
406                 self.sample_cnt = 0
407                 self.time_ranges = None
408                 self.interval = 0
409                 self.interval_remaining = 0
410                 self.remaining = 0
411                 self.tr_pos = 0
412 
413 def CalcTimeRangesByCPU(line, cpu, cpu_time_ranges, max_time):
414         cpu_time_range = cpu_time_ranges[cpu]
415         cpu_time_range.remaining -= 1
416         cpu_time_range.interval_remaining -= 1
417         if cpu_time_range.remaining == 0:
418                 cpu_time_range.time_ranges[cpu_time_range.tr_pos][1] = max_time
419                 return
420         if cpu_time_range.interval_remaining == 0:
421                 time = TimeVal(line[1][:-1], 0)
422                 time_ranges = cpu_time_range.time_ranges
423                 time_ranges[cpu_time_range.tr_pos][1] = time - 1
424                 time_ranges.append([time, max_time])
425                 cpu_time_range.tr_pos += 1
426                 cpu_time_range.interval_remaining = cpu_time_range.interval
427 
428 def CountSamplesByCPU(line, cpu, cpu_time_ranges):
429         try:
430                 cpu_time_ranges[cpu].sample_cnt += 1
431         except:
432                 print("exception")
433                 print("cpu", cpu)
434                 print("len(cpu_time_ranges)", len(cpu_time_ranges))
435                 raise
436 
437 def ProcessCommandOutputLines(cmd, per_cpu, fn, *x):
438         # Assume CPU number is at beginning of line and enclosed by []
439         pat = re.compile(r"\s*\[[0-9]+\]")
440         p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
441         while True:
442                 line = p.stdout.readline()
443                 if line:
444                         line = line.decode("utf-8")
445                         if pat.match(line):
446                                 line = line.split()
447                                 if per_cpu:
448                                         # Assumes CPU number is enclosed by []
449                                         cpu = int(line[0][1:-1])
450                                 else:
451                                         cpu = 0
452                                 fn(line, cpu, *x)
453                 else:
454                         break
455         p.wait()
456 
457 def IntersectTimeRanges(new_time_ranges, time_ranges):
458         pos = 0
459         new_pos = 0
460         # Can assume len(time_ranges) != 0 and len(new_time_ranges) != 0
461         # Note also, there *must* be at least one intersection.
462         while pos < len(time_ranges) and new_pos < len(new_time_ranges):
463                 # new end < old start => no intersection, remove new
464                 if new_time_ranges[new_pos][1] < time_ranges[pos][0]:
465                         del new_time_ranges[new_pos]
466                         continue
467                 # new start > old end => no intersection, check next
468                 if new_time_ranges[new_pos][0] > time_ranges[pos][1]:
469                         pos += 1
470                         if pos < len(time_ranges):
471                                 continue
472                         # no next, so remove remaining
473                         while new_pos < len(new_time_ranges):
474                                 del new_time_ranges[new_pos]
475                         return
476                 # Found an intersection
477                 # new start < old start => adjust new start = old start
478                 if new_time_ranges[new_pos][0] < time_ranges[pos][0]:
479                         new_time_ranges[new_pos][0] = time_ranges[pos][0]
480                 # new end > old end => keep the overlap, insert the remainder
481                 if new_time_ranges[new_pos][1] > time_ranges[pos][1]:
482                         r = [ time_ranges[pos][1] + 1, new_time_ranges[new_pos][1] ]
483                         new_time_ranges[new_pos][1] = time_ranges[pos][1]
484                         new_pos += 1
485                         new_time_ranges.insert(new_pos, r)
486                         continue
487                 # new [start, end] is within old [start, end]
488                 new_pos += 1
489 
490 def SplitTimeRangesByTraceDataDensity(time_ranges, cpus, nr, cmd, file_name, per_cpu, min_size, min_interval, verbosity):
491         if verbosity.normal:
492                 print("\rAnalyzing...", flush=True, end=" ")
493                 if verbosity.verbose:
494                         print()
495         cnts_cmd, times_cmd = PerfDoubleQuickCommands(cmd, file_name)
496 
497         nr_cpus = cpus[-1] + 1 if per_cpu else 1
498         if per_cpu:
499                 nr_cpus = cpus[-1] + 1
500                 cpu_time_ranges = [ CPUTimeRange(cpu) for cpu in range(nr_cpus) ]
501         else:
502                 nr_cpus = 1
503                 cpu_time_ranges = [ CPUTimeRange(-1) ]
504 
505         if verbosity.debug:
506                 print("nr_cpus", nr_cpus)
507                 print("cnts_cmd", cnts_cmd)
508                 print("times_cmd", times_cmd)
509 
510         # Count the number of "double quick" samples per CPU
511         ProcessCommandOutputLines(cnts_cmd, per_cpu, CountSamplesByCPU, cpu_time_ranges)
512 
513         tot = 0
514         mx = 0
515         for cpu_time_range in cpu_time_ranges:
516                 cnt = cpu_time_range.sample_cnt
517                 tot += cnt
518                 if cnt > mx:
519                         mx = cnt
520                 if verbosity.debug:
521                         print("cpu:", cpu_time_range.cpu, "sample_cnt", cnt)
522 
523         if min_size < 1:
524                 min_size = 1
525 
526         if mx < min_size:
527                 # Too little data to be worth splitting
528                 if verbosity.debug:
529                         print("Too little data to split by time")
530                 if nr == 0:
531                         nr = 1
532                 return [ SplitTimeRangesIntoN(time_ranges, nr, min_interval) ]
533 
534         if nr:
535                 divisor = nr
536                 min_size = 1
537         else:
538                 divisor = NumberOfCPUs()
539 
540         interval = int(round(tot / divisor, 0))
541         if interval < min_size:
542                 interval = min_size
543 
544         if verbosity.debug:
545                 print("divisor", divisor)
546                 print("min_size", min_size)
547                 print("interval", interval)
548 
549         min_time = time_ranges[0][0]
550         max_time = time_ranges[-1][1]
551 
552         for cpu_time_range in cpu_time_ranges:
553                 cnt = cpu_time_range.sample_cnt
554                 if cnt == 0:
555                         cpu_time_range.time_ranges = copy.deepcopy(time_ranges)
556                         continue
557                 # Adjust target interval for CPU to give approximately equal interval sizes
558                 # Determine number of intervals, rounding to nearest integer
559                 n = int(round(cnt / interval, 0))
560                 if n < 1:
561                         n = 1
562                 # Determine interval size, rounding up
563                 d, m = divmod(cnt, n)
564                 if m:
565                         d += 1
566                 cpu_time_range.interval = d
567                 cpu_time_range.interval_remaining = d
568                 cpu_time_range.remaining = cnt
569                 # Init. time ranges for each CPU with the start time
570                 cpu_time_range.time_ranges = [ [min_time, max_time] ]
571 
572         # Set time ranges so that the same number of "double quick" samples
573         # will fall into each time range.
574         ProcessCommandOutputLines(times_cmd, per_cpu, CalcTimeRangesByCPU, cpu_time_ranges, max_time)
575 
576         for cpu_time_range in cpu_time_ranges:
577                 if cpu_time_range.sample_cnt:
578                         IntersectTimeRanges(cpu_time_range.time_ranges, time_ranges)
579 
580         return [cpu_time_ranges[cpu].time_ranges for cpu in cpus]
581 
582 def SplitSingleTimeRangeIntoN(time_range, n):
583         if n <= 1:
584                 return [time_range]
585         start = time_range[0]
586         end   = time_range[1]
587         duration = int((end - start + 1) / n)
588         if duration < 1:
589                 return [time_range]
590         time_ranges = []
591         for i in range(n):
592                 time_ranges.append([start, start + duration - 1])
593                 start += duration
594         time_ranges[-1][1] = end
595         return time_ranges
596 
597 def TimeRangeDuration(r):
598         return r[1] - r[0] + 1
599 
600 def TotalDuration(time_ranges):
601         duration = 0
602         for r in time_ranges:
603                 duration += TimeRangeDuration(r)
604         return duration
605 
606 def SplitTimeRangesByInterval(time_ranges, interval):
607         new_ranges = []
608         for r in time_ranges:
609                 duration = TimeRangeDuration(r)
610                 n = duration / interval
611                 n = int(round(n, 0))
612                 new_ranges += SplitSingleTimeRangeIntoN(r, n)
613         return new_ranges
614 
615 def SplitTimeRangesIntoN(time_ranges, n, min_interval):
616         if n <= len(time_ranges):
617                 return time_ranges
618         duration = TotalDuration(time_ranges)
619         interval = duration / n
620         if interval < min_interval:
621                 interval = min_interval
622         return SplitTimeRangesByInterval(time_ranges, interval)
623 
624 def RecombineTimeRanges(tr):
625         new_tr = copy.deepcopy(tr)
626         n = len(new_tr)
627         i = 1
628         while i < len(new_tr):
629                 # if prev end + 1 == cur start, combine them
630                 if new_tr[i - 1][1] + 1 == new_tr[i][0]:
631                         new_tr[i][0] = new_tr[i - 1][0]
632                         del new_tr[i - 1]
633                 else:
634                         i += 1
635         return new_tr
636 
637 def OpenTimeRangeEnds(time_ranges, min_time, max_time):
638         if time_ranges[0][0] <= min_time:
639                 time_ranges[0][0] = None
640         if time_ranges[-1][1] >= max_time:
641                 time_ranges[-1][1] = None
642 
643 def BadTimeStr(time_str):
644         raise Exception(f"perf command bad time option: '{time_str}'\nCheck also 'time of first sample' and 'time of last sample' in perf script --header-only")
645 
646 def ValidateTimeRanges(time_ranges, time_str):
647         n = len(time_ranges)
648         for i in range(n):
649                 start = time_ranges[i][0]
650                 end   = time_ranges[i][1]
651                 if i != 0 and start <= time_ranges[i - 1][1]:
652                         BadTimeStr(time_str)
653                 if start > end:
654                         BadTimeStr(time_str)
655 
656 def TimeVal(s, dflt):
657         s = s.strip()
658         if s == "":
659                 return dflt
660         a = s.split(".")
661         if len(a) > 2:
662                 raise Exception(f"Bad time value'{s}'")
663         x = int(a[0])
664         if x < 0:
665                 raise Exception("Negative time not allowed")
666         x *= 1000000000
667         if len(a) > 1:
668                 x += int((a[1] + "000000000")[:9])
669         return x
670 
671 def BadCPUStr(cpu_str):
672         raise Exception(f"perf command bad cpu option: '{cpu_str}'\nCheck also 'nrcpus avail' in perf script --header-only")
673 
674 def ParseTimeStr(time_str, min_time, max_time):
675         if time_str == None or time_str == "":
676                 return [[min_time, max_time]]
677         time_ranges = []
678         for r in time_str.split():
679                 a = r.split(",")
680                 if len(a) != 2:
681                         BadTimeStr(time_str)
682                 try:
683                         start = TimeVal(a[0], min_time)
684                         end   = TimeVal(a[1], max_time)
685                 except:
686                         BadTimeStr(time_str)
687                 time_ranges.append([start, end])
688         ValidateTimeRanges(time_ranges, time_str)
689         return time_ranges
690 
691 def ParseCPUStr(cpu_str, nr_cpus):
692         if cpu_str == None or cpu_str == "":
693                 return [-1]
694         cpus = []
695         for r in cpu_str.split(","):
696                 a = r.split("-")
697                 if len(a) < 1 or len(a) > 2:
698                         BadCPUStr(cpu_str)
699                 try:
700                         start = int(a[0].strip())
701                         if len(a) > 1:
702                                 end = int(a[1].strip())
703                         else:
704                                 end = start
705                 except:
706                         BadCPUStr(cpu_str)
707                 if start < 0 or end < 0 or end < start or end >= nr_cpus:
708                         BadCPUStr(cpu_str)
709                 cpus.extend(range(start, end + 1))
710         cpus = list(set(cpus)) # Remove duplicates
711         cpus.sort()
712         return cpus
713 
714 class ParallelPerf():
715 
716         def __init__(self, a):
717                 for arg_name in vars(a):
718                         setattr(self, arg_name, getattr(a, arg_name))
719                 self.orig_nr = self.nr
720                 self.orig_cmd = list(self.cmd)
721                 self.perf = self.cmd[0]
722                 if os.path.exists(self.output_dir):
723                         raise Exception(f"Output '{self.output_dir}' already exists")
724                 if self.jobs < 0 or self.nr < 0 or self.interval < 0:
725                         raise Exception("Bad options (negative values): try -h option for help")
726                 if self.nr != 0 and self.interval != 0:
727                         raise Exception("Cannot specify number of time subdivisions and time interval")
728                 if self.jobs == 0:
729                         self.jobs = NumberOfCPUs()
730                 if self.nr == 0 and self.interval == 0:
731                         if self.per_cpu:
732                                 self.nr = 1
733                         else:
734                                 self.nr = self.jobs
735 
736         def Init(self):
737                 if self.verbosity.debug:
738                         print("cmd", self.cmd)
739                 self.file_name = DetermineInputFileName(self.cmd)
740                 self.hdr = ReadHeader(self.perf, self.file_name)
741                 self.hdr_dict = ParseHeader(self.hdr)
742                 self.cmd_line = HeaderField(self.hdr_dict, "cmdline")
743 
744         def ExtractTimeInfo(self):
745                 self.min_time = TimeVal(HeaderField(self.hdr_dict, "time of first sample"), 0)
746                 self.max_time = TimeVal(HeaderField(self.hdr_dict, "time of last sample"), 0)
747                 self.time_str = ExtractPerfOption(self.cmd, "", "time")
748                 self.time_ranges = ParseTimeStr(self.time_str, self.min_time, self.max_time)
749                 if self.verbosity.debug:
750                         print("time_ranges", self.time_ranges)
751 
752         def ExtractCPUInfo(self):
753                 if self.per_cpu:
754                         nr_cpus = int(HeaderField(self.hdr_dict, "nrcpus avail"))
755                         self.cpu_str = ExtractPerfOption(self.cmd, "C", "cpu")
756                         if self.cpu_str == None or self.cpu_str == "":
757                                 self.cpus = [ x for x in range(nr_cpus) ]
758                         else:
759                                 self.cpus = ParseCPUStr(self.cpu_str, nr_cpus)
760                 else:
761                         self.cpu_str = None
762                         self.cpus = [-1]
763                 if self.verbosity.debug:
764                         print("cpus", self.cpus)
765 
766         def IsIntelPT(self):
767                 return self.cmd_line.find("intel_pt") >= 0
768 
769         def SplitTimeRanges(self):
770                 if self.IsIntelPT() and self.interval == 0:
771                         self.split_time_ranges_for_each_cpu = \
772                                 SplitTimeRangesByTraceDataDensity(self.time_ranges, self.cpus, self.orig_nr,
773                                                                   self.orig_cmd, self.file_name, self.per_cpu,
774                                                                   self.min_size, self.min_interval, self.verbosity)
775                 elif self.nr:
776                         self.split_time_ranges_for_each_cpu = [ SplitTimeRangesIntoN(self.time_ranges, self.nr, self.min_interval) ]
777                 else:
778                         self.split_time_ranges_for_each_cpu = [ SplitTimeRangesByInterval(self.time_ranges, self.interval) ]
779 
780         def CheckTimeRanges(self):
781                 for tr in self.split_time_ranges_for_each_cpu:
782                         # Re-combined time ranges should be the same
783                         new_tr = RecombineTimeRanges(tr)
784                         if new_tr != self.time_ranges:
785                                 if self.verbosity.debug:
786                                         print("tr", tr)
787                                         print("new_tr", new_tr)
788                                 raise Exception("Self test failed!")
789 
790         def OpenTimeRangeEnds(self):
791                 for time_ranges in self.split_time_ranges_for_each_cpu:
792                         OpenTimeRangeEnds(time_ranges, self.min_time, self.max_time)
793 
794         def CreateWorkList(self):
795                 self.worklist = CreateWorkList(self.cmd, self.pipe_to, self.output_dir, self.cpus, self.split_time_ranges_for_each_cpu)
796 
797         def PerfDataRecordedPerCPU(self):
798                 if "--per-thread" in self.cmd_line.split():
799                         return False
800                 return True
801 
802         def DefaultToPerCPU(self):
803                 # --no-per-cpu option takes precedence
804                 if self.no_per_cpu:
805                         return False
806                 if not self.PerfDataRecordedPerCPU():
807                         return False
808                 # Default to per-cpu for Intel PT data that was recorded per-cpu,
809                 # because decoding can be done for each CPU separately.
810                 if self.IsIntelPT():
811                         return True
812                 return False
813 
814         def Config(self):
815                 self.Init()
816                 self.ExtractTimeInfo()
817                 if not self.per_cpu:
818                         self.per_cpu = self.DefaultToPerCPU()
819                 if self.verbosity.debug:
820                         print("per_cpu", self.per_cpu)
821                 self.ExtractCPUInfo()
822                 self.SplitTimeRanges()
823                 if self.verbosity.self_test:
824                         self.CheckTimeRanges()
825                 # Prefer open-ended time range to starting / ending with min_time / max_time resp.
826                 self.OpenTimeRangeEnds()
827                 self.CreateWorkList()
828 
829         def Run(self):
830                 if self.dry_run:
831                         print(len(self.worklist),"jobs:")
832                         for w in self.worklist:
833                                 print(w.Command())
834                         return True
835                 result = RunWork(self.worklist, self.jobs, verbosity=self.verbosity)
836                 if self.verbosity.verbose:
837                         print(glb_prog_name, "done")
838                 return result
839 
840 def RunParallelPerf(a):
841         pp = ParallelPerf(a)
842         pp.Config()
843         return pp.Run()
844 
845 def Main(args):
846         ap = argparse.ArgumentParser(
847                 prog=glb_prog_name, formatter_class = argparse.RawDescriptionHelpFormatter,
848                 description =
849 """
850 Run a perf script command multiple times in parallel, using perf script options
851 --cpu and --time so that each job processes a different chunk of the data.
852 """,
853                 epilog =
854 """
855 Follow the options by '--' and then the perf script command e.g.
856 
857         $ perf record -a -- sleep 10
858         $ parallel-perf.py --nr=4 -- perf script --ns
859         All jobs finished successfully
860         $ tree parallel-perf-output/
861         parallel-perf-output/
862         ├── time-range-0
863         │   ├── cmd.txt
864         │   └── out.txt
865         ├── time-range-1
866         │   ├── cmd.txt
867         │   └── out.txt
868         ├── time-range-2
869         │   ├── cmd.txt
870         │   └── out.txt
871         └── time-range-3
872             ├── cmd.txt
873             └── out.txt
874         $ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
875         parallel-perf-output/time-range-0/cmd.txt:perf script --time=,9466.504461499 --ns
876         parallel-perf-output/time-range-1/cmd.txt:perf script --time=9466.504461500,9469.005396999 --ns
877         parallel-perf-output/time-range-2/cmd.txt:perf script --time=9469.005397000,9471.506332499 --ns
878         parallel-perf-output/time-range-3/cmd.txt:perf script --time=9471.506332500, --ns
879 
880 Any perf script command can be used, including the use of perf script options
881 --dlfilter and --script, so that the benefit of running parallel jobs
882 naturally extends to them also.
883 
884 If option --pipe-to is used, standard output is first piped through that
885 command. Beware, if the command fails (e.g. grep with no matches), it will be
886 considered a fatal error.
887 
888 Final standard output is redirected to files named out.txt in separate
889 subdirectories under the output directory. Similarly, standard error is
890 written to files named err.txt. In addition, files named cmd.txt contain the
891 corresponding perf script command. After processing, err.txt files are removed
892 if they are empty.
893 
894 If any job exits with a non-zero exit code, then all jobs are killed and no
895 more are started. A message is printed if any job results in a non-empty
896 err.txt file.
897 
898 There is a separate output subdirectory for each time range. If the --per-cpu
899 option is used, these are further grouped under cpu-n subdirectories, e.g.
900 
901         $ parallel-perf.py --per-cpu --nr=2 -- perf script --ns --cpu=0,1
902         All jobs finished successfully
903         $ tree parallel-perf-output
904         parallel-perf-output/
905         ├── cpu-0
906         │   ├── time-range-0
907         │   │   ├── cmd.txt
908         │   │   └── out.txt
909         │   └── time-range-1
910         │       ├── cmd.txt
911         │       └── out.txt
912         └── cpu-1
913             ├── time-range-0
914             │   ├── cmd.txt
915             │   └── out.txt
916             └── time-range-1
917                 ├── cmd.txt
918                 └── out.txt
919         $ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
920         parallel-perf-output/cpu-0/time-range-0/cmd.txt:perf script --cpu=0 --time=,9469.005396999 --ns
921         parallel-perf-output/cpu-0/time-range-1/cmd.txt:perf script --cpu=0 --time=9469.005397000, --ns
922         parallel-perf-output/cpu-1/time-range-0/cmd.txt:perf script --cpu=1 --time=,9469.005396999 --ns
923         parallel-perf-output/cpu-1/time-range-1/cmd.txt:perf script --cpu=1 --time=9469.005397000, --ns
924 
925 Subdivisions of time range, and cpus if the --per-cpu option is used, are
926 expressed by the --time and --cpu perf script options respectively. If the
927 supplied perf script command has a --time option, then that time range is
928 subdivided, otherwise the time range given by 'time of first sample' to
929 'time of last sample' is used (refer perf script --header-only). Similarly, the
930 supplied perf script command may provide a --cpu option, and only those CPUs
931 will be processed.
932 
933 To prevent time intervals becoming too small, the --min-interval option can
934 be used.
935 
936 Note there is special handling for processing Intel PT traces. If an interval is
937 not specified and the perf record command contained the intel_pt event, then the
938 time range will be subdivided in order to produce subdivisions that contain
939 approximately the same amount of trace data. That is accomplished by counting
940 double-quick (--itrace=qqi) samples, and choosing time ranges that encompass
941 approximately the same number of samples. In that case, time ranges may not be
942 the same for each CPU processed. For Intel PT, --per-cpu is the default, but
943 that can be overridden by --no-per-cpu. Note, for Intel PT, double-quick
944 decoding produces 1 sample for each PSB synchronization packet, which in turn
945 come after a certain number of bytes output, determined by psb_period (refer
946 perf Intel PT documentation). The minimum number of double-quick samples that
947 will define a time range can be set by the --min_size option, which defaults to
948 64.
949 """)
950         ap.add_argument("-o", "--output-dir", default="parallel-perf-output", help="output directory (default 'parallel-perf-output')")
951         ap.add_argument("-j", "--jobs", type=int, default=0, help="maximum number of jobs to run in parallel at one time (default is the number of CPUs)")
952         ap.add_argument("-n", "--nr", type=int, default=0, help="number of time subdivisions (default is the number of jobs)")
953         ap.add_argument("-i", "--interval", type=float, default=0, help="subdivide the time range using this time interval (in seconds e.g. 0.1 for a tenth of a second)")
954         ap.add_argument("-c", "--per-cpu", action="store_true", help="process data for each CPU in parallel")
955         ap.add_argument("-m", "--min-interval", type=float, default=glb_min_interval, help=f"minimum interval (default {glb_min_interval} seconds)")
956         ap.add_argument("-p", "--pipe-to", help="command to pipe output to (optional)")
957         ap.add_argument("-N", "--no-per-cpu", action="store_true", help="do not process data for each CPU in parallel")
958         ap.add_argument("-b", "--min_size", type=int, default=glb_min_samples, help="minimum data size (for Intel PT in PSBs)")
959         ap.add_argument("-D", "--dry-run", action="store_true", help="do not run any jobs, just show the perf script commands")
960         ap.add_argument("-q", "--quiet", action="store_true", help="do not print any messages except errors")
961         ap.add_argument("-v", "--verbose", action="store_true", help="print more messages")
962         ap.add_argument("-d", "--debug", action="store_true", help="print debugging messages")
963         cmd_line = list(args)
964         try:
965                 split_pos = cmd_line.index("--")
966                 cmd = cmd_line[split_pos + 1:]
967                 args = cmd_line[:split_pos]
968         except:
969                 cmd = None
970                 args = cmd_line
971         a = ap.parse_args(args=args[1:])
972         a.cmd = cmd
973         a.verbosity = Verbosity(a.quiet, a.verbose, a.debug)
974         try:
975                 if a.cmd == None:
976                         if len(args) <= 1:
977                                 ap.print_help()
978                                 return True
979                         raise Exception("Command line must contain '--' before perf command")
980                 return RunParallelPerf(a)
981         except Exception as e:
982                 print("Fatal error: ", str(e))
983                 if a.debug:
984                         raise
985                 return False
986 
987 if __name__ == "__main__":
988         if not Main(sys.argv):
989                 sys.exit(1)

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

sflogo.php