Package osh :: Package command :: Module agg
[frames] | no frames]

Source Code for Module osh.command.agg

  1  # osh 
  2  # Copyright (C) 2005 Jack Orenstein <jao@geophile.com> 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify 
  5  # it under the terms of the GNU General Public License as published by 
  6  # the Free Software Foundation; either version 2 of the License, or 
  7  # (at your option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, 
 10  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 12  # GNU General Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License 
 15  # along with this program; if not, write to the Free Software 
 16  # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 17   
 18  """C{agg [-g GROUPING_FUNCTION] INITIAL_VALUE AGGREGATOR} 
 19   
 20  C{agg [-c GROUPING_FUNCTION] INITIAL_VALUE AGGREGATOR} 
 21   
 22  Combines inputs into a smaller number of outputs. For example, if 
 23  the input is a stream of numbers, then C{agg} can be used to generate 
 24  output comprising the sum of the inputs. 
 25   
 26  If C{GROUPING_FUNCTION} is omitted, then one output object is 
 27  generated by initializing an accumulator to C{INITIAL_VALUE} and then 
 28  combining the accumulator with input objects using C{AGGREGATOR}.  The 
 29  inputs to C{AGGREGATOR} are the current values of the accumulator 
 30  followed by the members of the input object. 
 31   
 32  B{Example}: If the input objects are integers C{1, 2, 3}, then the sum of 
 33  the integers is computed by:: 
 34   
 35      agg 0 'sum, x: sum + x' 
 36   
 37  which yields C{6}. 
 38   
 39  B{Example}: The sum and sum of squares of a sequence of input number can be computed 
 40  by:: 
 41   
 42      agg '(0, 0)' 'sum, sum_squares, input: (sum + input, sum + input ** 2)' 
 43   
 44  In this case, the accumulator is a tuple containing two integers, the 
 45  sum and the sum of squares. The initial value of this accumulator is 
 46  C{(0, 0)}.  Each input consists of a single number. The aggregator 
 47  function takes as input the two parts of the accumulator followed by 
 48  the next number of the input, and outputs a tuple containing the 
 49  updated accumulator value. 
 50   
 51  If C{GROUPING_FUNCTION} is specified, then a set of accumulators is 
 52  maintained, one for each value of C{GROUPING_FUNCTION}. Each output 
 53  value consists of the group value followed by the accumulated values 
 54  for the group, ("values" not "value" because the accumulator may be a 
 55  sequence.) 
 56   
 57  Example: If the input objects are C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then 
 58  the sum of ints for each string is computed by:: 
 59   
 60      agg -g 'label, number: label' 0 'sum, label, number: sum + number' 
 61   
 62  which yields C{('a', 3), ('b', 7)}. 
 63   
 64  If C{GROUPING_FUNCTION} is specified with the C{-g} flag, then C{agg} 
 65  generates its output when the input stream has ended. (It can't 
 66  generate output earlier, because group members map appear in any 
 67  order.) In some situations however, group members appear 
 68  consecutively, and it is useful to get output earlier. If group 
 69  members are known to be consecutive, then C{GROUPING_FUNCTION} can be 
 70  specified using the C{-c} flag. 
 71  """ 
 72   
 73  import osh.core 
 74  import osh.args 
 75   
 76  _wrap_if_necessary = osh.core.wrap_if_necessary 
 77  Option = osh.args.Option 
 78   
 79  # CLI 
