揭开xenomai双核系统下clock机制的面纱-双内核技术

2.3 ipipe tick设备管理

linux时间系统中说到有多少个硬件timer,就会注册多少个clock event device,最后linux会为每个cpu选择一个合适的clock event来为tick device产生event。xenomai系统的运行也需要这么一个合适的硬件timer来产生event,由于xenomai需要的硬件都是由ipipe来提供,所以ipipe需要知道系统中有哪些clock event device被注册,然后ipipe为每一个cpu核选择一个合适的。

ipipe将linux中clock event device按xenomai系统需要重新抽象为结构体struct ipipe_timer,系统中有一个全局链表timer,当底层驱动调用clockevents_register_device,注册clock event设备时ipipe对应的创建一个ipipe_timer插入链表timer。struct ipipe_timer如下: structipipe_timer{ intirq; void(*request)(structipipe_timer*timer,intsteal); int(*set)(unsignedlongticks,void*timer); void(*ack)(void); void(*release)(structipipe_timer*timer); /*Onlyifregisteringatimerdirectly*/ constchar*name; unsignedrating; unsignedlongfreq; unsignedlongmin_delay_ticks; unsignedlongmax_delay_ticks; conststructcpumask*cpumask; /*Forinternaluse*/ void*timer_set;/*pointerpassedto->set()callback*/ structclock_event_device*host_timer;/*依赖的clockevent*/ structlist_headlink; unsignedc2t_integ; unsignedc2t_frac; /*Forclockeventinterception*/ u32real_mult; u32real_shift; void(*mode_handler)(enumclock_event_modemode, structclock_event_device*); intorig_mode; int(*orig_set_state_periodic)(structclock_event_device*); int(*orig_set_state_oneshot)(structclock_event_device*); int(*orig_set_state_oneshot_stopped)(structclock_event_device*); int(*orig_set_state_shutdown)(structclock_event_device*); int(*orig_set_next_event)(unsignedlongevt, structclock_event_device*cdev); unsignedint(*refresh_freq)(void); };

irq

:该ipipe_timer所依赖的clock_event_device的中断号,产生中断时ipipe将中断分配给谁处理用到;request

:设定clock_event_device模式的函数set

:设置下一个定时中断的函数,这个就是上面启动xntimer时的那个函数ack

:产生中断后中断清除函数rating

:该clock_event_device的raning级别freq

:该clock_event_device的运行频率min_delay_ticks、max_delay_ticks

:最小、最大定时时间cpumask

:cpu掩码,标识可以为哪个cpu提供定时服务host_timer

:这个ipipe_timer对应是哪个clock_event_devicelink

:链表节点,加入全局链表timer时使用 orig_set_state_periodic、orig_set_state_oneshot、orig_set_state_oneshot_stopped、orig_set_next_event,为xenomai提供服务需要将clock_event_device中一些已经设置的函数替换,这些用来备份原clock_event_device中的函数。

再来看一看clock xevent注册函数clockevents_register_device(),ipipe补丁在其中插入了一个注册函数ipipe_host_timer_register()先把clock xevent管理起来: voidclockevents_register_device(structclock_event_device*dev) { unsignedlongflags; …… ipipe_host_timer_register(dev); …. } staticintget_dev_mode(structclock_event_device*evtdev) { if(clockevent_state_oneshot(evtdev)|| clockevent_state_oneshot_stopped(evtdev)) returnCLOCK_EVT_MODE_ONESHOT; if(clockevent_state_periodic(evtdev)) returnCLOCK_EVT_MODE_PERIODIC; if(clockevent_state_shutdown(evtdev)) returnCLOCK_EVT_MODE_SHUTDOWN; returnCLOCK_EVT_MODE_UNUSED; } voidipipe_host_timer_register(structclock_event_device*evtdev) { structipipe_timer*timer=evtdev->ipipe_timer; if(timer==NULL) return; timer->orig_mode=CLOCK_EVT_MODE_UNUSED; if(timer->request==NULL) timer->request=ipipe_timer_default_request;/*设置request函数*/ /* *Bydefault,usethesamemethodaslinuxtimer,onARMat *least,mostset_next_eventmethodsaresafetobecalled *fromXenomaidomainanyway. */ if(timer->set==NULL){ timer->timer_set=evtdev; timer->set=(typeof(timer->set))evtdev->set_next_event;/*设定的counter的cycle数值*/ } if(timer->release==NULL) timer->release=ipipe_timer_default_release; if(timer->name==NULL) timer->name=evtdev->name; if(timer->rating==0) timer->rating=evtdev->rating; timer->freq=(1000000000ULL*evtdev->mult)>>evtdev->shift;/*1G*mult>>shift*/ if(timer->min_delay_ticks==0) timer->min_delay_ticks= (evtdev->min_delta_ns*evtdev->mult)>>evtdev->shift; if(timer->max_delay_ticks==0) timer->max_delay_ticks= (evtdev->max_delta_ns*evtdev->mult)>>evtdev->shift; if(timer->cpumask==NULL) timer->cpumask=evtdev->cpumask; timer->host_timer=evtdev; ipipe_timer_register(timer); }

