Package osh :: Package command :: Module agg
[frames] | no frames]

Source Code for Module osh.command.agg

  1  # osh 
  2  # Copyright (C) 2005 Jack Orenstein <jao@geophile.com> 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify 
  5  # it under the terms of the GNU General Public License as published by 
  6  # the Free Software Foundation; either version 2 of the License, or 
  7  # (at your option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, 
 10  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 12  # GNU General Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License 
 15  # along with this program; if not, write to the Free Software 
 16  # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 17   
 18  """C{agg [-r] [-g GROUPING_FUNCTION] INITIAL_VALUE AGGREGATION_FUNCTION} 
 19   
 20  C{agg [-r] [-c GROUPING_FUNCTION] INITIAL_VALUE AGGREGATION_FUNCTION} 
 21       
 22  Aggregates objects from the input stream. If C{GROUPING_FUNCTION} is omitted, then 
 23  one output object is generated by initializing an accumulator to C{INITIAL_VALUE} 
 24  and then combining the accumulator with input objects using C{AGGREGATION_FUNCTION}. 
 25  C{AGGREGATION_FUNCTION} takes two inputs, the current value of the accumulator and 
 26  an object from the input stream. 
 27   
 28  Example: If the input objects are integers C{1, 2, 3}, then the sum of the integers 
 29  is computed as follows:: 
 30   
 31      ... ^ agg 0 'sum, x: sum + x' 
 32   
 33  which yields C{6}. 
 34   
 35  If C{GROUPING_FUNCTION} is specified, then a set of accumulators is maintained, 
 36  one for each value of C{GROUPING_FUNCTION}. Each output object is a tuple with 
 37  two parts, the group value and the accumulated value for the group. 
 38   
 39  Example: If the input objects are C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then 
 40  the sum of ints for each string is computed as follows:: 
 41   
 42      ... ^ agg -g 'x, y: x' 0 'sum, x, y: sum + y' 
 43   
 44  which yields C{('a', 3), ('b', 7)}. 
 45   
 46  If the grouping function is specified with the C{-g} flag, then agg generates its 
 47  output when the input stream has ended. (It has to, because group members map 
 48  appear in any order.) In some situations however, group members appear consecutively, 
 49  and it is useful to get output earlier. If group members are known to be consecutive, 
 50  then the group function can be specified using the C{-c} flag. 
 51   
 52  If the C{-r} flag is specified, then one output object is generated for each input object; 
 53  the output object contains the value of the accumulator so far. The accumulator appears 
 54  in the output row before the inputs. For example, if the input stream contains C{1, 2, 3}, 
 55  then the running total can be computed as follows:: 
 56   
 57      ... ^ agg -r 0 'sum, x: sum + x' ^ ... 
 58   
 59  The output stream would be C{(1, 1), (3, 2), (6, 3)C}. In the last output object, C{6} is the sum 
 60  of the current input (C{3}) and all preceding inputs (C{1, 2}). 
 61   
 62  The C{-r} flag can also be used with grouping. For example, if the input objects are 
 63  C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then the running totals for the strings would 
 64  be computed as follows:: 
 65   
 66      ... ^ agg -r -g 'x, y: x' 0 'sum, x, y: sum + y' ^ ... 
 67   
 68  The output stream would be C{(1, 'a', 1), (3, 'a', 2), (3, 'b', 3), (7, 'b', 4)}. 
 69   
 70  """ 
 71   
 72  import osh.core 
 73  import osh.args 
 74   
 75  _wrap_if_necessary = osh.core.wrap_if_necessary 
 76  Option = osh.args.Option 
 77   
 78  # CLI 
