Module sverchok.utils.sv_itertools
Expand source code
from itertools import chain, repeat, zip_longest
from sverchok.data_structure import levels_of_list_or_np, list_match_func
# the class based should be slower but kept until tested
class SvZipExhausted(Exception):
pass
class SvSentinel:
def __init__(self, fl, top):
self.fl = fl
self.top = top
self.done = False
def __next__(self):
if self.done:
raise StopIteration
self.top.counter -= 1
if not self.top.counter:
raise SvZipExhausted
self.done = True
return self.fl
def __iter__(self):
return self
class sv_zip_longest:
def __init__(self, *args):
self.counter = len(args)
self.iterators = []
for lst in args:
fl = lst[-1]
filler = repeat(fl)
self.iterators.append(chain(lst, SvSentinel(fl,self), filler))
def __next__(self):
try:
if self.counter:
return tuple(map(next, self.iterators))
else:
raise StopIteration
except SvZipExhausted:
raise StopIteration
def __iter__(self):
return self
def sv_zip_longest2(*args):
# by zeffi
longest = max([len(i) for i in args])
itrs = [iter(sl) for sl in args]
for i in range(longest):
yield tuple((next(iterator, args[idx][-1]) for idx, iterator in enumerate(itrs)))
def recurse_fx(l, f):
if isinstance(l, (list, tuple)):
return [recurse_fx(i, f) for i in l]
else:
return f(l)
def recurse_fxy(l1, l2, f):
l1_type = isinstance(l1, (list, tuple))
l2_type = isinstance(l2, (list, tuple))
if not (l1_type or l2_type):
return f(l1, l2)
elif l1_type and l2_type:
fl = l2[-1] if len(l1) > len(l2) else l1[-1]
res = []
res_append = res.append
for x, y in zip_longest(l1, l2, fillvalue=fl):
res_append(recurse_fxy(x, y, f))
return res
elif l1_type and not l2_type:
return [recurse_fxy(x, l2, f) for x in l1]
else: #not l1_type and l2_type
return [recurse_fxy(l1, y, f) for y in l2]
def recurse_f_multipar(params, f, matching_f):
'''params will spread using the matching function (matching_f)
on the lowest level applies f (function)'''
is_list = [isinstance(l, (list, tuple)) for l in params]
if any(is_list):
res = []
if not all(is_list):
l_temp = []
for l in params:
if not isinstance(l, (list, tuple)):
l_temp.append([l])
else:
l_temp.append(l)
params = l_temp
params = matching_f(params)
for z in zip(*params):
res.append(recurse_f_multipar(z,f, matching_f))
return res
else:
return f(params)
def recurse_f_multipar_const(params, const, f, matching_f):
'''params will spread using the matching function, the const is a constant
parameter that you dont want to spread '''
is_list = [isinstance(l, (list, tuple)) for l in params]
if any(is_list):
res = []
if not all(is_list):
l_temp = []
for l in params:
if not isinstance(l, (list, tuple)):
l_temp.append([l])
else:
l_temp.append(l)
params = l_temp
params = matching_f(params)
for z in zip(*params):
res.append(recurse_f_multipar_const(z, const, f, matching_f))
return res
else:
return f(params, const)
def recurse_f_level_control(params, constant, main_func, matching_f, desired_levels, concatenate="APPEND"):
'''params will spread using the matching function (matching_f), the const is a constant
parameter that you dont want to spread , the main_func is the function to apply
and the desired_levels should be like [1, 2, 1, 3...] one level per parameter'''
input_levels = [levels_of_list_or_np(p) for p in params]
over_levels = [lv > dl for lv, dl in zip(input_levels, desired_levels)]
if any(over_levels):
p_temp = []
result = []
result_add = result.extend if concatenate == 'EXTEND' else result.append
for p, lv, dl in zip(params, input_levels, desired_levels):
if lv <= dl:
p_temp.append([p])
else:
p_temp.append(p)
params = matching_f(p_temp)
for g in zip(*params):
result_add(recurse_f_level_control(g, constant, main_func, matching_f, desired_levels, concatenate=concatenate))
else:
result = main_func(params, constant, matching_f)
return result
def process_matched(params, main_func, matching_mode, input_nesting, outputs_num):
'''params will spread using the matching fmode,
the main_func is the function to apply
the input_nesting should be like an Integer List like [1, 2, 1, 3...] one level per parameter
outputs_num: number of outputs (Integer)
'''
input_levels = [levels_of_list_or_np(p) for p in params]
over_levels = [lv > dl for lv, dl in zip(input_levels, input_nesting)]
one_output = outputs_num == 1
matching_f = list_match_func[matching_mode]
if one_output:
result = []
else:
result = [[] for l in range(outputs_num)]
if any(over_levels):
p_temp = []
for p, lv, dl in zip(params, input_levels, input_nesting):
if lv <= dl:
p_temp.append([p])
else:
p_temp.append(p)
params = matching_f(p_temp)
for g in zip(*params):
local_result = process_matched(g, main_func, matching_mode, input_nesting, outputs_num)
append_result(result, local_result, one_output)
else:
result = main_func(matching_f(params))
return result
def append_result(result, local_result, one_output):
if one_output:
result.append(local_result)
else:
for r, r_local in zip(result, local_result):
r.append(r_local)
def extend_result(result, local_result, one_output):
if one_output:
result.extend(local_result)
else:
for r, r_local in zip(result, local_result):
r.extend(r_local)
def extend_if_needed(vl, wl, default=0.5):
# match wl to correspond with vl
try:
last_value = wl[-1][-1]
except:
last_value = default
if (len(vl) > len(wl)):
num_new_empty_lists = len(vl) - len(wl)
for emlist in range(num_new_empty_lists):
wl.append([])
# extend each sublist in wl to match quantity found in sublists of v1
for i, vlist in enumerate(vl):
if (len(vlist) > len(wl[i])):
num_new_repeats = len(vlist) - len(wl[i])
for n in range(num_new_repeats):
wl[i].append(last_value)
return wl
Functions
def append_result(result, local_result, one_output)
-
Expand source code
def append_result(result, local_result, one_output): if one_output: result.append(local_result) else: for r, r_local in zip(result, local_result): r.append(r_local)
def extend_if_needed(vl, wl, default=0.5)
-
Expand source code
def extend_if_needed(vl, wl, default=0.5): # match wl to correspond with vl try: last_value = wl[-1][-1] except: last_value = default if (len(vl) > len(wl)): num_new_empty_lists = len(vl) - len(wl) for emlist in range(num_new_empty_lists): wl.append([]) # extend each sublist in wl to match quantity found in sublists of v1 for i, vlist in enumerate(vl): if (len(vlist) > len(wl[i])): num_new_repeats = len(vlist) - len(wl[i]) for n in range(num_new_repeats): wl[i].append(last_value) return wl
def extend_result(result, local_result, one_output)
-
Expand source code
def extend_result(result, local_result, one_output): if one_output: result.extend(local_result) else: for r, r_local in zip(result, local_result): r.extend(r_local)
def process_matched(params, main_func, matching_mode, input_nesting, outputs_num)
-
params will spread using the matching fmode, the main_func is the function to apply the input_nesting should be like an Integer List like [1, 2, 1, 3…] one level per parameter outputs_num: number of outputs (Integer)
Expand source code
def process_matched(params, main_func, matching_mode, input_nesting, outputs_num): '''params will spread using the matching fmode, the main_func is the function to apply the input_nesting should be like an Integer List like [1, 2, 1, 3...] one level per parameter outputs_num: number of outputs (Integer) ''' input_levels = [levels_of_list_or_np(p) for p in params] over_levels = [lv > dl for lv, dl in zip(input_levels, input_nesting)] one_output = outputs_num == 1 matching_f = list_match_func[matching_mode] if one_output: result = [] else: result = [[] for l in range(outputs_num)] if any(over_levels): p_temp = [] for p, lv, dl in zip(params, input_levels, input_nesting): if lv <= dl: p_temp.append([p]) else: p_temp.append(p) params = matching_f(p_temp) for g in zip(*params): local_result = process_matched(g, main_func, matching_mode, input_nesting, outputs_num) append_result(result, local_result, one_output) else: result = main_func(matching_f(params)) return result
def recurse_f_level_control(params, constant, main_func, matching_f, desired_levels, concatenate='APPEND')
-
params will spread using the matching function (matching_f), the const is a constant parameter that you dont want to spread , the main_func is the function to apply and the desired_levels should be like [1, 2, 1, 3…] one level per parameter
Expand source code
def recurse_f_level_control(params, constant, main_func, matching_f, desired_levels, concatenate="APPEND"): '''params will spread using the matching function (matching_f), the const is a constant parameter that you dont want to spread , the main_func is the function to apply and the desired_levels should be like [1, 2, 1, 3...] one level per parameter''' input_levels = [levels_of_list_or_np(p) for p in params] over_levels = [lv > dl for lv, dl in zip(input_levels, desired_levels)] if any(over_levels): p_temp = [] result = [] result_add = result.extend if concatenate == 'EXTEND' else result.append for p, lv, dl in zip(params, input_levels, desired_levels): if lv <= dl: p_temp.append([p]) else: p_temp.append(p) params = matching_f(p_temp) for g in zip(*params): result_add(recurse_f_level_control(g, constant, main_func, matching_f, desired_levels, concatenate=concatenate)) else: result = main_func(params, constant, matching_f) return result
def recurse_f_multipar(params, f, matching_f)
-
params will spread using the matching function (matching_f) on the lowest level applies f (function)
Expand source code
def recurse_f_multipar(params, f, matching_f): '''params will spread using the matching function (matching_f) on the lowest level applies f (function)''' is_list = [isinstance(l, (list, tuple)) for l in params] if any(is_list): res = [] if not all(is_list): l_temp = [] for l in params: if not isinstance(l, (list, tuple)): l_temp.append([l]) else: l_temp.append(l) params = l_temp params = matching_f(params) for z in zip(*params): res.append(recurse_f_multipar(z,f, matching_f)) return res else: return f(params)
def recurse_f_multipar_const(params, const, f, matching_f)
-
params will spread using the matching function, the const is a constant parameter that you dont want to spread
Expand source code
def recurse_f_multipar_const(params, const, f, matching_f): '''params will spread using the matching function, the const is a constant parameter that you dont want to spread ''' is_list = [isinstance(l, (list, tuple)) for l in params] if any(is_list): res = [] if not all(is_list): l_temp = [] for l in params: if not isinstance(l, (list, tuple)): l_temp.append([l]) else: l_temp.append(l) params = l_temp params = matching_f(params) for z in zip(*params): res.append(recurse_f_multipar_const(z, const, f, matching_f)) return res else: return f(params, const)
def recurse_fx(l, f)
-
Expand source code
def recurse_fx(l, f): if isinstance(l, (list, tuple)): return [recurse_fx(i, f) for i in l] else: return f(l)
def recurse_fxy(l1, l2, f)
-
Expand source code
def recurse_fxy(l1, l2, f): l1_type = isinstance(l1, (list, tuple)) l2_type = isinstance(l2, (list, tuple)) if not (l1_type or l2_type): return f(l1, l2) elif l1_type and l2_type: fl = l2[-1] if len(l1) > len(l2) else l1[-1] res = [] res_append = res.append for x, y in zip_longest(l1, l2, fillvalue=fl): res_append(recurse_fxy(x, y, f)) return res elif l1_type and not l2_type: return [recurse_fxy(x, l2, f) for x in l1] else: #not l1_type and l2_type return [recurse_fxy(l1, y, f) for y in l2]
def sv_zip_longest2(*args)
-
Expand source code
def sv_zip_longest2(*args): # by zeffi longest = max([len(i) for i in args]) itrs = [iter(sl) for sl in args] for i in range(longest): yield tuple((next(iterator, args[idx][-1]) for idx, iterator in enumerate(itrs)))
Classes
class SvSentinel (fl, top)
-
Expand source code
class SvSentinel: def __init__(self, fl, top): self.fl = fl self.top = top self.done = False def __next__(self): if self.done: raise StopIteration self.top.counter -= 1 if not self.top.counter: raise SvZipExhausted self.done = True return self.fl def __iter__(self): return self
class SvZipExhausted (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class SvZipExhausted(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class sv_zip_longest (*args)
-
Expand source code
class sv_zip_longest: def __init__(self, *args): self.counter = len(args) self.iterators = [] for lst in args: fl = lst[-1] filler = repeat(fl) self.iterators.append(chain(lst, SvSentinel(fl,self), filler)) def __next__(self): try: if self.counter: return tuple(map(next, self.iterators)) else: raise StopIteration except SvZipExhausted: raise StopIteration def __iter__(self): return self