这里面通过evtdev直接将一些结构体成员赋值,这里需要注意的的是timer->set = (typeof(timer->set))evtdev->set_next_event;对于lapic-timer来说timer->set=lapic_next_event,如果CPU支持tsc deadline特性则是timer->set=lapic_next_deadline,TSC-deadline模式允许软件使用本地APIC timer 在绝对时间发出中断信号,使用tsc来设置deadline,为了全文统一,使用apic-timer,这决定了xenomai是否能直接控制硬件,然后调用ipipe_timer_register()将ipipe_timer添加到链表timer完成注册: voidipipe_timer_register(structipipe_timer*timer) { structipipe_timer*t; unsignedlongflags; if(timer->timer_set==NULL) timer->timer_set=timer; if(timer->cpumask==NULL) timer->cpumask=cpumask_of(smp_processor_id()); raw_spin_lock_irqsave(&lock,flags); list_for_each_entry(t,&timers,link){/*按插入链表*/ if(t->rating<= timer->rating){ __list_add(&timer->link,t->link.prev,&t->link); gotodone; } } list_add_tail(&timer->link,&timers);/*按插入全局链表尾*/ done: raw_spin_unlock_irqrestore(&lock,flags); }

xenomai在每一个cpu核都需要一个ipipe_timer 来推动调度、定时等,ipipe为每个CPU分配了一个ipipe_timer指针percpu_timer,链表timers记录了所有ipipe_timer,这样就可以从链表中选择可供xenomai使用的ipipe_timer: staticDEFINE_PER_CPU(structipipe_timer*,percpu_timer);

另外,在3.ipipe domian管理说到每个cpu上管理不同域的结构体ipipe_percpu_data,里面有一个成员变量int hrtimer_irq,这个hrtimer_irq是用来存放为这个cpu提供event的硬件timer的中断号的,用于将ipipe_percpu_data与ipipe_timer联系起来,介绍完相关数据结构下面来看xenomai 时钟系统初始化流程。 DECLARE_PER_CPU(structipipe_percpu_data,ipipe_percpu);

2.4 xenomai 时钟系统初始化流程

