1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """For API usage only, (for CLI use C{osh @CLUSTER [ ... ]} syntax instead.)
19 """
20
21 import osh.args
22 import osh.core
23 import osh.spawn
24 import osh.error
25
26 Spawn = osh.spawn.Spawn
27 ObjectInputProvider = osh.spawn.ObjectInputProvider
28 ObjectOutputConsumer = osh.spawn.ObjectOutputConsumer
29 LineOutputConsumer = osh.spawn.LineOutputConsumer
30 Option = osh.args.Option
31
32 _REMOTE_EXECUTABLE = 'remoteosh'
33
34 -def _dump(stream, object):
36
38 if isinstance(object, osh.error.Error):
39 exception = object.recreate_exception()
40 osh.error.exception_handler(exception, object.command_description(), object.input(), host)
41 else:
42 consumer.send((host,) + tuple(object))
43
45
46
47
48
49 if '[Errno 9] Bad file descriptor' not in line:
50 osh.error.stderr_handler(line, consumer, None, host)
51
52
55
56
57 -def remote(cluster, remote_command):
58 """Execute a command on each node of C{cluster}. Output from each node is sent to
59 the output stream, with the node name added as the first element of the output tuple.
60 """
61 import osh.apiparser
62 op = _Remote()
63 if isinstance(remote_command, osh.core.Op):
64 remote_command = [remote_command]
65 remote_pipeline = osh.apiparser._sequence_op(remote_command)
66 op.set_pipeline(remote_pipeline)
67 return op.process_args(Option('-c', cluster))
68
70
71
72
73 _pipeline = None
74
75
76
77
80
81
82
85
87 osh.core.RemoteOp.setup(self)
88
89
90
91
93 osh.core.RemoteOp.execute(self)
94 self.send_complete()
95
96
97
98
100 process = Spawn(
101 self._remote_command(host.address, self.user(), host.db_profile),
102 ObjectInputProvider(lambda stream, object: _dump(stream, object),
103 [osh.core.verbosity, self._pipeline]),
104 ObjectOutputConsumer(lambda object: _consume_remote_stdout(self, host.name, object)),
105 LineOutputConsumer(lambda line: _consume_remote_stderr(self, host.name, line)))
106 process.run()
107 if process.terminating_exception():
108 self.set_terminating_exception(process.terminating_exception())
109
110
111
112
114 self._pipeline = pipeline
115 return self
116
117
118
119
121 if db_profile:
122 remote_command = '"%s %s"' % (_REMOTE_EXECUTABLE, db_profile)
123 else:
124 remote_command = _REMOTE_EXECUTABLE
125 ssh_command = 'ssh %s -l %s %s' % (host, user, remote_command)
126 return ssh_command
127