80 -def _agg():
81 return _Agg()
82 83 # API
84 -def agg(initial_value, aggregator, group = None, consecutive = None):
85 """Combine inputs into a smaller number of outputs. If neither C{group} nor 86 C{consecutive} is specified, then there is one accumulator, initialized to 87 C{initial_value}. The C{aggregator} function is used to combine the current value 88 of the accumulator with the input to yield the next value of the accumulator. 89 The arguments to C{aggregator} are the elements of the accumulator followed 90 by the elements of one piece of input. 91 If C{group} is specified, then there is one accumulator for each group value, defined 92 by applying the function C{group} to each input. C{consecutive} is just like C{group} 93 except that it is assumed that group values are adjacent in the input sequence. 94 At most one of C{group} and C{consecutive} may be specified. 95 """ 96 args = [initial_value, aggregator] 97 if group: 98 args.append(Option('-g', group)) 99 if consecutive: 100 args.append(Option('-c', consecutive)) 101 return _Agg().process_args(*args)
102
103 -class _Agg(osh.core.Op):
104 105 _aggregate = None 106 107 108 # object interface 109
110 - def __init__(self):
111 osh.core.Op.__init__(self, 'g:c:', (2, 2))
112 113 114 # BaseOp interface 115
116 - def doc(self):
117 return __doc__
118
119 - def setup(self):
120 args = self.args() 121 grouping_function = args.function_arg('-g') 122 consecutive_grouping_function = args.function_arg('-c') 123 initial_value = _wrap_if_necessary(args.next_eval()) 124 aggregation_function = args.next_function() 125 if grouping_function and consecutive_grouping_function: 126 self.usage() 127 if initial_value is None or aggregation_function is None: 128 self.usage() 129 if args.has_next(): 130 self.usage() 131 if grouping_function and consecutive_grouping_function: 132 self.usage() 133 elif grouping_function: 134 self._aggregate = _GroupingAggregate( 135 self, 136 grouping_function, 137 initial_value, 138 aggregation_function) 139 elif consecutive_grouping_function: 140 self._aggregate = _ConsecutiveGroupingAggregate( 141 self, 142 consecutive_grouping_function, 143 initial_value, 144 aggregation_function) 145 else: 146 self._aggregate = _SimpleAggregate( 147 self, 148 initial_value, 149 aggregation_function)
150 151
152 - def receive(self, object):
153 self._aggregate.receive(object)
154
155 - def receive_complete(self):
156 self._aggregate.receive_complete()
157 158
159 -class _GroupingAggregate(object):
160 _command = None 161 _group_function = None 162 _initial_value = None 163 _aggregate_function = None 164 _sum = None 165
166 - def __init__(self, command, group_function, initial_value, aggregate_function):
167 self._command = command 168 self._group_function = group_function 169 self._initial_value = initial_value 170 self._aggregate_function = aggregate_function 171 self._sum = {}
172
173 - def receive(self, object):
174 group = self._group_function(*object) 175 sum = self._sum.get(group, self._initial_value) 176 self._sum[group] = _wrap_if_necessary(self._aggregate_function(*(tuple(sum) + tuple(object))))
177
178 - def receive_complete(self):
179 for group, sum in self._sum.iteritems(): 180 self._command.send(_wrap_if_necessary(group) + tuple(sum)) 181 self._command.send_complete()
182
183 -class _ConsecutiveGroupingAggregate(object):
184 _command = None 185 _group_function = None 186 _initial_value = None 187 _aggregate_function = None 188 _group = None 189 _sum = None 190
191 - def __init__(self, command, group_function, initial_value, aggregate_function):
192 self._command = command 193 self._group_function = group_function 194 self._initial_value = initial_value 195 self._aggregate_function = aggregate_function 196 self._group = None 197 self._sum = None
198
199 - def receive(self, object):
200 newGroup = self._group_function(*object) 201 if self._group is None or self._group != newGroup: 202 if self._group is not None: 203 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum)) 204 self._group = newGroup 205 self._sum = self._initial_value 206 self._sum = _wrap_if_necessary(self._aggregate_function(*(tuple(self._sum) + tuple(object))))
207
208 - def receive_complete(self):
209 if self._group is not None: 210 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum)) 211 self._command.send_complete()
212
213 -class _SimpleAggregate(object):
214 _command = None 215 _initial_value = None 216 _aggregate_function = None 217 _sum = None 218
219 - def __init__(self, command, initial_value, aggregate_function):
220 self._command = command 221 self._initial_value = initial_value 222 self._aggregate_function = aggregate_function 223 self._sum = initial_value
224
225 - def receive(self, object):
226 self._sum = _wrap_if_necessary(self._aggregate_function(*(tuple(self._sum) + tuple(object))))
227
228 - def receive_complete(self):
229 self._command.send(self._sum) 230 self._command.send_complete()
231