xenomai内核系统初始化源码文件:kernelxenomaiinit.c,时钟系统在xenomai初始化流程中调用mach_setup()完成硬件相关初始化: xenomai_init(void) ->mach_setup() staticint__initmach_setup(void) { structipipe_sysinfosysinfo; intret,virq; ret=ipipe_select_timers(&xnsched_realtime_cpus); … ipipe_get_sysinfo(&sysinfo);/*获取系统ipipe信息*/ if(timerfreq_arg==0) timerfreq_arg=sysinfo.sys_hrtimer_freq; if(clockfreq_arg==0) clockfreq_arg=sysinfo.sys_hrclock_freq; cobalt_pipeline.timer_freq=timerfreq_arg; cobalt_pipeline.clock_freq=clockfreq_arg; if(cobalt_machine.init){ ret=cobalt_machine.init();/*mach_x86_init*/ if(ret) returnret; } ipipe_register_head(&xnsched_realtime_domain,”Xenomai”); …… ret=xnclock_init(cobalt_pipeline.clock_freq);/*初始化xnclock,为Cobalt提供clock服务时钟*/ return0;

首先调用ipipe_select_timers()来为每个cpu选择一个ipipe_timer。 intipipe_select_timers(conststructcpumask*mask) { unsignedhrclock_freq; unsignedlonglongtmp; structipipe_timer*t; structclock_event_device*evtdev; unsignedlongflags; unsignedcpu; cpumask_tfixup; ……. if(__ipipe_hrclock_freq>UINT_MAX){ tmp=__ipipe_hrclock_freq; do_div(tmp,1000); hrclock_freq=tmp; }else hrclock_freq=__ipipe_hrclock_freq;/*1000ULL*cpu_khz*/ ……. for_each_cpu(cpu,mask){/*从timers为每一个CPU选择一个percpu_timer*/ list_for_each_entry(t,&timers,link){/*遍历ipipe全局timer链表*/ if(!cpumask_test_cpu(cpu,t->cpumask)) continue; evtdev=t->host_timer; if(evtdev&&clockevent_state_shutdown(evtdev))/*该CPUtimer被软件shutdown则跳过*/ continue; gotofound; } …. gotoerr_remove_all; found: install_pcpu_timer(cpu,hrclock_freq,t);/*设置每一个CPU的timer*/ } ……. flags=ipipe_critical_enter(ipipe_timer_request_sync); ipipe_timer_request_sync();/*如果支持,则切换到单触发模式。*/ ipipe_critical_exit(flags); ……. }

先得到从全局变量cpu_khz得到tsc频率保存到hrclock_freq,然后为xenomai运行的每一个cpu核进行ippie_timer选择,对每一个遍历全局链表timers,取出evtdev,看是否能为该cpu服务,并且没有处于关闭状态。evtdev在Linux没有被使用就会被Linux关闭。最后选出来的也就是lapic-timer 。 找到合适的tevtdev后调用install_pcpu_timer(cpu, hrclock_freq, t),为该cpu设置ipipe_timer: staticvoidinstall_pcpu_timer(unsignedcpu,unsignedhrclock_freq, structipipe_timer*t) { per_cpu(ipipe_percpu.hrtimer_irq,cpu)=t->irq; per_cpu(percpu_timer,cpu)=t; config_pcpu_timer(t,hrclock_freq); }

主要是设置几个xenomai相关的precpu变量,ipipe_percpu.hrtimer_irq设置为该evtdev的irq,percpu_timer为该evtdev对应的ipipe_timer,然后计算ipipe_timer中lapic-timer与tsc频率之间的转换因子c2t_integ、c2t_frac;

回到ipipe_select_timers(),通过ipipe给每一个cpu发送一个中断IPIPE_CRITICAL_IPI,将每一个lapic-timer通过ipipe_timer->request设置为oneshot模式。

回到mach_setup(),为每个cpu选出ipipe_timer后获取此时系统信息:ipipe_get_sysinfo(&sysinfo) intipipe_get_sysinfo(structipipe_sysinfo*info) { info->sys_nr_cpus=num_online_cpus();/*运行的cpu数据*/ info->sys_cpu_freq=__ipipe_cpu_freq;/*1000ULL*cpu_khz*/ info->sys_hrtimer_irq=per_cpu(ipipe_percpu.hrtimer_irq,0);/*cpu0的ipipe_timer中断号*/ info->sys_hrtimer_freq=__ipipe_hrtimer_freq;/*time的频率*/ info->sys_hrclock_freq=__ipipe_hrclock_freq;/*1000ULL*cpu_khz*/ return0; }

在这里还是觉得有问题,CPU和TSC、timer三者频率不一定相等。

这几个变量在接下来初始化xnclock中使用。xnclock_init(cobalt_pipeline.clock_freq): int__initxnclock_init(unsignedlonglongfreq) { xnclock_update_freq(freq); nktimerlat=xnarch_timer_calibrate(); xnclock_reset_gravity(&nkclock);/*reset_core_clock_gravity*/ xnclock_register(&nkclock,&xnsched_realtime_cpus); return0; }

xnclock_update_freq(freq)计算出tsc频率与时间ns单位的转换因子tsc_scale,tsc_shift,计算流程可参考文档实时内核与linux内核时钟漂移过大原因.docx

xnarch_timer_calibrate()计算出每次对硬件timer编程这个执行过程需要多长时间,也就是测量ipipe_timer_set()这个函数的执行时间nktimerlat,计算方法是这样先确保测量这段时间timer不会触发中断干扰,所以先用ipipe_timer_set()给硬件timer设置一个很长的超时值,然后开始测量,先从TSC读取现在的时间tick值t0,然后循环执行100次ipipe_timer_set(),接着从TSC读取现在的时间tick值t1,ipipe_timer_set()平均每次的执行时间是揭开xenomai双核系统下clock机制的面纱-双内核技术,为了算上其他可能的延迟5%,nktimerlat=(t1−t0)/105;

下面计算对kernel、user、irq xntimer精确定时的gravity,上面已经说过为甚需要这个,xnclock_reset_gravity(&nkclock)调用执行xnclock->ops.reset_gravity(),也就是reset_core_clock_gravity()函数: staticvoidreset_core_clock_gravity(structxnclock*clock) { structxnclock_gravitygravity; xnarch_get_latencies(&gravity); gravity.user+=nktimerlat; if(gravity.kernel==0) gravity.kernel=gravity.user; if(gravity.irq==0) gravity.irq=nktimerlat; set_core_clock_gravity(clock,&gravity); }

首先通过xnarch_get_latencies()函数来计算各空间的gravity,其实这个函数里没有具体的计算流程,给的都是一些经验值,要么我们自己编译时配置: staticinlinevoidxnarch_get_latencies(structxnclock_gravity*p) { unsignedlongsched_latency; #ifCONFIG_XENO_OPT_TIMING_SCHEDLAT!=0 sched_latency=CONFIG_XENO_OPT_TIMING_SCHEDLAT; #else/*!CONFIG_XENO_OPT_TIMING_SCHEDLAT*/ if(strcmp(ipipe_timer_name(),”lapic”)==0){ #ifdefCONFIG_SMP if(num_online_cpus()>1) sched_latency=3350; else sched_latency=2000; #else/*!SMP*/ sched_latency=1000; #endif/*!SMP*/ }elseif(strcmp(ipipe_timer_name(),”pit”)){/*HPET*/ #ifdefCONFIG_SMP if(num_online_cpus()>1) sched_latency=3350; else sched_latency=1500; #else/*!SMP*/ sched_latency=1000; #endif/*!SMP*/ }else{ sched_latency=(__get_bogomips()< 250 ? 17000 :                  __get_bogomips() < 2500 ? 4200 :                  3500); #ifdef CONFIG_SMP         sched_latency += 1000; #endif /* CONFIG_SMP */     } #endif /* !CONFIG_XENO_OPT_TIMING_SCHEDLAT */     p->user=sched_latency; p->kernel=CONFIG_XENO_OPT_TIMING_KSCHEDLAT; p->irq=CONFIG_XENO_OPT_TIMING_IRQLAT; }

首先判断宏CONFIG_XENO_OPT_TIMING_SCHEDLAT 如果不等于0,说明我们自己配置了这个数,直接赋值就行,否则的话,根据xenomai使用的定时器是lapic 还是hept给不同的一些经验值了: p->user=CONFIG_XENO_OPT_TIMING_SCHEDLAT; p->kernel=CONFIG_XENO_OPT_TIMING_KSCHEDLAT; p->irq=CONFIG_XENO_OPT_TIMING_IRQLAT;

CONFIG_XENO_OPT_TIMING_SCHEDLAT宏在内核编译时设置,默认为0,使用已有的经验值: [*]Xenomai/cobalt—> Latencysettings—> (0)Userschedulinglatency(ns) (0)Intra-kernelschedulinglatency(ns) (0)Interruptlatency(ns)

实际使用后发现这个经验值也不太准,从测试数据看i5处理器与赛扬就存在差别,如果开启内核trace,就更不准了

. 计算出gravity后加上ipipe_timer_set()执行需要的时间nktimerlat,就是最终的gravity。以用户空间实时程序定时为例如下(图中时间段与比例无关):

揭开xenomai双核系统下clock机制的面纱-双内核技术 到此mach_setup()函数中上层软件时钟相关初始化完了,但xenomai还不能直接对硬件timer,此时xenomai进程调度还没初始化,硬件timer与内核调度等息息相关,xenomai内核还不能掌管硬件timer,不能保证linux愉快运行,硬抢过来只能一起阵亡。等xenomai内核任务管理等初始化完毕,给Linux舒适的运行空间,就可以直接控制硬件timer了,下面继续解析这个函数sys_init();

2.5 xenomai接管lapic-timer

sys_init()涉及每个CPU上的调度结构体初始化等。先插以点内容,每个cpu上的xenomai 调度由对象xnsched来管理,xnsched对象每个cpu有一个,其中包含各类sched class,还包含两个xntimer,一个host timer —htimer,主要给linux定时,另一个循环计时timer rrbtimer;一个xnthead结构rootcb,xenomai调度的是线程,每个实时线程使用xnthead结构表示,这个rootcb表示本cpu上xenomai调度的linux,在双核下linux只是xenomai的一个idle任务,cpu0上xnsched结构如下:

揭开xenomai双核系统下clock机制的面纱-双内核技术

详细的结构后面会分析,这里只解析时钟相关部分。 static__initintsys_init(void) { structxnsched*sched; void*heapaddr; intret,cpu; if(sysheap_size_arg==0) sysheap_size_arg=CONFIG_XENO_OPT_SYS_HEAPSZ;/**/ heapaddr=xnheap_vmalloc(sysheap_size_arg*1024);/*256*1024*/ ….. xnheap_set_name(&cobalt_heap,”systemheap”); for_each_online_cpu(cpu){ sched=&per_cpu(nksched,cpu); xnsched_init(sched,cpu); } #ifdefCONFIG_SMP ipipe_request_irq(&xnsched_realtime_domain, IPIPE_RESCHEDULE_IPI, (ipipe_irq_handler_t)__xnsched_run_handler, NULL,NULL); #endif xnregistry_init(); /* *Ifstartinginstoppedmode,doallinitializations,butdo *notenablethecoretimer. */ if(realtime_core_state()==COBALT_STATE_WARMUP){ ret=xntimer_grab_hardware();/*霸占硬件host定时器*/ ….. set_realtime_core_state(COBALT_STATE_RUNNING);/*更新实时内核状态*/ } return0; }

sys_init()中先初始化内核堆空间,初始化每个CPU上的调度结构体xnsched、创建idle线程,也就是上面说到的roottcb,多cpu核调度等,经过这一些步骤,LInux已经变成xenomai的一个idle线程了,最后调用xntimer_grab_hardware(),接管硬件timer: intxntimer_grab_hardware(void) { structxnsched*sched; intret,cpu,_cpu; spl_ts; ……. nkclock.wallclock_offset= xnclock_get_host_time()-xnclock_read_monotonic(&nkclock); ret=xntimer_setup_ipi();ipipe_request_irq(&xnsched_realtime_domain,IPIPE_HRTIMER_IPI, (ipipe_irq_handler_t)xnintr_core_clock_handler,NULL,NULL); for_each_realtime_cpu(cpu){ ret=grab_hardware_timer(cpu); if(ret< 0)             goto fail;         xnlock_get_irqsave(&nklock, s);         sched = xnsched_struct(cpu);         if (ret >1) xntimer_start(&sched->htimer,ret,ret,XN_RELATIVE); elseif(ret==1) xntimer_start(&sched->htimer,0,0,XN_RELATIVE); #ifdefCONFIG_XENO_OPT_WATCHDOG/*启动看门狗定时器*/ xntimer_start(&sched->wdtimer,1000000000UL,1000000000UL,XN_RELATIVE); xnsched_reset_watchdog(sched); #endif xnlock_put_irqrestore(&nklock,s); } …… returnret; }

注册xnclock时nkclock.wallclock_offset没有设置,现在设置也就是walltime的时间与tsc 的时间偏移。然后注册IPIPE_HRTIMER_IPI中断到xnsched_realtime_domain,9.2xntimer那一节启动一个xntimer需要通知其他cpu处理时发送的IPIPE_HRTIMER_IPI: intxntimer_setup_ipi(void) { returnipipe_request_irq(&xnsched_realtime_domain, IPIPE_HRTIMER_IPI, (ipipe_irq_handler_t)xnintr_core_clock_handler, NULL,NULL); }

接下来就是重要的为每个cpu接管硬件timer了,其实过程也简单,就是将原来lcock event的一些操作函数替换来达到目的,每个cpu上xenomai调度管理结构xnsched,每个xnsched中有一个定时器htimer,这个xntimer就是为linux服务的,根据底层timer的类型,后启动htimer,htimer 推动linux继时间子系统运行。这些后面会详细解析。回到接管timer函数grab_hardware_timer(cpu): staticintgrab_hardware_timer(intcpu) { inttickval,ret; ret=ipipe_timer_start(xnintr_core_clock_handler, switch_htick_mode,program_htick_shot,cpu); switch(ret){ caseCLOCK_EVT_MODE_PERIODIC: /* *Oneshottickemulationcallbackwontbeused,ask *thecallertostartaninternaltimerforemulating *aperiodictick. */ tickval=1000000000UL/HZ; break; caseCLOCK_EVT_MODE_ONESHOT: /*oneshottickemulation*/ tickval=1; break; caseCLOCK_EVT_MODE_UNUSED: /*wedontneedtoemulatethetickatall.*/ tickval=0; break; caseCLOCK_EVT_MODE_SHUTDOWN: return-ENODEV; default: returnret; } returntickval; }

主要的操作在ipipe_timer_start(xnintr_core_clock_handler,switch_htick_mode, program_htick_shot, cpu),后面的就是判断这个timer工作在什么模式,相应的返回好根据模式设置htimer为linux服务;

ipipe_timer_start(xnintr_core_clock_handler,switch_htick_mode, program_htick_shot, cpu)其中的xnintr_core_clock_handler是lapic-timer 产生中断时xenomai内核的处理函数,里面会去处理每个xntimer以及xenomai调度;switch_htick_mode是lapic-timer工作模式切换函数,program_htick_shot函数是对sched->htimer重新定时的函数,这个函数对linux来说特别重要,以后linux就不直接对硬件timer设置定时了,而是给xenomai中的sched->htimer设置。下面是ipipe_timer_start代码: intipipe_timer_start(void(*tick_handler)(void), void(*emumode)(enumclock_event_modemode, structclock_event_device*cdev), int(*emutick)(unsignedlongevt, structclock_event_device*cdev), unsignedintcpu) { structgrab_timer_datadata; intret; data.tick_handler=tick_handler;/*xnintr_core_clock_handler*/ data.emutick=emutick;/*program_htick_shot*/ data.emumode=emumode;/*switch_htick_mode*/ data.retval=-EINVAL; ret=smp_call_function_single(cpu,grab_timer,&data,true);/*执行grab_timer*/ returnret?:data.retval; }

先将传入的几个函数指正存到结构体data,然后调用smp_call_function_single传给函数grab_timer处理,smp_call_function_single中的smp表示给指定的cpu去执行grab_timer,对应的cpu执行grab_timer(&data): staticvoidgrab_timer(void*arg) { structgrab_timer_data*data=arg; structclock_event_device*evtdev; structipipe_timer*timer; structirq_desc*desc; unsignedlongflags; intsteal,ret; flags=hard_local_irq_save(); timer=this_cpu_read(percpu_timer); evtdev=timer->host_timer; ret=ipipe_request_irq(ipipe_head_domain,timer->irq, (ipipe_irq_handler_t)data->tick_handler, NULL,__ipipe_ack_hrtimer_irq); if(ret< 0 && ret != -EBUSY) {         hard_local_irq_restore(flags);         data->retval=ret; return; } steal=evtdev!=NULL&&!clockevent_state_detached(evtdev); if(steal&&evtdev->ipipe_stolen==0){ timer->real_mult=evtdev->mult; timer->real_shift=evtdev->shift; timer->orig_set_state_periodic=evtdev->set_state_periodic; timer->orig_set_state_oneshot=evtdev->set_state_oneshot; timer->orig_set_state_oneshot_stopped=evtdev->set_state_oneshot_stopped; timer->orig_set_state_shutdown=evtdev->set_state_shutdown; timer->orig_set_next_event=evtdev->set_next_event; timer->mode_handler=data->emumode;/*switch_htick_mode*/ evtdev->mult=1; evtdev->shift=0; evtdev->max_delta_ns=UINT_MAX; if(timer->orig_set_state_periodic) evtdev->set_state_periodic=do_set_periodic; if(timer->orig_set_state_oneshot) evtdev->set_state_oneshot=do_set_oneshot; if(timer->orig_set_state_oneshot_stopped) evtdev->set_state_oneshot_stopped=do_set_oneshot_stopped; if(timer->orig_set_state_shutdown) evtdev->set_state_shutdown=do_set_shutdown; evtdev->set_next_event=data->emutick;/*program_htick_shot*/ evtdev->ipipe_stolen=1; } hard_local_irq_restore(flags); data->retval=get_dev_mode(evtdev); desc=irq_to_desc(timer->irq); if(desc&&irqd_irq_disabled(&desc->irq_data)) ipipe_enable_irq(timer->irq); if(evtdev->ipipe_stolen&&clockevent_state_oneshot(evtdev)){/*启动oneshot*/ ret=clockevents_program_event(evtdev, evtdev->next_event,true); if(ret) data->retval=ret; } }

首先从percpu_timer取出我们在ipipe_select_timers选择的那个clockevent device evtdev,现在要这个evtdev为xenomai服务,所以将它的中断注册到ipipe_head_domain,当中断来的时候后ipipe会交给ipipe_head_domain调用data->tick_handler也就是xnintr_core_clock_handler处理,xnintr_core_clock_handler中处理xenomai在本CPU当上的调度、定时等。

在struct clock_event_device中ipipe添加了一个标志位ipipe_stolen用来表示该evtdev是不是已经为实时系统服务,是就是1,否则为0,这里当然为0,先将原来evtdev的操作函数备份到’orig_‘打头的成员变量中,设置ipipe_timer的real_mult、real_shift为evtdev的mult、shift,原evtdev的mult、shift设置为1、0,linux计算的时候才能与xntimer定时时间对应起来。

最重要的是把原来evtdev->set_next_event设置成了program_htick_shot,program_htick_shot如下,从此linux就是对shched->htimer 定时器设置定时,来替代原来的evtdev: staticintprogram_htick_shot(unsignedlongdelay, structclock_event_device*cdev) { structxnsched*sched; intret; spl_ts; xnlock_get_irqsave(&nklock,s); sched=xnsched_current(); ret=xntimer_start(&sched->htimer,delay,XN_INFINITE,XN_RELATIVE);/*相对,单次定时*/ xnlock_put_irqrestore(&nklock,s); returnret?-ETIME:0; }

其余的最后如果evtdev中断没有使能就使能中断,evtdev是oneshot状态启动oneshot,到此xenomai掌管了lpic-tiemr,从此xenomai内核直接设置lpic-tiemr,lpic-tiemr到时产生中断,ipipe调用执行xnintr_core_clock_handler处理lpic-tiemr中断,xnintr_core_clock_handler处理xenomai时钟系统: voidxnintr_core_clock_handler(void) { structxnsched*sched=xnsched_current(); intcpu__maybe_unused=xnsched_cpu(sched); xnstat_exectime_t*prev; if(!xnsched_supported_cpu(cpu)){ #ifdefXNARCH_HOST_TICK_IRQ ipipe_post_irq_root(XNARCH_HOST_TICK_IRQ); #endif return; } …… ++sched->inesting;/*中断嵌套++*/ sched->lflags|=XNINIRQ;/*在中断上下文状态*/ xnlock_get(&nklock); xnclock_tick(&nkclock);/*处理一个时钟tick*/ xnlock_put(&nklock); trace_cobalt_clock_exit(per_cpu(ipipe_percpu.hrtimer_irq,cpu)); xnstat_exectime_switch(sched,prev); if(–sched->inesting==0){/*如果没有其他中断嵌套,执行从新调度*/ sched->lflags&=~XNINIRQ; xnsched_run();/*调度*/ sched=xnsched_current(); } /* *Ifthecoreclockinterruptpreemptedareal-timethread, *anytransitiontotherootthreadhasalreadytriggereda *hosttickpropagationfromxnsched_run(),soatthispoint, *weonlyneedtopropagatethehosttickincasethe *interruptpreemptedtherootthread. */ if((sched->lflags&XNHTICK)&& xnthread_test_state(sched->curr,XNROOT)) xnintr_host_tick(sched); }

xnintr_core_clock_handler中,首先判断产生这个中断的cpu属不属于实时调度cpu,如果不属于,那就把中断post到root域后直接返回,ipipe会在root域上挂起这个中断给linux处理。

如果这是运行xenomai的cpu,接下来调用xnclock_tick(&nkclock),来处理一个时钟tick,里面就是看该cpu上哪些xntimer到期了做相应处理: voidxnclock_tick(structxnclock*clock) { structxnsched*sched=xnsched_current(); structxntimer*timer; xnsticks_tdelta; xntimerq_t*tmq; xnticks_tnow; xntimerh_t*h; atomic_only(); …… tmq=&xnclock_this_timerdata(clock)->q;/**/ /* *Optimisation:anylocaltimerreprogrammingtriggeredby *invokedtimerhandlerscanwaituntilweleavethetick *handler.Usethisstatusflagashinttoxntimer_start(). */ sched->status|=XNINTCK; now=xnclock_read_raw(clock); while((h=xntimerq_head(tmq))!=NULL){ timer=container_of(h,structxntimer,aplink); delta=(xnsticks_t)(xntimerh_date(&timer->aplink)-now); if(delta>0) break; trace_cobalt_timer_expire(timer); xntimer_dequeue(timer,tmq); xntimer_account_fired(timer);/*timer->fired++*/ /* *Bypostponingthepropagationofthelow-priority *hostticktotheinterruptepilogue(see *xnintr_irq_handler()),wesavesomeI-cache,which *translatesintopreciousmicrosecsonlow-endhw. */ if(unlikely(timer==&sched->htimer)){ sched->lflags|=XNHTICK; sched->lflags&=~XNHDEFER; if(timer->status&XNTIMER_PERIODIC) gotoadvance; continue; } /*Checkforalockedclockstate(i.e.ptracing).*/ if(unlikely(nkclock_lock>0)){ if(timer->status&XNTIMER_NOBLCK) gotofire; if(timer->status&XNTIMER_PERIODIC) gotoadvance; /* *Wehavenoperiodforthisblockedtimer, *sohaveittickagainatareasonablyclose *dateinthefuture,waitingfortheclock *tobeunlockedatsomepoint.Sinceclocks *areblockedwhensingle-steppingintoan *applicationusingadebugger,itisfineto *waitfor250msfortheusertocontinue *programexecution. */ xntimerh_date(&timer->aplink)+= xnclock_ns_to_ticks(xntimer_clock(timer), 250000000); gotorequeue; } fire: timer->handler(timer);/******************************/ now=xnclock_read_raw(clock); timer->status|=XNTIMER_FIRED; /* *Onlyrequeueperiodictimerswhichhavenotbeen *requeued,stoppedorkilled. */ if((timer->status& (XNTIMER_PERIODIC|XNTIMER_DEQUEUED|XNTIMER_KILLED|XNTIMER_RUNNING))!= (XNTIMER_PERIODIC|XNTIMER_DEQUEUED|XNTIMER_RUNNING)) continue; advance: do{ timer->periodic_ticks++; xntimer_update_date(timer); }while(xntimerh_date(&timer->aplink)< now);     requeue: #ifdef CONFIG_SMP         /*          * If the timer was migrated over its timeout handler,          * xntimer_migrate() re-queued it already.          */         if (unlikely(timer->sched!=sched)) continue; #endif xntimer_enqueue(timer,tmq); } sched->status&=~XNINTCK; xnclock_program_shot(clock,sched); }

xnclock_tick里主要处理各种类型的xntimer,首先取出本cpu上管理xntimer红黑树的根节点xntimerq_t,然后开始处理,为了安全设置sched状态标识status为XNINTCK,标识该sched正在处理tick,得到现在tsc值now,然后一个while循环,取出红黑树上定时最小的那个xntimer,得到这个xntimer的时间date,如果date减去now大于0,说明最短定时的xntimer都没有到期,那就不需要继续处理,直接跳出循环,执行xnclock_program_shot(clock, sched)设置定时器下一个中断触发时间。 如果有xntimer到期,date减去now小于等于0,首先从红黑树中删除,然后xntimer.fire加1,表示xntimer到期次数,然后处理,这里逻辑有点绕:

1.如果是sched->htimer,就是为Linux定时的,先设置sched->lflags |= XNHTICK,这个标志设置的是lflags不是status,因为linux的不是紧急的,后面本cpu没有高优先级实时任务运行才会给linux处理。接着判断是不是一个周期timer,如果是,goto到advance更新timer时间date,可能已将过去几个周期时间了,所有使用循环一个一个周期的增加直到现在时间now,然后重新插入红黑树。

2.如果这个xntimer是一个非阻塞timer,直接跳转fire执行handler,并设置状态已经FIRED。

3.如果这是一个非htimer的周期定时器,那同样更新时间后重新加入红黑树。

4.以上都不是就将xntimer重新定时250ms,加入红黑树。

xnclock_tick执行返回后,xnstat_exectime_switch()更新该cpu上每个域的执行时间,然后如果没有其他中断嵌套则进行任务调度xnsched_run();

不知经过多少个rt任务切换后回到这个上下文,并且当前cpu运行linux,上次离开这linux的定时器htimer还没处理呢,检查如果当前cpu上运行linux,并且sched->lflags中有XNHTICK标志,那将中断通过ipipe post给linux处理,并清除lflags中的XNHTICK,linux中断子系统就会去只执行eventhandler,处理linux时间子系统。 voidxnintr_host_tick(structxnsched*sched)/*Interruptsoff.*/ { sched->lflags&=~XNHTICK; #ifdefXNARCH_HOST_TICK_IRQ ipipe_post_irq_root(XNARCH_HOST_TICK_IRQ); #endif }

揭开xenomai双核系统下clock机制的面纱-双内核技术

2.6 xenomai内核下Linux时钟工作流程

到此时钟系统中除调度相关的外,一个CPU上双核系统时钟流程如下图所示:

揭开xenomai双核系统下clock机制的面纱-双内核技术

总结:xenomai内核启动时,grab_timer()结合ipipe通过替换回调函数将原linux系统timer lapic-timer作为xenomai 系统timer,xenomai直接对层硬件lapic-timer编程,linux退化为xenomai的idle任务,idle任务的主时钟就变成linux的时钟来源,由linux直接对层硬件lapic-timer编程变成对idle hrtimer编程。idle hrtimer依附于xenomai时钟xnclock,xnclock运作来源于底层硬件lapic-timer。

2.7 gravity

为什么要设置gravity呢?

xenomai是个实时系统必须保证定时器的精确,xntimer都是由硬件timer产生中断后处理的,如果没有gravity,对于用户空间实时任务RT:假如此时时间刻度是0,该任务定时10us后触发定时器,10us后,产生了中断,此时时间刻度为10us,开始处理xntimer,然后切换回内核空间执行调度,最后切换回用户空间,从定时器到期到最后切换回RT也是需要时间的,已经超过RT所定的10us,因此,需要得到定时器超时->回到用户空间的这段时间gravity;不同空间的任务经过的路径不一样,所以针对kernel、user和irq分别计算gravity,当任务定时,定时器到期时间date-gravity才是xntimer的触发时间。当切换回原来的任务时刚好是定时时间。

揭开xenomai双核系统下clock机制的面纱-双内核技术总结来说是,CPU执行代码需要时间,调度度上下切换需要时间,中断、内核态、用户态需要的时间不一样,需要将中间的这些时间排除,这些时间就是gravity。

2.8 autotune

gravity可以使用xenomai 内核代码中的经验值,还可以内核编译时自定义,除这两种之外,xenomai还提供了一种自动计算的程序autotune,它的使用需要配合内核模块autotune,编译内核时选中编译: []Xenomai/cobalt—> Corefeatures—> <>Auto-tuning

程序autotune位于/usr/xenomai/sbin目录下,直接执行会分别计算irq、kernel、user的gravity;

审核编辑:刘清

免责声明:文章内容来自互联网,本站不对其真实性负责,也不承担任何法律责任,如有侵权等情况,请与本站联系删除。
转载请注明出处:揭开xenomai双核系统下clock机制的面纱-双内核技术 https://www.yhzz.com.cn/a/6307.html

上一篇 2023-04-13 13:30:33
下一篇 2023-04-13 13:41:35

相关推荐

联系云恒

在线留言: 我要留言
客服热线:400-600-0310
工作时间:周一至周六,08:30-17:30,节假日休息。