79 -def _agg():
80 return _Agg()
81 82 # API
83 -def agg(initial_value, aggregator, group = None, consecutive = None, running = False):
84 """Combine inputs into a smaller number of outputs. If neither C{group} nor 85 C{consecutive} is specified, then there is one accumulator, initialized to 86 C{initial_value}. The C{aggregator} function is used to combine the current value 87 of the accumulator with the input to yield the next value of the accumulator. 88 The arguments to C{aggregator} are the elements of the accumulator followed 89 by the elements of one piece of input. 90 If C{group} is specified, then there is one accumulator for each group value, defined 91 by applying the function C{group} to each input. C{consecutive} is just like C{group} 92 except that it is assumed that group values are adjacent in the input sequence. 93 At most one of C{group} and C{consecutive} may be specified. If C{running} is C{false}, 94 then output contains one object per group, containing the aggregate value. 95 (If neither C{group} nor C{consecutive} are provided, then there is just one group, 96 representing the aggregate for the entire input stream.) If C{running} is true, 97 then each the aggregate value for the group is written out with each input object -- 98 i.e., the output contains "running totals". In this case, the aggregate values appear 99 before the input values in the output object. 100 """ 101 args = [initial_value, aggregator] 102 if group: 103 args.append(Option('-g', group)) 104 if consecutive: 105 args.append(Option('-c', consecutive)) 106 if running: 107 args.append(Option('-r')) 108 return _Agg().process_args(*args)
109
110 -class _Agg(osh.core.Op):
111 112 _aggregate = None 113 114 115 # object interface 116
117 - def __init__(self):
118 osh.core.Op.__init__(self, 'g:c:r', (2, 2))
119 120 121 # BaseOp interface 122
123 - def doc(self):
124 return __doc__
125
126 - def setup(self):
127 args = self.args() 128 grouping_function = args.function_arg('-g') 129 consecutive_grouping_function = args.function_arg('-c') 130 running_totals = args.flag('-r') 131 if running_totals is None: 132 running_totals = False 133 initial_value = _wrap_if_necessary(args.next_eval()) 134 aggregation_function = args.next_function() 135 if grouping_function and consecutive_grouping_function: 136 self.usage() 137 if initial_value is None or aggregation_function is None: 138 self.usage() 139 if args.has_next(): 140 self.usage() 141 if grouping_function and consecutive_grouping_function: 142 self.usage() 143 elif grouping_function: 144 self._aggregate = _GroupingAggregate( 145 self, 146 running_totals, 147 grouping_function, 148 initial_value, 149 aggregation_function) 150 elif consecutive_grouping_function: 151 self._aggregate = _ConsecutiveGroupingAggregate( 152 self, 153 running_totals, 154 consecutive_grouping_function, 155 initial_value, 156 aggregation_function) 157 else: 158 self._aggregate = _SimpleAggregate( 159 self, 160 running_totals, 161 initial_value, 162 aggregation_function)
163 164
165 - def receive(self, object):
166 self._aggregate.receive(object)
167
168 - def receive_complete(self):
169 self._aggregate.receive_complete()
170 171
172 -class _GroupingAggregate(object):
173 _running_totals = None 174 _command = None 175 _group_function = None 176 _initial_value = None 177 _aggregate_function = None 178 _sum = None 179
180 - def __init__(self, command, running_totals, group_function, initial_value, aggregate_function):
181 self._running_totals = running_totals 182 self._command = command 183 self._group_function = group_function 184 self._initial_value = initial_value 185 self._aggregate_function = aggregate_function 186 self._sum = {}
187
188 - def receive(self, object):
189 group = self._group_function(*object) 190 sum = self._sum.get(group, self._initial_value) 191 tuple_object = tuple(object) 192 new_sum = self._aggregate_function(*(tuple(sum) + tuple_object)) 193 self._sum[group] = _wrap_if_necessary(new_sum) 194 if self._running_totals: 195 self._command.send((new_sum,) + tuple_object)
196
197 - def receive_complete(self):
198 if not self._running_totals: 199 for group, sum in self._sum.iteritems(): 200 self._command.send(_wrap_if_necessary(group) + tuple(sum)) 201 self._command.send_complete()
202
203 -class _ConsecutiveGroupingAggregate(object):
204 _running_totals = None 205 _command = None 206 _group_function = None 207 _initial_value = None 208 _aggregate_function = None 209 _group = None 210 _sum = None 211
212 - def __init__(self, command, running_totals, group_function, initial_value, aggregate_function):
213 self._running_totals = running_totals 214 self._command = command 215 self._group_function = group_function 216 self._initial_value = initial_value 217 self._aggregate_function = aggregate_function 218 self._group = None 219 self._sum = None
220
221 - def receive(self, object):
222 new_group = self._group_function(*object) 223 if self._group is None or self._group != new_group: 224 if self._group is not None and not self._running_totals: 225 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum)) 226 self._group = new_group 227 self._sum = self._initial_value 228 tuple_object = tuple(object) 229 new_sum = self._aggregate_function(*(tuple(self._sum) + tuple_object)) 230 self._sum = _wrap_if_necessary(new_sum) 231 if self._running_totals: 232 self._command.send((new_sum,) + tuple_object)
233
234 - def receive_complete(self):
235 if (not self._running_totals) and self._group is not None: 236 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum)) 237 self._command.send_complete()
238
239 -class _SimpleAggregate(object):
240 _command = None 241 _initial_value = None 242 _aggregate_function = None 243 _sum = None 244
245 - def __init__(self, command, running_totals, initial_value, aggregate_function):
246 self._running_totals = running_totals 247 self._command = command 248 self._initial_value = initial_value 249 self._aggregate_function = aggregate_function 250 self._sum = initial_value
251
252 - def receive(self, object):
253 tuple_object = tuple(object) 254 new_sum = self._aggregate_function(*(tuple(self._sum) + tuple_object)) 255 self._sum = _wrap_if_necessary(new_sum) 256 if self._running_totals: 257 self._command.send((new_sum,) + tuple_object)
258
259 - def receive_complete(self):
260 if not self._running_totals: 261 self._command.send(self._sum) 262 self._command.send_complete()
263